Cleanup: trailing space for BLI
[blender.git] / source / blender / blenlib / intern / threads.c
1 /*
2  * ***** BEGIN GPL LICENSE BLOCK *****
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  *
18  * The Original Code is Copyright (C) 2006 Blender Foundation
19  * All rights reserved.
20  *
21  * The Original Code is: all of this file.
22  *
23  * Contributor(s): none yet.
24  *
25  * ***** END GPL LICENSE BLOCK *****
26  */
27
28 /** \file blender/blenlib/intern/threads.c
29  *  \ingroup bli
30  */
31
32 #include <stdlib.h>
33 #include <errno.h>
34 #include <string.h>
35
36 #include "MEM_guardedalloc.h"
37
38 #include "BLI_listbase.h"
39 #include "BLI_gsqueue.h"
40 #include "BLI_task.h"
41 #include "BLI_threads.h"
42
43 #include "PIL_time.h"
44
45 /* for checking system threads - BLI_system_thread_count */
46 #ifdef WIN32
47 #  include <windows.h>
48 #  include <sys/timeb.h>
49 #elif defined(__APPLE__)
50 #  include <sys/types.h>
51 #  include <sys/sysctl.h>
52 #else
53 #  include <unistd.h>
54 #  include <sys/time.h>
55 #endif
56
57 #include "atomic_ops.h"
58
59 #if defined(__APPLE__) && defined(_OPENMP) && (__GNUC__ == 4) && (__GNUC_MINOR__ == 2) && !defined(__clang__)
60 #  define USE_APPLE_OMP_FIX
61 #endif
62
63 #ifdef USE_APPLE_OMP_FIX
64 /* ************** libgomp (Apple gcc 4.2.1) TLS bug workaround *************** */
65 extern pthread_key_t gomp_tls_key;
66 static void *thread_tls_data;
67 #endif
68
69 /* We're using one global task scheduler for all kind of tasks. */
70 static TaskScheduler *task_scheduler = NULL;
71
72 /* ********** basic thread control API ************
73  *
74  * Many thread cases have an X amount of jobs, and only an Y amount of
75  * threads are useful (typically amount of cpus)
76  *
77  * This code can be used to start a maximum amount of 'thread slots', which
78  * then can be filled in a loop with an idle timer.
79  *
80  * A sample loop can look like this (pseudo c);
81  *
82  *     ListBase lb;
83  *     int maxthreads = 2;
84  *     int cont = 1;
85  *
86  *     BLI_threadpool_init(&lb, do_something_func, maxthreads);
87  *
88  *     while (cont) {
89  *         if (BLI_available_threads(&lb) && !(escape loop event)) {
90  *             // get new job (data pointer)
91  *             // tag job 'processed
92  *             BLI_threadpool_insert(&lb, job);
93  *         }
94  *         else PIL_sleep_ms(50);
95  *
96  *         // find if a job is ready, this the do_something_func() should write in job somewhere
97  *         cont = 0;
98  *         for (go over all jobs)
99  *             if (job is ready) {
100  *                 if (job was not removed) {
101  *                     BLI_threadpool_remove(&lb, job);
102  *                 }
103  *             }
104  *             else cont = 1;
105  *         }
106  *         // conditions to exit loop
107  *         if (if escape loop event) {
108  *             if (BLI_available_threadslots(&lb) == maxthreads)
109  *                 break;
110  *         }
111  *     }
112  *
113  *     BLI_threadpool_end(&lb);
114  *
115  ************************************************ */
116 static SpinLock _malloc_lock;
117 static pthread_mutex_t _image_lock = PTHREAD_MUTEX_INITIALIZER;
118 static pthread_mutex_t _image_draw_lock = PTHREAD_MUTEX_INITIALIZER;
119 static pthread_mutex_t _viewer_lock = PTHREAD_MUTEX_INITIALIZER;
120 static pthread_mutex_t _custom1_lock = PTHREAD_MUTEX_INITIALIZER;
121 static pthread_mutex_t _rcache_lock = PTHREAD_MUTEX_INITIALIZER;
122 static pthread_mutex_t _opengl_lock = PTHREAD_MUTEX_INITIALIZER;
123 static pthread_mutex_t _nodes_lock = PTHREAD_MUTEX_INITIALIZER;
124 static pthread_mutex_t _movieclip_lock = PTHREAD_MUTEX_INITIALIZER;
125 static pthread_mutex_t _colormanage_lock = PTHREAD_MUTEX_INITIALIZER;
126 static pthread_mutex_t _fftw_lock = PTHREAD_MUTEX_INITIALIZER;
127 static pthread_mutex_t _view3d_lock = PTHREAD_MUTEX_INITIALIZER;
128 static pthread_t mainid;
129 static unsigned int thread_levels = 0;  /* threads can be invoked inside threads */
130 static int num_threads_override = 0;
131
132 /* just a max for security reasons */
133 #define RE_MAX_THREAD BLENDER_MAX_THREADS
134
135 typedef struct ThreadSlot {
136         struct ThreadSlot *next, *prev;
137         void *(*do_thread)(void *);
138         void *callerdata;
139         pthread_t pthread;
140         int avail;
141 } ThreadSlot;
142
143 static void BLI_lock_malloc_thread(void)
144 {
145         BLI_spin_lock(&_malloc_lock);
146 }
147
148 static void BLI_unlock_malloc_thread(void)
149 {
150         BLI_spin_unlock(&_malloc_lock);
151 }
152
153 void BLI_threadapi_init(void)
154 {
155         mainid = pthread_self();
156
157         BLI_spin_init(&_malloc_lock);
158 }
159
160 void BLI_threadapi_exit(void)
161 {
162         if (task_scheduler) {
163                 BLI_task_scheduler_free(task_scheduler);
164         }
165         BLI_spin_end(&_malloc_lock);
166 }
167
168 TaskScheduler *BLI_task_scheduler_get(void)
169 {
170         if (task_scheduler == NULL) {
171                 int tot_thread = BLI_system_thread_count();
172
173                 /* Do a lazy initialization, so it happens after
174                  * command line arguments parsing
175                  */
176                 task_scheduler = BLI_task_scheduler_create(tot_thread);
177         }
178
179         return task_scheduler;
180 }
181
182 /* tot = 0 only initializes malloc mutex in a safe way (see sequence.c)
183  * problem otherwise: scene render will kill of the mutex!
184  */
185
186 void BLI_threadpool_init(ListBase *threadbase, void *(*do_thread)(void *), int tot)
187 {
188         int a;
189
190         if (threadbase != NULL && tot > 0) {
191                 BLI_listbase_clear(threadbase);
192
193                 if (tot > RE_MAX_THREAD) tot = RE_MAX_THREAD;
194                 else if (tot < 1) tot = 1;
195
196                 for (a = 0; a < tot; a++) {
197                         ThreadSlot *tslot = MEM_callocN(sizeof(ThreadSlot), "threadslot");
198                         BLI_addtail(threadbase, tslot);
199                         tslot->do_thread = do_thread;
200                         tslot->avail = 1;
201                 }
202         }
203
204         unsigned int level = atomic_fetch_and_add_u(&thread_levels, 1);
205         if (level == 0) {
206                 MEM_set_lock_callback(BLI_lock_malloc_thread, BLI_unlock_malloc_thread);
207
208 #ifdef USE_APPLE_OMP_FIX
209                 /* workaround for Apple gcc 4.2.1 omp vs background thread bug,
210                  * we copy gomp thread local storage pointer to setting it again
211                  * inside the thread that we start */
212                 thread_tls_data = pthread_getspecific(gomp_tls_key);
213 #endif
214         }
215 }
216
217 /* amount of available threads */
218 int BLI_available_threads(ListBase *threadbase)
219 {
220         ThreadSlot *tslot;
221         int counter = 0;
222
223         for (tslot = threadbase->first; tslot; tslot = tslot->next) {
224                 if (tslot->avail)
225                         counter++;
226         }
227         return counter;
228 }
229
230 /* returns thread number, for sample patterns or threadsafe tables */
231 int BLI_threadpool_available_thread_index(ListBase *threadbase)
232 {
233         ThreadSlot *tslot;
234         int counter = 0;
235
236         for (tslot = threadbase->first; tslot; tslot = tslot->next, counter++) {
237                 if (tslot->avail)
238                         return counter;
239         }
240         return 0;
241 }
242
243 static void *tslot_thread_start(void *tslot_p)
244 {
245         ThreadSlot *tslot = (ThreadSlot *)tslot_p;
246
247 #ifdef USE_APPLE_OMP_FIX
248         /* workaround for Apple gcc 4.2.1 omp vs background thread bug,
249          * set gomp thread local storage pointer which was copied beforehand */
250         pthread_setspecific(gomp_tls_key, thread_tls_data);
251 #endif
252
253         return tslot->do_thread(tslot->callerdata);
254 }
255
256 int BLI_thread_is_main(void)
257 {
258         return pthread_equal(pthread_self(), mainid);
259 }
260
261 void BLI_threadpool_insert(ListBase *threadbase, void *callerdata)
262 {
263         ThreadSlot *tslot;
264
265         for (tslot = threadbase->first; tslot; tslot = tslot->next) {
266                 if (tslot->avail) {
267                         tslot->avail = 0;
268                         tslot->callerdata = callerdata;
269                         pthread_create(&tslot->pthread, NULL, tslot_thread_start, tslot);
270                         return;
271                 }
272         }
273         printf("ERROR: could not insert thread slot\n");
274 }
275
276 void BLI_threadpool_remove(ListBase *threadbase, void *callerdata)
277 {
278         ThreadSlot *tslot;
279
280         for (tslot = threadbase->first; tslot; tslot = tslot->next) {
281                 if (tslot->callerdata == callerdata) {
282                         pthread_join(tslot->pthread, NULL);
283                         tslot->callerdata = NULL;
284                         tslot->avail = 1;
285                 }
286         }
287 }
288
289 void BLI_threadpool_remove_index(ListBase *threadbase, int index)
290 {
291         ThreadSlot *tslot;
292         int counter = 0;
293
294         for (tslot = threadbase->first; tslot; tslot = tslot->next, counter++) {
295                 if (counter == index && tslot->avail == 0) {
296                         pthread_join(tslot->pthread, NULL);
297                         tslot->callerdata = NULL;
298                         tslot->avail = 1;
299                         break;
300                 }
301         }
302 }
303
304 void BLI_threadpool_clear(ListBase *threadbase)
305 {
306         ThreadSlot *tslot;
307
308         for (tslot = threadbase->first; tslot; tslot = tslot->next) {
309                 if (tslot->avail == 0) {
310                         pthread_join(tslot->pthread, NULL);
311                         tslot->callerdata = NULL;
312                         tslot->avail = 1;
313                 }
314         }
315 }
316
317 void BLI_threadpool_end(ListBase *threadbase)
318 {
319         ThreadSlot *tslot;
320
321         /* only needed if there's actually some stuff to end
322          * this way we don't end up decrementing thread_levels on an empty threadbase
323          * */
324         if (threadbase && (BLI_listbase_is_empty(threadbase) == false)) {
325                 for (tslot = threadbase->first; tslot; tslot = tslot->next) {
326                         if (tslot->avail == 0) {
327                                 pthread_join(tslot->pthread, NULL);
328                         }
329                 }
330                 BLI_freelistN(threadbase);
331         }
332
333         unsigned int level = atomic_sub_and_fetch_u(&thread_levels, 1);
334         if (level == 0) {
335                 MEM_set_lock_callback(NULL, NULL);
336         }
337 }
338
339 /* System Information */
340
341 /* how many threads are native on this system? */
342 int BLI_system_thread_count(void)
343 {
344         static int t = -1;
345
346         if (num_threads_override != 0) {
347                 return num_threads_override;
348         }
349         else if (LIKELY(t != -1)) {
350                 return t;
351         }
352
353         {
354 #ifdef WIN32
355                 SYSTEM_INFO info;
356                 GetSystemInfo(&info);
357                 t = (int) info.dwNumberOfProcessors;
358 #else
359 #   ifdef __APPLE__
360                 int mib[2];
361                 size_t len;
362
363                 mib[0] = CTL_HW;
364                 mib[1] = HW_NCPU;
365                 len = sizeof(t);
366                 sysctl(mib, 2, &t, &len, NULL, 0);
367 #   else
368                 t = (int)sysconf(_SC_NPROCESSORS_ONLN);
369 #   endif
370 #endif
371         }
372
373         CLAMP(t, 1, RE_MAX_THREAD);
374
375         return t;
376 }
377
378 void BLI_system_num_threads_override_set(int num)
379 {
380         num_threads_override = num;
381 }
382
383 int BLI_system_num_threads_override_get(void)
384 {
385         return num_threads_override;
386 }
387
388 /* Global Mutex Locks */
389
390 static ThreadMutex *global_mutex_from_type(const int type)
391 {
392         switch (type) {
393                 case LOCK_IMAGE:
394                         return &_image_lock;
395                 case LOCK_DRAW_IMAGE:
396                         return &_image_draw_lock;
397                 case LOCK_VIEWER:
398                         return &_viewer_lock;
399                 case LOCK_CUSTOM1:
400                         return &_custom1_lock;
401                 case LOCK_RCACHE:
402                         return &_rcache_lock;
403                 case LOCK_OPENGL:
404                         return &_opengl_lock;
405                 case LOCK_NODES:
406                         return &_nodes_lock;
407                 case LOCK_MOVIECLIP:
408                         return &_movieclip_lock;
409                 case LOCK_COLORMANAGE:
410                         return &_colormanage_lock;
411                 case LOCK_FFTW:
412                         return &_fftw_lock;
413                 case LOCK_VIEW3D:
414                         return &_view3d_lock;
415                 default:
416                         BLI_assert(0);
417                         return NULL;
418         }
419 }
420
421 void BLI_thread_lock(int type)
422 {
423         pthread_mutex_lock(global_mutex_from_type(type));
424 }
425
426 void BLI_thread_unlock(int type)
427 {
428         pthread_mutex_unlock(global_mutex_from_type(type));
429 }
430
431 /* Mutex Locks */
432
433 void BLI_mutex_init(ThreadMutex *mutex)
434 {
435         pthread_mutex_init(mutex, NULL);
436 }
437
438 void BLI_mutex_lock(ThreadMutex *mutex)
439 {
440         pthread_mutex_lock(mutex);
441 }
442
443 void BLI_mutex_unlock(ThreadMutex *mutex)
444 {
445         pthread_mutex_unlock(mutex);
446 }
447
448 bool BLI_mutex_trylock(ThreadMutex *mutex)
449 {
450         return (pthread_mutex_trylock(mutex) == 0);
451 }
452
453 void BLI_mutex_end(ThreadMutex *mutex)
454 {
455         pthread_mutex_destroy(mutex);
456 }
457
458 ThreadMutex *BLI_mutex_alloc(void)
459 {
460         ThreadMutex *mutex = MEM_callocN(sizeof(ThreadMutex), "ThreadMutex");
461         BLI_mutex_init(mutex);
462         return mutex;
463 }
464
465 void BLI_mutex_free(ThreadMutex *mutex)
466 {
467         BLI_mutex_end(mutex);
468         MEM_freeN(mutex);
469 }
470
471 /* Spin Locks */
472
473 void BLI_spin_init(SpinLock *spin)
474 {
475 #if defined(__APPLE__)
476         *spin = OS_SPINLOCK_INIT;
477 #elif defined(_MSC_VER)
478         *spin = 0;
479 #else
480         pthread_spin_init(spin, 0);
481 #endif
482 }
483
484 void BLI_spin_lock(SpinLock *spin)
485 {
486 #if defined(__APPLE__)
487         OSSpinLockLock(spin);
488 #elif defined(_MSC_VER)
489         while (InterlockedExchangeAcquire(spin, 1)) {
490                 while (*spin) {
491                         /* Spinlock hint for processors with hyperthreading. */
492                         YieldProcessor();
493                 }
494         }
495 #else
496         pthread_spin_lock(spin);
497 #endif
498 }
499
500 void BLI_spin_unlock(SpinLock *spin)
501 {
502 #if defined(__APPLE__)
503         OSSpinLockUnlock(spin);
504 #elif defined(_MSC_VER)
505         _ReadWriteBarrier();
506         *spin = 0;
507 #else
508         pthread_spin_unlock(spin);
509 #endif
510 }
511
512 #if defined(__APPLE__) || defined(_MSC_VER)
513 void BLI_spin_end(SpinLock *UNUSED(spin))
514 {
515 }
516 #else
517 void BLI_spin_end(SpinLock *spin)
518 {
519         pthread_spin_destroy(spin);
520 }
521 #endif
522
523 /* Read/Write Mutex Lock */
524
525 void BLI_rw_mutex_init(ThreadRWMutex *mutex)
526 {
527         pthread_rwlock_init(mutex, NULL);
528 }
529
530 void BLI_rw_mutex_lock(ThreadRWMutex *mutex, int mode)
531 {
532         if (mode == THREAD_LOCK_READ)
533                 pthread_rwlock_rdlock(mutex);
534         else
535                 pthread_rwlock_wrlock(mutex);
536 }
537
538 void BLI_rw_mutex_unlock(ThreadRWMutex *mutex)
539 {
540         pthread_rwlock_unlock(mutex);
541 }
542
543 void BLI_rw_mutex_end(ThreadRWMutex *mutex)
544 {
545         pthread_rwlock_destroy(mutex);
546 }
547
548 ThreadRWMutex *BLI_rw_mutex_alloc(void)
549 {
550         ThreadRWMutex *mutex = MEM_callocN(sizeof(ThreadRWMutex), "ThreadRWMutex");
551         BLI_rw_mutex_init(mutex);
552         return mutex;
553 }
554
555 void BLI_rw_mutex_free(ThreadRWMutex *mutex)
556 {
557         BLI_rw_mutex_end(mutex);
558         MEM_freeN(mutex);
559 }
560
561 /* Ticket Mutex Lock */
562
563 struct TicketMutex {
564         pthread_cond_t cond;
565         pthread_mutex_t mutex;
566         unsigned int queue_head, queue_tail;
567 };
568
569 TicketMutex *BLI_ticket_mutex_alloc(void)
570 {
571         TicketMutex *ticket = MEM_callocN(sizeof(TicketMutex), "TicketMutex");
572
573         pthread_cond_init(&ticket->cond, NULL);
574         pthread_mutex_init(&ticket->mutex, NULL);
575
576         return ticket;
577 }
578
579 void BLI_ticket_mutex_free(TicketMutex *ticket)
580 {
581         pthread_mutex_destroy(&ticket->mutex);
582         pthread_cond_destroy(&ticket->cond);
583         MEM_freeN(ticket);
584 }
585
586 void BLI_ticket_mutex_lock(TicketMutex *ticket)
587 {
588         unsigned int queue_me;
589
590         pthread_mutex_lock(&ticket->mutex);
591         queue_me = ticket->queue_tail++;
592
593         while (queue_me != ticket->queue_head)
594                 pthread_cond_wait(&ticket->cond, &ticket->mutex);
595
596         pthread_mutex_unlock(&ticket->mutex);
597 }
598
599 void BLI_ticket_mutex_unlock(TicketMutex *ticket)
600 {
601         pthread_mutex_lock(&ticket->mutex);
602         ticket->queue_head++;
603         pthread_cond_broadcast(&ticket->cond);
604         pthread_mutex_unlock(&ticket->mutex);
605 }
606
607 /* ************************************************ */
608
609 /* Condition */
610
611 void BLI_condition_init(ThreadCondition *cond)
612 {
613         pthread_cond_init(cond, NULL);
614 }
615
616 void BLI_condition_wait(ThreadCondition *cond, ThreadMutex *mutex)
617 {
618         pthread_cond_wait(cond, mutex);
619 }
620
621 void BLI_condition_wait_global_mutex(ThreadCondition *cond, const int type)
622 {
623         pthread_cond_wait(cond, global_mutex_from_type(type));
624 }
625
626 void BLI_condition_notify_one(ThreadCondition *cond)
627 {
628         pthread_cond_signal(cond);
629 }
630
631 void BLI_condition_notify_all(ThreadCondition *cond)
632 {
633         pthread_cond_broadcast(cond);
634 }
635
636 void BLI_condition_end(ThreadCondition *cond)
637 {
638         pthread_cond_destroy(cond);
639 }
640
641 /* ************************************************ */
642
643 struct ThreadQueue {
644         GSQueue *queue;
645         pthread_mutex_t mutex;
646         pthread_cond_t push_cond;
647         pthread_cond_t finish_cond;
648         volatile int nowait;
649         volatile int canceled;
650 };
651
652 ThreadQueue *BLI_thread_queue_init(void)
653 {
654         ThreadQueue *queue;
655
656         queue = MEM_callocN(sizeof(ThreadQueue), "ThreadQueue");
657         queue->queue = BLI_gsqueue_new(sizeof(void *));
658
659         pthread_mutex_init(&queue->mutex, NULL);
660         pthread_cond_init(&queue->push_cond, NULL);
661         pthread_cond_init(&queue->finish_cond, NULL);
662
663         return queue;
664 }
665
666 void BLI_thread_queue_free(ThreadQueue *queue)
667 {
668         /* destroy everything, assumes no one is using queue anymore */
669         pthread_cond_destroy(&queue->finish_cond);
670         pthread_cond_destroy(&queue->push_cond);
671         pthread_mutex_destroy(&queue->mutex);
672
673         BLI_gsqueue_free(queue->queue);
674
675         MEM_freeN(queue);
676 }
677
678 void BLI_thread_queue_push(ThreadQueue *queue, void *work)
679 {
680         pthread_mutex_lock(&queue->mutex);
681
682         BLI_gsqueue_push(queue->queue, &work);
683
684         /* signal threads waiting to pop */
685         pthread_cond_signal(&queue->push_cond);
686         pthread_mutex_unlock(&queue->mutex);
687 }
688
689 void *BLI_thread_queue_pop(ThreadQueue *queue)
690 {
691         void *work = NULL;
692
693         /* wait until there is work */
694         pthread_mutex_lock(&queue->mutex);
695         while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
696                 pthread_cond_wait(&queue->push_cond, &queue->mutex);
697
698         /* if we have something, pop it */
699         if (!BLI_gsqueue_is_empty(queue->queue)) {
700                 BLI_gsqueue_pop(queue->queue, &work);
701
702                 if (BLI_gsqueue_is_empty(queue->queue))
703                         pthread_cond_broadcast(&queue->finish_cond);
704         }
705
706         pthread_mutex_unlock(&queue->mutex);
707
708         return work;
709 }
710
711 static void wait_timeout(struct timespec *timeout, int ms)
712 {
713         ldiv_t div_result;
714         long sec, usec, x;
715
716 #ifdef WIN32
717         {
718                 struct _timeb now;
719                 _ftime(&now);
720                 sec = now.time;
721                 usec = now.millitm * 1000; /* microsecond precision would be better */
722         }
723 #else
724         {
725                 struct timeval now;
726                 gettimeofday(&now, NULL);
727                 sec = now.tv_sec;
728                 usec = now.tv_usec;
729         }
730 #endif
731
732         /* add current time + millisecond offset */
733         div_result = ldiv(ms, 1000);
734         timeout->tv_sec = sec + div_result.quot;
735
736         x = usec + (div_result.rem * 1000);
737
738         if (x >= 1000000) {
739                 timeout->tv_sec++;
740                 x -= 1000000;
741         }
742
743         timeout->tv_nsec = x * 1000;
744 }
745
746 void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
747 {
748         double t;
749         void *work = NULL;
750         struct timespec timeout;
751
752         t = PIL_check_seconds_timer();
753         wait_timeout(&timeout, ms);
754
755         /* wait until there is work */
756         pthread_mutex_lock(&queue->mutex);
757         while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
758                 if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT)
759                         break;
760                 else if (PIL_check_seconds_timer() - t >= ms * 0.001)
761                         break;
762         }
763
764         /* if we have something, pop it */
765         if (!BLI_gsqueue_is_empty(queue->queue)) {
766                 BLI_gsqueue_pop(queue->queue, &work);
767
768                 if (BLI_gsqueue_is_empty(queue->queue))
769                         pthread_cond_broadcast(&queue->finish_cond);
770         }
771
772         pthread_mutex_unlock(&queue->mutex);
773
774         return work;
775 }
776
777 int BLI_thread_queue_len(ThreadQueue *queue)
778 {
779         int size;
780
781         pthread_mutex_lock(&queue->mutex);
782         size = BLI_gsqueue_len(queue->queue);
783         pthread_mutex_unlock(&queue->mutex);
784
785         return size;
786 }
787
788 bool BLI_thread_queue_is_empty(ThreadQueue *queue)
789 {
790         bool is_empty;
791
792         pthread_mutex_lock(&queue->mutex);
793         is_empty = BLI_gsqueue_is_empty(queue->queue);
794         pthread_mutex_unlock(&queue->mutex);
795
796         return is_empty;
797 }
798
799 void BLI_thread_queue_nowait(ThreadQueue *queue)
800 {
801         pthread_mutex_lock(&queue->mutex);
802
803         queue->nowait = 1;
804
805         /* signal threads waiting to pop */
806         pthread_cond_broadcast(&queue->push_cond);
807         pthread_mutex_unlock(&queue->mutex);
808 }
809
810 void BLI_thread_queue_wait_finish(ThreadQueue *queue)
811 {
812         /* wait for finish condition */
813         pthread_mutex_lock(&queue->mutex);
814
815         while (!BLI_gsqueue_is_empty(queue->queue))
816                 pthread_cond_wait(&queue->finish_cond, &queue->mutex);
817
818         pthread_mutex_unlock(&queue->mutex);
819 }
820
821 /* ************************************************ */
822
823 void BLI_threaded_malloc_begin(void)
824 {
825         unsigned int level = atomic_fetch_and_add_u(&thread_levels, 1);
826         if (level == 0) {
827                 MEM_set_lock_callback(BLI_lock_malloc_thread, BLI_unlock_malloc_thread);
828                 /* There is a little chance that two threads will meed to acces to a
829                  * scheduler which was not yet created from main thread. which could
830                  * cause scheduler created multiple times.
831                  */
832                 BLI_task_scheduler_get();
833         }
834 }
835
836 void BLI_threaded_malloc_end(void)
837 {
838         unsigned int level = atomic_sub_and_fetch_u(&thread_levels, 1);
839         if (level == 0) {
840                 MEM_set_lock_callback(NULL, NULL);
841         }
842 }
843