Fix #33107: cycles fixed threads 1 was still having two cores do work,
[blender.git] / intern / cycles / util / util_task.cpp
1 /*
2  * Copyright 2011, Blender Foundation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  */
18
19 #include "util_debug.h"
20 #include "util_foreach.h"
21 #include "util_system.h"
22 #include "util_task.h"
23
24 CCL_NAMESPACE_BEGIN
25
26 /* Task Pool */
27
28 TaskPool::TaskPool()
29 {
30         num = 0;
31         do_cancel = false;
32 }
33
34 TaskPool::~TaskPool()
35 {
36         stop();
37 }
38
39 void TaskPool::push(Task *task, bool front)
40 {
41         TaskScheduler::Entry entry;
42
43         entry.task = task;
44         entry.pool = this;
45
46         TaskScheduler::push(entry, front);
47 }
48
49 void TaskPool::push(const TaskRunFunction& run, bool front)
50 {
51         push(new Task(run), front);
52 }
53
54 void TaskPool::wait_work()
55 {
56         thread_scoped_lock num_lock(num_mutex);
57
58         while(num != 0) {
59                 num_lock.unlock();
60
61                 thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
62
63                 /* find task from this pool. if we get a task from another pool,
64                  * we can get into deadlock */
65                 TaskScheduler::Entry work_entry;
66                 bool found_entry = false;
67                 list<TaskScheduler::Entry>::iterator it;
68
69                 for(it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
70                         TaskScheduler::Entry& entry = *it;
71
72                         if(entry.pool == this) {
73                                 work_entry = entry;
74                                 found_entry = true;
75                                 TaskScheduler::queue.erase(it);
76                                 break;
77                         }
78                 }
79
80                 queue_lock.unlock();
81
82                 /* if found task, do it, otherwise wait until other tasks are done */
83                 if(found_entry) {
84                         /* run task */
85                         work_entry.task->run();
86
87                         /* delete task */
88                         delete work_entry.task;
89
90                         /* notify pool task was done */
91                         num_decrease(1);
92                 }
93
94                 num_lock.lock();
95                 if(num == 0)
96                         break;
97
98                 if(!found_entry)
99                         num_cond.wait(num_lock);
100         }
101 }
102
103 void TaskPool::cancel()
104 {
105         do_cancel = true;
106
107         TaskScheduler::clear(this);
108         
109         {
110                 thread_scoped_lock num_lock(num_mutex);
111
112                 while(num)
113                         num_cond.wait(num_lock);
114         }
115
116         do_cancel = false;
117 }
118
119 void TaskPool::stop()
120 {
121         TaskScheduler::clear(this);
122
123         assert(num == 0);
124 }
125
126 bool TaskPool::cancelled()
127 {
128         return do_cancel;
129 }
130
131 void TaskPool::num_decrease(int done)
132 {
133         num_mutex.lock();
134         num -= done;
135
136         assert(num >= 0);
137         if(num == 0)
138                 num_cond.notify_all();
139
140         num_mutex.unlock();
141 }
142
143 void TaskPool::num_increase()
144 {
145         thread_scoped_lock num_lock(num_mutex);
146         num++;
147         num_cond.notify_all();
148 }
149
150 /* Task Scheduler */
151
152 thread_mutex TaskScheduler::mutex;
153 int TaskScheduler::users = 0;
154 vector<thread*> TaskScheduler::threads;
155 vector<int> TaskScheduler::thread_level;
156 volatile bool TaskScheduler::do_exit = false;
157
158 list<TaskScheduler::Entry> TaskScheduler::queue;
159 thread_mutex TaskScheduler::queue_mutex;
160 thread_condition_variable TaskScheduler::queue_cond;
161
162 void TaskScheduler::init(int num_threads)
163 {
164         thread_scoped_lock lock(mutex);
165
166         /* multiple cycles instances can use this task scheduler, sharing the same
167          * threads, so we keep track of the number of users. */
168         if(users == 0) {
169                 do_exit = false;
170
171                 if(num_threads == 0) {
172                         /* automatic number of threads will be main thread + num cores */
173                         num_threads = system_cpu_thread_count();
174                 }
175                 else {
176                         /* main thread will also work, for fixed threads we count it too */
177                         num_threads -= 1;
178                 }
179
180                 /* launch threads that will be waiting for work */
181                 threads.resize(num_threads);
182                 thread_level.resize(num_threads);
183
184                 for(size_t i = 0; i < threads.size(); i++) {
185                         threads[i] = new thread(function_bind(&TaskScheduler::thread_run, i));
186                         thread_level[i] = 0;
187                 }
188         }
189         
190         users++;
191 }
192
193 void TaskScheduler::exit()
194 {
195         thread_scoped_lock lock(mutex);
196
197         users--;
198
199         if(users == 0) {
200                 /* stop all waiting threads */
201                 do_exit = true;
202                 TaskScheduler::queue_cond.notify_all();
203
204                 /* delete threads */
205                 foreach(thread *t, threads) {
206                         t->join();
207                         delete t;
208                 }
209
210                 threads.clear();
211                 thread_level.clear();
212         }
213 }
214
215 bool TaskScheduler::thread_wait_pop(Entry& entry)
216 {
217         thread_scoped_lock queue_lock(queue_mutex);
218
219         while(queue.empty() && !do_exit)
220                 queue_cond.wait(queue_lock);
221
222         if(queue.empty()) {
223                 assert(do_exit);
224                 return false;
225         }
226         
227         entry = queue.front();
228         queue.pop_front();
229
230         return true;
231 }
232
233 void TaskScheduler::thread_run(int thread_id)
234 {
235         Entry entry;
236
237         /* todo: test affinity/denormal mask */
238
239         /* keep popping off tasks */
240         while(thread_wait_pop(entry)) {
241                 /* run task */
242                 entry.task->run();
243
244                 /* delete task */
245                 delete entry.task;
246
247                 /* notify pool task was done */
248                 entry.pool->num_decrease(1);
249         }
250 }
251
252 void TaskScheduler::push(Entry& entry, bool front)
253 {
254         entry.pool->num_increase();
255
256         /* add entry to queue */
257         TaskScheduler::queue_mutex.lock();
258         if(front)
259                 TaskScheduler::queue.push_front(entry);
260         else
261                 TaskScheduler::queue.push_back(entry);
262
263         TaskScheduler::queue_cond.notify_one();
264         TaskScheduler::queue_mutex.unlock();
265 }
266
267 void TaskScheduler::clear(TaskPool *pool)
268 {
269         thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
270
271         /* erase all tasks from this pool from the queue */
272         list<Entry>::iterator it = queue.begin();
273         int done = 0;
274
275         while(it != queue.end()) {
276                 Entry& entry = *it;
277
278                 if(entry.pool == pool) {
279                         done++;
280                         delete entry.task;
281
282                         it = queue.erase(it);
283                 }
284                 else
285                         it++;
286         }
287
288         queue_lock.unlock();
289
290         /* notify done */
291         pool->num_decrease(done);
292 }
293
294 CCL_NAMESPACE_END
295