Task scheduler ported to C
authorSergey Sharybin <sergey.vfx@gmail.com>
Mon, 1 Jul 2013 21:23:20 +0000 (21:23 +0000)
committerSergey Sharybin <sergey.vfx@gmail.com>
Mon, 1 Jul 2013 21:23:20 +0000 (21:23 +0000)
Patch by Brecht, which in original version also
gets rid of ThreadedWorker in favor of new task
scheduler and ports some areas to it.

Kudos to Brecht for this work!

source/blender/blenlib/BLI_task.h [new file with mode: 0644]
source/blender/blenlib/BLI_threads.h
source/blender/blenlib/CMakeLists.txt
source/blender/blenlib/intern/task.c [new file with mode: 0644]
source/blender/blenlib/intern/threads.c

diff --git a/source/blender/blenlib/BLI_task.h b/source/blender/blenlib/BLI_task.h
new file mode 100644 (file)
index 0000000..f57d428
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ * ***** END GPL LICENSE BLOCK *****
+ */
+
+#ifndef __BLI_TASK_H__
+#define __BLI_TASK_H__ 
+
+/** \file BLI_task.h
+ *  \ingroup bli
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "BLI_threads.h"
+#include "BLI_utildefines.h"
+
+/* Task Scheduler
+ * 
+ * Central scheduler that holds running threads ready to execute tasks. A single
+ * queue holds the task from all pools.
+ *
+ * Init/exit must be called before/after any task pools are created/freed, and
+ * must be called from the main threads. All other scheduler and pool functions
+ * are thread-safe. */
+
+typedef struct TaskScheduler TaskScheduler;
+
+enum {
+       TASK_SCHEDULER_AUTO_THREADS = 0,
+       TASK_SCHEDULER_SINGLE_THREAD = 1
+};
+
+TaskScheduler *BLI_task_scheduler_create(int num_threads);
+void BLI_task_scheduler_free(TaskScheduler *scheduler);
+
+int BLI_task_scheduler_num_threads(TaskScheduler *scheduler);
+
+/* Task Pool
+ *
+ * Pool of tasks that will be executed by the central TaskScheduler. For each
+ * pool, we can wait for all tasks to be done, or cancel them before they are
+ * done.
+ *
+ * Running tasks may spawn new tasks.
+ *
+ * Pools may be nested, i.e. a thread running a task can create another task
+ * pool with smaller tasks. When other threads are busy they will continue
+ * working on their own tasks, if not they will join in, no new threads will
+ * be launched.
+ */
+
+typedef enum TaskPriority {
+       TASK_PRIORITY_LOW,
+       TASK_PRIORITY_HIGH
+} TaskPriority;
+
+typedef struct TaskPool TaskPool;
+typedef void (*TaskRunFunction)(TaskPool *pool, void *taskdata, int threadid);
+
+TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata);
+void BLI_task_pool_free(TaskPool *pool);
+
+void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run,
+       void *taskdata, bool free_taskdata, TaskPriority priority);
+
+/* work and wait until all tasks are done */
+void BLI_task_pool_work_and_wait(TaskPool *pool);
+/* cancel all tasks, keep worker threads running */
+void BLI_task_pool_cancel(TaskPool *pool);
+/* stop all worker threads */
+void BLI_task_pool_stop(TaskPool *pool);
+
+/* for worker threads, test if cancelled */
+bool BLI_task_pool_cancelled(TaskPool *pool);
+
+/* optional userdata pointer to pass along to run function */
+void *BLI_task_pool_userdata(TaskPool *pool);
+
+/* optional mutex to use from run function */
+ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool);
+
+/* number of tasks done, for stats, don't use this to make decisions */
+size_t BLI_task_pool_tasks_done(TaskPool *pool);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
index 331cac3ed7686e1e588dcbe6725cf35f07b6c635..a64745d1dfe3ea04a36e4419cb1742d1d836397d 100644 (file)
@@ -100,6 +100,7 @@ ThreadMutex *BLI_mutex_alloc(void);
 void BLI_mutex_free(ThreadMutex *mutex);
 
 void BLI_mutex_lock(ThreadMutex *mutex);
+bool BLI_mutex_trylock(ThreadMutex *mutex);
 void BLI_mutex_unlock(ThreadMutex *mutex);
 
 /* Spin Lock */
@@ -170,6 +171,16 @@ int BLI_thread_queue_size(ThreadQueue *queue);
 void BLI_thread_queue_wait_finish(ThreadQueue *queue);
 void BLI_thread_queue_nowait(ThreadQueue *queue);
 
+/* Condition */
+
+typedef pthread_cond_t ThreadCondition;
+
+void BLI_condition_init(ThreadCondition *cond);
+void BLI_condition_wait(ThreadCondition *cond, ThreadMutex *mutex);
+void BLI_condition_notify_one(ThreadCondition *cond);
+void BLI_condition_notify_all(ThreadCondition *cond);
+void BLI_condition_end(ThreadCondition *cond);
+
 #ifdef __cplusplus
 }
 #endif
index 1d94ca9afbca3a7cbfc9b5deaaa6476efd662266..3ae6e8f96564451b539ead86155591e52a9c5a67 100644 (file)
@@ -91,6 +91,7 @@ set(SRC
        intern/string.c
        intern/string_cursor_utf8.c
        intern/string_utf8.c
+       intern/task.c
        intern/threads.c
        intern/time.c
        intern/uvproject.c
@@ -150,6 +151,7 @@ set(SRC
        BLI_string_cursor_utf8.h
        BLI_string_utf8.h
        BLI_sys_types.h
+       BLI_task.h
        BLI_threads.h
        BLI_utildefines.h
        BLI_uvproject.h
diff --git a/source/blender/blenlib/intern/task.c b/source/blender/blenlib/intern/task.c
new file mode 100644 (file)
index 0000000..5a8b000
--- /dev/null
@@ -0,0 +1,402 @@
+/*
+ * ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ * ***** END GPL LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+
+#include "MEM_guardedalloc.h"
+
+#include "BLI_listbase.h"
+#include "BLI_task.h"
+#include "BLI_threads.h"
+
+/* Types */
+
+typedef struct Task {
+       struct Task *next, *prev;
+
+       TaskRunFunction run;
+       void *taskdata;
+       bool free_taskdata;
+       TaskPool *pool;
+} Task;
+
+struct TaskPool {
+       TaskScheduler *scheduler;
+
+       volatile size_t num;
+       volatile size_t done;
+       ThreadMutex num_mutex;
+       ThreadCondition num_cond;
+
+       void *userdata;
+       ThreadMutex user_mutex;
+
+       volatile bool do_cancel;
+};
+
+struct TaskScheduler {
+       pthread_t *threads;
+       int num_threads;
+
+       ListBase queue;
+       ThreadMutex queue_mutex;
+       ThreadCondition queue_cond;
+
+       volatile bool do_exit;
+};
+
+typedef struct TaskThread {
+       TaskScheduler *scheduler;
+       int id;
+} TaskThread;
+
+/* Task Scheduler */
+
+static void task_pool_num_decrease(TaskPool *pool, size_t done)
+{
+       BLI_mutex_lock(&pool->num_mutex);
+       pool->num -= done;
+       pool->done += done;
+
+       BLI_assert(pool->num >= 0);
+       if(pool->num == 0)
+               BLI_condition_notify_all(&pool->num_cond);
+
+       BLI_mutex_unlock(&pool->num_mutex);
+}
+
+static void task_pool_num_increase(TaskPool *pool)
+{
+       BLI_mutex_lock(&pool->num_mutex);
+
+       pool->num++;
+       BLI_condition_notify_all(&pool->num_cond);
+
+       BLI_mutex_unlock(&pool->num_mutex);
+}
+
+static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
+{
+       BLI_mutex_lock(&scheduler->queue_mutex);
+
+       while(!scheduler->queue.first && !scheduler->do_exit)
+               BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
+
+       if(!scheduler->queue.first) {
+               BLI_mutex_unlock(&scheduler->queue_mutex);
+               BLI_assert(scheduler->do_exit);
+               return false;
+       }
+       
+       *task = scheduler->queue.first;
+       BLI_remlink(&scheduler->queue, *task);
+
+       BLI_mutex_unlock(&scheduler->queue_mutex);
+
+       return true;
+}
+
+static void *task_scheduler_thread_run(void *thread_p)
+{
+       TaskThread *thread = (TaskThread*)thread_p;
+       TaskScheduler *scheduler = thread->scheduler;
+       int thread_id = thread->id;
+       Task *task;
+
+       /* keep popping off tasks */
+       while(task_scheduler_thread_wait_pop(scheduler, &task)) {
+               /* run task */
+               task->run(task->pool, task->taskdata, thread_id);
+
+               /* notify pool task was done */
+               task_pool_num_decrease(task->pool, 1);
+
+               /* delete task */
+               if(task->free_taskdata)
+                       MEM_freeN(task->taskdata);
+               MEM_freeN(task);
+       }
+
+       MEM_freeN(thread);
+
+       return NULL;
+}
+
+TaskScheduler *BLI_task_scheduler_create(int num_threads)
+{
+       TaskScheduler *scheduler = MEM_callocN(sizeof(TaskScheduler), "TaskScheduler");
+
+       /* multiple places can use this task scheduler, sharing the same
+        * threads, so we keep track of the number of users. */
+       scheduler->do_exit = false;
+
+       scheduler->queue.first = scheduler->queue.last = NULL;
+       BLI_mutex_init(&scheduler->queue_mutex);
+       BLI_condition_init(&scheduler->queue_cond);
+
+       if(num_threads == 0) {
+               /* automatic number of threads will be main thread + num cores */
+               num_threads = BLI_system_thread_count();
+       }
+
+       /* main thread will also work, so we count it too */
+       num_threads -= 1;
+
+       /* launch threads that will be waiting for work */
+       if(num_threads > 0) {
+               int i;
+
+               scheduler->num_threads = num_threads;
+               scheduler->threads = MEM_callocN(sizeof(pthread_t)*num_threads, "TaskScheduler threads");
+
+               for(i = 0; i < num_threads; i++) {
+                       TaskThread *thread = MEM_callocN(sizeof(TaskThread), "TaskThread");
+                       thread->scheduler = scheduler;
+                       thread->id = i+1;
+
+                       if(pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) {
+                               fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads);
+                               MEM_freeN(thread);
+                       }
+               }
+       }
+       
+       return scheduler;
+}
+
+void BLI_task_scheduler_free(TaskScheduler *scheduler)
+{
+       Task *task;
+
+       /* stop all waiting threads */
+       scheduler->do_exit = true;
+       BLI_condition_notify_all(&scheduler->queue_cond);
+
+       /* delete threads */
+       if(scheduler->threads) {
+               int i;
+
+               for(i = 0; i < scheduler->num_threads; i++) {
+                       if(pthread_join(scheduler->threads[i], NULL) != 0)
+                               fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads);
+               }
+
+               MEM_freeN(scheduler->threads);
+       }
+
+       /* delete leftover tasks */
+       for(task = scheduler->queue.first; task; task = task->next)
+               if(task->free_taskdata)
+                       MEM_freeN(task->taskdata);
+       BLI_freelistN(&scheduler->queue);
+
+       /* delete mutex/condition */
+       BLI_mutex_end(&scheduler->queue_mutex);
+       BLI_condition_end(&scheduler->queue_cond);
+
+       MEM_freeN(scheduler);
+}
+
+int BLI_task_scheduler_num_threads(TaskScheduler *scheduler)
+{
+       return scheduler->num_threads + 1;
+}
+
+static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority)
+{
+       task_pool_num_increase(task->pool);
+
+       /* add task to queue */
+       BLI_mutex_lock(&scheduler->queue_mutex);
+
+       if(priority == TASK_PRIORITY_HIGH)
+               BLI_addhead(&scheduler->queue, task);
+       else
+               BLI_addtail(&scheduler->queue, task);
+
+       BLI_condition_notify_one(&scheduler->queue_cond);
+       BLI_mutex_unlock(&scheduler->queue_mutex);
+}
+
+static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
+{
+       Task *task, *nexttask;
+       size_t done = 0;
+
+       BLI_mutex_lock(&scheduler->queue_mutex);
+
+       /* free all tasks from this pool from the queue */
+       for(task = scheduler->queue.first; task; task = nexttask) {
+               nexttask = task->next;
+
+               if(task->pool == pool) {
+                       if(task->free_taskdata)
+                               MEM_freeN(task->taskdata);
+                       BLI_freelinkN(&scheduler->queue, task);
+
+                       done++;
+               }
+       }
+
+       BLI_mutex_unlock(&scheduler->queue_mutex);
+
+       /* notify done */
+       task_pool_num_decrease(pool, done);
+}
+
+/* Task Pool */
+
+TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata)
+{
+       TaskPool *pool = MEM_callocN(sizeof(TaskPool), "TaskPool");
+
+       pool->scheduler = scheduler;
+       pool->num = 0;
+       pool->do_cancel = false;
+
+       BLI_mutex_init(&pool->num_mutex);
+       BLI_condition_init(&pool->num_cond);
+
+       pool->userdata = userdata;
+       BLI_mutex_init(&pool->user_mutex);
+
+       return pool;
+}
+
+void BLI_task_pool_free(TaskPool *pool)
+{
+       BLI_task_pool_stop(pool);
+
+       BLI_mutex_end(&pool->num_mutex);
+       BLI_condition_end(&pool->num_cond);
+
+       BLI_mutex_end(&pool->user_mutex);
+
+       MEM_freeN(pool);
+}
+
+void BLI_task_pool_push(TaskPool *pool, TaskRunFunction run,
+       void *taskdata, bool free_taskdata, TaskPriority priority)
+{
+       Task *task = MEM_callocN(sizeof(Task), "Task");
+
+       task->run = run;
+       task->taskdata = taskdata;
+       task->free_taskdata = free_taskdata;
+       task->pool = pool;
+
+       task_scheduler_push(pool->scheduler, task, priority);
+}
+
+void BLI_task_pool_work_and_wait(TaskPool *pool)
+{
+       TaskScheduler *scheduler = pool->scheduler;
+
+       BLI_mutex_lock(&pool->num_mutex);
+
+       while(pool->num != 0) {
+               Task *task, *work_task = NULL;
+               bool found_task = false;
+
+               BLI_mutex_unlock(&pool->num_mutex);
+
+               BLI_mutex_lock(&scheduler->queue_mutex);
+
+               /* find task from this pool. if we get a task from another pool,
+                * we can get into deadlock */
+
+               for(task = scheduler->queue.first; task; task = task->next) {
+                       if(task->pool == pool) {
+                               work_task = task;
+                               found_task = true;
+                               BLI_remlink(&scheduler->queue, task);
+                               break;
+                       }
+               }
+
+               BLI_mutex_unlock(&scheduler->queue_mutex);
+
+               /* if found task, do it, otherwise wait until other tasks are done */
+               if(found_task) {
+                       /* run task */
+                       work_task->run(pool, work_task->taskdata, 0);
+
+                       /* delete task */
+                       if(work_task->free_taskdata)
+                               MEM_freeN(work_task->taskdata);
+                       MEM_freeN(work_task);
+
+                       /* notify pool task was done */
+                       task_pool_num_decrease(pool, 1);
+               }
+
+               BLI_mutex_lock(&pool->num_mutex);
+               if(pool->num == 0)
+                       break;
+
+               if(!found_task)
+                       BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
+       }
+
+       BLI_mutex_unlock(&pool->num_mutex);
+}
+
+void BLI_task_pool_cancel(TaskPool *pool)
+{
+       pool->do_cancel = true;
+
+       task_scheduler_clear(pool->scheduler, pool);
+
+       /* wait until all entries are cleared */
+       BLI_mutex_lock(&pool->num_mutex);
+       while(pool->num)
+               BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
+       BLI_mutex_unlock(&pool->num_mutex);
+
+       pool->do_cancel = false;
+}
+
+void BLI_task_pool_stop(TaskPool *pool)
+{
+       task_scheduler_clear(pool->scheduler, pool);
+
+       BLI_assert(pool->num == 0);
+}
+
+bool BLI_task_pool_cancelled(TaskPool *pool)
+{
+       return pool->do_cancel;
+}
+
+void *BLI_task_pool_userdata(TaskPool *pool)
+{
+       return pool->userdata;
+}
+
+ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
+{
+       return &pool->user_mutex;
+}
+
+size_t BLI_task_pool_tasks_done(TaskPool *pool)
+{
+       return pool->done;
+}
+
index e0ea3bbf685afbf5a44edeedb043fc3a5b51b7de..cd3132682b6e05c5f02e7fffb6b04e4fcc03f3b9 100644 (file)
@@ -412,6 +412,11 @@ void BLI_mutex_unlock(ThreadMutex *mutex)
        pthread_mutex_unlock(mutex);
 }
 
+bool BLI_mutex_trylock(ThreadMutex *mutex)
+{
+       return (pthread_mutex_trylock(mutex) == 0);
+}
+
 void BLI_mutex_end(ThreadMutex *mutex)
 {
        pthread_mutex_destroy(mutex);
@@ -772,6 +777,32 @@ void BLI_thread_queue_wait_finish(ThreadQueue *queue)
        pthread_mutex_unlock(&queue->mutex);
 }
 
+/* Condition */
+void BLI_condition_init(ThreadCondition *cond)
+{
+       pthread_cond_init(cond, NULL);
+}
+
+void BLI_condition_wait(ThreadCondition *cond, ThreadMutex *mutex)
+{
+       pthread_cond_wait(cond, mutex);
+}
+
+void BLI_condition_notify_one(ThreadCondition *cond)
+{
+       pthread_cond_signal(cond);
+}
+
+void BLI_condition_notify_all(ThreadCondition *cond)
+{
+       pthread_cond_broadcast(cond);
+}
+
+void BLI_condition_end(ThreadCondition *cond)
+{
+       pthread_cond_destroy(cond);
+}
+
 /* ************************************************ */
 
 void BLI_begin_threaded_malloc(void)