ClangFormat: apply to source, most of intern
[blender.git] / intern / cycles / util / util_task.cpp
1 /*
2  * Copyright 2011-2013 Blender Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "util/util_foreach.h"
18 #include "util/util_logging.h"
19 #include "util/util_system.h"
20 #include "util/util_task.h"
21 #include "util/util_time.h"
22
23 //#define THREADING_DEBUG_ENABLED
24
25 #ifdef THREADING_DEBUG_ENABLED
26 #  include <stdio.h>
27 #  define THREADING_DEBUG(...) \
28     do { \
29       printf(__VA_ARGS__); \
30       fflush(stdout); \
31     } while (0)
32 #else
33 #  define THREADING_DEBUG(...)
34 #endif
35
36 CCL_NAMESPACE_BEGIN
37
38 /* Task Pool */
39
40 TaskPool::TaskPool()
41 {
42   num_tasks_handled = 0;
43   num = 0;
44   do_cancel = false;
45 }
46
47 TaskPool::~TaskPool()
48 {
49   stop();
50 }
51
52 void TaskPool::push(Task *task, bool front)
53 {
54   TaskScheduler::Entry entry;
55
56   entry.task = task;
57   entry.pool = this;
58
59   TaskScheduler::push(entry, front);
60 }
61
62 void TaskPool::push(const TaskRunFunction &run, bool front)
63 {
64   push(new Task(run), front);
65 }
66
67 void TaskPool::wait_work(Summary *stats)
68 {
69   thread_scoped_lock num_lock(num_mutex);
70
71   while (num != 0) {
72     num_lock.unlock();
73
74     thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
75
76     /* find task from this pool. if we get a task from another pool,
77      * we can get into deadlock */
78     TaskScheduler::Entry work_entry;
79     bool found_entry = false;
80     list<TaskScheduler::Entry>::iterator it;
81
82     for (it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
83       TaskScheduler::Entry &entry = *it;
84
85       if (entry.pool == this) {
86         work_entry = entry;
87         found_entry = true;
88         TaskScheduler::queue.erase(it);
89         break;
90       }
91     }
92
93     queue_lock.unlock();
94
95     /* if found task, do it, otherwise wait until other tasks are done */
96     if (found_entry) {
97       /* run task */
98       work_entry.task->run(0);
99
100       /* delete task */
101       delete work_entry.task;
102
103       /* notify pool task was done */
104       num_decrease(1);
105     }
106
107     num_lock.lock();
108     if (num == 0)
109       break;
110
111     if (!found_entry) {
112       THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
113       num_cond.wait(num_lock);
114       THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
115     }
116   }
117
118   if (stats != NULL) {
119     stats->time_total = time_dt() - start_time;
120     stats->num_tasks_handled = num_tasks_handled;
121   }
122 }
123
124 void TaskPool::cancel()
125 {
126   do_cancel = true;
127
128   TaskScheduler::clear(this);
129
130   {
131     thread_scoped_lock num_lock(num_mutex);
132
133     while (num) {
134       THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
135       num_cond.wait(num_lock);
136       THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
137     }
138   }
139
140   do_cancel = false;
141 }
142
143 void TaskPool::stop()
144 {
145   TaskScheduler::clear(this);
146
147   assert(num == 0);
148 }
149
150 bool TaskPool::canceled()
151 {
152   return do_cancel;
153 }
154
155 bool TaskPool::finished()
156 {
157   thread_scoped_lock num_lock(num_mutex);
158   return num == 0;
159 }
160
161 void TaskPool::num_decrease(int done)
162 {
163   num_mutex.lock();
164   num -= done;
165
166   assert(num >= 0);
167   if (num == 0) {
168     THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
169     num_cond.notify_all();
170   }
171
172   num_mutex.unlock();
173 }
174
175 void TaskPool::num_increase()
176 {
177   thread_scoped_lock num_lock(num_mutex);
178   if (num_tasks_handled == 0) {
179     start_time = time_dt();
180   }
181   num++;
182   num_tasks_handled++;
183   THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
184   num_cond.notify_all();
185 }
186
187 /* Task Scheduler */
188
189 thread_mutex TaskScheduler::mutex;
190 int TaskScheduler::users = 0;
191 vector<thread *> TaskScheduler::threads;
192 bool TaskScheduler::do_exit = false;
193
194 list<TaskScheduler::Entry> TaskScheduler::queue;
195 thread_mutex TaskScheduler::queue_mutex;
196 thread_condition_variable TaskScheduler::queue_cond;
197
198 namespace {
199
200 /* Get number of processors on each of the available nodes. The result is sized
201  * by the highest node index, and element corresponds to number of processors on
202  * that node.
203  * If node is not available, then the corresponding number of processors is
204  * zero. */
205 void get_per_node_num_processors(vector<int> *num_per_node_processors)
206 {
207   const int num_nodes = system_cpu_num_numa_nodes();
208   if (num_nodes == 0) {
209     LOG(ERROR) << "Zero available NUMA nodes, is not supposed to happen.";
210     return;
211   }
212   num_per_node_processors->resize(num_nodes);
213   for (int node = 0; node < num_nodes; ++node) {
214     if (!system_cpu_is_numa_node_available(node)) {
215       (*num_per_node_processors)[node] = 0;
216       continue;
217     }
218     (*num_per_node_processors)[node] = system_cpu_num_numa_node_processors(node);
219   }
220 }
221
222 /* Calculate total number of processors on all available nodes.
223  * This is similar to system_cpu_thread_count(), but uses pre-calculated number
224  * of processors on each of the node, avoiding extra system calls and checks for
225  * the node availability. */
226 int get_num_total_processors(const vector<int> &num_per_node_processors)
227 {
228   int num_total_processors = 0;
229   foreach (int num_node_processors, num_per_node_processors) {
230     num_total_processors += num_node_processors;
231   }
232   return num_total_processors;
233 }
234
235 /* Compute NUMA node for every thread to run on, for the best performance. */
236 vector<int> distribute_threads_on_nodes(const int num_threads)
237 {
238   /* Start with all threads unassigned to any specific NUMA node. */
239   vector<int> thread_nodes(num_threads, -1);
240   const int num_active_group_processors = system_cpu_num_active_group_processors();
241   VLOG(1) << "Detected " << num_active_group_processors << " processors "
242           << "in active group.";
243   if (num_active_group_processors >= num_threads) {
244     /* If the current thread is set up in a way that its affinity allows to
245      * use at least requested number of threads we do not explicitly set
246      * affinity to the worker therads.
247      * This way we allow users to manually edit affinity of the parent
248      * thread, and here we follow that affinity. This way it's possible to
249      * have two Cycles/Blender instances running manually set to a different
250      * dies on a CPU. */
251     VLOG(1) << "Not setting thread group affinity.";
252     return thread_nodes;
253   }
254   vector<int> num_per_node_processors;
255   get_per_node_num_processors(&num_per_node_processors);
256   if (num_per_node_processors.size() == 0) {
257     /* Error was already repported, here we can't do anything, so we simply
258      * leave default affinity to all the worker threads. */
259     return thread_nodes;
260   }
261   const int num_nodes = num_per_node_processors.size();
262   int thread_index = 0;
263   /* First pass: fill in all the nodes to their maximum.
264   *
265    * If there is less threads than the overall nodes capacity, some of the
266    * nodes or parts of them will idle.
267    *
268    * TODO(sergey): Consider picking up fastest nodes if number of threads
269    * fits on them. For example, on Threadripper2 we might consider using nodes
270    * 0 and 2 if user requested 32 render threads. */
271   const int num_total_node_processors = get_num_total_processors(num_per_node_processors);
272   int current_node_index = 0;
273   while (thread_index < num_total_node_processors && thread_index < num_threads) {
274     const int num_node_processors = num_per_node_processors[current_node_index];
275     for (int processor_index = 0; processor_index < num_node_processors; ++processor_index) {
276       VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
277       thread_nodes[thread_index] = current_node_index;
278       ++thread_index;
279       if (thread_index == num_threads) {
280         /* All threads are scheduled on their nodes. */
281         return thread_nodes;
282       }
283     }
284     ++current_node_index;
285   }
286   /* Second pass: keep scheduling threads to each node one by one, uniformly
287    * fillign them in.
288    * This is where things becomes tricky to predict for the maximum
289    * performance: on the one hand this avoids too much threading overhead on
290    * few nodes, but for the final performance having all the overhead on one
291    * node might be better idea (since other nodes will have better chance of
292    * rendering faster).
293    * But more tricky is that nodes might have difference capacity, so we might
294    * want to do some weighted scheduling. For example, if node 0 has 16
295    * processors and node 1 has 32 processors, we'd better schedule 1 extra
296    * thread on node 0 and 2 extra threads on node 1. */
297   current_node_index = 0;
298   while (thread_index < num_threads) {
299     /* Skip unavailable nodes. */
300     /* TODO(sergey): Add sanity check against deadlock. */
301     while (num_per_node_processors[current_node_index] == 0) {
302       current_node_index = (current_node_index + 1) % num_nodes;
303     }
304     VLOG(1) << "Scheduling thread " << thread_index << " to node " << current_node_index << ".";
305     ++thread_index;
306     current_node_index = (current_node_index + 1) % num_nodes;
307   }
308
309   return thread_nodes;
310 }
311
312 }  // namespace
313
314 void TaskScheduler::init(int num_threads)
315 {
316   thread_scoped_lock lock(mutex);
317   /* Multiple cycles instances can use this task scheduler, sharing the same
318    * threads, so we keep track of the number of users. */
319   ++users;
320   if (users != 1) {
321     return;
322   }
323   do_exit = false;
324   const bool use_auto_threads = (num_threads == 0);
325   if (use_auto_threads) {
326     /* Automatic number of threads. */
327     num_threads = system_cpu_thread_count();
328   }
329   VLOG(1) << "Creating pool of " << num_threads << " threads.";
330
331   /* Compute distribution on NUMA nodes. */
332   vector<int> thread_nodes = distribute_threads_on_nodes(num_threads);
333
334   /* Launch threads that will be waiting for work. */
335   threads.resize(num_threads);
336   for (int thread_index = 0; thread_index < num_threads; ++thread_index) {
337     threads[thread_index] = new thread(function_bind(&TaskScheduler::thread_run, thread_index + 1),
338                                        thread_nodes[thread_index]);
339   }
340 }
341
342 void TaskScheduler::exit()
343 {
344   thread_scoped_lock lock(mutex);
345   users--;
346   if (users == 0) {
347     VLOG(1) << "De-initializing thread pool of task scheduler.";
348     /* stop all waiting threads */
349     TaskScheduler::queue_mutex.lock();
350     do_exit = true;
351     TaskScheduler::queue_cond.notify_all();
352     TaskScheduler::queue_mutex.unlock();
353
354     /* delete threads */
355     foreach (thread *t, threads) {
356       t->join();
357       delete t;
358     }
359     threads.clear();
360   }
361 }
362
363 void TaskScheduler::free_memory()
364 {
365   assert(users == 0);
366   threads.free_memory();
367 }
368
369 bool TaskScheduler::thread_wait_pop(Entry &entry)
370 {
371   thread_scoped_lock queue_lock(queue_mutex);
372
373   while (queue.empty() && !do_exit)
374     queue_cond.wait(queue_lock);
375
376   if (queue.empty()) {
377     assert(do_exit);
378     return false;
379   }
380
381   entry = queue.front();
382   queue.pop_front();
383
384   return true;
385 }
386
387 void TaskScheduler::thread_run(int thread_id)
388 {
389   Entry entry;
390
391   /* todo: test affinity/denormal mask */
392
393   /* keep popping off tasks */
394   while (thread_wait_pop(entry)) {
395     /* run task */
396     entry.task->run(thread_id);
397
398     /* delete task */
399     delete entry.task;
400
401     /* notify pool task was done */
402     entry.pool->num_decrease(1);
403   }
404 }
405
406 void TaskScheduler::push(Entry &entry, bool front)
407 {
408   entry.pool->num_increase();
409
410   /* add entry to queue */
411   TaskScheduler::queue_mutex.lock();
412   if (front)
413     TaskScheduler::queue.push_front(entry);
414   else
415     TaskScheduler::queue.push_back(entry);
416
417   TaskScheduler::queue_cond.notify_one();
418   TaskScheduler::queue_mutex.unlock();
419 }
420
421 void TaskScheduler::clear(TaskPool *pool)
422 {
423   thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
424
425   /* erase all tasks from this pool from the queue */
426   list<Entry>::iterator it = queue.begin();
427   int done = 0;
428
429   while (it != queue.end()) {
430     Entry &entry = *it;
431
432     if (entry.pool == pool) {
433       done++;
434       delete entry.task;
435
436       it = queue.erase(it);
437     }
438     else
439       it++;
440   }
441
442   queue_lock.unlock();
443
444   /* notify done */
445   pool->num_decrease(done);
446 }
447
448 /* Dedicated Task Pool */
449
450 DedicatedTaskPool::DedicatedTaskPool()
451 {
452   do_cancel = false;
453   do_exit = false;
454   num = 0;
455
456   worker_thread = new thread(function_bind(&DedicatedTaskPool::thread_run, this));
457 }
458
459 DedicatedTaskPool::~DedicatedTaskPool()
460 {
461   stop();
462   worker_thread->join();
463   delete worker_thread;
464 }
465
466 void DedicatedTaskPool::push(Task *task, bool front)
467 {
468   num_increase();
469
470   /* add task to queue */
471   queue_mutex.lock();
472   if (front)
473     queue.push_front(task);
474   else
475     queue.push_back(task);
476
477   queue_cond.notify_one();
478   queue_mutex.unlock();
479 }
480
481 void DedicatedTaskPool::push(const TaskRunFunction &run, bool front)
482 {
483   push(new Task(run), front);
484 }
485
486 void DedicatedTaskPool::wait()
487 {
488   thread_scoped_lock num_lock(num_mutex);
489
490   while (num)
491     num_cond.wait(num_lock);
492 }
493
494 void DedicatedTaskPool::cancel()
495 {
496   do_cancel = true;
497
498   clear();
499   wait();
500
501   do_cancel = false;
502 }
503
504 void DedicatedTaskPool::stop()
505 {
506   clear();
507
508   do_exit = true;
509   queue_cond.notify_all();
510
511   wait();
512
513   assert(num == 0);
514 }
515
516 bool DedicatedTaskPool::canceled()
517 {
518   return do_cancel;
519 }
520
521 void DedicatedTaskPool::num_decrease(int done)
522 {
523   thread_scoped_lock num_lock(num_mutex);
524   num -= done;
525
526   assert(num >= 0);
527   if (num == 0)
528     num_cond.notify_all();
529 }
530
531 void DedicatedTaskPool::num_increase()
532 {
533   thread_scoped_lock num_lock(num_mutex);
534   num++;
535   num_cond.notify_all();
536 }
537
538 bool DedicatedTaskPool::thread_wait_pop(Task *&task)
539 {
540   thread_scoped_lock queue_lock(queue_mutex);
541
542   while (queue.empty() && !do_exit)
543     queue_cond.wait(queue_lock);
544
545   if (queue.empty()) {
546     assert(do_exit);
547     return false;
548   }
549
550   task = queue.front();
551   queue.pop_front();
552
553   return true;
554 }
555
556 void DedicatedTaskPool::thread_run()
557 {
558   Task *task;
559
560   /* keep popping off tasks */
561   while (thread_wait_pop(task)) {
562     /* run task */
563     task->run(0);
564
565     /* delete task */
566     delete task;
567
568     /* notify task was done */
569     num_decrease(1);
570   }
571 }
572
573 void DedicatedTaskPool::clear()
574 {
575   thread_scoped_lock queue_lock(queue_mutex);
576
577   /* erase all tasks from the queue */
578   list<Task *>::iterator it = queue.begin();
579   int done = 0;
580
581   while (it != queue.end()) {
582     done++;
583     delete *it;
584
585     it = queue.erase(it);
586   }
587
588   queue_lock.unlock();
589
590   /* notify done */
591   num_decrease(done);
592 }
593
594 string TaskPool::Summary::full_report() const
595 {
596   string report = "";
597   report += string_printf("Total time:    %f\n", time_total);
598   report += string_printf("Tasks handled: %d\n", num_tasks_handled);
599   return report;
600 }
601
602 CCL_NAMESPACE_END