Cycles Network rendering, remove some exception throwing, replace with saner error...
authorMartijn Berger <martijn.berger@gmail.com>
Wed, 5 Feb 2014 20:55:51 +0000 (21:55 +0100)
committerMartijn Berger <martijn.berger@gmail.com>
Wed, 5 Feb 2014 20:55:51 +0000 (21:55 +0100)
This patch adds a network_error() function more alike how other devices handle error's

- it adds a check for errors on load_kernels to make sure we do not crash if rendering without a server.
- it uses the non throwing variation of boost::asio::read.

Reviewers: brecht

Reviewed By: brecht

CC: brecht
Differential Revision: https://developer.blender.org/D86

intern/cycles/device/device_network.cpp
intern/cycles/device/device_network.h

index 90339b89ccec585afccd1398b2748cca3dd9650e..bffd993818fe0728bface42d0cfa11fbac72b779 100644 (file)
@@ -53,6 +53,7 @@ public:
        NetworkDevice(DeviceInfo& info, Stats &stats, const char *address)
        : Device(info, stats, true), socket(io_service)
        {
+               error_func = NetworkError();
                stringstream portstr;
                portstr << SERVER_PORT;
 
@@ -69,14 +70,14 @@ public:
                }
 
                if(error)
-                       throw boost::system::system_error(error);
+                       error_func.network_error(error.message());
 
                mem_counter = 0;
        }
 
        ~NetworkDevice()
        {
-               RPCSend snd(socket, "stop");
+               RPCSend snd(socket, &error_func, "stop");
                snd.write();
        }
 
@@ -86,7 +87,7 @@ public:
 
                mem.device_pointer = ++mem_counter;
 
-               RPCSend snd(socket, "mem_alloc");
+               RPCSend snd(socket, &error_func, "mem_alloc");
 
                snd.add(mem);
                snd.add(type);
@@ -97,7 +98,7 @@ public:
        {
                thread_scoped_lock lock(rpc_lock);
 
-               RPCSend snd(socket, "mem_copy_to");
+               RPCSend snd(socket, &error_func, "mem_copy_to");
 
                snd.add(mem);
                snd.write();
@@ -110,7 +111,7 @@ public:
 
                size_t data_size = mem.memory_size();
 
-               RPCSend snd(socket, "mem_copy_from");
+               RPCSend snd(socket, &error_func, "mem_copy_from");
 
                snd.add(mem);
                snd.add(y);
@@ -119,7 +120,7 @@ public:
                snd.add(elem);
                snd.write();
 
-               RPCReceive rcv(socket);
+               RPCReceive rcv(socket, &error_func);
                rcv.read_buffer((void*)mem.data_pointer, data_size);
        }
 
@@ -127,7 +128,7 @@ public:
        {
                thread_scoped_lock lock(rpc_lock);
 
-               RPCSend snd(socket, "mem_zero");
+               RPCSend snd(socket, &error_func, "mem_zero");
 
                snd.add(mem);
                snd.write();
@@ -138,7 +139,7 @@ public:
                if(mem.device_pointer) {
                        thread_scoped_lock lock(rpc_lock);
 
-                       RPCSend snd(socket, "mem_free");
+                       RPCSend snd(socket, &error_func, "mem_free");
 
                        snd.add(mem);
                        snd.write();
@@ -151,7 +152,7 @@ public:
        {
                thread_scoped_lock lock(rpc_lock);
 
-               RPCSend snd(socket, "const_copy_to");
+               RPCSend snd(socket, &error_func, "const_copy_to");
 
                string name_string(name);
 
@@ -167,7 +168,7 @@ public:
 
                mem.device_pointer = ++mem_counter;
 
-               RPCSend snd(socket, "tex_alloc");
+               RPCSend snd(socket, &error_func, "tex_alloc");
 
                string name_string(name);
 
@@ -184,7 +185,7 @@ public:
                if(mem.device_pointer) {
                        thread_scoped_lock lock(rpc_lock);
 
-                       RPCSend snd(socket, "tex_free");
+                       RPCSend snd(socket, &error_func, "tex_free");
 
                        snd.add(mem);
                        snd.write();
@@ -195,14 +196,17 @@ public:
 
        bool load_kernels(bool experimental)
        {
+               if(error_func.have_error())
+                       return false;
+
                thread_scoped_lock lock(rpc_lock);
 
-               RPCSend snd(socket, "load_kernels");
+               RPCSend snd(socket, &error_func, "load_kernels");
                snd.add(experimental);
                snd.write();
 
                bool result;
-               RPCReceive rcv(socket);
+               RPCReceive rcv(socket, &error_func);
                rcv.read(result);
 
                return result;
@@ -214,7 +218,7 @@ public:
 
                the_task = task;
 
-               RPCSend snd(socket, "task_add");
+               RPCSend snd(socket, &error_func, "task_add");
                snd.add(task);
                snd.write();
        }
@@ -223,7 +227,7 @@ public:
        {
                thread_scoped_lock lock(rpc_lock);
 
-               RPCSend snd(socket, "task_wait");
+               RPCSend snd(socket, &error_func, "task_wait");
                snd.write();
 
                lock.unlock();
@@ -232,10 +236,13 @@ public:
 
                /* todo: run this threaded for connecting to multiple clients */
                for(;;) {
+                       if(error_func.have_error())
+                               break;
+
                        RenderTile tile;
 
                        lock.lock();
-                       RPCReceive rcv(socket);
+                       RPCReceive rcv(socket, &error_func);
 
                        if(rcv.name == "acquire_tile") {
                                lock.unlock();
@@ -245,14 +252,14 @@ public:
                                        the_tiles.push_back(tile);
 
                                        lock.lock();
-                                       RPCSend snd(socket, "acquire_tile");
+                                       RPCSend snd(socket, &error_func, "acquire_tile");
                                        snd.add(tile);
                                        snd.write();
                                        lock.unlock();
                                }
                                else {
                                        lock.lock();
-                                       RPCSend snd(socket, "acquire_tile_none");
+                                       RPCSend snd(socket, &error_func, "acquire_tile_none");
                                        snd.write();
                                        lock.unlock();
                                }
@@ -272,7 +279,7 @@ public:
                                the_task.release_tile(tile);
 
                                lock.lock();
-                               RPCSend snd(socket, "release_tile");
+                               RPCSend snd(socket, &error_func, "release_tile");
                                snd.write();
                                lock.unlock();
                        }
@@ -288,9 +295,12 @@ public:
        void task_cancel()
        {
                thread_scoped_lock lock(rpc_lock);
-               RPCSend snd(socket, "task_cancel");
+               RPCSend snd(socket, &error_func, "task_cancel");
                snd.write();
        }
+
+private:
+       NetworkError error_func;
 };
 
 Device *device_network_create(DeviceInfo& info, Stats &stats, const char *address)
@@ -316,9 +326,16 @@ class DeviceServer {
 public:
        thread_mutex rpc_lock;
 
+       void network_error(const string &message){
+               error_func.network_error(message);
+       }
+
+       bool have_error() { return error_func.have_error(); }
+
        DeviceServer(Device *device_, tcp::socket& socket_)
        : device(device_), socket(socket_), stop(false), blocked_waiting(false)
        {
+               error_func = NetworkError();
        }
 
        void listen()
@@ -336,7 +353,7 @@ protected:
        void listen_step()
        {
                thread_scoped_lock lock(rpc_lock);
-               RPCReceive rcv(socket);
+               RPCReceive rcv(socket, &error_func);
 
                if(rcv.name == "stop")
                        stop = true;
@@ -493,7 +510,7 @@ protected:
 
                        size_t data_size = mem.memory_size();
 
-                       RPCSend snd(socket);
+                       RPCSend snd(socket, &error_func, "mem_copy_from");
                        snd.write();
                        snd.write_buffer((uint8_t*)mem.data_pointer, data_size);
                        lock.unlock();
@@ -588,7 +605,7 @@ protected:
 
                        bool result;
                        result = device->load_kernels(experimental);
-                       RPCSend snd(socket);
+                       RPCSend snd(socket, &error_func, "load_kernels");
                        snd.add(result);
                        snd.write();
                        lock.unlock();
@@ -631,7 +648,7 @@ protected:
                        blocked_waiting = false;
 
                        lock.lock();
-                       RPCSend snd(socket, "task_wait_done");
+                       RPCSend snd(socket, &error_func, "task_wait_done");
                        snd.write();
                        lock.unlock();
                }
@@ -670,7 +687,7 @@ protected:
 
                bool result = false;
 
-               RPCSend snd(socket, "acquire_tile");
+               RPCSend snd(socket, &error_func, "acquire_tile");
                snd.write();
 
                do {
@@ -700,7 +717,7 @@ protected:
                                        cout << "Error: unexpected acquire RPC receive call \"" + entry.name + "\"\n";
                                }
                        }
-               } while(acquire_queue.empty() && !stop);
+               } while(acquire_queue.empty() && !stop && !have_error());
 
                return result;
        }
@@ -724,7 +741,7 @@ protected:
 
                {
                        thread_scoped_lock lock(rpc_lock);
-                       RPCSend snd(socket, "release_tile");
+                       RPCSend snd(socket, &error_func, "release_tile");
                        snd.add(tile);
                        snd.write();
                        lock.unlock();
@@ -776,8 +793,11 @@ protected:
 
        bool stop;
        bool blocked_waiting;
+private:
+       NetworkError error_func;
 
        /* todo: free memory and device (osl) on network error */
+
 };
 
 void Device::server_run()
index d639450b9ea4539d392a1ea205c863f60f68f1b7..bf8f3c70c495bb439654cf92c7086512fa6694a2 100644 (file)
@@ -21,6 +21,8 @@
 
 #include <boost/archive/text_iarchive.hpp>
 #include <boost/archive/text_oarchive.hpp>
+#include <boost/archive/binary_iarchive.hpp>
+#include <boost/archive/binary_oarchive.hpp>
 #include <boost/array.hpp>
 #include <boost/asio.hpp>
 #include <boost/bind.hpp>
@@ -53,6 +55,14 @@ static const int DISCOVER_PORT = 5121;
 static const string DISCOVER_REQUEST_MSG = "REQUEST_RENDER_SERVER_IP";
 static const string DISCOVER_REPLY_MSG = "REPLY_RENDER_SERVER_IP";
 
+#if 0
+typedef boost::archive::text_oarchive o_archive;
+typedef boost::archive::text_iarchive i_archive;
+#else
+typedef boost::archive::binary_oarchive o_archive;
+typedef boost::archive::binary_iarchive i_archive;
+#endif
+
 /* Serialization of device memory */
 
 class network_device_memory : public device_memory
@@ -64,15 +74,40 @@ public:
        vector<char> local_data;
 };
 
+/* Common netowrk error function / object for both DeviceNetwork and DeviceServer*/
+class NetworkError {
+public:
+       NetworkError() {
+               error = "";
+               error_count = 0;
+       }
+
+       ~NetworkError() {}
+
+       void network_error(const string& message) {
+               error = message;
+               error_count += 1;
+       }
+
+       bool have_error() {
+               return true ? error_count > 0 : false;
+       }
+
+private:
+       string error;
+       int error_count;
+};
+
+
 /* Remote procedure call Send */
 
 class RPCSend {
 public:
-       RPCSend(tcp::socket& socket_, const string& name_ = "")
+       RPCSend(tcp::socket& socket_, NetworkError* e, const string& name_ = "")
        : name(name_), socket(socket_), archive(archive_stream), sent(false)
        {
                archive & name_;
-
+               error_func = e;
                fprintf(stderr, "rpc send %s\n", name.c_str());
        }
 
@@ -94,7 +129,6 @@ public:
        void add(const DeviceTask& task)
        {
                int type = (int)task.type;
-
                archive & type & task.x & task.y & task.w & task.h;
                archive & task.rgba_byte & task.rgba_half & task.buffer & task.sample & task.num_samples;
                archive & task.offset & task.stride;
@@ -128,7 +162,7 @@ public:
                        boost::asio::transfer_all(), error);
 
                if(error.value())
-                       cout << "Network send error: " << error.message() << "\n";
+                       error_func->network_error(error.message());
 
                /* then send actual data */
                boost::asio::write(socket,
@@ -136,7 +170,7 @@ public:
                        boost::asio::transfer_all(), error);
                
                if(error.value())
-                       cout << "Network send error: " << error.message() << "\n";
+                       error_func->network_error(error.message());
 
                sent = true;
        }
@@ -150,27 +184,34 @@ public:
                        boost::asio::transfer_all(), error);
                
                if(error.value())
-                       cout << "Network send error: " << error.message() << "\n";
+                       error_func->network_error(error.message());
        }
 
 protected:
        string name;
        tcp::socket& socket;
        ostringstream archive_stream;
-       boost::archive::text_oarchive archive;
+       o_archive archive;
        bool sent;
+       NetworkError *error_func;
 };
 
 /* Remote procedure call Receive */
 
 class RPCReceive {
 public:
-       RPCReceive(tcp::socket& socket_)
+       RPCReceive(tcp::socket& socket_, NetworkError* e )
        : socket(socket_), archive_stream(NULL), archive(NULL)
        {
+               error_func = e;
                /* read head with fixed size */
                vector<char> header(8);
-               size_t len = boost::asio::read(socket, boost::asio::buffer(header));
+               boost::system::error_code error;
+               size_t len = boost::asio::read(socket, boost::asio::buffer(header), error);
+
+               if(error.value()){
+                       error_func->network_error(error.message());
+               }
 
                /* verify if we got something */
                if(len == header.size()) {
@@ -183,30 +224,31 @@ public:
                        if((header_stream >> hex >> data_size)) {
 
                                vector<char> data(data_size);
-                               size_t len = boost::asio::read(socket, boost::asio::buffer(data));
+                               size_t len = boost::asio::read(socket, boost::asio::buffer(data), error);
+
+                               if(error.value())
+                                       error_func->network_error(error.message());
+
 
                                if(len == data_size) {
                                        archive_str = (data.size())? string(&data[0], data.size()): string("");
-#if 0
-                                       istringstream archive_stream(archive_str);
-                                       boost::archive::text_iarchive archive(archive_stream);
-#endif
+
                                        archive_stream = new istringstream(archive_str);
-                                       archive = new boost::archive::text_iarchive(*archive_stream);
+                                       archive = new i_archive(*archive_stream);
 
                                        *archive & name;
                                        fprintf(stderr, "rpc receive %s\n", name.c_str());
                                }
                                else {
-                                       cout << "Network receive error: data size doesn't match header\n";
+                                       error_func->network_error("Network receive error: data size doesn't match header");
                                }
                        }
                        else {
-                               cout << "Network receive error: can't decode data size from header\n";
+                               error_func->network_error("Network receive error: can't decode data size from header");
                        }
                }
                else {
-                       cout << "Network receive error: invalid header size\n";
+                       error_func->network_error("Network receive error: invalid header size");
                }
        }
 
@@ -231,7 +273,12 @@ public:
 
        void read_buffer(void *buffer, size_t size)
        {
-               size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size));
+               boost::system::error_code error;
+               size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size), error);
+
+               if(error.value()){
+                       error_func->network_error(error.message());
+               }
 
                if(len != size)
                        cout << "Network receive error: buffer size doesn't match expected size\n";
@@ -267,7 +314,8 @@ protected:
        tcp::socket& socket;
        string archive_str;
        istringstream *archive_stream;
-       boost::archive::text_iarchive *archive;
+       i_archive *archive;
+       NetworkError *error_func;
 };
 
 /* Server auto discovery */