Applied and completed a compositor patch by Brecht to use signalling and waiting...
authorLukas Toenne <lukas.toenne@googlemail.com>
Sun, 10 Jun 2012 12:26:33 +0000 (12:26 +0000)
committerLukas Toenne <lukas.toenne@googlemail.com>
Sun, 10 Jun 2012 12:26:33 +0000 (12:26 +0000)
source/blender/blenlib/BLI_threads.h
source/blender/blenlib/intern/threads.c
source/blender/compositor/intern/COM_ExecutionGroup.cpp
source/blender/compositor/intern/COM_WorkScheduler.cpp
source/blender/compositor/intern/COM_WorkScheduler.h

index b13da9f0dd425845cbf8e2b4e6d9b448ae443f6e..fb8771722c1a64095c47e95e93e44a05ff969b8b 100644 (file)
@@ -136,6 +136,7 @@ void *BLI_thread_queue_pop(ThreadQueue *queue);
 void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms);
 int BLI_thread_queue_size(ThreadQueue *queue);
 
+void BLI_thread_queue_wait_finish(ThreadQueue *queue);
 void BLI_thread_queue_nowait(ThreadQueue *queue);
 
 #endif
index f9f677d7c224ec7adfbe1228aedd6c21494a504f..dc4c15a82fcef696f71ab7eff25695056dbb90e3 100644 (file)
@@ -520,8 +520,10 @@ void BLI_insert_work(ThreadedWorker *worker, void *param)
 struct ThreadQueue {
        GSQueue *queue;
        pthread_mutex_t mutex;
-       pthread_cond_t cond;
-       int nowait;
+       pthread_cond_t push_cond;
+       pthread_cond_t finish_cond;
+       volatile int nowait;
+       volatile int cancelled;
 };
 
 ThreadQueue *BLI_thread_queue_init(void)
@@ -532,14 +534,17 @@ ThreadQueue *BLI_thread_queue_init(void)
        queue->queue = BLI_gsqueue_new(sizeof(void *));
 
        pthread_mutex_init(&queue->mutex, NULL);
-       pthread_cond_init(&queue->cond, NULL);
+       pthread_cond_init(&queue->push_cond, NULL);
+       pthread_cond_init(&queue->finish_cond, NULL);
 
        return queue;
 }
 
 void BLI_thread_queue_free(ThreadQueue *queue)
 {
-       pthread_cond_destroy(&queue->cond);
+       /* destroy everything, assumes no one is using queue anymore */
+       pthread_cond_destroy(&queue->finish_cond);
+       pthread_cond_destroy(&queue->push_cond);
        pthread_mutex_destroy(&queue->mutex);
 
        BLI_gsqueue_free(queue->queue);
@@ -554,7 +559,7 @@ void BLI_thread_queue_push(ThreadQueue *queue, void *work)
        BLI_gsqueue_push(queue->queue, &work);
 
        /* signal threads waiting to pop */
-       pthread_cond_signal(&queue->cond);
+       pthread_cond_signal(&queue->push_cond);
        pthread_mutex_unlock(&queue->mutex);
 }
 
@@ -565,11 +570,15 @@ void *BLI_thread_queue_pop(ThreadQueue *queue)
        /* wait until there is work */
        pthread_mutex_lock(&queue->mutex);
        while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait)
-               pthread_cond_wait(&queue->cond, &queue->mutex);
-
+               pthread_cond_wait(&queue->push_cond, &queue->mutex);
+       
        /* if we have something, pop it */
-       if (!BLI_gsqueue_is_empty(queue->queue))
+       if (!BLI_gsqueue_is_empty(queue->queue)) {
                BLI_gsqueue_pop(queue->queue, &work);
+               
+               if(BLI_gsqueue_is_empty(queue->queue))
+                       pthread_cond_broadcast(&queue->finish_cond);
+       }
 
        pthread_mutex_unlock(&queue->mutex);
 
@@ -623,16 +632,20 @@ void *BLI_thread_queue_pop_timeout(ThreadQueue *queue, int ms)
        /* wait until there is work */
        pthread_mutex_lock(&queue->mutex);
        while (BLI_gsqueue_is_empty(queue->queue) && !queue->nowait) {
-               if (pthread_cond_timedwait(&queue->cond, &queue->mutex, &timeout) == ETIMEDOUT)
+               if (pthread_cond_timedwait(&queue->push_cond, &queue->mutex, &timeout) == ETIMEDOUT)
                        break;
                else if (PIL_check_seconds_timer() - t >= ms * 0.001)
                        break;
        }
 
        /* if we have something, pop it */
-       if (!BLI_gsqueue_is_empty(queue->queue))
+       if (!BLI_gsqueue_is_empty(queue->queue)) {
                BLI_gsqueue_pop(queue->queue, &work);
-
+               
+               if(BLI_gsqueue_is_empty(queue->queue))
+                       pthread_cond_broadcast(&queue->finish_cond);
+       }
+       
        pthread_mutex_unlock(&queue->mutex);
 
        return work;
@@ -656,10 +669,23 @@ void BLI_thread_queue_nowait(ThreadQueue *queue)
        queue->nowait = 1;
 
        /* signal threads waiting to pop */
-       pthread_cond_signal(&queue->cond);
+       pthread_cond_broadcast(&queue->push_cond);
+       pthread_mutex_unlock(&queue->mutex);
+}
+
+void BLI_thread_queue_wait_finish(ThreadQueue *queue)
+{
+       /* wait for finish condition */
+       pthread_mutex_lock(&queue->mutex);
+
+    while(!BLI_gsqueue_is_empty(queue->queue))
+               pthread_cond_wait(&queue->finish_cond, &queue->mutex);
+
        pthread_mutex_unlock(&queue->mutex);
 }
 
+/* ************************************************ */
+
 void BLI_begin_threaded_malloc(void)
 {
        if (thread_levels == 0) {
@@ -674,3 +700,4 @@ void BLI_end_threaded_malloc(void)
        if (thread_levels == 0)
                MEM_set_lock_callback(NULL, NULL);
 }
+
index 44b3c8dafbb368681fec916294da957cc6a3d3ec..481b83c81a3f48330a2af7461270607363125c7c 100644 (file)
@@ -351,7 +351,8 @@ void ExecutionGroup::execute(ExecutionSystem *graph)
                                startIndex = index+1;
                        }
                }
-               PIL_sleep_ms(10);
+
+               WorkScheduler::finish();
 
                if (bTree->test_break && bTree->test_break(bTree->tbh)) {
                        breaked = true;
index 172107f720b94e220d5301f77e9d0a3bed4849db..ba8bfe55310799b7f1a70d640ffec59c42c18d56 100644 (file)
@@ -39,8 +39,6 @@
 #endif
 
 
-/// @brief global state of the WorkScheduler.
-static WorkSchedulerState state;
 /// @brief list of all CPUDevices. for every hardware thread an instance of CPUDevice is created
 static vector<CPUDevice*> cpudevices;
 
@@ -68,43 +66,29 @@ static bool openclActive = false;
 #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
 void *WorkScheduler::thread_execute_cpu(void *data)
 {
-       bool continueLoop = true;
        Device *device = (Device*)data;
-       while (continueLoop) {
-               WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue);
-               if (work) {
-                       device->execute(work);
-                       delete work;
-               }
-               PIL_sleep_ms(10);
-
-               if (WorkScheduler::isStopping()) {
-                       continueLoop = false;
-               }
+       WorkPackage *work;
+       
+       while ((work = (WorkPackage*)BLI_thread_queue_pop(cpuqueue))) {
+               device->execute(work);
+               delete work;
        }
+       
        return NULL;
 }
 
 void *WorkScheduler::thread_execute_gpu(void *data)
 {
-       bool continueLoop = true;
        Device *device = (Device*)data;
-       while (continueLoop) {
-               WorkPackage *work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue);
-               if (work) {
-                       device->execute(work);
-                       delete work;
-               }
-               PIL_sleep_ms(10);
-
-               if (WorkScheduler::isStopping()) {
-                       continueLoop = false;
-               }
+       WorkPackage *work;
+       
+       while ((work = (WorkPackage*)BLI_thread_queue_pop(gpuqueue))) {
+               device->execute(work);
+               delete work;
        }
+       
        return NULL;
 }
-
-bool WorkScheduler::isStopping() {return state == COM_WSS_STOPPING;}
 #endif
 
 
@@ -135,7 +119,6 @@ void WorkScheduler::start(CompositorContext &context)
 #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
        unsigned int index;
        cpuqueue = BLI_thread_queue_init();
-       BLI_thread_queue_nowait(cpuqueue);
        BLI_init_threads(&cputhreads, thread_execute_cpu, cpudevices.size());
        for (index = 0 ; index < cpudevices.size() ; index ++) {
                Device *device = cpudevices[index];
@@ -144,7 +127,6 @@ void WorkScheduler::start(CompositorContext &context)
 #ifdef COM_OPENCL_ENABLED
        if (context.getHasActiveOpenCLDevices()) {
                gpuqueue = BLI_thread_queue_init();
-               BLI_thread_queue_nowait(gpuqueue);
                BLI_init_threads(&gputhreads, thread_execute_gpu, gpudevices.size());
                for (index = 0 ; index < gpudevices.size() ; index ++) {
                        Device *device = gpudevices[index];
@@ -157,45 +139,39 @@ void WorkScheduler::start(CompositorContext &context)
        }
 #endif
 #endif
-       state = COM_WSS_STARTED;
 }
 void WorkScheduler::finish()
 {
 #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
 #ifdef COM_OPENCL_ENABLED
        if (openclActive) {
-               while (BLI_thread_queue_size(gpuqueue) + BLI_thread_queue_size(cpuqueue) > 0) {
-                       PIL_sleep_ms(10);
-               }
+               BLI_thread_queue_wait_finish(gpuqueue);
+               BLI_thread_queue_wait_finish(cpuqueue);
        }
        else {
-               while (BLI_thread_queue_size(cpuqueue) > 0) {
-                       PIL_sleep_ms(10);
-               }
+               BLI_thread_queue_wait_finish(cpuqueue);
        }
 #else
-       while (BLI_thread_queue_size(cpuqueue) > 0) {
-               PIL_sleep_ms(10);
-       }
+       BLI_thread_queue_wait_finish(cpuqueue);
 #endif
 #endif
 }
 void WorkScheduler::stop()
 {
-       state = COM_WSS_STOPPING;
 #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
+       BLI_thread_queue_nowait(cpuqueue);
        BLI_end_threads(&cputhreads);
        BLI_thread_queue_free(cpuqueue);
        cpuqueue = NULL;
 #ifdef COM_OPENCL_ENABLED
        if (openclActive) {
+               BLI_thread_queue_nowait(gpuqueue);
                BLI_end_threads(&gputhreads);
                BLI_thread_queue_free(gpuqueue);
                gpuqueue = NULL;
        }
 #endif
 #endif
-       state = COM_WSS_STOPPED;
 }
 
 bool WorkScheduler::hasGPUDevices()
@@ -218,8 +194,6 @@ extern void clContextError(const char *errinfo, const void *private_info, size_t
 
 void WorkScheduler::initialize()
 {
-       state = COM_WSS_UNKNOWN;
-
 #if COM_CURRENT_THREADING_MODEL == COM_TM_QUEUE
        int numberOfCPUThreads = BLI_system_thread_count();
 
@@ -298,8 +272,6 @@ void WorkScheduler::initialize()
        }
 #endif
 #endif
-
-       state = COM_WSS_INITIALIZED;
 }
 
 void WorkScheduler::deinitialize()
@@ -329,5 +301,4 @@ void WorkScheduler::deinitialize()
        }
 #endif
 #endif
-       state = COM_WSS_DEINITIALIZED;
 }
index 0de1763749e72ec360a11b08442550e2e5821f72..b03b514d1391d6c3fde89debe43b7b3870385e58 100644 (file)
@@ -31,19 +31,6 @@ extern "C" {
 #include "COM_defines.h"
 #include "COM_Device.h"
 
-// STATES
-/** @brief states of the WorkScheduler
-  * @ingroup execution
-  */
-typedef enum WorkSchedulerState {
-       COM_WSS_UNKNOWN = -1,
-       COM_WSS_INITIALIZED = 0,
-       COM_WSS_STARTED = 1,
-       COM_WSS_STOPPING = 2,
-       COM_WSS_STOPPED = 3,
-       COM_WSS_DEINITIALIZED = 4
-} WorkSchedulerState;
-
 /** @brief the workscheduler
   * @ingroup execution
   */