aa109c8b6f345febba3d3517a991b4ab18d192a2
[blender.git] / release / scripts / io / netrender / master.py
1 # ##### BEGIN GPL LICENSE BLOCK #####
2 #
3 #  This program is free software; you can redistribute it and/or
4 #  modify it under the terms of the GNU General Public License
5 #  as published by the Free Software Foundation; either version 2
6 #  of the License, or (at your option) any later version.
7 #
8 #  This program is distributed in the hope that it will be useful,
9 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
10 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 #  GNU General Public License for more details.
12 #
13 #  You should have received a copy of the GNU General Public License
14 #  along with this program; if not, write to the Free Software Foundation,
15 #  Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16 #
17 # ##### END GPL LICENSE BLOCK #####
18
19 import sys, os
20 import http, http.client, http.server, urllib, socket
21 import subprocess, shutil, time, hashlib
22 import select # for select.error
23
24 from netrender.utils import *
25 import netrender.model
26 import netrender.balancing
27 import netrender.master_html
28
29 class MRenderFile(netrender.model.RenderFile):
30     def __init__(self, filepath, index, start, end):
31         super().__init__(filepath, index, start, end)
32         self.found = False
33
34     def test(self):
35         self.found = os.path.exists(self.filepath)
36         return self.found
37
38
39 class MRenderSlave(netrender.model.RenderSlave):
40     def __init__(self, name, address, stats):
41         super().__init__()
42         self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest()
43         self.name = name
44         self.address = address
45         self.stats = stats
46         self.last_seen = time.time()
47
48         self.job = None
49         self.job_frames = []
50
51         netrender.model.RenderSlave._slave_map[self.id] = self
52
53     def seen(self):
54         self.last_seen = time.time()
55
56     def finishedFrame(self, frame_number):
57         self.job_frames.remove(frame_number)
58         if not self.job_frames:
59             self.job = None
60
61 class MRenderJob(netrender.model.RenderJob):
62     def __init__(self, job_id, job_info):
63         super().__init__(job_info)
64         self.id = job_id
65         self.last_dispatched = time.time()
66
67         # force one chunk for process jobs
68         if self.type == netrender.model.JOB_PROCESS:
69             self.chunks = 1
70
71         # Force WAITING status on creation
72         self.status = JOB_WAITING
73
74         # special server properties
75         self.last_update = 0
76         self.save_path = ""
77         self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end) for rfile in job_info.files]
78
79     def save(self):
80         if self.save_path:
81             f = open(self.save_path + "job.txt", "w")
82             f.write(repr(self.serialize()))
83             f.close()
84
85     def edit(self, info_map):
86         if "status" in info_map:
87             self.status = info_map["status"]
88
89         if "priority" in info_map:
90             self.priority = info_map["priority"]
91
92         if "chunks" in info_map:
93             self.chunks = info_map["chunks"]
94
95     def testStart(self):
96         for f in self.files:
97             if not f.test():
98                 return False
99
100         self.start()
101         return True
102
103     def testFinished(self):
104         for f in self.frames:
105             if f.status == QUEUED or f.status == DISPATCHED:
106                 break
107         else:
108             self.status = JOB_FINISHED
109             
110     def pause(self, status = None):
111         if self.status not in {JOB_PAUSED, JOB_QUEUED}:
112             return 
113         
114         if status == None:
115             self.status = JOB_PAUSED if self.status == JOB_QUEUED else JOB_QUEUED
116         elif status:
117             self.status = JOB_QUEUED
118         else:
119             self.status = JOB_PAUSED
120
121     def start(self):
122         self.status = JOB_QUEUED
123
124     def addLog(self, frames):
125         log_name = "_".join(("%04d" % f for f in frames)) + ".log"
126         log_path = self.save_path + log_name
127
128         for number in frames:
129             frame = self[number]
130             if frame:
131                 frame.log_path = log_path
132
133     def addFrame(self, frame_number, command):
134         frame = MRenderFrame(frame_number, command)
135         self.frames.append(frame)
136         return frame
137
138     def reset(self, all):
139         for f in self.frames:
140             f.reset(all)
141
142     def getFrames(self):
143         frames = []
144         for f in self.frames:
145             if f.status == QUEUED:
146                 self.last_dispatched = time.time()
147                 frames.append(f)
148                 if len(frames) >= self.chunks:
149                     break
150
151         return frames
152
153 class MRenderFrame(netrender.model.RenderFrame):
154     def __init__(self, frame, command):
155         super().__init__()
156         self.number = frame
157         self.slave = None
158         self.time = 0
159         self.status = QUEUED
160         self.command = command
161
162         self.log_path = None
163
164     def reset(self, all):
165         if all or self.status == ERROR:
166             self.log_path = None
167             self.slave = None
168             self.time = 0
169             self.status = QUEUED
170
171
172 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
173 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
174 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
175 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
176 file_pattern = re.compile("/file_([a-zA-Z0-9]+)_([0-9]+)")
177 render_pattern = re.compile("/render_([a-zA-Z0-9]+)_([0-9]+).(exr|jpg)")
178 log_pattern = re.compile("/log_([a-zA-Z0-9]+)_([0-9]+).log")
179 reset_pattern = re.compile("/reset(all|)_([a-zA-Z0-9]+)_([0-9]+)")
180 cancel_pattern = re.compile("/cancel_([a-zA-Z0-9]+)")
181 pause_pattern = re.compile("/pause_([a-zA-Z0-9]+)")
182 edit_pattern = re.compile("/edit_([a-zA-Z0-9]+)")
183
184 class RenderHandler(http.server.BaseHTTPRequestHandler):
185     def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"):
186         self.send_response(code)
187         self.send_header("Content-type", content)
188
189         for key, value in headers.items():
190             self.send_header(key, value)
191
192         self.end_headers()
193
194     def do_HEAD(self):
195
196         if self.path == "/status":
197             job_id = self.headers.get('job-id', "")
198             job_frame = int(self.headers.get('job-frame', -1))
199
200             job = self.server.getJobID(job_id)
201             if job:
202                 frame = job[job_frame]
203
204
205                 if frame:
206                     self.send_head(http.client.OK)
207                 else:
208                     # no such frame
209                     self.send_head(http.client.NO_CONTENT)
210             else:
211                 # no such job id
212                 self.send_head(http.client.NO_CONTENT)
213
214     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
215     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
216     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
217     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
218     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
219
220     def do_GET(self):
221
222         if self.path == "/version":
223             self.send_head()
224             self.server.stats("", "Version check")
225             self.wfile.write(VERSION)
226         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
227         elif self.path.startswith("/render"):
228             match = render_pattern.match(self.path)
229
230             if match:
231                 job_id = match.groups()[0]
232                 frame_number = int(match.groups()[1])
233                 
234                 exr = match.groups()[2] == "exr"
235
236                 job = self.server.getJobID(job_id)
237
238                 if job:
239                     frame = job[frame_number]
240
241                     if frame:
242                         if frame.status in (QUEUED, DISPATCHED):
243                             self.send_head(http.client.ACCEPTED)
244                         elif frame.status == DONE:
245                             self.server.stats("", "Sending result to client")
246                             
247                             if exr:
248                                 f = open(job.save_path + "%04d" % frame_number + ".exr", 'rb')
249                                 self.send_head(content = "image/x-exr")
250                             else:
251                                 filename = job.save_path + "%04d" % frame_number + ".jpg"
252                                 
253                                 if not os.path.exists(filename):
254                                     import bpy
255                                     sce = bpy.data.scenes[0]
256                                     sce.render_data.file_format = "JPEG"
257                                     sce.render_data.quality = 90
258                                     bpy.ops.image.open(path = job.save_path + "%04d" % frame_number + ".exr")
259                                     img = bpy.data.images["%04d" % frame_number + ".exr"]
260                                     img.save(filename)
261                                 
262                                 f = open(filename, 'rb')
263                                 self.send_head(content = "image/jpeg")
264
265
266                             shutil.copyfileobj(f, self.wfile)
267
268                             f.close()
269                         elif frame.status == ERROR:
270                             self.send_head(http.client.PARTIAL_CONTENT)
271                     else:
272                         # no such frame
273                         self.send_head(http.client.NO_CONTENT)
274                 else:
275                     # no such job id
276                     self.send_head(http.client.NO_CONTENT)
277             else:
278                 # invalid url
279                 self.send_head(http.client.NO_CONTENT)
280         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
281         elif self.path.startswith("/log"):
282             match = log_pattern.match(self.path)
283
284             if match:
285                 job_id = match.groups()[0]
286                 frame_number = int(match.groups()[1])
287
288                 job = self.server.getJobID(job_id)
289
290                 if job:
291                     frame = job[frame_number]
292
293                     if frame:
294                         if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
295                             self.send_head(http.client.PROCESSING)
296                         else:
297                             self.server.stats("", "Sending log to client")
298                             f = open(frame.log_path, 'rb')
299
300                             self.send_head(content = "text/plain")
301
302                             shutil.copyfileobj(f, self.wfile)
303
304                             f.close()
305                     else:
306                         # no such frame
307                         self.send_head(http.client.NO_CONTENT)
308                 else:
309                     # no such job id
310                     self.send_head(http.client.NO_CONTENT)
311             else:
312                 # invalid URL
313                 self.send_head(http.client.NO_CONTENT)
314         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
315         elif self.path == "/status":
316             job_id = self.headers.get('job-id', "")
317             job_frame = int(self.headers.get('job-frame', -1))
318
319             if job_id:
320
321                 job = self.server.getJobID(job_id)
322                 if job:
323                     if job_frame != -1:
324                         frame = job[frame]
325
326                         if frame:
327                             message = frame.serialize()
328                         else:
329                             # no such frame
330                             self.send_heat(http.client.NO_CONTENT)
331                             return
332                     else:
333                         message = job.serialize()
334                 else:
335                     # no such job id
336                     self.send_head(http.client.NO_CONTENT)
337                     return
338             else: # status of all jobs
339                 message = []
340
341                 for job in self.server:
342                     message.append(job.serialize())
343
344
345             self.server.stats("", "Sending status")
346             self.send_head()
347             self.wfile.write(bytes(repr(message), encoding='utf8'))
348
349         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
350         elif self.path == "/job":
351             self.server.balance()
352
353             slave_id = self.headers['slave-id']
354
355             slave = self.server.getSeenSlave(slave_id)
356
357             if slave: # only if slave id is valid
358                 job, frames = self.server.newDispatch(slave_id)
359
360                 if job and frames:
361                     for f in frames:
362                         print("dispatch", f.number)
363                         f.status = DISPATCHED
364                         f.slave = slave
365
366                     slave.job = job
367                     slave.job_frames = [f.number for f in frames]
368
369                     self.send_head(headers={"job-id": job.id})
370
371                     message = job.serialize(frames)
372
373                     self.wfile.write(bytes(repr(message), encoding='utf8'))
374
375                     self.server.stats("", "Sending job to slave")
376                 else:
377                     # no job available, return error code
378                     slave.job = None
379                     slave.job_frames = []
380
381                     self.send_head(http.client.ACCEPTED)
382             else: # invalid slave id
383                 self.send_head(http.client.NO_CONTENT)
384         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
385         elif self.path.startswith("/file"):
386             match = file_pattern.match(self.path)
387
388             if match:
389                 slave_id = self.headers['slave-id']
390                 slave = self.server.getSeenSlave(slave_id)
391
392                 if not slave:
393                     # invalid slave id
394                     print("invalid slave id")
395
396                 job_id = match.groups()[0]
397                 file_index = int(match.groups()[1])
398
399                 job = self.server.getJobID(job_id)
400
401                 if job:
402                     render_file = job.files[file_index]
403
404                     if render_file:
405                         self.server.stats("", "Sending file to slave")
406                         f = open(render_file.filepath, 'rb')
407
408                         self.send_head()
409                         shutil.copyfileobj(f, self.wfile)
410
411                         f.close()
412                     else:
413                         # no such file
414                         self.send_head(http.client.NO_CONTENT)
415                 else:
416                     # no such job id
417                     self.send_head(http.client.NO_CONTENT)
418             else: # invalid url
419                 self.send_head(http.client.NO_CONTENT)
420         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
421         elif self.path == "/slaves":
422             message = []
423
424             self.server.stats("", "Sending slaves status")
425
426             for slave in self.server.slaves:
427                 message.append(slave.serialize())
428
429             self.send_head()
430
431             self.wfile.write(bytes(repr(message), encoding='utf8'))
432         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
433         else:
434             # hand over the rest to the html section
435             netrender.master_html.get(self)
436
437     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
438     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
439     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
440     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
441     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
442     def do_POST(self):
443
444         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
445         if self.path == "/job":
446
447             length = int(self.headers['content-length'])
448
449             job_info = netrender.model.RenderJob.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
450
451             job_id = self.server.nextJobID()
452
453             job = MRenderJob(job_id, job_info)
454
455             for frame in job_info.frames:
456                 frame = job.addFrame(frame.number, frame.command)
457
458             self.server.addJob(job)
459
460             headers={"job-id": job_id}
461
462             if job.testStart():
463                 self.server.stats("", "New job, started")
464                 self.send_head(headers=headers)
465             else:
466                 self.server.stats("", "New job, missing files (%i total)" % len(job.files))
467                 self.send_head(http.client.ACCEPTED, headers=headers)
468         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
469         elif self.path.startswith("/edit"):
470             match = edit_pattern.match(self.path)
471
472             if match:
473                 job_id = match.groups()[0]
474
475                 job = self.server.getJobID(job_id)
476
477                 if job:
478                     length = int(self.headers['content-length'])
479                     info_map = eval(str(self.rfile.read(length), encoding='utf8'))
480
481                     job.edit(info_map)
482                     self.send_head()
483                 else:
484                     # no such job id
485                     self.send_head(http.client.NO_CONTENT)
486             else:
487                 # invalid url
488                 self.send_head(http.client.NO_CONTENT)
489         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
490         elif self.path == "/balance_limit":
491             length = int(self.headers['content-length'])
492             info_map = eval(str(self.rfile.read(length), encoding='utf8'))
493             for rule_id, limit in info_map.items():
494                 try:
495                     rule = self.server.balancer.ruleByID(rule_id)
496                     if rule:
497                         rule.setLimit(limit)
498                 except:
499                     pass # invalid type
500                         
501             self.send_head()
502         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
503         elif self.path == "/balance_enable":
504             length = int(self.headers['content-length'])
505             info_map = eval(str(self.rfile.read(length), encoding='utf8'))
506             for rule_id, enabled in info_map.items():
507                 rule = self.server.balancer.ruleByID(rule_id)
508                 if rule:
509                     rule.enabled = enabled
510                         
511             self.send_head()
512         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
513         elif self.path.startswith("/cancel"):
514             match = cancel_pattern.match(self.path)
515
516             if match:
517                 length = int(self.headers['content-length'])
518                 
519                 if length > 0:
520                     info_map = eval(str(self.rfile.read(length), encoding='utf8'))
521                     clear = info_map.get("clear", False)
522                 else:
523                     clear = False
524                 
525                 job_id = match.groups()[0]
526
527                 job = self.server.getJobID(job_id)
528
529                 if job:
530                     self.server.stats("", "Cancelling job")
531                     self.server.removeJob(job, clear)
532                     self.send_head()
533                 else:
534                     # no such job id
535                     self.send_head(http.client.NO_CONTENT)
536             else:
537                 # invalid url
538                 self.send_head(http.client.NO_CONTENT)
539         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
540         elif self.path.startswith("/pause"):
541             match = pause_pattern.match(self.path)
542
543             if match:
544                 length = int(self.headers['content-length'])
545                 
546                 if length > 0:
547                     info_map = eval(str(self.rfile.read(length), encoding='utf8'))
548                     status = info_map.get("status", None)
549                 else:
550                     status = None
551                 
552                 job_id = match.groups()[0]
553
554                 job = self.server.getJobID(job_id)
555
556                 if job:
557                     self.server.stats("", "Pausing job")
558                     job.pause(status)
559                     self.send_head()
560                 else:
561                     # no such job id
562                     self.send_head(http.client.NO_CONTENT)
563             else:
564                 # invalid url
565                 self.send_head(http.client.NO_CONTENT)
566         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
567         elif self.path == "/clear":
568             # cancel all jobs
569             length = int(self.headers['content-length'])
570             
571             if length > 0:
572                 info_map = eval(str(self.rfile.read(length), encoding='utf8'))
573                 clear = info_map.get("clear", False)
574             else:
575                 clear = False
576
577             self.server.stats("", "Clearing jobs")
578             self.server.clear(clear)
579
580             self.send_head()
581         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
582         elif self.path.startswith("/reset"):
583             match = reset_pattern.match(self.path)
584
585             if match:
586                 all = match.groups()[0] == 'all'
587                 job_id = match.groups()[1]
588                 job_frame = int(match.groups()[2])
589
590                 job = self.server.getJobID(job_id)
591
592                 if job:
593                     if job_frame != 0:
594
595                         frame = job[job_frame]
596                         if frame:
597                             self.server.stats("", "Reset job frame")
598                             frame.reset(all)
599                             self.send_head()
600                         else:
601                             # no such frame
602                             self.send_head(http.client.NO_CONTENT)
603
604                     else:
605                         self.server.stats("", "Reset job")
606                         job.reset(all)
607                         self.send_head()
608
609                 else: # job not found
610                     self.send_head(http.client.NO_CONTENT)
611             else: # invalid url
612                 self.send_head(http.client.NO_CONTENT)
613         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
614         elif self.path == "/slave":
615             length = int(self.headers['content-length'])
616             job_frame_string = self.headers['job-frame']
617
618             self.server.stats("", "New slave connected")
619
620             slave_info = netrender.model.RenderSlave.materialize(eval(str(self.rfile.read(length), encoding='utf8')), cache = False)
621
622             slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
623
624             self.send_head(headers = {"slave-id": slave_id})
625         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
626         elif self.path == "/log":
627             length = int(self.headers['content-length'])
628
629             log_info = netrender.model.LogFile.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
630
631             slave_id = log_info.slave_id
632
633             slave = self.server.getSeenSlave(slave_id)
634
635             if slave: # only if slave id is valid
636                 job = self.server.getJobID(log_info.job_id)
637
638                 if job:
639                     self.server.stats("", "Log announcement")
640                     job.addLog(log_info.frames)
641                     self.send_head(http.client.OK)
642                 else:
643                     # no such job id
644                     self.send_head(http.client.NO_CONTENT)
645             else: # invalid slave id
646                 self.send_head(http.client.NO_CONTENT)
647     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
648     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
649     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
650     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
651     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
652     def do_PUT(self):
653
654         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
655         if self.path.startswith("/file"):
656             match = file_pattern.match(self.path)
657
658             if match:
659                 self.server.stats("", "Receiving job")
660
661                 length = int(self.headers['content-length'])
662                 job_id = match.groups()[0]
663                 file_index = int(match.groups()[1])
664
665                 job = self.server.getJobID(job_id)
666
667                 if job:
668
669                     render_file = job.files[file_index]
670
671                     if render_file:
672                         main_file = job.files[0].filepath # filename of the first file
673
674                         main_path, main_name = os.path.split(main_file)
675
676                         if file_index > 0:
677                             file_path = prefixPath(job.save_path, render_file.filepath, main_path)
678                         else:
679                             file_path = job.save_path + main_name
680
681                         buf = self.rfile.read(length)
682
683                         # add same temp file + renames as slave
684
685                         f = open(file_path, "wb")
686                         f.write(buf)
687                         f.close()
688                         del buf
689
690                         render_file.filepath = file_path # set the new path
691
692                         if job.testStart():
693                             self.server.stats("", "File upload, starting job")
694                             self.send_head(http.client.OK)
695                         else:
696                             self.server.stats("", "File upload, file missings")
697                             self.send_head(http.client.ACCEPTED)
698                     else: # invalid file
699                         print("file not found", job_id, file_index)
700                         self.send_head(http.client.NO_CONTENT)
701                 else: # job not found
702                     print("job not found", job_id, file_index)
703                     self.send_head(http.client.NO_CONTENT)
704             else: # invalid url
705                 print("no match")
706                 self.send_head(http.client.NO_CONTENT)
707         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
708         elif self.path == "/render":
709             self.server.stats("", "Receiving render result")
710
711             # need some message content here or the slave doesn't like it
712             self.wfile.write(bytes("foo", encoding='utf8'))
713
714             slave_id = self.headers['slave-id']
715
716             slave = self.server.getSeenSlave(slave_id)
717
718             if slave: # only if slave id is valid
719                 job_id = self.headers['job-id']
720
721                 job = self.server.getJobID(job_id)
722
723                 if job:
724                     job_frame = int(self.headers['job-frame'])
725                     job_result = int(self.headers['job-result'])
726                     job_time = float(self.headers['job-time'])
727
728                     frame = job[job_frame]
729
730                     if frame:
731                         if job.type == netrender.model.JOB_BLENDER:
732                             if job_result == DONE:
733                                 length = int(self.headers['content-length'])
734                                 buf = self.rfile.read(length)
735                                 f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
736                                 f.write(buf)
737                                 f.close()
738
739                                 del buf
740                             elif job_result == ERROR:
741                                 # blacklist slave on this job on error
742                                 # slaves might already be in blacklist if errors on the whole chunk
743                                 if not slave.id in job.blacklist:
744                                     job.blacklist.append(slave.id)
745
746                         self.server.stats("", "Receiving result")
747
748                         slave.finishedFrame(job_frame)
749
750                         frame.status = job_result
751                         frame.time = job_time
752
753                         job.testFinished()
754
755                         self.send_head()
756                     else: # frame not found
757                         self.send_head(http.client.NO_CONTENT)
758                 else: # job not found
759                     self.send_head(http.client.NO_CONTENT)
760             else: # invalid slave id
761                 self.send_head(http.client.NO_CONTENT)
762         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
763         elif self.path.startswith("/log"):
764             self.server.stats("", "Receiving log file")
765
766             match = log_pattern.match(self.path)
767
768             if match:
769                 job_id = match.groups()[0]
770
771                 job = self.server.getJobID(job_id)
772
773                 if job:
774                     job_frame = int(match.groups()[1])
775
776                     frame = job[job_frame]
777
778                     if frame and frame.log_path:
779                         length = int(self.headers['content-length'])
780                         buf = self.rfile.read(length)
781                         f = open(frame.log_path, 'ab')
782                         f.write(buf)
783                         f.close()
784
785                         del buf
786
787                         self.server.getSeenSlave(self.headers['slave-id'])
788
789                         self.send_head()
790                     else: # frame not found
791                         self.send_head(http.client.NO_CONTENT)
792                 else: # job not found
793                     self.send_head(http.client.NO_CONTENT)
794             else: # invalid url
795                 self.send_head(http.client.NO_CONTENT)
796
797 class RenderMasterServer(http.server.HTTPServer):
798     def __init__(self, address, handler_class, path):
799         super().__init__(address, handler_class)
800         self.jobs = []
801         self.jobs_map = {}
802         self.slaves = []
803         self.slaves_map = {}
804         self.job_id = 0
805         self.path = path + "master_" + str(os.getpid()) + os.sep
806
807         self.slave_timeout = 30 # 30 mins: need a parameter for that
808
809         self.balancer = netrender.balancing.Balancer()
810         self.balancer.addRule(netrender.balancing.RatingUsageByCategory(self.getJobs))
811         self.balancer.addRule(netrender.balancing.RatingUsage())
812         self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
813         self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9))
814         self.balancer.addPriority(netrender.balancing.NewJobPriority())
815         self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2))
816
817         if not os.path.exists(self.path):
818             os.mkdir(self.path)
819
820     def nextJobID(self):
821         self.job_id += 1
822         return str(self.job_id)
823
824     def addSlave(self, name, address, stats):
825         slave = MRenderSlave(name, address, stats)
826         self.slaves.append(slave)
827         self.slaves_map[slave.id] = slave
828
829         return slave.id
830
831     def removeSlave(self, slave):
832         self.slaves.remove(slave)
833         self.slaves_map.pop(slave.id)
834
835     def getSlave(self, slave_id):
836         return self.slaves_map.get(slave_id)
837
838     def getSeenSlave(self, slave_id):
839         slave = self.getSlave(slave_id)
840         if slave:
841             slave.seen()
842
843         return slave
844
845     def timeoutSlaves(self):
846         removed = []
847
848         t = time.time()
849
850         for slave in self.slaves:
851             if (t - slave.last_seen) / 60 > self.slave_timeout:
852                 removed.append(slave)
853
854                 if slave.job:
855                     for f in slave.job_frames:
856                         slave.job[f].status = ERROR
857
858         for slave in removed:
859             self.removeSlave(slave)
860
861     def updateUsage(self):
862         blend = 0.5
863         for job in self.jobs:
864             job.usage *= (1 - blend)
865
866         if self.slaves:
867             slave_usage = blend / self.countSlaves()
868
869             for slave in self.slaves:
870                 if slave.job:
871                     slave.job.usage += slave_usage
872
873
874     def clear(self, clear_files = False):
875         removed = self.jobs[:]
876
877         for job in removed:
878             self.removeJob(job, clear_files)
879
880     def balance(self):
881         self.balancer.balance(self.jobs)
882
883     def getJobs(self):
884         return self.jobs
885
886     def countJobs(self, status = JOB_QUEUED):
887         total = 0
888         for j in self.jobs:
889             if j.status == status:
890                 total += 1
891
892         return total
893
894     def countSlaves(self):
895         return len(self.slaves)
896
897     def removeJob(self, job, clear_files = False):
898         self.jobs.remove(job)
899         self.jobs_map.pop(job.id)
900         
901         if clear_files:
902             shutil.rmtree(job.save_path)
903         
904         for slave in self.slaves:
905             if slave.job == job:
906                 slave.job = None
907                 slave.job_frames = []
908
909     def addJob(self, job):
910         self.jobs.append(job)
911         self.jobs_map[job.id] = job
912
913         # create job directory
914         job.save_path = self.path + "job_" + job.id + os.sep
915         if not os.path.exists(job.save_path):
916             os.mkdir(job.save_path)
917
918         job.save()
919
920     def getJobID(self, id):
921         return self.jobs_map.get(id)
922
923     def __iter__(self):
924         for job in self.jobs:
925             yield job
926
927     def newDispatch(self, slave_id):
928         if self.jobs:
929             for job in self.jobs:
930                 if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist:
931                     return job, job.getFrames()
932
933         return None, None
934
935 def clearMaster(path):
936     shutil.rmtree(path)
937
938 def runMaster(address, broadcast, clear, path, update_stats, test_break):
939         httpd = RenderMasterServer(address, RenderHandler, path)
940         httpd.timeout = 1
941         httpd.stats = update_stats
942
943         if broadcast:
944             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
945             s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
946
947         start_time = time.time()
948
949         while not test_break():
950             try:
951                 httpd.handle_request()
952             except select.error:
953                 pass
954
955             if time.time() - start_time >= 10: # need constant here
956                 httpd.timeoutSlaves()
957
958                 httpd.updateUsage()
959
960                 if broadcast:
961                         print("broadcasting address")
962                         s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
963                         start_time = time.time()
964
965         httpd.server_close()
966         if clear:
967             clearMaster(httpd.path)