Cycles: threading optimizations
[blender.git] / intern / cycles / util / util_task.cpp
1 /*
2  * Copyright 2011, Blender Foundation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  */
18
19 #include "util_debug.h"
20 #include "util_foreach.h"
21 #include "util_system.h"
22 #include "util_task.h"
23
24 CCL_NAMESPACE_BEGIN
25
26 /* Task Pool */
27
28 TaskPool::TaskPool()
29 {
30         num = 0;
31         num_done = 0;
32
33         do_cancel = false;
34 }
35
36 TaskPool::~TaskPool()
37 {
38         stop();
39 }
40
41 void TaskPool::push(Task *task, bool front)
42 {
43         TaskScheduler::Entry entry;
44
45         entry.task = task;
46         entry.pool = this;
47
48         TaskScheduler::push(entry, front);
49 }
50
51 void TaskPool::push(const TaskRunFunction& run, bool front)
52 {
53         push(new Task(run), front);
54 }
55
56 void TaskPool::wait_work()
57 {
58         thread_scoped_lock done_lock(done_mutex);
59
60         while(num_done != num) {
61                 thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
62
63                 /* find task from this pool. if we get a task from another pool,
64                  * we can get into deadlock */
65                 TaskScheduler::Entry work_entry;
66                 bool found_entry = false;
67                 list<TaskScheduler::Entry>::iterator it;
68
69                 for(it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
70                         TaskScheduler::Entry& entry = *it;
71
72                         if(entry.pool == this) {
73                                 work_entry = entry;
74                                 found_entry = true;
75                                 TaskScheduler::queue.erase(it);
76                                 break;
77                         }
78                 }
79
80                 queue_lock.unlock();
81
82                 /* if found task, do it, otherwise wait until other tasks are done */
83                 if(found_entry) {
84                         done_lock.unlock();
85
86                         /* run task */
87                         work_entry.task->run();
88
89                         /* delete task */
90                         delete work_entry.task;
91
92                         /* notify pool task was done */
93                         done_increase(1);
94
95                         done_lock.lock();
96                 }
97                 else
98                         done_cond.wait(done_lock);
99         }
100 }
101
102 void TaskPool::cancel()
103 {
104         TaskScheduler::clear(this);
105
106         do_cancel = true;
107         {
108                 thread_scoped_lock lock(done_mutex);
109
110                 while(num_done != num)
111                         done_cond.wait(lock);
112         }
113         do_cancel = false;
114 }
115
116 void TaskPool::stop()
117 {
118         TaskScheduler::clear(this);
119
120         assert(num_done == num);
121 }
122
123 bool TaskPool::cancelled()
124 {
125         return do_cancel;
126 }
127
128 void TaskPool::done_increase(int done)
129 {
130         done_mutex.lock();
131         num_done += done;
132         done_mutex.unlock();
133
134         assert(num_done <= num);
135         done_cond.notify_all();
136 }
137
138 /* Task Scheduler */
139
140 thread_mutex TaskScheduler::mutex;
141 int TaskScheduler::users = 0;
142 vector<thread*> TaskScheduler::threads;
143 vector<int> TaskScheduler::thread_level;
144 volatile bool TaskScheduler::do_exit = false;
145
146 list<TaskScheduler::Entry> TaskScheduler::queue;
147 thread_mutex TaskScheduler::queue_mutex;
148 thread_condition_variable TaskScheduler::queue_cond;
149
150 void TaskScheduler::init(int num_threads)
151 {
152         thread_scoped_lock lock(mutex);
153
154         /* multiple cycles instances can use this task scheduler, sharing the same
155            threads, so we keep track of the number of users. */
156         if(users == 0) {
157                 do_exit = false;
158
159                 /* launch threads that will be waiting for work */
160                 if(num_threads == 0)
161                         num_threads = system_cpu_thread_count();
162
163                 threads.resize(num_threads);
164                 thread_level.resize(num_threads);
165
166                 for(size_t i = 0; i < threads.size(); i++) {
167                         threads[i] = new thread(function_bind(&TaskScheduler::thread_run, i));
168                         thread_level[i] = 0;
169                 }
170         }
171         
172         users++;
173 }
174
175 void TaskScheduler::exit()
176 {
177         thread_scoped_lock lock(mutex);
178
179         users--;
180
181         if(users == 0) {
182                 /* stop all waiting threads */
183                 do_exit = true;
184                 TaskScheduler::queue_cond.notify_all();
185
186                 /* delete threads */
187                 foreach(thread *t, threads) {
188                         t->join();
189                         delete t;
190                 }
191
192                 threads.clear();
193                 thread_level.clear();
194         }
195 }
196
197 bool TaskScheduler::thread_wait_pop(Entry& entry)
198 {
199         thread_scoped_lock lock(queue_mutex);
200
201         while(queue.empty() && !do_exit)
202                 queue_cond.wait(lock);
203
204         if(queue.empty()) {
205                 assert(do_exit);
206                 return false;
207         }
208         
209         entry = queue.front();
210         queue.pop_front();
211
212         return true;
213 }
214
215 void TaskScheduler::thread_run(int thread_id)
216 {
217         Entry entry;
218
219         /* todo: test affinity/denormal mask */
220
221         /* keep popping off tasks */
222         while(thread_wait_pop(entry)) {
223                 /* run task */
224                 entry.task->run();
225
226                 /* delete task */
227                 delete entry.task;
228
229                 /* notify pool task was done */
230                 entry.pool->done_increase(1);
231         }
232 }
233
234 void TaskScheduler::push(Entry& entry, bool front)
235 {
236         /* add entry to queue */
237         TaskScheduler::queue_mutex.lock();
238         if(front)
239                 TaskScheduler::queue.push_front(entry);
240         else
241                 TaskScheduler::queue.push_back(entry);
242         entry.pool->num++;
243         TaskScheduler::queue_mutex.unlock();
244
245         TaskScheduler::queue_cond.notify_one();
246 }
247
248 void TaskScheduler::clear(TaskPool *pool)
249 {
250         thread_scoped_lock lock(queue_mutex);
251
252         /* erase all tasks from this pool from the queue */
253         list<Entry>::iterator it = queue.begin();
254         int done = 0;
255
256         while(it != queue.end()) {
257                 Entry& entry = *it;
258
259                 if(entry.pool == pool) {
260                         done++;
261                         delete entry.task;
262
263                         it = queue.erase(it);
264                 }
265                 else
266                         it++;
267         }
268
269         /* notify done */
270         pool->done_increase(done);
271 }
272
273 CCL_NAMESPACE_END
274