Cycles: svn merge -r41182:41205 ^/trunk/blender
[blender-staging.git] / intern / cycles / device / device_network.h
1 /*
2  * Copyright 2011, Blender Foundation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  */
18
19 #ifndef __DEVICE_NETWORK_H__
20 #define __DEVICE_NETWORK_H__
21
22 #ifdef WITH_NETWORK
23
24 #include <boost/archive/text_iarchive.hpp>
25 #include <boost/archive/text_oarchive.hpp>
26 #include <boost/array.hpp>
27 #include <boost/asio.hpp>
28 #include <boost/bind.hpp>
29 #include <boost/serialization/vector.hpp>
30 #include <boost/thread.hpp>
31
32 #include <iostream>
33
34 #include "util_foreach.h"
35 #include "util_list.h"
36 #include "util_string.h"
37
38 CCL_NAMESPACE_BEGIN
39
40 using std::cout;
41 using std::cerr;
42 using std::endl;
43 using std::hex;
44 using std::setw;
45 using std::exception;
46
47 using boost::asio::ip::tcp;
48
49 static const int SERVER_PORT = 5120;
50 static const int DISCOVER_PORT = 5121;
51 static const string DISCOVER_REQUEST_MSG = "REQUEST_RENDER_SERVER_IP";
52 static const string DISCOVER_REPLY_MSG = "REPLY_RENDER_SERVER_IP";
53
54 typedef struct RPCSend {
55         RPCSend(tcp::socket& socket_, const string& name_ = "")
56         : name(name_), socket(socket_), archive(archive_stream)
57         {
58                 archive & name_;
59         }
60
61         void write()
62         {
63                 boost::system::error_code error;
64
65                 /* get string from stream */
66                 string archive_str = archive_stream.str();
67
68                 /* first send fixed size header with size of following data */
69                 ostringstream header_stream;
70                 header_stream << setw(8) << hex << archive_str.size();
71                 string header_str = header_stream.str();
72
73                 boost::asio::write(socket,
74                         boost::asio::buffer(header_str),
75                         boost::asio::transfer_all(), error);
76
77                 if(error.value())
78                         cout << "Network send error: " << error.message() << "\n";
79
80                 /* then send actual data */
81                 boost::asio::write(socket,
82                         boost::asio::buffer(archive_str),
83                         boost::asio::transfer_all(), error);
84                 
85                 if(error.value())
86                         cout << "Network send error: " << error.message() << "\n";
87         }
88
89         void write_buffer(void *buffer, size_t size)
90         {
91                 boost::system::error_code error;
92
93                 boost::asio::write(socket,
94                         boost::asio::buffer(buffer, size),
95                         boost::asio::transfer_all(), error);
96                 
97                 if(error.value())
98                         cout << "Network send error: " << error.message() << "\n";
99         }
100
101         string name;
102         tcp::socket& socket;
103         ostringstream archive_stream;
104         boost::archive::text_oarchive archive;
105 } RPCSend;
106
107 typedef struct RPCReceive {
108         RPCReceive(tcp::socket& socket_)
109         : socket(socket_), archive_stream(NULL), archive(NULL)
110         {
111                 /* read head with fixed size */
112                 vector<char> header(8);
113                 size_t len = boost::asio::read(socket, boost::asio::buffer(header));
114
115                 /* verify if we got something */
116                 if(len == header.size()) {
117                         /* decode header */
118                         string header_str(&header[0], header.size());
119                         istringstream header_stream(header_str);
120
121                         size_t data_size;
122
123                         if((header_stream >> hex >> data_size)) {
124                                 vector<char> data(data_size);
125                                 size_t len = boost::asio::read(socket, boost::asio::buffer(data));
126
127                                 if(len == data_size) {
128                                         archive_str = (data.size())? string(&data[0], data.size()): string("");
129                                         /*istringstream archive_stream(archive_str);
130                                         boost::archive::text_iarchive archive(archive_stream);*/
131                                         archive_stream = new istringstream(archive_str);
132                                         archive = new boost::archive::text_iarchive(*archive_stream);
133
134                                         *archive & name;
135                                 }
136                                 else
137                                         cout << "Network receive error: data size doens't match header\n";
138                         }
139                         else
140                                 cout << "Network receive error: can't decode data size from header\n";
141                 }
142                 else
143                         cout << "Network receive error: invalid header size\n";
144         }
145
146         ~RPCReceive()
147         {
148                 delete archive;
149                 delete archive_stream;
150         }
151
152         void read_buffer(void *buffer, size_t size)
153         {
154                 size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size));
155
156                 if(len != size)
157                         cout << "Network receive error: buffer size doesn't match expected size\n";
158         }
159
160         string name;
161         tcp::socket& socket;
162         string archive_str;
163         istringstream *archive_stream;
164         boost::archive::text_iarchive *archive;
165 } RPCReceive;
166
167 class ServerDiscovery {
168 public:
169         ServerDiscovery(bool discover = false)
170         : listen_socket(io_service), collect_servers(false)
171         {
172                 /* setup listen socket */
173                 listen_endpoint.address(boost::asio::ip::address_v4::any());
174                 listen_endpoint.port(DISCOVER_PORT);
175
176                 listen_socket.open(listen_endpoint.protocol());
177
178                 boost::asio::socket_base::reuse_address option(true);
179                 listen_socket.set_option(option);
180
181                 listen_socket.bind(listen_endpoint);
182
183                 /* setup receive callback */
184                 async_receive();
185
186                 /* start server discovery */
187                 if(discover) {
188                         collect_servers = true;
189                         servers.clear();
190
191                         broadcast_message(DISCOVER_REQUEST_MSG);
192                 }
193
194                 /* start thread */
195                 work = new boost::asio::io_service::work(io_service);
196                 thread = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service));
197         }
198
199         ~ServerDiscovery()
200         {
201                 io_service.stop();
202                 thread->join();
203                 delete thread;
204                 delete work;
205         }
206
207         list<string> get_server_list()
208         {
209                 list<string> result;
210
211                 mutex.lock();
212                 result = servers;
213                 mutex.unlock();
214
215                 return result;
216         }
217
218 private:
219         void handle_receive_from(const boost::system::error_code& error, size_t size)
220         {
221                 if(error) {
222                         cout << "Server discovery receive error: " << error.message() << "\n";
223                         return;
224                 }
225
226                 if(size > 0) {
227                         string msg = string(receive_buffer, size);
228
229                         /* handle incoming message */
230                         if(collect_servers) {
231                                 if(msg == DISCOVER_REPLY_MSG) {
232                                         string address = receive_endpoint.address().to_string();
233
234                                         mutex.lock();
235
236                                         /* add address if it's not already in the list */
237                                         bool found = false;
238
239                                         foreach(string& server, servers)
240                                                 if(server == address)
241                                                         found = true;
242
243                                         if(!found)
244                                                 servers.push_back(address);
245
246                                         mutex.unlock();
247                                 }
248                         }
249                         else {
250                                 /* reply to request */
251                                 if(msg == DISCOVER_REQUEST_MSG)
252                                         broadcast_message(DISCOVER_REPLY_MSG);
253                         }
254                 }
255
256                 async_receive();
257         }
258
259         void async_receive()
260         {
261                 listen_socket.async_receive_from(
262                         boost::asio::buffer(receive_buffer), receive_endpoint,
263                         boost::bind(&ServerDiscovery::handle_receive_from, this,
264                         boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
265         }
266
267         void broadcast_message(const string& msg)
268         {
269                 /* setup broadcast socket */
270                 boost::asio::ip::udp::socket socket(io_service);
271
272                 socket.open(boost::asio::ip::udp::v4());
273
274                 boost::asio::socket_base::broadcast option(true);
275                 socket.set_option(option);
276
277                 boost::asio::ip::udp::endpoint broadcast_endpoint(
278                         boost::asio::ip::address::from_string("255.255.255.255"), DISCOVER_PORT);
279
280                 /* broadcast message */
281                 socket.send_to(boost::asio::buffer(msg), broadcast_endpoint);
282         }
283
284         /* network service and socket */
285         boost::asio::io_service io_service;
286         boost::asio::ip::udp::endpoint listen_endpoint;
287         boost::asio::ip::udp::socket listen_socket;
288
289         /* threading */
290         boost::thread *thread;
291         boost::asio::io_service::work *work;
292         boost::mutex mutex;
293
294         /* buffer and endpoint for receiving messages */
295         char receive_buffer[256];
296         boost::asio::ip::udp::endpoint receive_endpoint;
297
298         /* collection of server addresses in list */
299         bool collect_servers;
300         list<string> servers;
301 };
302
303 CCL_NAMESPACE_END
304
305 #endif
306
307 #endif /* __DEVICE_NETWORK_H__ */
308