Merged revision(s) 58452-58584 from trunk/blender into soc-2013-dingto.
[blender-staging.git] / intern / cycles / util / util_task.cpp
1 /*
2  * Copyright 2011, Blender Foundation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  */
18
19 #include "util_debug.h"
20 #include "util_foreach.h"
21 #include "util_system.h"
22 #include "util_task.h"
23
24 //#define THREADING_DEBUG_ENABLED
25
26 #ifdef THREADING_DEBUG_ENABLED
27 #include <stdio.h>
28 #define THREADING_DEBUG(...) do { printf(__VA_ARGS__); fflush(stdout); } while(0)
29 #else
30 #define THREADING_DEBUG(...)
31 #endif
32
33 CCL_NAMESPACE_BEGIN
34
35 /* Task Pool */
36
37 TaskPool::TaskPool()
38 {
39         num = 0;
40         do_cancel = false;
41 }
42
43 TaskPool::~TaskPool()
44 {
45         stop();
46 }
47
48 void TaskPool::push(Task *task, bool front)
49 {
50         TaskScheduler::Entry entry;
51
52         entry.task = task;
53         entry.pool = this;
54
55         TaskScheduler::push(entry, front);
56 }
57
58 void TaskPool::push(const TaskRunFunction& run, bool front)
59 {
60         push(new Task(run), front);
61 }
62
63 void TaskPool::wait_work()
64 {
65         thread_scoped_lock num_lock(num_mutex);
66
67         while(num != 0) {
68                 num_lock.unlock();
69
70                 thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
71
72                 /* find task from this pool. if we get a task from another pool,
73                  * we can get into deadlock */
74                 TaskScheduler::Entry work_entry;
75                 bool found_entry = false;
76                 list<TaskScheduler::Entry>::iterator it;
77
78                 for(it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
79                         TaskScheduler::Entry& entry = *it;
80
81                         if(entry.pool == this) {
82                                 work_entry = entry;
83                                 found_entry = true;
84                                 TaskScheduler::queue.erase(it);
85                                 break;
86                         }
87                 }
88
89                 queue_lock.unlock();
90
91                 /* if found task, do it, otherwise wait until other tasks are done */
92                 if(found_entry) {
93                         /* run task */
94                         work_entry.task->run();
95
96                         /* delete task */
97                         delete work_entry.task;
98
99                         /* notify pool task was done */
100                         num_decrease(1);
101                 }
102
103                 num_lock.lock();
104                 if(num == 0)
105                         break;
106
107                 if(!found_entry) {
108                         THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
109                         num_cond.wait(num_lock);
110                         THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
111                 }
112         }
113 }
114
115 void TaskPool::cancel()
116 {
117         do_cancel = true;
118
119         TaskScheduler::clear(this);
120         
121         {
122                 thread_scoped_lock num_lock(num_mutex);
123
124                 while(num) {
125                         THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
126                         num_cond.wait(num_lock);
127                         THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
128                 }
129         }
130
131         do_cancel = false;
132 }
133
134 void TaskPool::stop()
135 {
136         TaskScheduler::clear(this);
137
138         assert(num == 0);
139 }
140
141 bool TaskPool::cancelled()
142 {
143         return do_cancel;
144 }
145
146 void TaskPool::num_decrease(int done)
147 {
148         num_mutex.lock();
149         num -= done;
150
151         assert(num >= 0);
152         if(num == 0) {
153                 THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
154                 num_cond.notify_all();
155         }
156
157         num_mutex.unlock();
158 }
159
160 void TaskPool::num_increase()
161 {
162         thread_scoped_lock num_lock(num_mutex);
163         num++;
164         THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
165         num_cond.notify_all();
166 }
167
168 /* Task Scheduler */
169
170 thread_mutex TaskScheduler::mutex;
171 int TaskScheduler::users = 0;
172 vector<thread*> TaskScheduler::threads;
173 volatile bool TaskScheduler::do_exit = false;
174
175 list<TaskScheduler::Entry> TaskScheduler::queue;
176 thread_mutex TaskScheduler::queue_mutex;
177 thread_condition_variable TaskScheduler::queue_cond;
178
179 void TaskScheduler::init(int num_threads)
180 {
181         thread_scoped_lock lock(mutex);
182
183         /* multiple cycles instances can use this task scheduler, sharing the same
184          * threads, so we keep track of the number of users. */
185         if(users == 0) {
186                 do_exit = false;
187
188                 if(num_threads == 0) {
189                         /* automatic number of threads */
190                         num_threads = system_cpu_thread_count();
191                 }
192
193                 /* launch threads that will be waiting for work */
194                 threads.resize(num_threads);
195
196                 for(size_t i = 0; i < threads.size(); i++)
197                         threads[i] = new thread(function_bind(&TaskScheduler::thread_run, i));
198         }
199         
200         users++;
201 }
202
203 void TaskScheduler::exit()
204 {
205         thread_scoped_lock lock(mutex);
206
207         users--;
208
209         if(users == 0) {
210                 /* stop all waiting threads */
211                 do_exit = true;
212                 TaskScheduler::queue_cond.notify_all();
213
214                 /* delete threads */
215                 foreach(thread *t, threads) {
216                         t->join();
217                         delete t;
218                 }
219
220                 threads.clear();
221         }
222 }
223
224 bool TaskScheduler::thread_wait_pop(Entry& entry)
225 {
226         thread_scoped_lock queue_lock(queue_mutex);
227
228         while(queue.empty() && !do_exit)
229                 queue_cond.wait(queue_lock);
230
231         if(queue.empty()) {
232                 assert(do_exit);
233                 return false;
234         }
235         
236         entry = queue.front();
237         queue.pop_front();
238
239         return true;
240 }
241
242 void TaskScheduler::thread_run(int thread_id)
243 {
244         Entry entry;
245
246         /* todo: test affinity/denormal mask */
247
248         /* keep popping off tasks */
249         while(thread_wait_pop(entry)) {
250                 /* run task */
251                 entry.task->run();
252
253                 /* delete task */
254                 delete entry.task;
255
256                 /* notify pool task was done */
257                 entry.pool->num_decrease(1);
258         }
259 }
260
261 void TaskScheduler::push(Entry& entry, bool front)
262 {
263         entry.pool->num_increase();
264
265         /* add entry to queue */
266         TaskScheduler::queue_mutex.lock();
267         if(front)
268                 TaskScheduler::queue.push_front(entry);
269         else
270                 TaskScheduler::queue.push_back(entry);
271
272         TaskScheduler::queue_cond.notify_one();
273         TaskScheduler::queue_mutex.unlock();
274 }
275
276 void TaskScheduler::clear(TaskPool *pool)
277 {
278         thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
279
280         /* erase all tasks from this pool from the queue */
281         list<Entry>::iterator it = queue.begin();
282         int done = 0;
283
284         while(it != queue.end()) {
285                 Entry& entry = *it;
286
287                 if(entry.pool == pool) {
288                         done++;
289                         delete entry.task;
290
291                         it = queue.erase(it);
292                 }
293                 else
294                         it++;
295         }
296
297         queue_lock.unlock();
298
299         /* notify done */
300         pool->num_decrease(done);
301 }
302
303 CCL_NAMESPACE_END
304