netrender:
[blender.git] / release / scripts / io / netrender / master.py
1 # ##### BEGIN GPL LICENSE BLOCK #####
2 #
3 #  This program is free software; you can redistribute it and/or
4 #  modify it under the terms of the GNU General Public License
5 #  as published by the Free Software Foundation; either version 2
6 #  of the License, or (at your option) any later version.
7 #
8 #  This program is distributed in the hope that it will be useful,
9 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
10 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 #  GNU General Public License for more details.
12 #
13 #  You should have received a copy of the GNU General Public License
14 #  along with this program; if not, write to the Free Software Foundation,
15 #  Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16 #
17 # ##### END GPL LICENSE BLOCK #####
18
19 import sys, os
20 import http, http.client, http.server, urllib, socket
21 import subprocess, shutil, time, hashlib
22 import select # for select.error
23
24 from netrender.utils import *
25 import netrender.model
26 import netrender.balancing
27 import netrender.master_html
28
29 class MRenderFile(netrender.model.RenderFile):
30     def __init__(self, filepath, index, start, end):
31         super().__init__(filepath, index, start, end)
32         self.found = False
33
34     def test(self):
35         self.found = os.path.exists(self.filepath)
36         return self.found
37
38
39 class MRenderSlave(netrender.model.RenderSlave):
40     def __init__(self, name, address, stats):
41         super().__init__()
42         self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest()
43         self.name = name
44         self.address = address
45         self.stats = stats
46         self.last_seen = time.time()
47
48         self.job = None
49         self.job_frames = []
50
51         netrender.model.RenderSlave._slave_map[self.id] = self
52
53     def seen(self):
54         self.last_seen = time.time()
55
56     def finishedFrame(self, frame_number):
57         self.job_frames.remove(frame_number)
58         if not self.job_frames:
59             self.job = None
60
61 class MRenderJob(netrender.model.RenderJob):
62     def __init__(self, job_id, job_info):
63         super().__init__(job_info)
64         self.id = job_id
65         self.last_dispatched = time.time()
66
67         # force one chunk for process jobs
68         if self.type == netrender.model.JOB_PROCESS:
69             self.chunks = 1
70
71         # Force WAITING status on creation
72         self.status = JOB_WAITING
73
74         # special server properties
75         self.last_update = 0
76         self.save_path = ""
77         self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end) for rfile in job_info.files]
78
79     def save(self):
80         if self.save_path:
81             f = open(self.save_path + "job.txt", "w")
82             f.write(repr(self.serialize()))
83             f.close()
84
85     def edit(self, info_map):
86         if "status" in info_map:
87             self.status = info_map["status"]
88
89         if "priority" in info_map:
90             self.priority = info_map["priority"]
91
92         if "chunks" in info_map:
93             self.chunks = info_map["chunks"]
94
95     def testStart(self):
96         for f in self.files:
97             if not f.test():
98                 return False
99
100         self.start()
101         return True
102
103     def testFinished(self):
104         for f in self.frames:
105             if f.status == QUEUED or f.status == DISPATCHED:
106                 break
107         else:
108             self.status = JOB_FINISHED
109             
110     def pause(self, status = None):
111         if self.status not in {JOB_PAUSED, JOB_QUEUED}:
112             return 
113         
114         if status == None:
115             self.status = JOB_PAUSED if self.status == JOB_QUEUED else JOB_QUEUED
116         elif status:
117             self.status = JOB_QUEUED
118         else:
119             self.status = JOB_PAUSED
120
121     def start(self):
122         self.status = JOB_QUEUED
123
124     def addLog(self, frames):
125         log_name = "_".join(("%04d" % f for f in frames)) + ".log"
126         log_path = self.save_path + log_name
127
128         for number in frames:
129             frame = self[number]
130             if frame:
131                 frame.log_path = log_path
132
133     def addFrame(self, frame_number, command):
134         frame = MRenderFrame(frame_number, command)
135         self.frames.append(frame)
136         return frame
137
138     def reset(self, all):
139         for f in self.frames:
140             f.reset(all)
141
142     def getFrames(self):
143         frames = []
144         for f in self.frames:
145             if f.status == QUEUED:
146                 self.last_dispatched = time.time()
147                 frames.append(f)
148                 if len(frames) >= self.chunks:
149                     break
150
151         return frames
152
153 class MRenderFrame(netrender.model.RenderFrame):
154     def __init__(self, frame, command):
155         super().__init__()
156         self.number = frame
157         self.slave = None
158         self.time = 0
159         self.status = QUEUED
160         self.command = command
161
162         self.log_path = None
163
164     def reset(self, all):
165         if all or self.status == ERROR:
166             self.log_path = None
167             self.slave = None
168             self.time = 0
169             self.status = QUEUED
170
171
172 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
173 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
174 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
175 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
176 file_pattern = re.compile("/file_([a-zA-Z0-9]+)_([0-9]+)")
177 render_pattern = re.compile("/render_([a-zA-Z0-9]+)_([0-9]+).(exr|jpg)")
178 log_pattern = re.compile("/log_([a-zA-Z0-9]+)_([0-9]+).log")
179 reset_pattern = re.compile("/reset(all|)_([a-zA-Z0-9]+)_([0-9]+)")
180 cancel_pattern = re.compile("/cancel_([a-zA-Z0-9]+)")
181 pause_pattern = re.compile("/pause_([a-zA-Z0-9]+)")
182 edit_pattern = re.compile("/edit_([a-zA-Z0-9]+)")
183
184 class RenderHandler(http.server.BaseHTTPRequestHandler):
185     def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"):
186         self.send_response(code)
187         self.send_header("Content-type", content)
188
189         for key, value in headers.items():
190             self.send_header(key, value)
191
192         self.end_headers()
193
194     def do_HEAD(self):
195
196         if self.path == "/status":
197             job_id = self.headers.get('job-id', "")
198             job_frame = int(self.headers.get('job-frame', -1))
199
200             job = self.server.getJobID(job_id)
201             if job:
202                 frame = job[job_frame]
203
204
205                 if frame:
206                     self.send_head(http.client.OK)
207                 else:
208                     # no such frame
209                     self.send_head(http.client.NO_CONTENT)
210             else:
211                 # no such job id
212                 self.send_head(http.client.NO_CONTENT)
213
214     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
215     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
216     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
217     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
218     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
219
220     def do_GET(self):
221
222         if self.path == "/version":
223             self.send_head()
224             self.server.stats("", "Version check")
225             self.wfile.write(VERSION)
226         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
227         elif self.path.startswith("/render"):
228             match = render_pattern.match(self.path)
229
230             if match:
231                 job_id = match.groups()[0]
232                 frame_number = int(match.groups()[1])
233                 
234                 exr = match.groups()[2] == "exr"
235
236                 job = self.server.getJobID(job_id)
237
238                 if job:
239                     frame = job[frame_number]
240
241                     if frame:
242                         if frame.status in (QUEUED, DISPATCHED):
243                             self.send_head(http.client.ACCEPTED)
244                         elif frame.status == DONE:
245                             self.server.stats("", "Sending result to client")
246                             
247                             if exr:
248                                 f = open(job.save_path + "%04d" % frame_number + ".exr", 'rb')
249                                 self.send_head(content = "image/x-exr")
250                             else:
251                                 filename = job.save_path + "%04d" % frame_number + ".jpg"
252                                 
253                                 if not os.path.exists(filename):
254                                     import bpy
255                                     sce = bpy.data.scenes[0]
256                                     sce.render_data.file_format = "JPEG"
257                                     sce.render_data.quality = 90
258                                     bpy.ops.image.open(path = job.save_path + "%04d" % frame_number + ".exr")
259                                     img = bpy.data.images["%04d" % frame_number + ".exr"]
260                                     img.save(filename)
261                                     
262                                     try:
263                                         process = subprocess.Popen(["convert", filename, "-resize", "300x300", filename])
264                                         process.wait()                                        
265                                     except:
266                                         pass
267                                 
268                                 f = open(filename, 'rb')
269                                 self.send_head(content = "image/jpeg")
270
271
272                             shutil.copyfileobj(f, self.wfile)
273
274                             f.close()
275                         elif frame.status == ERROR:
276                             self.send_head(http.client.PARTIAL_CONTENT)
277                     else:
278                         # no such frame
279                         self.send_head(http.client.NO_CONTENT)
280                 else:
281                     # no such job id
282                     self.send_head(http.client.NO_CONTENT)
283             else:
284                 # invalid url
285                 self.send_head(http.client.NO_CONTENT)
286         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
287         elif self.path.startswith("/log"):
288             match = log_pattern.match(self.path)
289
290             if match:
291                 job_id = match.groups()[0]
292                 frame_number = int(match.groups()[1])
293
294                 job = self.server.getJobID(job_id)
295
296                 if job:
297                     frame = job[frame_number]
298
299                     if frame:
300                         if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
301                             self.send_head(http.client.PROCESSING)
302                         else:
303                             self.server.stats("", "Sending log to client")
304                             f = open(frame.log_path, 'rb')
305
306                             self.send_head(content = "text/plain")
307
308                             shutil.copyfileobj(f, self.wfile)
309
310                             f.close()
311                     else:
312                         # no such frame
313                         self.send_head(http.client.NO_CONTENT)
314                 else:
315                     # no such job id
316                     self.send_head(http.client.NO_CONTENT)
317             else:
318                 # invalid URL
319                 self.send_head(http.client.NO_CONTENT)
320         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
321         elif self.path == "/status":
322             job_id = self.headers.get('job-id', "")
323             job_frame = int(self.headers.get('job-frame', -1))
324
325             if job_id:
326
327                 job = self.server.getJobID(job_id)
328                 if job:
329                     if job_frame != -1:
330                         frame = job[frame]
331
332                         if frame:
333                             message = frame.serialize()
334                         else:
335                             # no such frame
336                             self.send_heat(http.client.NO_CONTENT)
337                             return
338                     else:
339                         message = job.serialize()
340                 else:
341                     # no such job id
342                     self.send_head(http.client.NO_CONTENT)
343                     return
344             else: # status of all jobs
345                 message = []
346
347                 for job in self.server:
348                     message.append(job.serialize())
349
350
351             self.server.stats("", "Sending status")
352             self.send_head()
353             self.wfile.write(bytes(repr(message), encoding='utf8'))
354
355         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
356         elif self.path == "/job":
357             self.server.balance()
358
359             slave_id = self.headers['slave-id']
360
361             slave = self.server.getSeenSlave(slave_id)
362
363             if slave: # only if slave id is valid
364                 job, frames = self.server.newDispatch(slave_id)
365
366                 if job and frames:
367                     for f in frames:
368                         print("dispatch", f.number)
369                         f.status = DISPATCHED
370                         f.slave = slave
371
372                     slave.job = job
373                     slave.job_frames = [f.number for f in frames]
374
375                     self.send_head(headers={"job-id": job.id})
376
377                     message = job.serialize(frames)
378
379                     self.wfile.write(bytes(repr(message), encoding='utf8'))
380
381                     self.server.stats("", "Sending job to slave")
382                 else:
383                     # no job available, return error code
384                     slave.job = None
385                     slave.job_frames = []
386
387                     self.send_head(http.client.ACCEPTED)
388             else: # invalid slave id
389                 self.send_head(http.client.NO_CONTENT)
390         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
391         elif self.path.startswith("/file"):
392             match = file_pattern.match(self.path)
393
394             if match:
395                 slave_id = self.headers['slave-id']
396                 slave = self.server.getSeenSlave(slave_id)
397
398                 if not slave:
399                     # invalid slave id
400                     print("invalid slave id")
401
402                 job_id = match.groups()[0]
403                 file_index = int(match.groups()[1])
404
405                 job = self.server.getJobID(job_id)
406
407                 if job:
408                     render_file = job.files[file_index]
409
410                     if render_file:
411                         self.server.stats("", "Sending file to slave")
412                         f = open(render_file.filepath, 'rb')
413
414                         self.send_head()
415                         shutil.copyfileobj(f, self.wfile)
416
417                         f.close()
418                     else:
419                         # no such file
420                         self.send_head(http.client.NO_CONTENT)
421                 else:
422                     # no such job id
423                     self.send_head(http.client.NO_CONTENT)
424             else: # invalid url
425                 self.send_head(http.client.NO_CONTENT)
426         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
427         elif self.path == "/slaves":
428             message = []
429
430             self.server.stats("", "Sending slaves status")
431
432             for slave in self.server.slaves:
433                 message.append(slave.serialize())
434
435             self.send_head()
436
437             self.wfile.write(bytes(repr(message), encoding='utf8'))
438         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
439         else:
440             # hand over the rest to the html section
441             netrender.master_html.get(self)
442
443     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
444     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
445     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
446     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
447     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
448     def do_POST(self):
449
450         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
451         if self.path == "/job":
452
453             length = int(self.headers['content-length'])
454
455             job_info = netrender.model.RenderJob.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
456
457             job_id = self.server.nextJobID()
458
459             job = MRenderJob(job_id, job_info)
460
461             for frame in job_info.frames:
462                 frame = job.addFrame(frame.number, frame.command)
463
464             self.server.addJob(job)
465
466             headers={"job-id": job_id}
467
468             if job.testStart():
469                 self.server.stats("", "New job, started")
470                 self.send_head(headers=headers)
471             else:
472                 self.server.stats("", "New job, missing files (%i total)" % len(job.files))
473                 self.send_head(http.client.ACCEPTED, headers=headers)
474         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
475         elif self.path.startswith("/edit"):
476             match = edit_pattern.match(self.path)
477
478             if match:
479                 job_id = match.groups()[0]
480
481                 job = self.server.getJobID(job_id)
482
483                 if job:
484                     length = int(self.headers['content-length'])
485                     info_map = eval(str(self.rfile.read(length), encoding='utf8'))
486
487                     job.edit(info_map)
488                     self.send_head()
489                 else:
490                     # no such job id
491                     self.send_head(http.client.NO_CONTENT)
492             else:
493                 # invalid url
494                 self.send_head(http.client.NO_CONTENT)
495         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
496         elif self.path == "/balance_limit":
497             length = int(self.headers['content-length'])
498             info_map = eval(str(self.rfile.read(length), encoding='utf8'))
499             for rule_id, limit in info_map.items():
500                 try:
501                     rule = self.server.balancer.ruleByID(rule_id)
502                     if rule:
503                         rule.setLimit(limit)
504                 except:
505                     pass # invalid type
506                         
507             self.send_head()
508         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
509         elif self.path == "/balance_enable":
510             length = int(self.headers['content-length'])
511             info_map = eval(str(self.rfile.read(length), encoding='utf8'))
512             for rule_id, enabled in info_map.items():
513                 rule = self.server.balancer.ruleByID(rule_id)
514                 if rule:
515                     rule.enabled = enabled
516                         
517             self.send_head()
518         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
519         elif self.path.startswith("/cancel"):
520             match = cancel_pattern.match(self.path)
521
522             if match:
523                 length = int(self.headers['content-length'])
524                 
525                 if length > 0:
526                     info_map = eval(str(self.rfile.read(length), encoding='utf8'))
527                     clear = info_map.get("clear", False)
528                 else:
529                     clear = False
530                 
531                 job_id = match.groups()[0]
532
533                 job = self.server.getJobID(job_id)
534
535                 if job:
536                     self.server.stats("", "Cancelling job")
537                     self.server.removeJob(job, clear)
538                     self.send_head()
539                 else:
540                     # no such job id
541                     self.send_head(http.client.NO_CONTENT)
542             else:
543                 # invalid url
544                 self.send_head(http.client.NO_CONTENT)
545         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
546         elif self.path.startswith("/pause"):
547             match = pause_pattern.match(self.path)
548
549             if match:
550                 length = int(self.headers['content-length'])
551                 
552                 if length > 0:
553                     info_map = eval(str(self.rfile.read(length), encoding='utf8'))
554                     status = info_map.get("status", None)
555                 else:
556                     status = None
557                 
558                 job_id = match.groups()[0]
559
560                 job = self.server.getJobID(job_id)
561
562                 if job:
563                     self.server.stats("", "Pausing job")
564                     job.pause(status)
565                     self.send_head()
566                 else:
567                     # no such job id
568                     self.send_head(http.client.NO_CONTENT)
569             else:
570                 # invalid url
571                 self.send_head(http.client.NO_CONTENT)
572         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
573         elif self.path == "/clear":
574             # cancel all jobs
575             length = int(self.headers['content-length'])
576             
577             if length > 0:
578                 info_map = eval(str(self.rfile.read(length), encoding='utf8'))
579                 clear = info_map.get("clear", False)
580             else:
581                 clear = False
582
583             self.server.stats("", "Clearing jobs")
584             self.server.clear(clear)
585
586             self.send_head()
587         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
588         elif self.path.startswith("/reset"):
589             match = reset_pattern.match(self.path)
590
591             if match:
592                 all = match.groups()[0] == 'all'
593                 job_id = match.groups()[1]
594                 job_frame = int(match.groups()[2])
595
596                 job = self.server.getJobID(job_id)
597
598                 if job:
599                     if job_frame != 0:
600
601                         frame = job[job_frame]
602                         if frame:
603                             self.server.stats("", "Reset job frame")
604                             frame.reset(all)
605                             self.send_head()
606                         else:
607                             # no such frame
608                             self.send_head(http.client.NO_CONTENT)
609
610                     else:
611                         self.server.stats("", "Reset job")
612                         job.reset(all)
613                         self.send_head()
614
615                 else: # job not found
616                     self.send_head(http.client.NO_CONTENT)
617             else: # invalid url
618                 self.send_head(http.client.NO_CONTENT)
619         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
620         elif self.path == "/slave":
621             length = int(self.headers['content-length'])
622             job_frame_string = self.headers['job-frame']
623
624             self.server.stats("", "New slave connected")
625
626             slave_info = netrender.model.RenderSlave.materialize(eval(str(self.rfile.read(length), encoding='utf8')), cache = False)
627
628             slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
629
630             self.send_head(headers = {"slave-id": slave_id})
631         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
632         elif self.path == "/log":
633             length = int(self.headers['content-length'])
634
635             log_info = netrender.model.LogFile.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
636
637             slave_id = log_info.slave_id
638
639             slave = self.server.getSeenSlave(slave_id)
640
641             if slave: # only if slave id is valid
642                 job = self.server.getJobID(log_info.job_id)
643
644                 if job:
645                     self.server.stats("", "Log announcement")
646                     job.addLog(log_info.frames)
647                     self.send_head(http.client.OK)
648                 else:
649                     # no such job id
650                     self.send_head(http.client.NO_CONTENT)
651             else: # invalid slave id
652                 self.send_head(http.client.NO_CONTENT)
653     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
654     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
655     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
656     # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
657     # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
658     def do_PUT(self):
659
660         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
661         if self.path.startswith("/file"):
662             match = file_pattern.match(self.path)
663
664             if match:
665                 self.server.stats("", "Receiving job")
666
667                 length = int(self.headers['content-length'])
668                 job_id = match.groups()[0]
669                 file_index = int(match.groups()[1])
670
671                 job = self.server.getJobID(job_id)
672
673                 if job:
674
675                     render_file = job.files[file_index]
676
677                     if render_file:
678                         main_file = job.files[0].filepath # filename of the first file
679
680                         main_path, main_name = os.path.split(main_file)
681
682                         if file_index > 0:
683                             file_path = prefixPath(job.save_path, render_file.filepath, main_path)
684                         else:
685                             file_path = job.save_path + main_name
686
687                         buf = self.rfile.read(length)
688
689                         # add same temp file + renames as slave
690
691                         f = open(file_path, "wb")
692                         f.write(buf)
693                         f.close()
694                         del buf
695
696                         render_file.filepath = file_path # set the new path
697
698                         if job.testStart():
699                             self.server.stats("", "File upload, starting job")
700                             self.send_head(http.client.OK)
701                         else:
702                             self.server.stats("", "File upload, file missings")
703                             self.send_head(http.client.ACCEPTED)
704                     else: # invalid file
705                         print("file not found", job_id, file_index)
706                         self.send_head(http.client.NO_CONTENT)
707                 else: # job not found
708                     print("job not found", job_id, file_index)
709                     self.send_head(http.client.NO_CONTENT)
710             else: # invalid url
711                 print("no match")
712                 self.send_head(http.client.NO_CONTENT)
713         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
714         elif self.path == "/render":
715             self.server.stats("", "Receiving render result")
716
717             # need some message content here or the slave doesn't like it
718             self.wfile.write(bytes("foo", encoding='utf8'))
719
720             slave_id = self.headers['slave-id']
721
722             slave = self.server.getSeenSlave(slave_id)
723
724             if slave: # only if slave id is valid
725                 job_id = self.headers['job-id']
726
727                 job = self.server.getJobID(job_id)
728
729                 if job:
730                     job_frame = int(self.headers['job-frame'])
731                     job_result = int(self.headers['job-result'])
732                     job_time = float(self.headers['job-time'])
733
734                     frame = job[job_frame]
735
736                     if frame:
737                         if job.type == netrender.model.JOB_BLENDER:
738                             if job_result == DONE:
739                                 length = int(self.headers['content-length'])
740                                 buf = self.rfile.read(length)
741                                 f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
742                                 f.write(buf)
743                                 f.close()
744
745                                 del buf
746                             elif job_result == ERROR:
747                                 # blacklist slave on this job on error
748                                 # slaves might already be in blacklist if errors on the whole chunk
749                                 if not slave.id in job.blacklist:
750                                     job.blacklist.append(slave.id)
751
752                         self.server.stats("", "Receiving result")
753
754                         slave.finishedFrame(job_frame)
755
756                         frame.status = job_result
757                         frame.time = job_time
758
759                         job.testFinished()
760
761                         self.send_head()
762                     else: # frame not found
763                         self.send_head(http.client.NO_CONTENT)
764                 else: # job not found
765                     self.send_head(http.client.NO_CONTENT)
766             else: # invalid slave id
767                 self.send_head(http.client.NO_CONTENT)
768         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
769         elif self.path.startswith("/log"):
770             self.server.stats("", "Receiving log file")
771
772             match = log_pattern.match(self.path)
773
774             if match:
775                 job_id = match.groups()[0]
776
777                 job = self.server.getJobID(job_id)
778
779                 if job:
780                     job_frame = int(match.groups()[1])
781
782                     frame = job[job_frame]
783
784                     if frame and frame.log_path:
785                         length = int(self.headers['content-length'])
786                         buf = self.rfile.read(length)
787                         f = open(frame.log_path, 'ab')
788                         f.write(buf)
789                         f.close()
790
791                         del buf
792
793                         self.server.getSeenSlave(self.headers['slave-id'])
794
795                         self.send_head()
796                     else: # frame not found
797                         self.send_head(http.client.NO_CONTENT)
798                 else: # job not found
799                     self.send_head(http.client.NO_CONTENT)
800             else: # invalid url
801                 self.send_head(http.client.NO_CONTENT)
802
803 class RenderMasterServer(http.server.HTTPServer):
804     def __init__(self, address, handler_class, path):
805         super().__init__(address, handler_class)
806         self.jobs = []
807         self.jobs_map = {}
808         self.slaves = []
809         self.slaves_map = {}
810         self.job_id = 0
811         self.path = path + "master_" + str(os.getpid()) + os.sep
812
813         self.slave_timeout = 30 # 30 mins: need a parameter for that
814
815         self.balancer = netrender.balancing.Balancer()
816         self.balancer.addRule(netrender.balancing.RatingUsageByCategory(self.getJobs))
817         self.balancer.addRule(netrender.balancing.RatingUsage())
818         self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
819         self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9))
820         self.balancer.addPriority(netrender.balancing.NewJobPriority())
821         self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2))
822
823         if not os.path.exists(self.path):
824             os.mkdir(self.path)
825
826     def nextJobID(self):
827         self.job_id += 1
828         return str(self.job_id)
829
830     def addSlave(self, name, address, stats):
831         slave = MRenderSlave(name, address, stats)
832         self.slaves.append(slave)
833         self.slaves_map[slave.id] = slave
834
835         return slave.id
836
837     def removeSlave(self, slave):
838         self.slaves.remove(slave)
839         self.slaves_map.pop(slave.id)
840
841     def getSlave(self, slave_id):
842         return self.slaves_map.get(slave_id)
843
844     def getSeenSlave(self, slave_id):
845         slave = self.getSlave(slave_id)
846         if slave:
847             slave.seen()
848
849         return slave
850
851     def timeoutSlaves(self):
852         removed = []
853
854         t = time.time()
855
856         for slave in self.slaves:
857             if (t - slave.last_seen) / 60 > self.slave_timeout:
858                 removed.append(slave)
859
860                 if slave.job:
861                     for f in slave.job_frames:
862                         slave.job[f].status = ERROR
863
864         for slave in removed:
865             self.removeSlave(slave)
866
867     def updateUsage(self):
868         blend = 0.5
869         for job in self.jobs:
870             job.usage *= (1 - blend)
871
872         if self.slaves:
873             slave_usage = blend / self.countSlaves()
874
875             for slave in self.slaves:
876                 if slave.job:
877                     slave.job.usage += slave_usage
878
879
880     def clear(self, clear_files = False):
881         removed = self.jobs[:]
882
883         for job in removed:
884             self.removeJob(job, clear_files)
885
886     def balance(self):
887         self.balancer.balance(self.jobs)
888
889     def getJobs(self):
890         return self.jobs
891
892     def countJobs(self, status = JOB_QUEUED):
893         total = 0
894         for j in self.jobs:
895             if j.status == status:
896                 total += 1
897
898         return total
899
900     def countSlaves(self):
901         return len(self.slaves)
902
903     def removeJob(self, job, clear_files = False):
904         self.jobs.remove(job)
905         self.jobs_map.pop(job.id)
906         
907         if clear_files:
908             shutil.rmtree(job.save_path)
909         
910         for slave in self.slaves:
911             if slave.job == job:
912                 slave.job = None
913                 slave.job_frames = []
914
915     def addJob(self, job):
916         self.jobs.append(job)
917         self.jobs_map[job.id] = job
918
919         # create job directory
920         job.save_path = self.path + "job_" + job.id + os.sep
921         if not os.path.exists(job.save_path):
922             os.mkdir(job.save_path)
923
924         job.save()
925
926     def getJobID(self, id):
927         return self.jobs_map.get(id)
928
929     def __iter__(self):
930         for job in self.jobs:
931             yield job
932
933     def newDispatch(self, slave_id):
934         if self.jobs:
935             for job in self.jobs:
936                 if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist:
937                     return job, job.getFrames()
938
939         return None, None
940
941 def clearMaster(path):
942     shutil.rmtree(path)
943
944 def runMaster(address, broadcast, clear, path, update_stats, test_break):
945         httpd = RenderMasterServer(address, RenderHandler, path)
946         httpd.timeout = 1
947         httpd.stats = update_stats
948
949         if broadcast:
950             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
951             s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
952
953         start_time = time.time()
954
955         while not test_break():
956             try:
957                 httpd.handle_request()
958             except select.error:
959                 pass
960
961             if time.time() - start_time >= 10: # need constant here
962                 httpd.timeoutSlaves()
963
964                 httpd.updateUsage()
965
966                 if broadcast:
967                         print("broadcasting address")
968                         s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
969                         start_time = time.time()
970
971         httpd.server_close()
972         if clear:
973             clearMaster(httpd.path)