Cycles: Remove unneeded include statements
[blender-staging.git] / intern / cycles / util / util_task.cpp
1 /*
2  * Copyright 2011-2013 Blender Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "util/util_foreach.h"
18 #include "util/util_logging.h"
19 #include "util/util_system.h"
20 #include "util/util_task.h"
21 #include "util/util_time.h"
22
23 //#define THREADING_DEBUG_ENABLED
24
25 #ifdef THREADING_DEBUG_ENABLED
26 #include <stdio.h>
27 #define THREADING_DEBUG(...) do { printf(__VA_ARGS__); fflush(stdout); } while(0)
28 #else
29 #define THREADING_DEBUG(...)
30 #endif
31
32 CCL_NAMESPACE_BEGIN
33
34 /* Task Pool */
35
36 TaskPool::TaskPool()
37 {
38         num_tasks_handled = 0;
39         num = 0;
40         do_cancel = false;
41 }
42
43 TaskPool::~TaskPool()
44 {
45         stop();
46 }
47
48 void TaskPool::push(Task *task, bool front)
49 {
50         TaskScheduler::Entry entry;
51
52         entry.task = task;
53         entry.pool = this;
54
55         TaskScheduler::push(entry, front);
56 }
57
58 void TaskPool::push(const TaskRunFunction& run, bool front)
59 {
60         push(new Task(run), front);
61 }
62
63 void TaskPool::wait_work(Summary *stats)
64 {
65         thread_scoped_lock num_lock(num_mutex);
66
67         while(num != 0) {
68                 num_lock.unlock();
69
70                 thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
71
72                 /* find task from this pool. if we get a task from another pool,
73                  * we can get into deadlock */
74                 TaskScheduler::Entry work_entry;
75                 bool found_entry = false;
76                 list<TaskScheduler::Entry>::iterator it;
77
78                 for(it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
79                         TaskScheduler::Entry& entry = *it;
80
81                         if(entry.pool == this) {
82                                 work_entry = entry;
83                                 found_entry = true;
84                                 TaskScheduler::queue.erase(it);
85                                 break;
86                         }
87                 }
88
89                 queue_lock.unlock();
90
91                 /* if found task, do it, otherwise wait until other tasks are done */
92                 if(found_entry) {
93                         /* run task */
94                         work_entry.task->run(0);
95
96                         /* delete task */
97                         delete work_entry.task;
98
99                         /* notify pool task was done */
100                         num_decrease(1);
101                 }
102
103                 num_lock.lock();
104                 if(num == 0)
105                         break;
106
107                 if(!found_entry) {
108                         THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
109                         num_cond.wait(num_lock);
110                         THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
111                 }
112         }
113
114         if(stats != NULL) {
115                 stats->time_total = time_dt() - start_time;
116                 stats->num_tasks_handled = num_tasks_handled;
117         }
118 }
119
120 void TaskPool::cancel()
121 {
122         do_cancel = true;
123
124         TaskScheduler::clear(this);
125         
126         {
127                 thread_scoped_lock num_lock(num_mutex);
128
129                 while(num) {
130                         THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
131                         num_cond.wait(num_lock);
132                         THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
133                 }
134         }
135
136         do_cancel = false;
137 }
138
139 void TaskPool::stop()
140 {
141         TaskScheduler::clear(this);
142
143         assert(num == 0);
144 }
145
146 bool TaskPool::canceled()
147 {
148         return do_cancel;
149 }
150
151 void TaskPool::num_decrease(int done)
152 {
153         num_mutex.lock();
154         num -= done;
155
156         assert(num >= 0);
157         if(num == 0) {
158                 THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
159                 num_cond.notify_all();
160         }
161
162         num_mutex.unlock();
163 }
164
165 void TaskPool::num_increase()
166 {
167         thread_scoped_lock num_lock(num_mutex);
168         if(num_tasks_handled == 0) {
169                 start_time = time_dt();
170         }
171         num++;
172         num_tasks_handled++;
173         THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
174         num_cond.notify_all();
175 }
176
177 /* Task Scheduler */
178
179 thread_mutex TaskScheduler::mutex;
180 int TaskScheduler::users = 0;
181 vector<thread*> TaskScheduler::threads;
182 bool TaskScheduler::do_exit = false;
183
184 list<TaskScheduler::Entry> TaskScheduler::queue;
185 thread_mutex TaskScheduler::queue_mutex;
186 thread_condition_variable TaskScheduler::queue_cond;
187
188 void TaskScheduler::init(int num_threads)
189 {
190         thread_scoped_lock lock(mutex);
191
192         /* multiple cycles instances can use this task scheduler, sharing the same
193          * threads, so we keep track of the number of users. */
194         if(users == 0) {
195                 do_exit = false;
196
197                 const bool use_auto_threads = (num_threads == 0);
198                 if(use_auto_threads) {
199                         /* automatic number of threads */
200                         num_threads = system_cpu_thread_count();
201                 }
202                 VLOG(1) << "Creating pool of " << num_threads << " threads.";
203
204                 /* launch threads that will be waiting for work */
205                 threads.resize(num_threads);
206
207                 const int num_groups = system_cpu_group_count();
208                 unsigned short num_process_groups = 0;
209                 vector<unsigned short> process_groups;
210                 int current_group_threads = 0;
211                 if(num_groups > 1) {
212                         process_groups.resize(num_groups);
213                         num_process_groups = system_cpu_process_groups(num_groups, 
214                                                                        &process_groups[0]);
215                         if(num_process_groups == 1) {
216                                 current_group_threads = system_cpu_group_thread_count(process_groups[0]);
217                         }
218                 }
219                 int thread_index = 0;
220                 for(int group = 0; group < num_groups; ++group) {
221                         /* NOTE: That's not really efficient from threading point of view,
222                          * but it is simple to read and it doesn't make sense to use more
223                          * user-specified threads than logical threads anyway.
224                          */
225                         int num_group_threads = (group == num_groups - 1)
226                                 ? (threads.size() - thread_index)
227                                 : system_cpu_group_thread_count(group);
228                         for(int group_thread = 0;
229                                 group_thread < num_group_threads && thread_index < threads.size();
230                                 ++group_thread, ++thread_index)
231                         {
232                                 /* NOTE: Thread group of -1 means we would not force thread affinity. */
233                                 int thread_group;
234                                 if(num_groups == 1) {
235                                         /* Use default affinity if there's only one CPU group in the system. */
236                                         thread_group = -1;
237                                 }
238                                 else if(use_auto_threads &&
239                                         num_process_groups == 1 &&
240                                                 num_threads <= current_group_threads)
241                                 {
242                                         /* If we fit into curent CPU group we also don't force any affinity. */
243                                         thread_group = -1;
244                                 }
245                                 else {
246                                         thread_group = group;
247                                 }
248                                 threads[thread_index] = new thread(function_bind(&TaskScheduler::thread_run,
249                                                                                  thread_index + 1),
250                                                                    thread_group);
251                         }
252                 }
253         }
254         
255         users++;
256 }
257
258 void TaskScheduler::exit()
259 {
260         thread_scoped_lock lock(mutex);
261
262         users--;
263
264         if(users == 0) {
265                 /* stop all waiting threads */
266                 TaskScheduler::queue_mutex.lock();
267                 do_exit = true;
268                 TaskScheduler::queue_cond.notify_all();
269                 TaskScheduler::queue_mutex.unlock();
270
271                 /* delete threads */
272                 foreach(thread *t, threads) {
273                         t->join();
274                         delete t;
275                 }
276
277                 threads.clear();
278         }
279 }
280
281 void TaskScheduler::free_memory()
282 {
283         assert(users == 0);
284         threads.free_memory();
285 }
286
287 bool TaskScheduler::thread_wait_pop(Entry& entry)
288 {
289         thread_scoped_lock queue_lock(queue_mutex);
290
291         while(queue.empty() && !do_exit)
292                 queue_cond.wait(queue_lock);
293
294         if(queue.empty()) {
295                 assert(do_exit);
296                 return false;
297         }
298         
299         entry = queue.front();
300         queue.pop_front();
301
302         return true;
303 }
304
305 void TaskScheduler::thread_run(int thread_id)
306 {
307         Entry entry;
308
309         /* todo: test affinity/denormal mask */
310
311         /* keep popping off tasks */
312         while(thread_wait_pop(entry)) {
313                 /* run task */
314                 entry.task->run(thread_id);
315
316                 /* delete task */
317                 delete entry.task;
318
319                 /* notify pool task was done */
320                 entry.pool->num_decrease(1);
321         }
322 }
323
324 void TaskScheduler::push(Entry& entry, bool front)
325 {
326         entry.pool->num_increase();
327
328         /* add entry to queue */
329         TaskScheduler::queue_mutex.lock();
330         if(front)
331                 TaskScheduler::queue.push_front(entry);
332         else
333                 TaskScheduler::queue.push_back(entry);
334
335         TaskScheduler::queue_cond.notify_one();
336         TaskScheduler::queue_mutex.unlock();
337 }
338
339 void TaskScheduler::clear(TaskPool *pool)
340 {
341         thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
342
343         /* erase all tasks from this pool from the queue */
344         list<Entry>::iterator it = queue.begin();
345         int done = 0;
346
347         while(it != queue.end()) {
348                 Entry& entry = *it;
349
350                 if(entry.pool == pool) {
351                         done++;
352                         delete entry.task;
353
354                         it = queue.erase(it);
355                 }
356                 else
357                         it++;
358         }
359
360         queue_lock.unlock();
361
362         /* notify done */
363         pool->num_decrease(done);
364 }
365
366 /* Dedicated Task Pool */
367
368 DedicatedTaskPool::DedicatedTaskPool()
369 {
370         do_cancel = false;
371         do_exit = false;
372         num = 0;
373
374         worker_thread = new thread(function_bind(&DedicatedTaskPool::thread_run, this));
375 }
376
377 DedicatedTaskPool::~DedicatedTaskPool()
378 {
379         stop();
380         worker_thread->join();
381         delete worker_thread;
382 }
383
384 void DedicatedTaskPool::push(Task *task, bool front)
385 {
386         num_increase();
387
388         /* add task to queue */
389         queue_mutex.lock();
390         if(front)
391                 queue.push_front(task);
392         else
393                 queue.push_back(task);
394
395         queue_cond.notify_one();
396         queue_mutex.unlock();
397 }
398
399 void DedicatedTaskPool::push(const TaskRunFunction& run, bool front)
400 {
401         push(new Task(run), front);
402 }
403
404 void DedicatedTaskPool::wait()
405 {
406         thread_scoped_lock num_lock(num_mutex);
407
408         while(num)
409                 num_cond.wait(num_lock);
410 }
411
412 void DedicatedTaskPool::cancel()
413 {
414         do_cancel = true;
415
416         clear();
417         wait();
418
419         do_cancel = false;
420 }
421
422 void DedicatedTaskPool::stop()
423 {
424         clear();
425
426         do_exit = true;
427         queue_cond.notify_all();
428
429         wait();
430
431         assert(num == 0);
432 }
433
434 bool DedicatedTaskPool::canceled()
435 {
436         return do_cancel;
437 }
438
439 void DedicatedTaskPool::num_decrease(int done)
440 {
441         thread_scoped_lock num_lock(num_mutex);
442         num -= done;
443
444         assert(num >= 0);
445         if(num == 0)
446                 num_cond.notify_all();
447 }
448
449 void DedicatedTaskPool::num_increase()
450 {
451         thread_scoped_lock num_lock(num_mutex);
452         num++;
453         num_cond.notify_all();
454 }
455
456 bool DedicatedTaskPool::thread_wait_pop(Task*& task)
457 {
458         thread_scoped_lock queue_lock(queue_mutex);
459
460         while(queue.empty() && !do_exit)
461                 queue_cond.wait(queue_lock);
462
463         if(queue.empty()) {
464                 assert(do_exit);
465                 return false;
466         }
467
468         task = queue.front();
469         queue.pop_front();
470
471         return true;
472 }
473
474 void DedicatedTaskPool::thread_run()
475 {
476         Task *task;
477
478         /* keep popping off tasks */
479         while(thread_wait_pop(task)) {
480                 /* run task */
481                 task->run(0);
482
483                 /* delete task */
484                 delete task;
485
486                 /* notify task was done */
487                 num_decrease(1);
488         }
489 }
490
491 void DedicatedTaskPool::clear()
492 {
493         thread_scoped_lock queue_lock(queue_mutex);
494
495         /* erase all tasks from the queue */
496         list<Task*>::iterator it = queue.begin();
497         int done = 0;
498
499         while(it != queue.end()) {
500                 done++;
501                 delete *it;
502
503                 it = queue.erase(it);
504         }
505
506         queue_lock.unlock();
507
508         /* notify done */
509         num_decrease(done);
510 }
511
512 string TaskPool::Summary::full_report() const
513 {
514         string report = "";
515         report += string_printf("Total time:    %f\n", time_total);
516         report += string_printf("Tasks handled: %d\n", num_tasks_handled);
517         return report;
518 }
519
520 CCL_NAMESPACE_END
521