* fix snprintf error with mingw
[blender.git] / release / io / netrender / master.py
1 import sys, os
2 import http, http.client, http.server, urllib, socket
3 import subprocess, shutil, time, hashlib
4
5 from netrender.utils import *
6 import netrender.model
7 import netrender.balancing
8 import netrender.master_html
9
10 class MRenderFile:
11         def __init__(self, filepath, start, end):
12                 self.filepath = filepath
13                 self.start = start
14                 self.end = end
15                 self.found = False
16         
17         def test(self):
18                 self.found = os.path.exists(self.filepath)
19                 return self.found
20
21
22 class MRenderSlave(netrender.model.RenderSlave):
23         def __init__(self, name, address, stats):
24                 super().__init__()
25                 self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest()
26                 self.name = name
27                 self.address = address
28                 self.stats = stats
29                 self.last_seen = time.time()
30                 
31                 self.job = None
32                 self.frame = None
33                 
34                 netrender.model.RenderSlave._slave_map[self.id] = self
35
36         def seen(self):
37                 self.last_seen = time.time()
38
39 class MRenderJob(netrender.model.RenderJob):
40         def __init__(self, job_id, name, files, chunks = 1, priority = 1, credits = 100.0, blacklist = []):
41                 super().__init__()
42                 self.id = job_id
43                 self.name = name
44                 self.files = files
45                 self.frames = []
46                 self.chunks = chunks
47                 self.priority = priority
48                 self.credits = credits
49                 self.blacklist = blacklist
50                 self.last_dispatched = time.time()
51         
52                 # special server properties
53                 self.last_update = 0
54                 self.save_path = ""
55                 self.files_map = {path: MRenderFile(path, start, end) for path, start, end in files}
56                 self.status = JOB_WAITING
57         
58         def save(self):
59                 if self.save_path:
60                         f = open(self.save_path + "job.txt", "w")
61                         f.write(repr(self.serialize()))
62                         f.close()
63         
64         def testStart(self):
65                 for f in self.files_map.values():
66                         if not f.test():
67                                 return False
68                 
69                 self.start()
70                 return True
71         
72         def testFinished(self):
73                 for f in self.frames:
74                         if f.status == QUEUED or f.status == DISPATCHED:
75                                 break
76                 else:
77                         self.status = JOB_FINISHED
78
79         def start(self):
80                 self.status = JOB_QUEUED
81
82         def update(self):
83                 if self.last_update == 0:
84                         self.credits += (time.time() - self.last_dispatched) / 60
85                 else:
86                         self.credits += (time.time() - self.last_update) / 60
87
88                 self.last_update = time.time()
89         
90         def addLog(self, frames):
91                 log_name = "_".join(("%04d" % f for f in frames)) + ".log"
92                 log_path = self.save_path + log_name
93                 
94                 for number in frames:
95                         frame = self[number]
96                         if frame:
97                                 frame.log_path = log_path
98         
99         def addFrame(self, frame_number):
100                 frame = MRenderFrame(frame_number)
101                 self.frames.append(frame)
102                 return frame
103                 
104         def reset(self, all):
105                 for f in self.frames:
106                         f.reset(all)
107         
108         def getFrames(self):
109                 frames = []
110                 for f in self.frames:
111                         if f.status == QUEUED:
112                                 self.credits -= 1 # cost of one frame
113                                 self.last_dispatched = time.time()
114                                 frames.append(f)
115                                 if len(frames) >= self.chunks:
116                                         break
117                 
118                 return frames
119
120 class MRenderFrame(netrender.model.RenderFrame):
121         def __init__(self, frame):
122                 super().__init__()
123                 self.number = frame
124                 self.slave = None
125                 self.time = 0
126                 self.status = QUEUED
127                 self.log_path = None
128                 
129         def reset(self, all):
130                 if all or self.status == ERROR:
131                         self.slave = None
132                         self.time = 0
133                         self.status = QUEUED
134
135
136 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
137 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
138 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
139 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
140
141 class RenderHandler(http.server.BaseHTTPRequestHandler):
142         def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"):
143                 self.send_response(code)
144                 self.send_header("Content-type", content)
145                 
146                 for key, value in headers.items():
147                         self.send_header(key, value)
148                 
149                 self.end_headers()
150
151         def do_HEAD(self):
152                 print(self.path)
153         
154                 if self.path == "/status":
155                         job_id = self.headers.get('job-id', "")
156                         job_frame = int(self.headers.get('job-frame', -1))
157                         
158                         if job_id:
159                                 print("status:", job_id, "\n")
160                                 
161                                 job = self.server.getJobByID(job_id)
162                                 if job:
163                                         if job_frame != -1:
164                                                 frame = job[frame]
165                                                 
166                                                 if not frame:
167                                                         # no such frame
168                                                         self.send_heat(http.client.NO_CONTENT)
169                                                         return
170                                 else:
171                                         # no such job id
172                                         self.send_head(http.client.NO_CONTENT)
173                                         return
174                         
175                         self.send_head()
176         
177         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
178         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
179         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
180         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
181         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
182         
183         def do_GET(self):
184                 print(self.path)
185                 
186                 if self.path == "/version":
187                         self.send_head()
188                         self.server.stats("", "New client connection")
189                         self.wfile.write(VERSION)
190                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
191                 elif self.path == "/render":
192                         job_id = self.headers['job-id']
193                         job_frame = int(self.headers['job-frame'])
194                         print("render:", job_id, job_frame)
195                         
196                         job = self.server.getJobByID(job_id)
197                         
198                         if job:
199                                 frame = job[job_frame]
200                                 
201                                 if frame:
202                                         if frame.status in (QUEUED, DISPATCHED):
203                                                 self.send_head(http.client.ACCEPTED)
204                                         elif frame.status == DONE:
205                                                 self.server.stats("", "Sending result back to client")
206                                                 f = open(job.save_path + "%04d" % job_frame + ".exr", 'rb')
207                                                 
208                                                 self.send_head()
209                                                 
210                                                 shutil.copyfileobj(f, self.wfile)
211                                                 
212                                                 f.close()
213                                         elif frame.status == ERROR:
214                                                 self.send_head(http.client.PARTIAL_CONTENT)
215                                 else:
216                                         # no such frame
217                                         self.send_head(http.client.NO_CONTENT)
218                         else:
219                                 # no such job id
220                                 self.send_head(http.client.NO_CONTENT)
221                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
222                 elif self.path == "/log":
223                         job_id = self.headers['job-id']
224                         job_frame = int(self.headers['job-frame'])
225                         print("log:", job_id, job_frame)
226                         
227                         job = self.server.getJobByID(job_id)
228                         
229                         if job:
230                                 frame = job[job_frame]
231                                 
232                                 if frame:
233                                         if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
234                                                 self.send_head(http.client.PROCESSING)
235                                         else:
236                                                 self.server.stats("", "Sending log back to client")
237                                                 f = open(frame.log_path, 'rb')
238                                                 
239                                                 self.send_head()
240                                                 
241                                                 shutil.copyfileobj(f, self.wfile)
242                                                 
243                                                 f.close()
244                                 else:
245                                         # no such frame
246                                         self.send_head(http.client.NO_CONTENT)
247                         else:
248                                 # no such job id
249                                 self.send_head(http.client.NO_CONTENT)
250                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
251                 elif self.path == "/status":
252                         job_id = self.headers.get('job-id', "")
253                         job_frame = int(self.headers.get('job-frame', -1))
254                         
255                         if job_id:
256                                 print("status:", job_id, "\n")
257                                 
258                                 job = self.server.getJobByID(job_id)
259                                 if job:
260                                         if job_frame != -1:
261                                                 frame = job[frame]
262                                                 
263                                                 if frame:
264                                                         message = frame.serialize()
265                                                 else:
266                                                         # no such frame
267                                                         self.send_heat(http.client.NO_CONTENT)
268                                                         return
269                                         else:
270                                                 message = job.serialize()
271                                 else:
272                                         # no such job id
273                                         self.send_head(http.client.NO_CONTENT)
274                                         return
275                         else: # status of all jobs
276                                 message = []
277                                 
278                                 for job in self.server:
279                                         message.append(job.serialize())
280                         
281                         self.send_head()
282                         self.wfile.write(bytes(repr(message), encoding='utf8'))
283                         
284                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
285                 elif self.path == "/job":
286                         self.server.update()
287                         
288                         slave_id = self.headers['slave-id']
289                         
290                         print("slave-id", slave_id)
291                         
292                         slave = self.server.updateSlave(slave_id)
293                         
294                         if slave: # only if slave id is valid
295                                 job, frames = self.server.getNewJob(slave_id)
296                                 
297                                 if job and frames:
298                                         for f in frames:
299                                                 print("dispatch", f.number)
300                                                 f.status = DISPATCHED
301                                                 f.slave = slave
302                                         
303                                         self.send_head(headers={"job-id": job.id})
304                                         
305                                         message = job.serialize(frames)
306                                         
307                                         self.wfile.write(bytes(repr(message), encoding='utf8'))
308                                         
309                                         self.server.stats("", "Sending job frame to render node")
310                                 else:
311                                         # no job available, return error code
312                                         self.send_head(http.client.ACCEPTED)
313                         else: # invalid slave id
314                                 self.send_head(http.client.NO_CONTENT)
315                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
316                 elif self.path == "/file":
317                         slave_id = self.headers['slave-id']
318                         
319                         slave = self.server.updateSlave(slave_id)
320                         
321                         if slave: # only if slave id is valid
322                                 job_id = self.headers['job-id']
323                                 job_file = self.headers['job-file']
324                                 print("job:", job_id, "\n")
325                                 print("file:", job_file, "\n")
326                                 
327                                 job = self.server.getJobByID(job_id)
328                                 
329                                 if job:
330                                         render_file = job.files_map.get(job_file, None)
331                                         
332                                         if render_file:
333                                                 self.server.stats("", "Sending file to render node")
334                                                 f = open(render_file.filepath, 'rb')
335                                                 
336                                                 self.send_head()
337                                                 shutil.copyfileobj(f, self.wfile)
338                                                 
339                                                 f.close()
340                                         else:
341                                                 # no such file
342                                                 self.send_head(http.client.NO_CONTENT)
343                                 else:
344                                         # no such job id
345                                         self.send_head(http.client.NO_CONTENT)
346                         else: # invalid slave id
347                                 self.send_head(http.client.NO_CONTENT)
348                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
349                 elif self.path == "/slave":
350                         message = []
351                         
352                         for slave in self.server.slaves:
353                                 message.append(slave.serialize())
354                         
355                         self.send_head()
356                         
357                         self.wfile.write(bytes(repr(message), encoding='utf8'))
358                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
359                 else:
360                         # hand over the rest to the html section
361                         netrender.master_html.get(self)
362
363         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
364         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
365         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
366         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
367         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
368         def do_POST(self):
369                 print(self.path)
370         
371                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
372                 if self.path == "/job":
373                         print("posting job info")
374                         self.server.stats("", "Receiving job")
375                         
376                         length = int(self.headers['content-length'])
377                         
378                         job_info = netrender.model.RenderJob.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
379                         
380                         job_id = self.server.nextJobID()
381                         
382                         print(job_info.files)
383                         
384                         job = MRenderJob(job_id, job_info.name, job_info.files, chunks = job_info.chunks, priority = job_info.priority, blacklist = job_info.blacklist)
385                         
386                         for frame in job_info.frames:
387                                 frame = job.addFrame(frame.number)
388                         
389                         self.server.addJob(job)
390                         
391                         headers={"job-id": job_id}
392                         
393                         if job.testStart():
394                                 self.send_head(headers=headers)
395                         else:
396                                 self.send_head(http.client.ACCEPTED, headers=headers)
397                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
398                 elif self.path == "/cancel":
399                         job_id = self.headers.get('job-id', "")
400                         if job_id:
401                                 print("cancel:", job_id, "\n")
402                                 self.server.removeJob(job_id)
403                         else: # cancel all jobs
404                                 self.server.clear()
405                                 
406                         self.send_head()
407                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
408                 elif self.path == "/reset":
409                         job_id = self.headers.get('job-id', "")
410                         job_frame = int(self.headers.get('job-frame', "-1"))
411                         all = bool(self.headers.get('reset-all', "False"))
412                         
413                         job = self.server.getJobByID(job_id)
414                         
415                         if job:
416                                 if job_frame != -1:
417                                         job[job_frame].reset(all)
418                                 else:
419                                         job.reset(all)
420                                         
421                                 self.send_head()
422                         else: # job not found
423                                 self.send_head(http.client.NO_CONTENT)
424                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
425                 elif self.path == "/slave":
426                         length = int(self.headers['content-length'])
427                         job_frame_string = self.headers['job-frame']
428                         
429                         slave_info = netrender.model.RenderSlave.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
430                         
431                         slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
432                         
433                         self.send_head(headers = {"slave-id": slave_id})
434                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
435                 elif self.path == "/log":
436                         slave_id = self.headers['slave-id']
437                         
438                         slave = self.server.updateSlave(slave_id)
439                         
440                         if slave: # only if slave id is valid
441                                 length = int(self.headers['content-length'])
442                                 
443                                 log_info = netrender.model.LogFile.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
444                                 
445                                 job = self.server.getJobByID(log_info.job_id)
446                                 
447                                 if job:
448                                         job.addLog(log_info.frames)
449                                         self.send_head(http.client.OK)
450                                 else:
451                                         # no such job id
452                                         self.send_head(http.client.NO_CONTENT)
453                         else: # invalid slave id
454                                 self.send_head(http.client.NO_CONTENT)  
455         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
456         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
457         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
458         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
459         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
460         def do_PUT(self):
461                 print(self.path)
462                 
463                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
464                 if self.path == "/file":
465                         print("writing blend file")
466                         self.server.stats("", "Receiving job")
467                         
468                         length = int(self.headers['content-length'])
469                         job_id = self.headers['job-id']
470                         job_file = self.headers['job-file']
471                         
472                         job = self.server.getJobByID(job_id)
473                         
474                         if job:
475                                 
476                                 render_file = job.files_map.get(job_file, None)
477                                 
478                                 if render_file:
479                                         main_file = job.files[0][0] # filename of the first file
480                                         
481                                         main_path, main_name = os.path.split(main_file)
482                                         
483                                         if job_file != main_file:
484                                                 file_path = prefixPath(job.save_path, job_file, main_path)
485                                         else:
486                                                 file_path = job.save_path + main_name
487                                         
488                                         buf = self.rfile.read(length)
489                                         
490                                         # add same temp file + renames as slave
491                                         
492                                         f = open(file_path, "wb")
493                                         f.write(buf)
494                                         f.close()
495                                         del buf
496                                         
497                                         render_file.filepath = file_path # set the new path
498                                         
499                                         if job.testStart():
500                                                 self.send_head(http.client.OK)
501                                         else:
502                                                 self.send_head(http.client.ACCEPTED)
503                                 else: # invalid file
504                                         self.send_head(http.client.NO_CONTENT)
505                         else: # job not found
506                                 self.send_head(http.client.NO_CONTENT)
507                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
508                 elif self.path == "/render":
509                         print("writing result file")
510                         self.server.stats("", "Receiving render result")
511                         
512                         slave_id = self.headers['slave-id']
513                         
514                         slave = self.server.updateSlave(slave_id)
515                         
516                         if slave: # only if slave id is valid
517                                 job_id = self.headers['job-id']
518                                 
519                                 job = self.server.getJobByID(job_id)
520                                 
521                                 if job:
522                                         job_frame = int(self.headers['job-frame'])
523                                         job_result = int(self.headers['job-result'])
524                                         job_time = float(self.headers['job-time'])
525                                         
526                                         frame = job[job_frame]
527
528                                         if job_result == DONE:
529                                                 length = int(self.headers['content-length'])
530                                                 buf = self.rfile.read(length)
531                                                 f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
532                                                 f.write(buf)
533                                                 f.close()
534                                                 
535                                                 del buf
536                                         elif job_result == ERROR:
537                                                 # blacklist slave on this job on error
538                                                 job.blacklist.append(slave.id)
539                                                 
540                                         frame.status = job_result
541                                         frame.time = job_time
542
543                                         job.testFinished()
544                         
545                                         self.server.updateSlave(self.headers['slave-id'])
546                                         
547                                         self.send_head()
548                                 else: # job not found
549                                         self.send_head(http.client.NO_CONTENT)
550                         else: # invalid slave id
551                                 self.send_head(http.client.NO_CONTENT)
552                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
553                 elif self.path == "/log":
554                         print("writing log file")
555                         self.server.stats("", "Receiving log file")
556                         
557                         job_id = self.headers['job-id']
558                         
559                         job = self.server.getJobByID(job_id)
560                         
561                         if job:
562                                 job_frame = int(self.headers['job-frame'])
563                                 
564                                 frame = job[job_frame]
565                                 
566                                 if frame and frame.log_path:
567                                         length = int(self.headers['content-length'])
568                                         buf = self.rfile.read(length)
569                                         f = open(frame.log_path, 'ab')
570                                         f.write(buf)
571                                         f.close()
572                                                 
573                                         del buf
574                                         
575                                         self.server.updateSlave(self.headers['slave-id'])
576                                         
577                                         self.send_head()
578                                 else: # frame not found
579                                         self.send_head(http.client.NO_CONTENT)
580                         else: # job not found
581                                 self.send_head(http.client.NO_CONTENT)
582
583 class RenderMasterServer(http.server.HTTPServer):
584         def __init__(self, address, handler_class, path):
585                 super().__init__(address, handler_class)
586                 self.jobs = []
587                 self.jobs_map = {}
588                 self.slaves = []
589                 self.slaves_map = {}
590                 self.job_id = 0
591                 self.path = path + "master_" + str(os.getpid()) + os.sep
592                 
593                 self.balancer = netrender.balancing.Balancer()
594                 self.balancer.addRule(netrender.balancing.RatingCredit())
595                 self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
596                 self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves))
597                 self.balancer.addPriority(netrender.balancing.NewJobPriority())
598                 self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2))
599                 
600                 if not os.path.exists(self.path):
601                         os.mkdir(self.path)
602         
603         def nextJobID(self):
604                 self.job_id += 1
605                 return str(self.job_id)
606         
607         def addSlave(self, name, address, stats):
608                 slave = MRenderSlave(name, address, stats)
609                 self.slaves.append(slave)
610                 self.slaves_map[slave.id] = slave
611                 
612                 return slave.id
613         
614         def getSlave(self, slave_id):
615                 return self.slaves_map.get(slave_id, None)
616         
617         def updateSlave(self, slave_id):
618                 slave = self.getSlave(slave_id)
619                 if slave:
620                         slave.seen()
621                         
622                 return slave
623         
624         def clear(self):
625                 self.jobs_map = {}
626                 self.jobs = []
627         
628         def update(self):
629                 for job in self.jobs:
630                         job.update()
631                 self.balancer.balance(self.jobs)
632         
633         def countJobs(self, status = JOB_QUEUED):
634                 total = 0
635                 for j in self.jobs:
636                         if j.status == status:
637                                 total += 1
638                 
639                 return total
640         
641         def countSlaves(self):
642                 return len(self.slaves)
643         
644         def removeJob(self, id):
645                 job = self.jobs_map.pop(id)
646
647                 if job:
648                         self.jobs.remove(job)
649         
650         def addJob(self, job):
651                 self.jobs.append(job)
652                 self.jobs_map[job.id] = job
653                 
654                 # create job directory
655                 job.save_path = self.path + "job_" + job.id + os.sep
656                 if not os.path.exists(job.save_path):
657                         os.mkdir(job.save_path)
658                         
659                 job.save()
660         
661         def getJobByID(self, id):
662                 return self.jobs_map.get(id, None)
663         
664         def __iter__(self):
665                 for job in self.jobs:
666                         yield job
667         
668         def getNewJob(self, slave_id):
669                 if self.jobs:
670                         for job in self.jobs:
671                                 if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist:
672                                         return job, job.getFrames()
673                 
674                 return None, None
675
676 def runMaster(address, broadcast, path, update_stats, test_break):
677                 httpd = RenderMasterServer(address, RenderHandler, path)
678                 httpd.timeout = 1
679                 httpd.stats = update_stats
680                 
681                 if broadcast:
682                         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
683                         s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
684
685                         start_time = time.time()
686                         
687                 while not test_break():
688                         httpd.handle_request()
689                         
690                         if broadcast:
691                                 if time.time() - start_time >= 10: # need constant here
692                                         print("broadcasting address")
693                                         s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
694                                         start_time = time.time()