2b209c135f447802b6be1699c101296e8e30ee9a
[blender.git] / intern / cycles / util / util_task.cpp
1 /*
2  * Copyright 2011, Blender Foundation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  */
18
19 #include "util_debug.h"
20 #include "util_foreach.h"
21 #include "util_system.h"
22 #include "util_task.h"
23
24 CCL_NAMESPACE_BEGIN
25
26 /* Task Pool */
27
28 TaskPool::TaskPool()
29 {
30         num = 0;
31         do_cancel = false;
32 }
33
34 TaskPool::~TaskPool()
35 {
36         stop();
37 }
38
39 void TaskPool::push(Task *task, bool front)
40 {
41         thread_scoped_lock num_lock(num_mutex);
42
43         TaskScheduler::Entry entry;
44
45         entry.task = task;
46         entry.pool = this;
47
48         TaskScheduler::push(entry, front);
49 }
50
51 void TaskPool::push(const TaskRunFunction& run, bool front)
52 {
53         push(new Task(run), front);
54 }
55
56 void TaskPool::wait_work()
57 {
58         thread_scoped_lock num_lock(num_mutex);
59
60         while(num != 0) {
61                 num_lock.unlock();
62
63                 thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
64
65                 /* find task from this pool. if we get a task from another pool,
66                  * we can get into deadlock */
67                 TaskScheduler::Entry work_entry;
68                 bool found_entry = false;
69                 list<TaskScheduler::Entry>::iterator it;
70
71                 for(it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
72                         TaskScheduler::Entry& entry = *it;
73
74                         if(entry.pool == this) {
75                                 work_entry = entry;
76                                 found_entry = true;
77                                 TaskScheduler::queue.erase(it);
78                                 break;
79                         }
80                 }
81
82                 queue_lock.unlock();
83
84                 /* if found task, do it, otherwise wait until other tasks are done */
85                 if(found_entry) {
86                         /* run task */
87                         work_entry.task->run();
88
89                         /* delete task */
90                         delete work_entry.task;
91
92                         /* notify pool task was done */
93                         num_decrease(1);
94                 }
95
96                 num_lock.lock();
97                 if(num == 0)
98                         break;
99
100                 if(!found_entry)
101                         num_cond.wait(num_lock);
102         }
103 }
104
105 void TaskPool::cancel()
106 {
107         thread_scoped_lock num_lock(num_mutex);
108
109         do_cancel = true;
110
111         TaskScheduler::clear(this);
112 }
113
114 void TaskPool::stop()
115 {
116         thread_scoped_lock num_lock(num_mutex);
117         
118         TaskScheduler::clear(this);
119
120         assert(num == 0);
121 }
122
123 bool TaskPool::cancelled()
124 {
125         return do_cancel;
126 }
127
128 void TaskPool::num_decrease(int done)
129 {
130         num -= done;
131         assert(num >= 0);
132         
133         if(num == 0) {
134                 do_cancel = false;
135                 
136                 num_cond.notify_all();
137         }
138 }
139
140 void TaskPool::num_increase()
141 {
142         num++;
143         
144         num_cond.notify_all();
145 }
146
147 /* Task Scheduler */
148
149 thread_mutex TaskScheduler::mutex;
150 int TaskScheduler::users = 0;
151 vector<thread*> TaskScheduler::threads;
152 vector<int> TaskScheduler::thread_level;
153 volatile bool TaskScheduler::do_exit = false;
154
155 list<TaskScheduler::Entry> TaskScheduler::queue;
156 thread_mutex TaskScheduler::queue_mutex;
157 thread_condition_variable TaskScheduler::queue_cond;
158
159 void TaskScheduler::init(int num_threads)
160 {
161         thread_scoped_lock lock(mutex);
162
163         /* multiple cycles instances can use this task scheduler, sharing the same
164          * threads, so we keep track of the number of users. */
165         if(users == 0) {
166                 do_exit = false;
167
168                 /* launch threads that will be waiting for work */
169                 if(num_threads == 0)
170                         num_threads = system_cpu_thread_count();
171
172                 threads.resize(num_threads);
173                 thread_level.resize(num_threads);
174
175                 for(size_t i = 0; i < threads.size(); i++) {
176                         threads[i] = new thread(function_bind(&TaskScheduler::thread_run, i));
177                         thread_level[i] = 0;
178                 }
179         }
180         
181         users++;
182 }
183
184 void TaskScheduler::exit()
185 {
186         thread_scoped_lock lock(mutex);
187
188         users--;
189
190         if(users == 0) {
191                 /* stop all waiting threads */
192                 do_exit = true;
193                 TaskScheduler::queue_cond.notify_all();
194
195                 /* delete threads */
196                 foreach(thread *t, threads) {
197                         t->join();
198                         delete t;
199                 }
200
201                 threads.clear();
202                 thread_level.clear();
203         }
204 }
205
206 bool TaskScheduler::thread_wait_pop(Entry& entry)
207 {
208         thread_scoped_lock queue_lock(queue_mutex);
209
210         while(queue.empty() && !do_exit)
211                 queue_cond.wait(queue_lock);
212
213         if(queue.empty()) {
214                 assert(do_exit);
215                 return false;
216         }
217         
218         entry = queue.front();
219         queue.pop_front();
220
221         return true;
222 }
223
224 void TaskScheduler::thread_run(int thread_id)
225 {
226         Entry entry;
227
228         /* todo: test affinity/denormal mask */
229
230         /* keep popping off tasks */
231         while(thread_wait_pop(entry)) {
232                 /* run task */
233                 entry.task->run();
234
235                 /* delete task */
236                 delete entry.task;
237
238                 /* notify pool task was done */
239                 {
240                         /* not called from TaskPool, have to explicitly lock the mutex here */
241                         thread_scoped_lock num_lock(entry.pool->num_mutex);
242                         entry.pool->num_decrease(1);
243                 }
244         }
245 }
246
247 void TaskScheduler::push(Entry& entry, bool front)
248 {
249         entry.pool->num_increase();
250
251         /* add entry to queue */
252         TaskScheduler::queue_mutex.lock();
253         if(front)
254                 TaskScheduler::queue.push_front(entry);
255         else
256                 TaskScheduler::queue.push_back(entry);
257
258         TaskScheduler::queue_cond.notify_one();
259         TaskScheduler::queue_mutex.unlock();
260 }
261
262 void TaskScheduler::clear(TaskPool *pool)
263 {
264         thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
265
266         /* erase all tasks from this pool from the queue */
267         list<Entry>::iterator it = queue.begin();
268         int done = 0;
269
270         while(it != queue.end()) {
271                 Entry& entry = *it;
272
273                 if(entry.pool == pool) {
274                         done++;
275                         delete entry.task;
276
277                         it = queue.erase(it);
278                 }
279                 else
280                         it++;
281         }
282
283         queue_lock.unlock();
284
285         /* notify done */
286         pool->num_decrease(done);
287 }
288
289 CCL_NAMESPACE_END
290