Get rid of `BLI_task_pool_stop()`.
[blender-staging.git] / source / blender / blenlib / intern / task.c
1 /*
2  * ***** BEGIN GPL LICENSE BLOCK *****
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  *
18  * ***** END GPL LICENSE BLOCK *****
19  */
20
21 /** \file blender/blenlib/intern/task.c
22  *  \ingroup bli
23  *
24  * A generic task system which can be used for any task based subsystem.
25  */
26
27 #include <stdlib.h>
28
29 #include "MEM_guardedalloc.h"
30
31 #include "DNA_listBase.h"
32
33 #include "BLI_listbase.h"
34 #include "BLI_math.h"
35 #include "BLI_task.h"
36 #include "BLI_threads.h"
37
38 #include "atomic_ops.h"
39
40 /* Define this to enable some detailed statistic print. */
41 #undef DEBUG_STATS
42
43 /* Types */
44
45 /* Number of per-thread pre-allocated tasks.
46  *
47  * For more details see description of TaskMemPool.
48  */
49 #define MEMPOOL_SIZE 256
50
51 typedef struct Task {
52         struct Task *next, *prev;
53
54         TaskRunFunction run;
55         void *taskdata;
56         bool free_taskdata;
57         TaskFreeFunction freedata;
58         TaskPool *pool;
59 } Task;
60
61 /* This is a per-thread storage of pre-allocated tasks.
62  *
63  * The idea behind this is simple: reduce amount of malloc() calls when pushing
64  * new task to the pool. This is done by keeping memory from the tasks which
65  * were finished already, so instead of freeing that memory we put it to the
66  * pool for the later re-use.
67  *
68  * The tricky part here is to avoid any inter-thread synchronization, hence no
69  * lock must exist around this pool. The pool will become an owner of the pointer
70  * from freed task, and only corresponding thread will be able to use this pool
71  * (no memory stealing and such).
72  *
73  * This leads to the following use of the pool:
74  *
75  * - task_push() should provide proper thread ID from which the task is being
76  *   pushed from.
77  *
78  * - Task allocation function which check corresponding memory pool and if there
79  *   is any memory in there it'll mark memory as re-used, remove it from the pool
80  *   and use that memory for the new task.
81  *
82  *   At this moment task queue owns the memory.
83  *
84  * - When task is done and task_free() is called the memory will be put to the
85  *  pool which corresponds to a thread which handled the task.
86  */
87 typedef struct TaskMemPool {
88         /* Number of pre-allocated tasks in the pool. */
89         int num_tasks;
90         /* Pre-allocated task memory pointers. */
91         Task *tasks[MEMPOOL_SIZE];
92 } TaskMemPool;
93
94 #ifdef DEBUG_STATS
95 typedef struct TaskMemPoolStats {
96         /* Number of allocations. */
97         int num_alloc;
98         /* Number of avoided allocations (pointer was re-used from the pool). */
99         int num_reuse;
100         /* Number of discarded memory due to pool saturation, */
101         int num_discard;
102 } TaskMemPoolStats;
103 #endif
104
105 struct TaskPool {
106         TaskScheduler *scheduler;
107
108         volatile size_t num;
109         size_t num_threads;
110         size_t currently_running_tasks;
111         ThreadMutex num_mutex;
112         ThreadCondition num_cond;
113
114         void *userdata;
115         ThreadMutex user_mutex;
116
117         volatile bool do_cancel;
118
119         /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler
120          * has to use its special background fallback thread in case we are in
121          * single-threaded situation.
122          */
123         bool run_in_background;
124
125         /* This pool is used for caching task pointers for thread id 0.
126          * This could either point to a global scheduler's task_mempool[0] if the
127          * pool is handled form the main thread or point to task_mempool_local
128          * otherwise.
129          *
130          * This way we solve possible threading conflicts accessing same global
131          * memory pool from multiple threads from which wait_work() is called.
132          */
133         TaskMemPool *task_mempool;
134         TaskMemPool task_mempool_local;
135
136 #ifdef DEBUG_STATS
137         TaskMemPoolStats *mempool_stats;
138 #endif
139 };
140
141 struct TaskScheduler {
142         pthread_t *threads;
143         struct TaskThread *task_threads;
144         TaskMemPool *task_mempool;
145         int num_threads;
146         bool background_thread_only;
147
148         ListBase queue;
149         ThreadMutex queue_mutex;
150         ThreadCondition queue_cond;
151
152         volatile bool do_exit;
153 };
154
155 typedef struct TaskThread {
156         TaskScheduler *scheduler;
157         int id;
158 } TaskThread;
159
160 /* Helper */
161 static void task_data_free(Task *task, const int thread_id)
162 {
163         if (task->free_taskdata) {
164                 if (task->freedata) {
165                         task->freedata(task->pool, task->taskdata, thread_id);
166                 }
167                 else {
168                         MEM_freeN(task->taskdata);
169                 }
170         }
171 }
172
173 BLI_INLINE TaskMemPool *get_task_mempool(TaskPool *pool, const int thread_id)
174 {
175         if (thread_id == 0) {
176                 return pool->task_mempool;
177         }
178         return &pool->scheduler->task_mempool[thread_id];
179 }
180
181 static Task *task_alloc(TaskPool *pool, const int thread_id)
182 {
183         assert(thread_id <= pool->scheduler->num_threads);
184         if (thread_id != -1) {
185                 assert(thread_id >= 0);
186                 TaskMemPool *mem_pool = get_task_mempool(pool, thread_id);
187                 /* Try to re-use task memory from a thread local storage. */
188                 if (mem_pool->num_tasks > 0) {
189                         --mem_pool->num_tasks;
190                         /* Success! We've just avoided task allocation. */
191 #ifdef DEBUG_STATS
192                         pool->mempool_stats[thread_id].num_reuse++;
193 #endif
194                         return mem_pool->tasks[mem_pool->num_tasks];
195                 }
196                 /* We are doomed to allocate new task data. */
197 #ifdef DEBUG_STATS
198                 pool->mempool_stats[thread_id].num_alloc++;
199 #endif
200         }
201         return MEM_mallocN(sizeof(Task), "New task");
202 }
203
204 static void task_free(TaskPool *pool, Task *task, const int thread_id)
205 {
206         task_data_free(task, thread_id);
207         assert(thread_id >= 0);
208         assert(thread_id <= pool->scheduler->num_threads);
209         TaskMemPool *mem_pool = get_task_mempool(pool, thread_id);
210         if (mem_pool->num_tasks < MEMPOOL_SIZE - 1) {
211                 /* Successfully allowed the task to be re-used later. */
212                 mem_pool->tasks[mem_pool->num_tasks] = task;
213                 ++mem_pool->num_tasks;
214         }
215         else {
216                 /* Local storage saturated, no other way than just discard
217                  * the memory.
218                  *
219                  * TODO(sergey): We can perhaps store such pointer in a global
220                  * scheduler pool, maybe it'll be faster than discarding and
221                  * allocating again.
222                  */
223                 MEM_freeN(task);
224 #ifdef DEBUG_STATS
225                 pool->mempool_stats[thread_id].num_discard++;
226 #endif
227         }
228 }
229
230 /* Task Scheduler */
231
232 static void task_pool_num_decrease(TaskPool *pool, size_t done)
233 {
234         BLI_mutex_lock(&pool->num_mutex);
235
236         BLI_assert(pool->num >= done);
237
238         pool->num -= done;
239         atomic_sub_and_fetch_z(&pool->currently_running_tasks, done);
240
241         if (pool->num == 0)
242                 BLI_condition_notify_all(&pool->num_cond);
243
244         BLI_mutex_unlock(&pool->num_mutex);
245 }
246
247 static void task_pool_num_increase(TaskPool *pool)
248 {
249         BLI_mutex_lock(&pool->num_mutex);
250
251         pool->num++;
252         BLI_condition_notify_all(&pool->num_cond);
253
254         BLI_mutex_unlock(&pool->num_mutex);
255 }
256
257 static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task)
258 {
259         bool found_task = false;
260         BLI_mutex_lock(&scheduler->queue_mutex);
261
262         while (!scheduler->queue.first && !scheduler->do_exit)
263                 BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
264
265         do {
266                 Task *current_task;
267
268                 /* Assuming we can only have a void queue in 'exit' case here seems logical (we should only be here after
269                  * our worker thread has been woken up from a condition_wait(), which only happens after a new task was
270                  * added to the queue), but it is wrong.
271                  * Waiting on condition may wake up the thread even if condition is not signaled (spurious wake-ups), and some
272                  * race condition may also empty the queue **after** condition has been signaled, but **before** awoken thread
273                  * reaches this point...
274                  * See http://stackoverflow.com/questions/8594591
275                  *
276                  * So we only abort here if do_exit is set.
277                  */
278                 if (scheduler->do_exit) {
279                         BLI_mutex_unlock(&scheduler->queue_mutex);
280                         return false;
281                 }
282
283                 for (current_task = scheduler->queue.first;
284                      current_task != NULL;
285                      current_task = current_task->next)
286                 {
287                         TaskPool *pool = current_task->pool;
288
289                         if (scheduler->background_thread_only && !pool->run_in_background) {
290                                 continue;
291                         }
292
293                         if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads ||
294                             pool->num_threads == 0)
295                         {
296                                 *task = current_task;
297                                 found_task = true;
298                                 BLI_remlink(&scheduler->queue, *task);
299                                 break;
300                         }
301                         else {
302                                 atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1);
303                         }
304                 }
305                 if (!found_task)
306                         BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex);
307         } while (!found_task);
308
309         BLI_mutex_unlock(&scheduler->queue_mutex);
310
311         return true;
312 }
313
314 static void *task_scheduler_thread_run(void *thread_p)
315 {
316         TaskThread *thread = (TaskThread *) thread_p;
317         TaskScheduler *scheduler = thread->scheduler;
318         int thread_id = thread->id;
319         Task *task;
320
321         /* keep popping off tasks */
322         while (task_scheduler_thread_wait_pop(scheduler, &task)) {
323                 TaskPool *pool = task->pool;
324
325                 /* run task */
326                 task->run(pool, task->taskdata, thread_id);
327
328                 /* delete task */
329                 task_free(pool, task, thread_id);
330
331                 /* notify pool task was done */
332                 task_pool_num_decrease(pool, 1);
333         }
334
335         return NULL;
336 }
337
338 TaskScheduler *BLI_task_scheduler_create(int num_threads)
339 {
340         TaskScheduler *scheduler = MEM_callocN(sizeof(TaskScheduler), "TaskScheduler");
341
342         /* multiple places can use this task scheduler, sharing the same
343          * threads, so we keep track of the number of users. */
344         scheduler->do_exit = false;
345
346         BLI_listbase_clear(&scheduler->queue);
347         BLI_mutex_init(&scheduler->queue_mutex);
348         BLI_condition_init(&scheduler->queue_cond);
349
350         if (num_threads == 0) {
351                 /* automatic number of threads will be main thread + num cores */
352                 num_threads = BLI_system_thread_count();
353         }
354
355         /* main thread will also work, so we count it too */
356         num_threads -= 1;
357
358         /* Add background-only thread if needed. */
359         if (num_threads == 0) {
360                 scheduler->background_thread_only = true;
361                 num_threads = 1;
362         }
363
364         /* launch threads that will be waiting for work */
365         if (num_threads > 0) {
366                 int i;
367
368                 scheduler->num_threads = num_threads;
369                 scheduler->threads = MEM_callocN(sizeof(pthread_t) * num_threads, "TaskScheduler threads");
370                 scheduler->task_threads = MEM_callocN(sizeof(TaskThread) * num_threads, "TaskScheduler task threads");
371
372                 for (i = 0; i < num_threads; i++) {
373                         TaskThread *thread = &scheduler->task_threads[i];
374                         thread->scheduler = scheduler;
375                         thread->id = i + 1;
376
377                         if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) {
378                                 fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads);
379                         }
380                 }
381
382                 scheduler->task_mempool = MEM_callocN(sizeof(*scheduler->task_mempool) * (num_threads + 1),
383                                                       "TaskScheduler task_mempool");
384         }
385
386         return scheduler;
387 }
388
389 void BLI_task_scheduler_free(TaskScheduler *scheduler)
390 {
391         Task *task;
392
393         /* stop all waiting threads */
394         BLI_mutex_lock(&scheduler->queue_mutex);
395         scheduler->do_exit = true;
396         BLI_condition_notify_all(&scheduler->queue_cond);
397         BLI_mutex_unlock(&scheduler->queue_mutex);
398
399         /* delete threads */
400         if (scheduler->threads) {
401                 int i;
402
403                 for (i = 0; i < scheduler->num_threads; i++) {
404                         if (pthread_join(scheduler->threads[i], NULL) != 0)
405                                 fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads);
406                 }
407
408                 MEM_freeN(scheduler->threads);
409         }
410
411         /* Delete task thread data */
412         if (scheduler->task_threads) {
413                 MEM_freeN(scheduler->task_threads);
414         }
415
416         /* Delete task memory pool */
417         if (scheduler->task_mempool) {
418                 for (int i = 0; i <= scheduler->num_threads; ++i) {
419                         for (int j = 0; j < scheduler->task_mempool[i].num_tasks; ++j) {
420                                 MEM_freeN(scheduler->task_mempool[i].tasks[j]);
421                         }
422                 }
423                 MEM_freeN(scheduler->task_mempool);
424         }
425
426         /* delete leftover tasks */
427         for (task = scheduler->queue.first; task; task = task->next) {
428                 task_data_free(task, 0);
429         }
430         BLI_freelistN(&scheduler->queue);
431
432         /* delete mutex/condition */
433         BLI_mutex_end(&scheduler->queue_mutex);
434         BLI_condition_end(&scheduler->queue_cond);
435
436         MEM_freeN(scheduler);
437 }
438
439 int BLI_task_scheduler_num_threads(TaskScheduler *scheduler)
440 {
441         return scheduler->num_threads + 1;
442 }
443
444 static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority)
445 {
446         task_pool_num_increase(task->pool);
447
448         /* add task to queue */
449         BLI_mutex_lock(&scheduler->queue_mutex);
450
451         if (priority == TASK_PRIORITY_HIGH)
452                 BLI_addhead(&scheduler->queue, task);
453         else
454                 BLI_addtail(&scheduler->queue, task);
455
456         BLI_condition_notify_one(&scheduler->queue_cond);
457         BLI_mutex_unlock(&scheduler->queue_mutex);
458 }
459
460 static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool)
461 {
462         Task *task, *nexttask;
463         size_t done = 0;
464
465         BLI_mutex_lock(&scheduler->queue_mutex);
466
467         /* free all tasks from this pool from the queue */
468         for (task = scheduler->queue.first; task; task = nexttask) {
469                 nexttask = task->next;
470
471                 if (task->pool == pool) {
472                         task_data_free(task, 0);
473                         BLI_freelinkN(&scheduler->queue, task);
474
475                         done++;
476                 }
477         }
478
479         BLI_mutex_unlock(&scheduler->queue_mutex);
480
481         /* notify done */
482         task_pool_num_decrease(pool, done);
483 }
484
485 /* Task Pool */
486
487 static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, const bool is_background)
488 {
489         TaskPool *pool = MEM_mallocN(sizeof(TaskPool), "TaskPool");
490
491 #ifndef NDEBUG
492         /* Assert we do not try to create a background pool from some parent task - those only work OK from main thread. */
493         if (is_background) {
494                 const pthread_t thread_id = pthread_self();
495                 int i = scheduler->num_threads;
496
497                 while (i--) {
498                         BLI_assert(!pthread_equal(scheduler->threads[i], thread_id));
499                 }
500         }
501 #endif
502
503         pool->scheduler = scheduler;
504         pool->num = 0;
505         pool->num_threads = 0;
506         pool->currently_running_tasks = 0;
507         pool->do_cancel = false;
508         pool->run_in_background = is_background;
509
510         BLI_mutex_init(&pool->num_mutex);
511         BLI_condition_init(&pool->num_cond);
512
513         pool->userdata = userdata;
514         BLI_mutex_init(&pool->user_mutex);
515
516         if (BLI_thread_is_main()) {
517                 pool->task_mempool = scheduler->task_mempool;
518         }
519         else {
520                 pool->task_mempool = &pool->task_mempool_local;
521                 pool->task_mempool_local.num_tasks = 0;
522         }
523
524 #ifdef DEBUG_STATS
525         pool->mempool_stats =
526                 MEM_callocN(sizeof(*pool->mempool_stats) * (scheduler->num_threads + 1),
527                             "per-taskpool mempool stats");
528 #endif
529
530         /* Ensure malloc will go fine from threads,
531          *
532          * This is needed because we could be in main thread here
533          * and malloc could be non-threda safe at this point because
534          * no other jobs are running.
535          */
536         BLI_begin_threaded_malloc();
537
538         return pool;
539 }
540
541 /**
542  * Create a normal task pool.
543  * This means that in single-threaded context, it will not be executed at all until you call
544  * \a BLI_task_pool_work_and_wait() on it.
545  */
546 TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata)
547 {
548         return task_pool_create_ex(scheduler, userdata, false);
549 }
550
551 /**
552  * Create a background task pool.
553  * In multi-threaded context, there is no differences with \a BLI_task_pool_create(), but in single-threaded case
554  * it is ensured to have at least one worker thread to run on (i.e. you do not have to call
555  * \a BLI_task_pool_work_and_wait() on it to be sure it will be processed).
556  *
557  * \note Background pools are non-recursive (that is, you should not create other background pools in tasks assigned
558  *       to a background pool, they could end never being executed, since the 'fallback' background thread is already
559  *       busy with parent task in single-threaded context).
560  */
561 TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler, void *userdata)
562 {
563         return task_pool_create_ex(scheduler, userdata, true);
564 }
565
566 void BLI_task_pool_free(TaskPool *pool)
567 {
568         BLI_task_pool_cancel(pool);
569
570         BLI_mutex_end(&pool->num_mutex);
571         BLI_condition_end(&pool->num_cond);
572
573         BLI_mutex_end(&pool->user_mutex);
574
575         /* Free local memory pool, those pointers are lost forever. */
576         if (pool->task_mempool == &pool->task_mempool_local) {
577                 for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) {
578                         MEM_freeN(pool->task_mempool_local.tasks[i]);
579                 }
580         }
581
582 #ifdef DEBUG_STATS
583         printf("Thread ID    Allocated   Reused   Discarded\n");
584         for (int i = 0; i < pool->scheduler->num_threads + 1; ++i) {
585                 printf("%02d           %05d       %05d    %05d\n",
586                        i,
587                        pool->mempool_stats[i].num_alloc,
588                        pool->mempool_stats[i].num_reuse,
589                        pool->mempool_stats[i].num_discard);
590         }
591         MEM_freeN(pool->mempool_stats);
592 #endif
593
594         MEM_freeN(pool);
595
596         BLI_end_threaded_malloc();
597 }
598
599 static void task_pool_push(
600         TaskPool *pool, TaskRunFunction run, void *taskdata,
601         bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority,
602         int thread_id)
603 {
604         Task *task = task_alloc(pool, thread_id);
605
606         task->run = run;
607         task->taskdata = taskdata;
608         task->free_taskdata = free_taskdata;
609         task->freedata = freedata;
610         task->pool = pool;
611
612         task_scheduler_push(pool->scheduler, task, priority);
613 }
614
615 void BLI_task_pool_push_ex(
616         TaskPool *pool, TaskRunFunction run, void *taskdata,
617         bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority)
618 {
619         task_pool_push(pool, run, taskdata, free_taskdata, freedata, priority, -1);
620 }
621
622 void BLI_task_pool_push(
623         TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskPriority priority)
624 {
625         BLI_task_pool_push_ex(pool, run, taskdata, free_taskdata, NULL, priority);
626 }
627
628 void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run,
629         void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id)
630 {
631         task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id);
632 }
633
634 void BLI_task_pool_work_and_wait(TaskPool *pool)
635 {
636         TaskScheduler *scheduler = pool->scheduler;
637
638         BLI_mutex_lock(&pool->num_mutex);
639
640         while (pool->num != 0) {
641                 Task *task, *work_task = NULL;
642                 bool found_task = false;
643
644                 BLI_mutex_unlock(&pool->num_mutex);
645
646                 BLI_mutex_lock(&scheduler->queue_mutex);
647
648                 /* find task from this pool. if we get a task from another pool,
649                  * we can get into deadlock */
650
651                 if (pool->num_threads == 0 ||
652                     pool->currently_running_tasks < pool->num_threads)
653                 {
654                         for (task = scheduler->queue.first; task; task = task->next) {
655                                 if (task->pool == pool) {
656                                         work_task = task;
657                                         found_task = true;
658                                         BLI_remlink(&scheduler->queue, task);
659                                         break;
660                                 }
661                         }
662                 }
663
664                 BLI_mutex_unlock(&scheduler->queue_mutex);
665
666                 /* if found task, do it, otherwise wait until other tasks are done */
667                 if (found_task) {
668                         /* run task */
669                         atomic_add_and_fetch_z(&pool->currently_running_tasks, 1);
670                         work_task->run(pool, work_task->taskdata, 0);
671
672                         /* delete task */
673                         task_free(pool, task, 0);
674
675                         /* notify pool task was done */
676                         task_pool_num_decrease(pool, 1);
677                 }
678
679                 BLI_mutex_lock(&pool->num_mutex);
680                 if (pool->num == 0)
681                         break;
682
683                 if (!found_task)
684                         BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
685         }
686
687         BLI_mutex_unlock(&pool->num_mutex);
688 }
689
690 void BLI_pool_set_num_threads(TaskPool *pool, int num_threads)
691 {
692         /* NOTE: Don't try to modify threads while tasks are running! */
693         pool->num_threads = num_threads;
694 }
695
696 void BLI_task_pool_cancel(TaskPool *pool)
697 {
698         pool->do_cancel = true;
699
700         task_scheduler_clear(pool->scheduler, pool);
701
702         /* wait until all entries are cleared */
703         BLI_mutex_lock(&pool->num_mutex);
704         while (pool->num)
705                 BLI_condition_wait(&pool->num_cond, &pool->num_mutex);
706         BLI_mutex_unlock(&pool->num_mutex);
707
708         pool->do_cancel = false;
709 }
710
711 bool BLI_task_pool_canceled(TaskPool *pool)
712 {
713         return pool->do_cancel;
714 }
715
716 void *BLI_task_pool_userdata(TaskPool *pool)
717 {
718         return pool->userdata;
719 }
720
721 ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool)
722 {
723         return &pool->user_mutex;
724 }
725
726 /* Parallel range routines */
727
728 /**
729  *
730  * Main functions:
731  * - #BLI_task_parallel_range
732  * - #BLI_task_parallel_listbase (#ListBase - double linked list)
733  *
734  * TODO:
735  * - #BLI_task_parallel_foreach_link (#Link - single linked list)
736  * - #BLI_task_parallel_foreach_ghash/gset (#GHash/#GSet - hash & set)
737  * - #BLI_task_parallel_foreach_mempool (#BLI_mempool - iterate over mempools)
738  *
739  */
740
741 /* Allows to avoid using malloc for userdata_chunk in tasks, when small enough. */
742 #define MALLOCA(_size) ((_size) <= 8192) ? alloca((_size)) : MEM_mallocN((_size), __func__)
743 #define MALLOCA_FREE(_mem, _size) if (((_mem) != NULL) && ((_size) > 8192)) MEM_freeN((_mem))
744
745 typedef struct ParallelRangeState {
746         int start, stop;
747         void *userdata;
748
749         TaskParallelRangeFunc func;
750         TaskParallelRangeFuncEx func_ex;
751
752         int iter;
753         int chunk_size;
754 } ParallelRangeState;
755
756 BLI_INLINE bool parallel_range_next_iter_get(
757         ParallelRangeState * __restrict state,
758         int * __restrict iter, int * __restrict count)
759 {
760         uint32_t uval = atomic_fetch_and_add_uint32((uint32_t *)(&state->iter), state->chunk_size);
761         int previter = *(int32_t*)&uval;
762
763         *iter = previter;
764         *count = max_ii(0, min_ii(state->chunk_size, state->stop - previter));
765
766         return (previter < state->stop);
767 }
768
769 static void parallel_range_func(
770         TaskPool * __restrict pool,
771         void *userdata_chunk,
772         int threadid)
773 {
774         ParallelRangeState * __restrict state = BLI_task_pool_userdata(pool);
775         int iter, count;
776
777         while (parallel_range_next_iter_get(state, &iter, &count)) {
778                 int i;
779
780                 if (state->func_ex) {
781                         for (i = 0; i < count; ++i) {
782                                 state->func_ex(state->userdata, userdata_chunk, iter + i, threadid);
783                         }
784                 }
785                 else {
786                         for (i = 0; i < count; ++i) {
787                                 state->func(state->userdata, iter + i);
788                         }
789                 }
790         }
791 }
792
793 /**
794  * This function allows to parallelized for loops in a similar way to OpenMP's 'parallel for' statement.
795  *
796  * See public API doc for description of parameters.
797  */
798 static void task_parallel_range_ex(
799         int start, int stop,
800         void *userdata,
801         void *userdata_chunk,
802         const size_t userdata_chunk_size,
803         TaskParallelRangeFunc func,
804         TaskParallelRangeFuncEx func_ex,
805         TaskParallelRangeFuncFinalize func_finalize,
806         const bool use_threading,
807         const bool use_dynamic_scheduling)
808 {
809         TaskScheduler *task_scheduler;
810         TaskPool *task_pool;
811         ParallelRangeState state;
812         int i, num_threads, num_tasks;
813
814         void *userdata_chunk_local = NULL;
815         void *userdata_chunk_array = NULL;
816         const bool use_userdata_chunk = (func_ex != NULL) && (userdata_chunk_size != 0) && (userdata_chunk != NULL);
817
818         if (start == stop) {
819                 return;
820         }
821
822         BLI_assert(start < stop);
823         if (userdata_chunk_size != 0) {
824                 BLI_assert(func_ex != NULL && func == NULL);
825                 BLI_assert(userdata_chunk != NULL);
826         }
827
828         /* If it's not enough data to be crunched, don't bother with tasks at all,
829          * do everything from the main thread.
830          */
831         if (!use_threading) {
832                 if (func_ex) {
833                         if (use_userdata_chunk) {
834                                 userdata_chunk_local = MALLOCA(userdata_chunk_size);
835                                 memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
836                         }
837
838                         for (i = start; i < stop; ++i) {
839                                 func_ex(userdata, userdata_chunk_local, i, 0);
840                         }
841
842                         if (func_finalize) {
843                                 func_finalize(userdata, userdata_chunk_local);
844                         }
845
846                         MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size);
847                 }
848                 else {
849                         for (i = start; i < stop; ++i) {
850                                 func(userdata, i);
851                         }
852                 }
853
854                 return;
855         }
856
857         task_scheduler = BLI_task_scheduler_get();
858         task_pool = BLI_task_pool_create(task_scheduler, &state);
859         num_threads = BLI_task_scheduler_num_threads(task_scheduler);
860
861         /* The idea here is to prevent creating task for each of the loop iterations
862          * and instead have tasks which are evenly distributed across CPU cores and
863          * pull next iter to be crunched using the queue.
864          */
865         num_tasks = num_threads * 2;
866
867         state.start = start;
868         state.stop = stop;
869         state.userdata = userdata;
870         state.func = func;
871         state.func_ex = func_ex;
872         state.iter = start;
873         if (use_dynamic_scheduling) {
874                 state.chunk_size = 32;
875         }
876         else {
877                 state.chunk_size = max_ii(1, (stop - start) / (num_tasks));
878         }
879
880         num_tasks = min_ii(num_tasks, (stop - start) / state.chunk_size);
881         atomic_fetch_and_add_uint32((uint32_t *)(&state.iter), 0);
882
883         if (use_userdata_chunk) {
884         userdata_chunk_array = MALLOCA(userdata_chunk_size * num_tasks);
885         }
886
887         for (i = 0; i < num_tasks; i++) {
888                 if (use_userdata_chunk) {
889                         userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
890                         memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size);
891                 }
892                 /* Use this pool's pre-allocated tasks. */
893                 BLI_task_pool_push_from_thread(task_pool,
894                                                parallel_range_func,
895                                                userdata_chunk_local, false,
896                                                TASK_PRIORITY_HIGH, 0);
897         }
898
899         BLI_task_pool_work_and_wait(task_pool);
900         BLI_task_pool_free(task_pool);
901
902         if (use_userdata_chunk) {
903                 if (func_finalize) {
904                         for (i = 0; i < num_tasks; i++) {
905                                 userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i);
906                                 func_finalize(userdata, userdata_chunk_local);
907                         }
908                 }
909                 MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks);
910         }
911 }
912
913 /**
914  * This function allows to parallelize for loops in a similar way to OpenMP's 'parallel for' statement.
915  *
916  * \param start First index to process.
917  * \param stop Index to stop looping (excluded).
918  * \param userdata Common userdata passed to all instances of \a func.
919  * \param userdata_chunk Optional, each instance of looping chunks will get a copy of this data
920  *                       (similar to OpenMP's firstprivate).
921  * \param userdata_chunk_size Memory size of \a userdata_chunk.
922  * \param func_ex Callback function (advanced version).
923  * \param use_threading If \a true, actually split-execute loop in threads, else just do a sequential forloop
924  *                      (allows caller to use any kind of test to switch on parallelization or not).
925  * \param use_dynamic_scheduling If \a true, the whole range is divided in a lot of small chunks (of size 32 currently),
926  *                               otherwise whole range is split in a few big chunks (num_threads * 2 chunks currently).
927  */
928 void BLI_task_parallel_range_ex(
929         int start, int stop,
930         void *userdata,
931         void *userdata_chunk,
932         const size_t userdata_chunk_size,
933         TaskParallelRangeFuncEx func_ex,
934         const bool use_threading,
935         const bool use_dynamic_scheduling)
936 {
937         task_parallel_range_ex(
938                     start, stop, userdata, userdata_chunk, userdata_chunk_size, NULL, func_ex, NULL,
939                     use_threading, use_dynamic_scheduling);
940 }
941
942 /**
943  * A simpler version of \a BLI_task_parallel_range_ex, which does not use \a use_dynamic_scheduling,
944  * and does not handle 'firstprivate'-like \a userdata_chunk.
945  *
946  * \param start First index to process.
947  * \param stop Index to stop looping (excluded).
948  * \param userdata Common userdata passed to all instances of \a func.
949  * \param func Callback function (simple version).
950  * \param use_threading If \a true, actually split-execute loop in threads, else just do a sequential forloop
951  *                      (allows caller to use any kind of test to switch on parallelization or not).
952  */
953 void BLI_task_parallel_range(
954         int start, int stop,
955         void *userdata,
956         TaskParallelRangeFunc func,
957         const bool use_threading)
958 {
959         task_parallel_range_ex(start, stop, userdata, NULL, 0, func, NULL, NULL, use_threading, false);
960 }
961
962 /**
963  * This function allows to parallelize for loops in a similar way to OpenMP's 'parallel for' statement,
964  * with an additional 'finalize' func called from calling thread once whole range have been processed.
965  *
966  * \param start First index to process.
967  * \param stop Index to stop looping (excluded).
968  * \param userdata Common userdata passed to all instances of \a func.
969  * \param userdata_chunk Optional, each instance of looping chunks will get a copy of this data
970  *                       (similar to OpenMP's firstprivate).
971  * \param userdata_chunk_size Memory size of \a userdata_chunk.
972  * \param func_ex Callback function (advanced version).
973  * \param func_finalize Callback function, called after all workers have finished,
974  * useful to finalize accumulative tasks.
975  * \param use_threading If \a true, actually split-execute loop in threads, else just do a sequential forloop
976  *                      (allows caller to use any kind of test to switch on parallelization or not).
977  * \param use_dynamic_scheduling If \a true, the whole range is divided in a lot of small chunks (of size 32 currently),
978  *                               otherwise whole range is split in a few big chunks (num_threads * 2 chunks currently).
979  */
980 void BLI_task_parallel_range_finalize(
981         int start, int stop,
982         void *userdata,
983         void *userdata_chunk,
984         const size_t userdata_chunk_size,
985         TaskParallelRangeFuncEx func_ex,
986         TaskParallelRangeFuncFinalize func_finalize,
987         const bool use_threading,
988         const bool use_dynamic_scheduling)
989 {
990         task_parallel_range_ex(
991                     start, stop, userdata, userdata_chunk, userdata_chunk_size, NULL, func_ex, func_finalize,
992                     use_threading, use_dynamic_scheduling);
993 }
994
995 #undef MALLOCA
996 #undef MALLOCA_FREE
997
998 typedef struct ParallelListbaseState {
999         void *userdata;
1000         TaskParallelListbaseFunc func;
1001
1002         int chunk_size;
1003         int index;
1004         Link *link;
1005         SpinLock lock;
1006 } ParallelListState;
1007
1008 BLI_INLINE Link *parallel_listbase_next_iter_get(
1009         ParallelListState * __restrict state,
1010         int * __restrict index,
1011         int * __restrict count)
1012 {
1013         int task_count = 0;
1014         BLI_spin_lock(&state->lock);
1015         Link *result = state->link;
1016         if (LIKELY(result != NULL)) {
1017                 *index = state->index;
1018                 while (state->link != NULL && task_count < state->chunk_size) {
1019                         ++task_count;
1020                         state->link = state->link->next;
1021                 }
1022                 state->index += task_count;
1023         }
1024         BLI_spin_unlock(&state->lock);
1025         *count = task_count;
1026         return result;
1027 }
1028
1029 static void parallel_listbase_func(
1030         TaskPool * __restrict pool,
1031         void *UNUSED(taskdata),
1032         int UNUSED(threadid))
1033 {
1034         ParallelListState * __restrict state = BLI_task_pool_userdata(pool);
1035         Link *link;
1036         int index, count;
1037
1038         while ((link = parallel_listbase_next_iter_get(state, &index, &count)) != NULL) {
1039                 for (int i = 0; i < count; ++i) {
1040                         state->func(state->userdata, link, index + i);
1041                         link = link->next;
1042                 }
1043         }
1044 }
1045
1046 /**
1047  * This function allows to parallelize for loops over ListBase items.
1048  *
1049  * \param listbase The double linked list to loop over.
1050  * \param userdata Common userdata passed to all instances of \a func.
1051  * \param func Callback function.
1052  * \param use_threading If \a true, actually split-execute loop in threads, else just do a sequential forloop
1053  *                      (allows caller to use any kind of test to switch on parallelization or not).
1054  *
1055  * \note There is no static scheduling here, since it would need another full loop over items to count them...
1056  */
1057 void BLI_task_parallel_listbase(
1058         struct ListBase *listbase,
1059         void *userdata,
1060         TaskParallelListbaseFunc func,
1061         const bool use_threading)
1062 {
1063         TaskScheduler *task_scheduler;
1064         TaskPool *task_pool;
1065         ParallelListState state;
1066         int i, num_threads, num_tasks;
1067
1068         if (BLI_listbase_is_empty(listbase)) {
1069                 return;
1070         }
1071
1072         if (!use_threading) {
1073                 i = 0;
1074                 for (Link *link = listbase->first; link != NULL; link = link->next, ++i) {
1075                         func(userdata, link, i);
1076                 }
1077                 return;
1078         }
1079
1080         task_scheduler = BLI_task_scheduler_get();
1081         task_pool = BLI_task_pool_create(task_scheduler, &state);
1082         num_threads = BLI_task_scheduler_num_threads(task_scheduler);
1083
1084         /* The idea here is to prevent creating task for each of the loop iterations
1085          * and instead have tasks which are evenly distributed across CPU cores and
1086          * pull next iter to be crunched using the queue.
1087          */
1088         num_tasks = num_threads * 2;
1089
1090         state.index = 0;
1091         state.link = listbase->first;
1092         state.userdata = userdata;
1093         state.func = func;
1094         state.chunk_size = 32;
1095         BLI_spin_init(&state.lock);
1096
1097         for (i = 0; i < num_tasks; i++) {
1098                 /* Use this pool's pre-allocated tasks. */
1099                 BLI_task_pool_push_from_thread(task_pool,
1100                                                parallel_listbase_func,
1101                                                NULL, false,
1102                                                TASK_PRIORITY_HIGH, 0);
1103         }
1104
1105         BLI_task_pool_work_and_wait(task_pool);
1106         BLI_task_pool_free(task_pool);
1107
1108         BLI_spin_end(&state.lock);
1109 }