2b6fb52c49c296f9debda3c421d271669383870d
[blender.git] / source / blender / blenlib / intern / threads.c
1 /*
2  * ***** BEGIN GPL LICENSE BLOCK *****
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version. 
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  *
18  * The Original Code is Copyright (C) 2006 Blender Foundation
19  * All rights reserved.
20  *
21  * The Original Code is: all of this file.
22  *
23  * Contributor(s): none yet.
24  *
25  * ***** END GPL LICENSE BLOCK *****
26  */
27
28 /** \file blender/blenlib/intern/threads.c
29  *  \ingroup bli
30  */
31
32 #include <stdlib.h>
33 #include <errno.h>
34 #include <string.h>
35
36 #include "MEM_guardedalloc.h"
37
38 #include "BLI_listbase.h"
39 #include "BLI_gsqueue.h"
40 #include "BLI_threads.h"
41
42 #include "PIL_time.h"
43
44 /* for checking system threads - BLI_system_thread_count */
45 #ifdef WIN32
46 #  include <windows.h>
47 #  include <sys/timeb.h>
48 #elif defined(__APPLE__)
49 #  include <sys/types.h>
50 #  include <sys/sysctl.h>
51 #else
52 #  include <unistd.h>
53 #  include <sys/time.h>
54 #endif
55
56 #if defined(__APPLE__) && defined(_OPENMP) && (__GNUC__ == 4) && (__GNUC_MINOR__ == 2)
57 #  define USE_APPLE_OMP_FIX
58 #endif
59
60 #ifdef USE_APPLE_OMP_FIX
61 /* ************** libgomp (Apple gcc 4.2.1) TLS bug workaround *************** */
62 extern pthread_key_t gomp_tls_key;
63 static void *thread_tls_data;
64 #endif
65
66 /* ********** basic thread control API ************ 
67  * 
68  * Many thread cases have an X amount of jobs, and only an Y amount of
69  * threads are useful (typically amount of cpus)
70  *
71  * This code can be used to start a maximum amount of 'thread slots', which
72  * then can be filled in a loop with an idle timer. 
73  *
74  * A sample loop can look like this (pseudo c);
75  *
76  *     ListBase lb;
77  *     int maxthreads = 2;
78  *     int cont = 1;
79  * 
80  *     BLI_init_threads(&lb, do_something_func, maxthreads);
81  * 
82  *     while (cont) {
83  *         if (BLI_available_threads(&lb) && !(escape loop event)) {
84  *             // get new job (data pointer)
85  *             // tag job 'processed 
86  *             BLI_insert_thread(&lb, job);
87  *         }
88  *         else PIL_sleep_ms(50);
89  *         
90  *         // find if a job is ready, this the do_something_func() should write in job somewhere
91  *         cont = 0;
92  *         for (go over all jobs)
93  *             if (job is ready) {
94  *                 if (job was not removed) {
95  *                     BLI_remove_thread(&lb, job);
96  *                 }
97  *             }
98  *             else cont = 1;
99  *         }
100  *         // conditions to exit loop 
101  *         if (if escape loop event) {
102  *             if (BLI_available_threadslots(&lb) == maxthreads)
103  *                 break;
104  *         }
105  *     }
106  * 
107  *     BLI_end_threads(&lb);
108  *
109  ************************************************ */
110 static pthread_mutex_t _malloc_lock = PTHREAD_MUTEX_INITIALIZER;
111 static pthread_mutex_t _image_lock = PTHREAD_MUTEX_INITIALIZER;
112 static pthread_mutex_t _image_draw_lock = PTHREAD_MUTEX_INITIALIZER;
113 static pthread_mutex_t _viewer_lock = PTHREAD_MUTEX_INITIALIZER;
114 static pthread_mutex_t _custom1_lock = PTHREAD_MUTEX_INITIALIZER;
115 static pthread_mutex_t _rcache_lock = PTHREAD_MUTEX_INITIALIZER;
116 static pthread_mutex_t _opengl_lock = PTHREAD_MUTEX_INITIALIZER;
117 static pthread_mutex_t _nodes_lock = PTHREAD_MUTEX_INITIALIZER;
118 static pthread_mutex_t _movieclip_lock = PTHREAD_MUTEX_INITIALIZER;
119 static pthread_mutex_t _colormanage_lock = PTHREAD_MUTEX_INITIALIZER;
120 static pthread_t mainid;
121 static int thread_levels = 0;  /* threads can be invoked inside threads */
122 static int num_threads_override = 0;
123
124 /* just a max for security reasons */
125 #define RE_MAX_THREAD BLENDER_MAX_THREADS
126
127 typedef struct ThreadSlot {
128         struct ThreadSlot *next, *prev;
129         void *(*do_thread)(void *);
130         void *callerdata;
131         pthread_t pthread;
132         int avail;
133 } ThreadSlot;
134
135 static void BLI_lock_malloc_thread(void)
136 {
137         pthread_mutex_lock(&_malloc_lock);
138 }
139
140 static void BLI_unlock_malloc_thread(void)
141 {
142         pthread_mutex_unlock(&_malloc_lock);
143 }
144
145 void BLI_threadapi_init(void)
146 {
147         mainid = pthread_self();
148 }
149
150 /* tot = 0 only initializes malloc mutex in a safe way (see sequence.c)
151  * problem otherwise: scene render will kill of the mutex!
152  */
153
154 void BLI_init_threads(ListBase *threadbase, void *(*do_thread)(void *), int tot)
155 {
156         int a;
157
158         if (threadbase != NULL && tot > 0) {
159                 threadbase->first = threadbase->last = NULL;
160         
161                 if (tot > RE_MAX_THREAD) tot = RE_MAX_THREAD;
162                 else if (tot < 1) tot = 1;
163         
164                 for (a = 0; a < tot; a++) {
165                         ThreadSlot *tslot = MEM_callocN(sizeof(ThreadSlot), "threadslot");
166                         BLI_addtail(threadbase, tslot);
167                         tslot->do_thread = do_thread;
168                         tslot->avail = 1;
169                 }
170         }
171         
172         if (thread_levels == 0) {
173                 MEM_set_lock_callback(BLI_lock_malloc_thread, BLI_unlock_malloc_thread);
174
175 #ifdef USE_APPLE_OMP_FIX
176                 /* workaround for Apple gcc 4.2.1 omp vs background thread bug,
177                  * we copy gomp thread local storage pointer to setting it again
178                  * inside the thread that we start */
179                 thread_tls_data = pthread_getspecific(gomp_tls_key);
180 #endif
181         }
182
183         thread_levels++;
184 }
185
186 /* amount of available threads */
187 int BLI_available_threads(ListBase *threadbase)
188 {
189         ThreadSlot *tslot;
190         int counter = 0;
191         
192         for (tslot = threadbase->first; tslot; tslot = tslot->next) {
193                 if (tslot->avail)
194                         counter++;
195         }
196         return counter;
197 }
198
199 /* returns thread number, for sample patterns or threadsafe tables */
200 int BLI_available_thread_index(ListBase *threadbase)
201 {
202         ThreadSlot *tslot;
203         int counter = 0;
204         
205         for (tslot = threadbase->first; tslot; tslot = tslot->next, counter++) {
206                 if (tslot->avail)
207                         return counter;
208         }
209         return 0;
210 }
211
212 static void *tslot_thread_start(void *tslot_p)
213 {
214         ThreadSlot *tslot = (ThreadSlot *)tslot_p;
215
216 #ifdef USE_APPLE_OMP_FIX
217         /* workaround for Apple gcc 4.2.1 omp vs background thread bug,
218          * set gomp thread local storage pointer which was copied beforehand */
219         pthread_setspecific(gomp_tls_key, thread_tls_data);
220 #endif
221
222         return tslot->do_thread(tslot->callerdata);
223 }
224
225 int BLI_thread_is_main(void)
226 {
227         return pthread_equal(pthread_self(), mainid);
228 }
229
230 void BLI_insert_thread(ListBase *threadbase, void *callerdata)
231 {
232         ThreadSlot *tslot;
233         
234         for (tslot = threadbase->first; tslot; tslot = tslot->next) {
235                 if (tslot->avail) {
236                         tslot->avail = 0;
237                         tslot->callerdata = callerdata;
238                         pthread_create(&tslot->pthread, NULL, tslot_thread_start, tslot);
239                         return;
240                 }
241         }
242         printf("ERROR: could not insert thread slot\n");
243 }
244
245 void BLI_remove_thread(ListBase *threadbase, void *callerdata)
246 {
247         ThreadSlot *tslot;
248         
249         for (tslot = threadbase->first; tslot; tslot = tslot->next) {
250                 if (tslot->callerdata == callerdata) {
251                         pthread_join(tslot->pthread, NULL);
252                         tslot->callerdata = NULL;
253                         tslot->avail = 1;
254                 }
255         }
256 }
257
258 void BLI_remove_thread_index(ListBase *threadbase, int index)
259 {
260         ThreadSlot *tslot;
261         int counter = 0;
262         
263         for (tslot = threadbase->first; tslot; tslot = tslot->next, counter++) {
264                 if (counter == index && tslot->avail == 0) {
265                         pthread_join(tslot->pthread, NULL);
266                         tslot->callerdata = NULL;
267                         tslot->avail = 1;
268                         break;
269                 }
270         }
271 }
272
273 void BLI_remove_threads(ListBase *threadbase)
274 {
275         ThreadSlot *tslot;
276         
277         for (tslot = threadbase->first; tslot; tslot = tslot->next) {
278                 if (tslot->avail == 0) {
279                         pthread_join(tslot->pthread, NULL);
280                         tslot->callerdata = NULL;
281                         tslot->avail = 1;
282                 }
283         }
284 }
285
286 void BLI_end_threads(ListBase *threadbase)
287 {
288         ThreadSlot *tslot;
289         
290         /* only needed if there's actually some stuff to end
291          * this way we don't end up decrementing thread_levels on an empty threadbase 
292          * */
293         if (threadbase && threadbase->first != NULL) {
294                 for (tslot = threadbase->first; tslot; tslot = tslot->next) {
295                         if (tslot->avail == 0) {
296                                 pthread_join(tslot->pthread, NULL);
297                         }
298                 }
299                 BLI_freelistN(threadbase);
300         }
301
302         thread_levels--;
303         if (thread_levels == 0)
304                 MEM_set_lock_callback(NULL, NULL);
305 }
306
307 /* System Information */
308
309 /* how many threads are native on this system? */
310 int BLI_system_thread_count(void)
311 {
312         int t;
313 #ifdef WIN32
314         SYSTEM_INFO info;
315         GetSystemInfo(&info);
316         t = (int) info.dwNumberOfProcessors;
317 #else 
318 #   ifdef __APPLE__
319         int mib[2];
320         size_t len;
321         
322         mib[0] = CTL_HW;
323         mib[1] = HW_NCPU;
324         len = sizeof(t);
325         sysctl(mib, 2, &t, &len, NULL, 0);
326 #   else
327         t = (int)sysconf(_SC_NPROCESSORS_ONLN);
328 #   endif
329 #endif
330
331         if (num_threads_override > 0)
332                 return num_threads_override;
333         
334         if (t > RE_MAX_THREAD)
335                 return RE_MAX_THREAD;
336         if (t < 1)
337                 return 1;
338         
339         return t;
340 }
341
342 void BLI_system_num_threads_override_set(int num)
343 {
344         num_threads_override = num;
345 }
346
347 int BLI_system_num_threads_override_get(void)
348 {
349         return num_threads_override;
350 }
351
352 /* Global Mutex Locks */
353
354 void BLI_lock_thread(int type)
355 {
356         if (type == LOCK_IMAGE)
357                 pthread_mutex_lock(&_image_lock);
358         else if (type == LOCK_DRAW_IMAGE)
359                 pthread_mutex_lock(&_image_draw_lock);
360         else if (type == LOCK_VIEWER)
361                 pthread_mutex_lock(&_viewer_lock);
362         else if (type == LOCK_CUSTOM1)
363                 pthread_mutex_lock(&_custom1_lock);
364         else if (type == LOCK_RCACHE)
365                 pthread_mutex_lock(&_rcache_lock);
366         else if (type == LOCK_OPENGL)
367                 pthread_mutex_lock(&_opengl_lock);
368         else if (type == LOCK_NODES)
369                 pthread_mutex_lock(&_nodes_lock);
370         else if (type == LOCK_MOVIECLIP)
371                 pthread_mutex_lock(&_movieclip_lock);
372         else if (type == LOCK_COLORMANAGE)
373                 pthread_mutex_lock(&_colormanage_lock);
374 }
375
376 void BLI_unlock_thread(int type)
377 {
378         if (type == LOCK_IMAGE)
379                 pthread_mutex_unlock(&_image_lock);
380         else if (type == LOCK_DRAW_IMAGE)
381                 pthread_mutex_unlock(&_image_draw_lock);
382         else if (type == LOCK_VIEWER)
383                 pthread_mutex_unlock(&_viewer_lock);
384         else if (type == LOCK_CUSTOM1)
385                 pthread_mutex_unlock(&_custom1_lock);
386         else if (type == LOCK_RCACHE)
387                 pthread_mutex_unlock(&_rcache_lock);
388         else if (type == LOCK_OPENGL)
389                 pthread_mutex_unlock(&_opengl_lock);
390         else if (type == LOCK_NODES)
391                 pthread_mutex_unlock(&_nodes_lock);
392         else if (type == LOCK_MOVIECLIP)
393                 pthread_mutex_unlock(&_movieclip_lock);
394         else if (type == LOCK_COLORMANAGE)
395                 pthread_mutex_unlock(&_colormanage_lock);
396 }
397
398 /* Mutex Locks */
399
400 void BLI_mutex_init(ThreadMutex *mutex)
401 {
402         pthread_mutex_init(mutex, NULL);
403 }
404
405 void BLI_mutex_lock(ThreadMutex *mutex)
406 {
407         pthread_mutex_lock(mutex);
408 }
409
410 void BLI_mutex_unlock(ThreadMutex *mutex)
411 {
412         pthread_mutex_unlock(mutex);
413 }
414
415 void BLI_mutex_end(ThreadMutex *mutex)
416 {
417         pthread_mutex_destroy(mutex);
418 }
419
420 ThreadMutex *BLI_mutex_alloc(void)
421 {
422         ThreadMutex *mutex = MEM_callocN(sizeof(ThreadMutex), "ThreadMutex");
423         BLI_mutex_init(mutex);
424         return mutex;
425 }
426
427 void BLI_mutex_free(ThreadMutex *mutex)
428 {
429         BLI_mutex_end(mutex);
430         MEM_freeN(mutex);
431 }
432
433 /* Spin Locks */
434
435 void BLI_spin_init(SpinLock *spin)
436 {
437 #ifdef __APPLE__
438         *spin = OS_SPINLOCK_INIT;
439 #else
440         pthread_spin_init(spin, 0);
441 #endif
442 }
443
444 void BLI_spin_lock(SpinLock *spin)
445 {
446 #ifdef __APPLE__
447         OSSpinLockLock(spin);
448 #else
449         pthread_spin_lock(spin);
450 #endif
451 }
452
453 void BLI_spin_unlock(SpinLock *spin)
454 {
455 #ifdef __APPLE__
456         OSSpinLockUnlock(spin);
457 #else
458         pthread_spin_unlock(spin);
459 #endif
460 }
461
462 #ifndef __APPLE__
463 void BLI_spin_end(SpinLock *spin)
464 {
465         pthread_spin_destroy(spin);
466 }
467 #else
468 void BLI_spin_end(SpinLock *UNUSED(spin))
469 {
470 }
471 #endif
472
473 /* Read/Write Mutex Lock */
474
475 void BLI_rw_mutex_init(ThreadRWMutex *mutex)
476 {
477         pthread_rwlock_init(mutex, NULL);
478 }
479
480 void BLI_rw_mutex_lock(ThreadRWMutex *mutex, int mode)
481 {
482         if (mode == THREAD_LOCK_READ)
483                 pthread_rwlock_rdlock(mutex);
484         else
485                 pthread_rwlock_wrlock(mutex);
486 }
487
488 void BLI_rw_mutex_unlock(ThreadRWMutex *mutex)
489 {
490         pthread_rwlock_unlock(mutex);
491 }
492
493 void BLI_rw_mutex_end(ThreadRWMutex *mutex)
494 {
495         pthread_rwlock_destroy(mutex);
496 }
497
498 ThreadRWMutex *BLI_rw_mutex_alloc(void)
499 {
500         ThreadRWMutex *mutex = MEM_callocN(sizeof(ThreadRWMutex), "ThreadRWMutex");
501         BLI_rw_mutex_init(mutex);
502         return mutex;
503 }
504
505 void BLI_rw_mutex_free(ThreadRWMutex *mutex)
506 {
507         BLI_rw_mutex_end(mutex);
508         MEM_freeN(mutex);
509 }
510
511 /* Ticket Mutex Lock */
512
513 struct TicketMutex {
514         pthread_cond_t cond;
515         pthread_mutex_t mutex;
516         unsigned int queue_head, queue_tail;
517 };
518
519 TicketMutex *BLI_ticket_mutex_alloc(void)
520 {
521         TicketMutex *ticket = MEM_callocN(sizeof(TicketMutex), "TicketMutex");
522
523         pthread_cond_init(&ticket->cond, NULL);
524         pthread_mutex_init(&ticket->mutex, NULL);
525
526         return ticket;
527 }
528
529 void BLI_ticket_mutex_free(TicketMutex *ticket)
530 {
531         pthread_mutex_destroy(&ticket->mutex);
532         pthread_cond_destroy(&ticket->cond);
533         MEM_freeN(ticket);
534 }
535
536 void BLI_ticket_mutex_lock(TicketMutex *ticket)
537 {
538         unsigned int queue_me;
539
540         pthread_mutex_lock(&ticket->mutex);
541         queue_me = ticket->queue_tail++;
542
543         while (queue_me != ticket->queue_head)
544                 pthread_cond_wait(&ticket->cond, &ticket->mutex);
545
546         pthread_mutex_unlock(&ticket->mutex);
547 }
548
549 void BLI_ticket_mutex_unlock(TicketMutex *ticket)
550 {
551         pthread_mutex_lock(&ticket->mutex);
552         ticket->queue_head++;
553         pthread_cond_broadcast(&ticket->cond);
554         pthread_mutex_unlock(&ticket->mutex);
555 }
556
557 /* ************************************************ */
558
559 typedef struct ThreadedWorker {
560         ListBase threadbase;
561         void *(*work_fnct)(void *);
562         char busy[RE_MAX_THREAD];
563         int total;
564         int sleep_time;
565 } ThreadedWorker;
566
567 typedef struct WorkParam {
568         ThreadedWorker *worker;
569         void *param;
570         int index;
571 } WorkParam;
572
573 static void *exec_work_fnct(void *v_param)
574 {
575         WorkParam *p = (WorkParam *)v_param;
576         void *value;
577         
578         value = p->worker->work_fnct(p->param);
579         
580         p->worker->busy[p->index] = 0;
581         MEM_freeN(p);
582         
583         return value;
584 }
585
586 ThreadedWorker *BLI_create_worker(void *(*do_thread)(void *), int tot, int sleep_time)
587 {
588         ThreadedWorker *worker;
589         
590         (void)sleep_time; /* unused */
591         
592         worker = MEM_callocN(sizeof(ThreadedWorker), "threadedworker");
593         
594         if (tot > RE_MAX_THREAD) {
595                 tot = RE_MAX_THREAD;
596         }
597         else if (tot < 1) {
598                 tot = 1;
599         }
600         
601         worker->total = tot;
602         worker->work_fnct = do_thread;
603         
604         BLI_init_threads(&worker->threadbase, exec_work_fnct, tot);
605         
606         return worker;
607 }
608
609 void BLI_end_worker(ThreadedWorker *worker)
610 {
611         BLI_remove_threads(&worker->threadbase);
612 }
613
614 void BLI_destroy_worker(ThreadedWorker *worker)
615 {
616         BLI_end_worker(worker);
617         BLI_freelistN(&worker->threadbase);
618         MEM_freeN(worker);
619 }
620
621 void BLI_insert_work(ThreadedWorker *worker, void *param)
622 {
623         WorkParam *p = MEM_callocN(sizeof(WorkParam), "workparam");
624         int index;
625         
626         if (BLI_available_threads(&worker->threadbase) == 0) {
627                 index = worker->total;
628                 while (index == worker->total) {
629                         PIL_sleep_ms(worker->sleep_time);
630                         
631                         for (index = 0; index < worker->total; index++) {
632                                 if (worker->busy[index] == 0) {
633                                         BLI_remove_thread_index(&worker->threadbase, index);
634                                         break;
635                                 }
636                         }
637                 }
638         }
639         else {
640                 index = BLI_available_thread_index(&worker->threadbase);
641         }
642         
643         worker->busy[index] = 1;
644         
645         p->param = param;
646         p->index = index;
647         p->worker = worker;
648         
649         BLI_insert_thread(&worker->threadbase, p);
650 }
651
652 /* ************************************************ */
653
654 struct ThreadQueue {
655         GSQueue *queue;
656         pthread_mutex_t mutex;
657         pthread_cond_t push_cond;
658         pthread_cond_t finish_cond;
659         volatile int nowait;
660         volatile int cancelled;
661 };
662
663 ThreadQueue *BLI_thread_queue_init(void)
664 {
665         ThreadQueue *queue;
666
667         queue = MEM_callocN(sizeof(ThreadQueue), "ThreadQueue");
668         queue->queue = BLI_gsqueue_new(sizeof(void *));
669
670         pthread_mutex_init(&queue->mutex, NULL);
671         pthread_cond_init(&queue->push_cond, NULL);
672         pthread_cond_init(&queue->finish_cond, NULL);
673
674         return queue;
675 }
676
677 void BLI_thread_queue_free(ThreadQueue *queue)
678 {
679         /* destroy everything, assumes no one is using queue anymore */
680         pthread_cond_destroy(&queue->finish_cond);
681         pthread_cond_destroy(&queue->push_cond);
682         pthread_mutex_destroy(&queue->mutex);
683
684         BLI_gsqueue_free(queue->queue);
685
686         MEM_freeN(queue);
687 }
688
689 void BLI_thread_queue_push(ThreadQueue *queue, void *work)
690 {
691         pthread_mutex_lock(&queue->mutex);
692
693         BLI_gsqueue_push(queue->queue, &work);
694
695         /* signal threads waiting to pop */
696         pthread_cond_signal(&queue->push_cond);
697         pthread_mutex_unlock(&queue->mutex);
698 }
699
700 void *BLI_thread_queue_pop(ThreadQueue *queue)
701 {
702         void *work = NULL;
703
704         /* wait until there is work */
705         pthread_mutex_lock(&queue->mutex);
706         while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
707                 pthread_cond_wait(&queue->push_cond, &queue->mutex);
708         
709         /* if we have something, pop it */
710         if (!BLI_gsqueue_is_empty(queue->queue)) {
711                 BLI_gsqueue_pop(queue->queue, &work);
712                 
713                 if (BLI_gsqueue_is_empty(queue->queue))
714                         pthread_cond_broadcast(&queue->finish_cond);
715         }
716
717         pthread_mutex_unlock(&queue->mutex);
718
719         return work;
720 }
721
722 static void wait_timeout(struct timespec *timeout, int ms)
723 {
724         ldiv_t div_result;
725         long sec, usec, x;
726
727 #ifdef WIN32
728         {
729                 struct _timeb now;
730                 _ftime(&now);
731                 sec = now.time;
732                 usec = now.millitm * 1000; /* microsecond precision would be better */
733         }
734 #else
735         {
736                 struct timeval now;
737                 gettimeofday(&now, NULL);
738                 sec = now.tv_sec;
739                 usec = now.tv_usec;
740         }
741 #endif
742
743         /* add current time + millisecond offset */
744         div_result = ldiv(ms, 1000);
745         timeout->tv_sec = sec + div_result.quot;
746
747         x = usec + (div_result.rem * 1000);
748
749         if (x >= 1000000) {
750                 timeout->tv_sec++;
751                 x -= 1000000;
752         }
753
754         timeout->tv_nsec = x * 1000;
755 }
756
757 void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
758 {
759         double t;
760         void *work = NULL;
761         struct timespec timeout;
762
763         t = PIL_check_seconds_timer();
764         wait_timeout(&timeout, ms);
765
766         /* wait until there is work */
767         pthread_mutex_lock(&queue->mutex);
768         while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
769                 if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT)
770                         break;
771                 else if (PIL_check_seconds_timer() - t >= ms * 0.001)
772                         break;
773         }
774
775         /* if we have something, pop it */
776         if (!BLI_gsqueue_is_empty(queue->queue)) {
777                 BLI_gsqueue_pop(queue->queue, &work);
778                 
779                 if (BLI_gsqueue_is_empty(queue->queue))
780                         pthread_cond_broadcast(&queue->finish_cond);
781         }
782         
783         pthread_mutex_unlock(&queue->mutex);
784
785         return work;
786 }
787
788 int BLI_thread_queue_size(ThreadQueue *queue)
789 {
790         int size;
791
792         pthread_mutex_lock(&queue->mutex);
793         size = BLI_gsqueue_size(queue->queue);
794         pthread_mutex_unlock(&queue->mutex);
795
796         return size;
797 }
798
799 void BLI_thread_queue_nowait(ThreadQueue *queue)
800 {
801         pthread_mutex_lock(&queue->mutex);
802
803         queue->nowait = 1;
804
805         /* signal threads waiting to pop */
806         pthread_cond_broadcast(&queue->push_cond);
807         pthread_mutex_unlock(&queue->mutex);
808 }
809
810 void BLI_thread_queue_wait_finish(ThreadQueue *queue)
811 {
812         /* wait for finish condition */
813         pthread_mutex_lock(&queue->mutex);
814
815         while (!BLI_gsqueue_is_empty(queue->queue))
816                 pthread_cond_wait(&queue->finish_cond, &queue->mutex);
817
818         pthread_mutex_unlock(&queue->mutex);
819 }
820
821 /* ************************************************ */
822
823 void BLI_begin_threaded_malloc(void)
824 {
825         /* Used for debug only */
826         /* BLI_assert(thread_levels >= 0); */
827
828         if (thread_levels == 0) {
829                 MEM_set_lock_callback(BLI_lock_malloc_thread, BLI_unlock_malloc_thread);
830         }
831         thread_levels++;
832 }
833
834 void BLI_end_threaded_malloc(void)
835 {
836         /* Used for debug only */
837         /* BLI_assert(thread_levels >= 0); */
838
839         thread_levels--;
840         if (thread_levels == 0)
841                 MEM_set_lock_callback(NULL, NULL);
842 }
843