2 * Copyright 2011, Blender Foundation.
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; either version 2
7 * of the License, or (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software Foundation,
16 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 #include "util_debug.h"
20 #include "util_foreach.h"
21 #include "util_system.h"
22 #include "util_task.h"
39 void TaskPool::push(Task *task, bool front)
41 TaskScheduler::Entry entry;
46 TaskScheduler::push(entry, front);
49 void TaskPool::push(const TaskRunFunction& run, bool front)
51 push(new Task(run), front);
54 void TaskPool::wait_work()
56 thread_scoped_lock num_lock(num_mutex);
61 thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
63 /* find task from this pool. if we get a task from another pool,
64 * we can get into deadlock */
65 TaskScheduler::Entry work_entry;
66 bool found_entry = false;
67 list<TaskScheduler::Entry>::iterator it;
69 for(it = TaskScheduler::queue.begin(); it != TaskScheduler::queue.end(); it++) {
70 TaskScheduler::Entry& entry = *it;
72 if(entry.pool == this) {
75 TaskScheduler::queue.erase(it);
82 /* if found task, do it, otherwise wait until other tasks are done */
85 work_entry.task->run();
88 delete work_entry.task;
90 /* notify pool task was done */
99 num_cond.wait(num_lock);
103 void TaskPool::cancel()
107 TaskScheduler::clear(this);
110 thread_scoped_lock num_lock(num_mutex);
113 num_cond.wait(num_lock);
119 void TaskPool::stop()
121 TaskScheduler::clear(this);
126 bool TaskPool::cancelled()
131 void TaskPool::num_decrease(int done)
138 num_cond.notify_all();
143 void TaskPool::num_increase()
145 thread_scoped_lock num_lock(num_mutex);
147 num_cond.notify_all();
152 thread_mutex TaskScheduler::mutex;
153 int TaskScheduler::users = 0;
154 vector<thread*> TaskScheduler::threads;
155 vector<int> TaskScheduler::thread_level;
156 volatile bool TaskScheduler::do_exit = false;
158 list<TaskScheduler::Entry> TaskScheduler::queue;
159 thread_mutex TaskScheduler::queue_mutex;
160 thread_condition_variable TaskScheduler::queue_cond;
162 void TaskScheduler::init(int num_threads)
164 thread_scoped_lock lock(mutex);
166 /* multiple cycles instances can use this task scheduler, sharing the same
167 * threads, so we keep track of the number of users. */
171 if(num_threads == 0) {
172 /* automatic number of threads will be main thread + num cores */
173 num_threads = system_cpu_thread_count();
176 /* main thread will also work, for fixed threads we count it too */
180 /* launch threads that will be waiting for work */
181 threads.resize(num_threads);
182 thread_level.resize(num_threads);
184 for(size_t i = 0; i < threads.size(); i++) {
185 threads[i] = new thread(function_bind(&TaskScheduler::thread_run, i));
193 void TaskScheduler::exit()
195 thread_scoped_lock lock(mutex);
200 /* stop all waiting threads */
202 TaskScheduler::queue_cond.notify_all();
205 foreach(thread *t, threads) {
211 thread_level.clear();
215 bool TaskScheduler::thread_wait_pop(Entry& entry)
217 thread_scoped_lock queue_lock(queue_mutex);
219 while(queue.empty() && !do_exit)
220 queue_cond.wait(queue_lock);
227 entry = queue.front();
233 void TaskScheduler::thread_run(int thread_id)
237 /* todo: test affinity/denormal mask */
239 /* keep popping off tasks */
240 while(thread_wait_pop(entry)) {
247 /* notify pool task was done */
248 entry.pool->num_decrease(1);
252 void TaskScheduler::push(Entry& entry, bool front)
254 entry.pool->num_increase();
256 /* add entry to queue */
257 TaskScheduler::queue_mutex.lock();
259 TaskScheduler::queue.push_front(entry);
261 TaskScheduler::queue.push_back(entry);
263 TaskScheduler::queue_cond.notify_one();
264 TaskScheduler::queue_mutex.unlock();
267 void TaskScheduler::clear(TaskPool *pool)
269 thread_scoped_lock queue_lock(TaskScheduler::queue_mutex);
271 /* erase all tasks from this pool from the queue */
272 list<Entry>::iterator it = queue.begin();
275 while(it != queue.end()) {
278 if(entry.pool == pool) {
282 it = queue.erase(it);
291 pool->num_decrease(done);