Fix T60227: Crash when Cycles uses more than system threads
authorSergey Sharybin <sergey.vfx@gmail.com>
Fri, 11 Jan 2019 14:01:54 +0000 (15:01 +0100)
committerSergey Sharybin <sergey.vfx@gmail.com>
Fri, 11 Jan 2019 14:03:48 +0000 (15:03 +0100)
Tweaked scheduling so it survives this situation by scattering
"extra" threads uniformly over all the NUMA nodes.

There are still tweaks possible to make some specific hardware
configurations work better.

intern/cycles/util/util_task.cpp
intern/cycles/util/util_thread.cpp
intern/cycles/util/util_thread.h

index 50a2bb1..7e9f731 100644 (file)
@@ -185,59 +185,149 @@ list<TaskScheduler::Entry> TaskScheduler::queue;
 thread_mutex TaskScheduler::queue_mutex;
 thread_condition_variable TaskScheduler::queue_cond;
 
-void TaskScheduler::init(int num_threads)
+namespace {
+
+/* Get number of processors on each of the available nodes. The result is sized
+ * by the highest node index, and element corresponds to number of processors on
+ * that node.
+ * If node is not available, then the corresponding number of processors is
+ * zero. */
+void get_per_node_num_processors(vector<int>* num_per_node_processors)
 {
-       thread_scoped_lock lock(mutex);
-
-       /* multiple cycles instances can use this task scheduler, sharing the same
-        * threads, so we keep track of the number of users. */
-       if(users == 0) {
-               do_exit = false;
-
-               const bool use_auto_threads = (num_threads == 0);
-               if(use_auto_threads) {
-                       /* automatic number of threads */
-                       num_threads = system_cpu_thread_count();
+       const int num_nodes = system_cpu_num_numa_nodes();
+       if(num_nodes == 0) {
+               LOG(ERROR) << "Zero available NUMA nodes, is not supposed to happen.";
+               return;
+       }
+       num_per_node_processors->resize(num_nodes);
+       for(int node = 0; node < num_nodes; ++node) {
+               if(!system_cpu_is_numa_node_available(node)) {
+                       (*num_per_node_processors)[node] = 0;
+                       continue;
                }
-               VLOG(1) << "Creating pool of " << num_threads << " threads.";
+               (*num_per_node_processors)[node] =
+                       system_cpu_num_numa_node_processors(node);
+       }
+}
 
-               /* launch threads that will be waiting for work */
-               threads.resize(num_threads);
+/* Calculate total number of processors on all available nodes.
+ * This is similar to system_cpu_thread_count(), but uses pre-calculated number
+ * of processors on each of the node, avoiding extra system calls and checks for
+ * the node availability. */
+int get_num_total_processors(const vector<int>& num_per_node_processors)
+{
+       int num_total_processors = 0;
+       foreach(int num_node_processors, num_per_node_processors) {
+               num_total_processors += num_node_processors;
+       }
+       return num_total_processors;
+}
 
-               const int num_nodes = system_cpu_num_numa_nodes();
-               int thread_index = 0;
-               for (int node = 0;
-                    node < num_nodes && thread_index < threads.size();
-                    ++node)
+/* Assign every thread a node on which is should be running, for the best
+ * performance. */
+void distribute_threads_on_nodes(const vector<thread*>& threads)
+{
+       const int num_threads = threads.size();
+       /* TODO(sergey): Skip overriding affinity if threads fits into the current
+        * nodes/CPU group. This will allow user to tweak affinity for weird and
+        * wonderful reasons. */
+       vector<int> num_per_node_processors;
+       get_per_node_num_processors(&num_per_node_processors);
+       if(num_per_node_processors.size() == 0) {
+               /* Error was already repported, here we can't do anything, so we simply
+                * leave default affinity to all the worker threads. */
+               return;
+       }
+       const int num_nodes = num_per_node_processors.size();
+       int thread_index = 0;
+       /* First pass: fill in all the nodes to their maximum.
+       *
+        * If there is less threads than the overall nodes capacity, some of the
+        * nodes or parts of them will idle.
+        *
+        * TODO(sergey): Consider picking up fastest nodes if number of threads
+        * fits on them. For example, on Threadripper2 we might consider using nodes
+        * 0 and 2 if user requested 32 render threads. */
+       const int num_total_node_processors =
+               get_num_total_processors(num_per_node_processors);
+       int current_node_index = 0;
+       while(thread_index < num_total_node_processors &&
+             thread_index < num_threads) {
+               const int num_node_processors =
+                       num_per_node_processors[current_node_index];
+               for(int processor_index = 0;
+                   processor_index < num_node_processors;
+                   ++processor_index)
                {
-                       if (!system_cpu_is_numa_node_available(node)) {
-                               continue;
-                       }
-                       const int num_node_processors =
-                               system_cpu_num_numa_node_processors(node);
-                       for (int i = 0;
-                            i < num_node_processors && thread_index < threads.size();
-                            ++i)
-                       {
-                               threads[thread_index] = new thread(
-                                       function_bind(&TaskScheduler::thread_run,
-                                                     thread_index + 1),
-                                       node);
-                               thread_index++;
+                       VLOG(1) << "Scheduling thread " << thread_index << " to node "
+                               << current_node_index << ".";
+                       threads[thread_index]->schedule_to_node(current_node_index);
+                       ++thread_index;
+                       if(thread_index == num_threads) {
+                               /* All threads are scheduled on their nodes. */
+                               return;
                        }
                }
+               ++current_node_index;
        }
+       /* Second pass: keep scheduling threads to each node one by one, uniformly
+        * fillign them in.
+        * This is where things becomes tricky to predict for the maximum
+        * performance: on the one hand this avoids too much threading overhead on
+        * few nodes, but for the final performance having all the overhead on one
+        * node might be better idea (since other nodes will have better chance of
+        * rendering faster).
+        * But more tricky is that nodes might have difference capacity, so we might
+        * want to do some weighted scheduling. For example, if node 0 has 16
+        * processors and node 1 has 32 processors, we'd better schedule 1 extra
+        * thread on node 0 and 2 extra threads on node 1. */
+       current_node_index = 0;
+       while(thread_index < num_threads) {
+               /* Skip unavailable nodes. */
+               /* TODO(sergey): Add sanity check against deadlock. */
+               while(num_per_node_processors[current_node_index] == 0) {
+                       current_node_index = (current_node_index + 1) % num_nodes;
+               }
+               VLOG(1) << "Scheduling thread " << thread_index << " to node "
+                       << current_node_index << ".";
+               ++thread_index;
+               current_node_index = (current_node_index + 1) % num_nodes;
+       }
+}
+
+}  // namespace
 
-       users++;
+void TaskScheduler::init(int num_threads)
+{
+       thread_scoped_lock lock(mutex);
+       /* Multiple cycles instances can use this task scheduler, sharing the same
+        * threads, so we keep track of the number of users. */
+       ++users;
+       if(users != 1) {
+               return;
+       }
+       do_exit = false;
+       const bool use_auto_threads = (num_threads == 0);
+       if(use_auto_threads) {
+               /* Automatic number of threads. */
+               num_threads = system_cpu_thread_count();
+       }
+       VLOG(1) << "Creating pool of " << num_threads << " threads.";
+       /* Launch threads that will be waiting for work. */
+       threads.resize(num_threads);
+       for(int thread_index = 0; thread_index < num_threads; ++thread_index) {
+               threads[thread_index] = new thread(
+                       function_bind(&TaskScheduler::thread_run, thread_index + 1));
+       }
+       distribute_threads_on_nodes(threads);
 }
 
 void TaskScheduler::exit()
 {
        thread_scoped_lock lock(mutex);
-
        users--;
-
        if(users == 0) {
+               VLOG(1) << "De-initializing thread pool of task scheduler.";
                /* stop all waiting threads */
                TaskScheduler::queue_mutex.lock();
                do_exit = true;
@@ -249,7 +339,6 @@ void TaskScheduler::exit()
                        t->join();
                        delete t;
                }
-
                threads.clear();
        }
 }
index 4d30e3f..1880eef 100644 (file)
@@ -58,4 +58,9 @@ bool thread::join()
        }
 }
 
+void thread::schedule_to_node(int node)
+{
+       node_ = node;
+}
+
 CCL_NAMESPACE_END
index d54199a..d21a7a8 100644 (file)
@@ -46,12 +46,18 @@ typedef std::condition_variable thread_condition_variable;
 
 class thread {
 public:
+       /* NOTE: Node index of -1 means that affinity will be inherited from the
+        * parent thread and no override on top of that will happen. */
        thread(function<void()> run_cb, int node = -1);
        ~thread();
 
        static void *run(void *arg);
        bool join();
 
+       /* For an existing thread descriptor which is NOT running yet, assign node
+        * on which it should be running. */
+       void schedule_to_node(int node);
+
 protected:
        function<void()> run_cb_;
        std::thread thread_;