add BLI_strcpy_rlen, replace strcat, which was used in misleading way.
[blender.git] / intern / cycles / util / util_task.cpp
1 /*
2  * Copyright 2011, Blender Foundation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  */
18
19 #include "util_debug.h"
20 #include "util_foreach.h"
21 #include "util_system.h"
22 #include "util_task.h"
23
24 //#define THREADING_DEBUG_ENABLED
25
26 #ifdef THREADING_DEBUG_ENABLED
27 #include <stdio.h>
28 #define THREADING_DEBUG(...) do { printf(__VA_ARGS__); fflush(stdout); } while(0)
29 #else
30 #define THREADING_DEBUG(...)
31 #endif
32
33 CCL_NAMESPACE_BEGIN
34
35 /* Task Pool */
36
37 TaskPool::TaskPool()
38 {
39         num = 0;
40         do_cancel = false;
41 }
42
43 TaskPool::~TaskPool()
44 {
45         stop();
46 }
47
48 void TaskPool::push(Task *task, bool front)
49 {
50         TaskScheduler::Entry entry;
51
52         entry.task = task;
53         entry.pool = this;
54
55         TaskScheduler::push(entry, front);
56 }
57
58 void TaskPool::push(const TaskRunFunction& run, bool front)
59 {
60         push(new Task(run), front);
61 }
62
63 void TaskPool::wait_work()
64 {
65         thread_scoped_lock num_lock(num_mutex);
66
67         while(num != 0) {
68                 num_lock.unlock();
69
70                 thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
71
72                 /* find task from this pool. if we get a task from another pool,
73                  * we can get into deadlock */
74                 TaskScheduler::Entry work_entry;
75                 bool found_entry = false;
76                 list<TaskScheduler::Entry>::iterator it;
77
78                 for(it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
79                         TaskScheduler::Entry& entry = *it;
80
81                         if(entry.pool == this) {
82                                 work_entry = entry;
83                                 found_entry = true;
84                                 TaskScheduler::queue.erase(it);
85                                 break;
86                         }
87                 }
88
89                 queue_lock.unlock();
90
91                 /* if found task, do it, otherwise wait until other tasks are done */
92                 if(found_entry) {
93                         /* run task */
94                         work_entry.task->run();
95
96                         /* delete task */
97                         delete work_entry.task;
98
99                         /* notify pool task was done */
100                         num_decrease(1);
101                 }
102
103                 num_lock.lock();
104                 if(num == 0)
105                         break;
106
107                 if(!found_entry) {
108                         THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::wait_work !found_entry\n", num);
109                         num_cond.wait(num_lock);
110                         THREADING_DEBUG("num==%d, condition wait done in TaskPool::wait_work !found_entry\n", num);
111                 }
112         }
113 }
114
115 void TaskPool::cancel()
116 {
117         do_cancel = true;
118
119         TaskScheduler::clear(this);
120         
121         {
122                 thread_scoped_lock num_lock(num_mutex);
123
124                 while(num) {
125                         THREADING_DEBUG("num==%d, Waiting for condition in TaskPool::cancel\n", num);
126                         num_cond.wait(num_lock);
127                         THREADING_DEBUG("num==%d condition wait done in TaskPool::cancel\n", num);
128                 }
129         }
130
131         do_cancel = false;
132 }
133
134 void TaskPool::stop()
135 {
136         TaskScheduler::clear(this);
137
138         assert(num == 0);
139 }
140
141 bool TaskPool::cancelled()
142 {
143         return do_cancel;
144 }
145
146 void TaskPool::num_decrease(int done)
147 {
148         num_mutex.lock();
149         num -= done;
150
151         assert(num >= 0);
152         if(num == 0) {
153                 THREADING_DEBUG("num==%d, notifying all in TaskPool::num_decrease\n", num);
154                 num_cond.notify_all();
155         }
156
157         num_mutex.unlock();
158 }
159
160 void TaskPool::num_increase()
161 {
162         thread_scoped_lock num_lock(num_mutex);
163         num++;
164         THREADING_DEBUG("num==%d, notifying all in TaskPool::num_increase\n", num);
165         num_cond.notify_all();
166 }
167
168 /* Task Scheduler */
169
170 thread_mutex TaskScheduler::mutex;
171 int TaskScheduler::users = 0;
172 vector<thread*> TaskScheduler::threads;
173 volatile bool TaskScheduler::do_exit = false;
174
175 list<TaskScheduler::Entry> TaskScheduler::queue;
176 thread_mutex TaskScheduler::queue_mutex;
177 thread_condition_variable TaskScheduler::queue_cond;
178
179 void TaskScheduler::init(int num_threads)
180 {
181         thread_scoped_lock lock(mutex);
182
183         /* multiple cycles instances can use this task scheduler, sharing the same
184          * threads, so we keep track of the number of users. */
185         if(users == 0) {
186                 do_exit = false;
187
188                 if(num_threads == 0) {
189                         /* automatic number of threads will be main thread + num cores */
190                         num_threads = system_cpu_thread_count();
191                 }
192                 else {
193                         /* main thread will also work, for fixed threads we count it too */
194                         num_threads -= 1;
195                 }
196
197                 /* launch threads that will be waiting for work */
198                 threads.resize(num_threads);
199
200                 for(size_t i = 0; i < threads.size(); i++)
201                         threads[i] = new thread(function_bind(&TaskScheduler::thread_run, i));
202         }
203         
204         users++;
205 }
206
207 void TaskScheduler::exit()
208 {
209         thread_scoped_lock lock(mutex);
210
211         users--;
212
213         if(users == 0) {
214                 /* stop all waiting threads */
215                 do_exit = true;
216                 TaskScheduler::queue_cond.notify_all();
217
218                 /* delete threads */
219                 foreach(thread *t, threads) {
220                         t->join();
221                         delete t;
222                 }
223
224                 threads.clear();
225         }
226 }
227
228 bool TaskScheduler::thread_wait_pop(Entry& entry)
229 {
230         thread_scoped_lock queue_lock(queue_mutex);
231
232         while(queue.empty() && !do_exit)
233                 queue_cond.wait(queue_lock);
234
235         if(queue.empty()) {
236                 assert(do_exit);
237                 return false;
238         }
239         
240         entry = queue.front();
241         queue.pop_front();
242
243         return true;
244 }
245
246 void TaskScheduler::thread_run(int thread_id)
247 {
248         Entry entry;
249
250         /* todo: test affinity/denormal mask */
251
252         /* keep popping off tasks */
253         while(thread_wait_pop(entry)) {
254                 /* run task */
255                 entry.task->run();
256
257                 /* delete task */
258                 delete entry.task;
259
260                 /* notify pool task was done */
261                 entry.pool->num_decrease(1);
262         }
263 }
264
265 void TaskScheduler::push(Entry& entry, bool front)
266 {
267         entry.pool->num_increase();
268
269         /* add entry to queue */
270         TaskScheduler::queue_mutex.lock();
271         if(front)
272                 TaskScheduler::queue.push_front(entry);
273         else
274                 TaskScheduler::queue.push_back(entry);
275
276         TaskScheduler::queue_cond.notify_one();
277         TaskScheduler::queue_mutex.unlock();
278 }
279
280 void TaskScheduler::clear(TaskPool *pool)
281 {
282         thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
283
284         /* erase all tasks from this pool from the queue */
285         list<Entry>::iterator it = queue.begin();
286         int done = 0;
287
288         while(it != queue.end()) {
289                 Entry& entry = *it;
290
291                 if(entry.pool == pool) {
292                         done++;
293                         delete entry.task;
294
295                         it = queue.erase(it);
296                 }
297                 else
298                         it++;
299         }
300
301         queue_lock.unlock();
302
303         /* notify done */
304         pool->num_decrease(done);
305 }
306
307 CCL_NAMESPACE_END
308