Merged changes in the trunk up to revision 53280.
[blender-staging.git] / intern / cycles / device / device_network.h
1 /*
2  * Copyright 2011, Blender Foundation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  */
18
19 #ifndef __DEVICE_NETWORK_H__
20 #define __DEVICE_NETWORK_H__
21
22 #ifdef WITH_NETWORK
23
24 #include <boost/archive/text_iarchive.hpp>
25 #include <boost/archive/text_oarchive.hpp>
26 #include <boost/array.hpp>
27 #include <boost/asio.hpp>
28 #include <boost/bind.hpp>
29 #include <boost/serialization/vector.hpp>
30 #include <boost/thread.hpp>
31
32 #include <iostream>
33
34 #include "buffers.h"
35
36 #include "util_foreach.h"
37 #include "util_list.h"
38 #include "util_map.h"
39 #include "util_string.h"
40
41 CCL_NAMESPACE_BEGIN
42
43 using std::cout;
44 using std::cerr;
45 using std::hex;
46 using std::setw;
47 using std::exception;
48
49 using boost::asio::ip::tcp;
50
51 static const int SERVER_PORT = 5120;
52 static const int DISCOVER_PORT = 5121;
53 static const string DISCOVER_REQUEST_MSG = "REQUEST_RENDER_SERVER_IP";
54 static const string DISCOVER_REPLY_MSG = "REPLY_RENDER_SERVER_IP";
55
56 /* Serialization of device memory */
57
58 class network_device_memory : public device_memory
59 {
60 public:
61         network_device_memory() {}
62         ~network_device_memory() { device_pointer = 0; };
63
64         vector<char> local_data;
65 };
66
67 /* Remote procedure call Send */
68
69 class RPCSend {
70 public:
71         RPCSend(tcp::socket& socket_, const string& name_ = "")
72         : name(name_), socket(socket_), archive(archive_stream), sent(false)
73         {
74                 archive & name_;
75         }
76
77         ~RPCSend()
78         {
79                 if(!sent)
80                         fprintf(stderr, "Error: RPC %s not sent\n", name.c_str());
81         }
82
83         void add(const device_memory& mem)
84         {
85                 archive & mem.data_type & mem.data_elements & mem.data_size;
86                 archive & mem.data_width & mem.data_height & mem.device_pointer;
87         }
88
89         template<typename T> void add(const T& data)
90         {
91                 archive & data;
92         }
93
94         void add(const DeviceTask& task)
95         {
96                 int type = (int)task.type;
97
98                 archive & type & task.x & task.y & task.w & task.h;
99                 archive & task.rgba & task.buffer & task.sample & task.num_samples;
100                 archive & task.resolution & task.offset & task.stride;
101                 archive & task.shader_input & task.shader_output & task.shader_eval_type;
102                 archive & task.shader_x & task.shader_w;
103         }
104
105         void add(const RenderTile& tile)
106         {
107                 archive & tile.x & tile.y & tile.w & tile.h;
108                 archive & tile.start_sample & tile.num_samples & tile.sample;
109                 archive & tile.resolution & tile.offset & tile.stride;
110                 archive & tile.buffer & tile.rng_state & tile.rgba;
111         }
112
113         void write()
114         {
115                 boost::system::error_code error;
116
117                 /* get string from stream */
118                 string archive_str = archive_stream.str();
119
120                 /* first send fixed size header with size of following data */
121                 ostringstream header_stream;
122                 header_stream << setw(8) << hex << archive_str.size();
123                 string header_str = header_stream.str();
124
125                 boost::asio::write(socket,
126                         boost::asio::buffer(header_str),
127                         boost::asio::transfer_all(), error);
128
129                 if(error.value())
130                         cout << "Network send error: " << error.message() << "\n";
131
132                 /* then send actual data */
133                 boost::asio::write(socket,
134                         boost::asio::buffer(archive_str),
135                         boost::asio::transfer_all(), error);
136                 
137                 if(error.value())
138                         cout << "Network send error: " << error.message() << "\n";
139
140                 sent = true;
141         }
142
143         void write_buffer(void *buffer, size_t size)
144         {
145                 boost::system::error_code error;
146
147                 boost::asio::write(socket,
148                         boost::asio::buffer(buffer, size),
149                         boost::asio::transfer_all(), error);
150                 
151                 if(error.value())
152                         cout << "Network send error: " << error.message() << "\n";
153         }
154
155 protected:
156         string name;
157         tcp::socket& socket;
158         ostringstream archive_stream;
159         boost::archive::text_oarchive archive;
160         bool sent;
161 };
162
163 /* Remote procedure call Receive */
164
165 class RPCReceive {
166 public:
167         RPCReceive(tcp::socket& socket_)
168         : socket(socket_), archive_stream(NULL), archive(NULL)
169         {
170                 /* read head with fixed size */
171                 vector<char> header(8);
172                 size_t len = boost::asio::read(socket, boost::asio::buffer(header));
173
174                 /* verify if we got something */
175                 if(len == header.size()) {
176                         /* decode header */
177                         string header_str(&header[0], header.size());
178                         istringstream header_stream(header_str);
179
180                         size_t data_size;
181
182                         if((header_stream >> hex >> data_size)) {
183                                 vector<char> data(data_size);
184                                 size_t len = boost::asio::read(socket, boost::asio::buffer(data));
185
186                                 if(len == data_size) {
187                                         archive_str = (data.size())? string(&data[0], data.size()): string("");
188 #if 0
189                                         istringstream archive_stream(archive_str);
190                                         boost::archive::text_iarchive archive(archive_stream);
191 #endif
192                                         archive_stream = new istringstream(archive_str);
193                                         archive = new boost::archive::text_iarchive(*archive_stream);
194
195                                         *archive & name;
196                                 }
197                                 else
198                                         cout << "Network receive error: data size doens't match header\n";
199                         }
200                         else
201                                 cout << "Network receive error: can't decode data size from header\n";
202                 }
203                 else
204                         cout << "Network receive error: invalid header size\n";
205         }
206
207         ~RPCReceive()
208         {
209                 delete archive;
210                 delete archive_stream;
211         }
212
213         void read(network_device_memory& mem)
214         {
215                 *archive & mem.data_type & mem.data_elements & mem.data_size;
216                 *archive & mem.data_width & mem.data_height & mem.device_pointer;
217
218                 mem.data_pointer = 0;
219         }
220
221         template<typename T> void read(T& data)
222         {
223                 *archive & data;
224         }
225
226         void read_buffer(void *buffer, size_t size)
227         {
228                 size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size));
229
230                 if(len != size)
231                         cout << "Network receive error: buffer size doesn't match expected size\n";
232         }
233
234         void read(DeviceTask& task)
235         {
236                 int type;
237
238                 *archive & type & task.x & task.y & task.w & task.h;
239                 *archive & task.rgba & task.buffer & task.sample & task.num_samples;
240                 *archive & task.resolution & task.offset & task.stride;
241                 *archive & task.shader_input & task.shader_output & task.shader_eval_type;
242                 *archive & task.shader_x & task.shader_w;
243
244                 task.type = (DeviceTask::Type)type;
245         }
246
247         void read(RenderTile& tile)
248         {
249                 *archive & tile.x & tile.y & tile.w & tile.h;
250                 *archive & tile.start_sample & tile.num_samples & tile.sample;
251                 *archive & tile.resolution & tile.offset & tile.stride;
252                 *archive & tile.buffer & tile.rng_state & tile.rgba;
253
254                 tile.buffers = NULL;
255         }
256
257         string name;
258
259 protected:
260         tcp::socket& socket;
261         string archive_str;
262         istringstream *archive_stream;
263         boost::archive::text_iarchive *archive;
264 };
265
266 /* Server auto discovery */
267
268 class ServerDiscovery {
269 public:
270         ServerDiscovery(bool discover = false)
271         : listen_socket(io_service), collect_servers(false)
272         {
273                 /* setup listen socket */
274                 listen_endpoint.address(boost::asio::ip::address_v4::any());
275                 listen_endpoint.port(DISCOVER_PORT);
276
277                 listen_socket.open(listen_endpoint.protocol());
278
279                 boost::asio::socket_base::reuse_address option(true);
280                 listen_socket.set_option(option);
281
282                 listen_socket.bind(listen_endpoint);
283
284                 /* setup receive callback */
285                 async_receive();
286
287                 /* start server discovery */
288                 if(discover) {
289                         collect_servers = true;
290                         servers.clear();
291
292                         broadcast_message(DISCOVER_REQUEST_MSG);
293                 }
294
295                 /* start thread */
296                 work = new boost::asio::io_service::work(io_service);
297                 thread = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service));
298         }
299
300         ~ServerDiscovery()
301         {
302                 io_service.stop();
303                 thread->join();
304                 delete thread;
305                 delete work;
306         }
307
308         list<string> get_server_list()
309         {
310                 list<string> result;
311
312                 mutex.lock();
313                 result = servers;
314                 mutex.unlock();
315
316                 return result;
317         }
318
319 private:
320         void handle_receive_from(const boost::system::error_code& error, size_t size)
321         {
322                 if(error) {
323                         cout << "Server discovery receive error: " << error.message() << "\n";
324                         return;
325                 }
326
327                 if(size > 0) {
328                         string msg = string(receive_buffer, size);
329
330                         /* handle incoming message */
331                         if(collect_servers) {
332                                 if(msg == DISCOVER_REPLY_MSG) {
333                                         string address = receive_endpoint.address().to_string();
334
335                                         mutex.lock();
336
337                                         /* add address if it's not already in the list */
338                                         bool found = false;
339
340                                         foreach(string& server, servers)
341                                                 if(server == address)
342                                                         found = true;
343
344                                         if(!found)
345                                                 servers.push_back(address);
346
347                                         mutex.unlock();
348                                 }
349                         }
350                         else {
351                                 /* reply to request */
352                                 if(msg == DISCOVER_REQUEST_MSG)
353                                         broadcast_message(DISCOVER_REPLY_MSG);
354                         }
355                 }
356
357                 async_receive();
358         }
359
360         void async_receive()
361         {
362                 listen_socket.async_receive_from(
363                         boost::asio::buffer(receive_buffer), receive_endpoint,
364                         boost::bind(&ServerDiscovery::handle_receive_from, this,
365                         boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
366         }
367
368         void broadcast_message(const string& msg)
369         {
370                 /* setup broadcast socket */
371                 boost::asio::ip::udp::socket socket(io_service);
372
373                 socket.open(boost::asio::ip::udp::v4());
374
375                 boost::asio::socket_base::broadcast option(true);
376                 socket.set_option(option);
377
378                 boost::asio::ip::udp::endpoint broadcast_endpoint(
379                         boost::asio::ip::address::from_string("255.255.255.255"), DISCOVER_PORT);
380
381                 /* broadcast message */
382                 socket.send_to(boost::asio::buffer(msg), broadcast_endpoint);
383         }
384
385         /* network service and socket */
386         boost::asio::io_service io_service;
387         boost::asio::ip::udp::endpoint listen_endpoint;
388         boost::asio::ip::udp::socket listen_socket;
389
390         /* threading */
391         boost::thread *thread;
392         boost::asio::io_service::work *work;
393         boost::mutex mutex;
394
395         /* buffer and endpoint for receiving messages */
396         char receive_buffer[256];
397         boost::asio::ip::udp::endpoint receive_endpoint;
398
399         /* collection of server addresses in list */
400         bool collect_servers;
401         list<string> servers;
402 };
403
404 CCL_NAMESPACE_END
405
406 #endif
407
408 #endif /* __DEVICE_NETWORK_H__ */
409