1bff5f6340b4e1ca6fc19c52c649bf25850db774
[blender.git] / release / io / netrender / master.py
1 import sys, os
2 import http, http.client, http.server, urllib, socket
3 import subprocess, shutil, time, hashlib
4
5 from netrender.utils import *
6 import netrender.model
7 import netrender.balancing
8
9 class MRenderFile:
10         def __init__(self, filepath, start, end):
11                 self.filepath = filepath
12                 self.start = start
13                 self.end = end
14                 self.found = False
15         
16         def test(self):
17                 self.found = os.path.exists(self.filepath)
18                 return self.found
19
20
21 class MRenderSlave(netrender.model.RenderSlave):
22         def __init__(self, name, address, stats):
23                 super().__init__()
24                 self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest()
25                 self.name = name
26                 self.address = address
27                 self.stats = stats
28                 self.last_seen = time.time()
29                 
30                 self.job = None
31                 self.frame = None
32                 
33                 netrender.model.RenderSlave._slave_map[self.id] = self
34
35         def seen(self):
36                 self.last_seen = time.time()
37
38 class MRenderJob(netrender.model.RenderJob):
39         def __init__(self, job_id, name, files, chunks = 1, priority = 1, credits = 100.0, blacklist = []):
40                 super().__init__()
41                 self.id = job_id
42                 self.name = name
43                 self.files = files
44                 self.frames = []
45                 self.chunks = chunks
46                 self.priority = priority
47                 self.credits = credits
48                 self.blacklist = blacklist
49                 self.last_dispatched = time.time()
50         
51                 # special server properties
52                 self.save_path = ""
53                 self.files_map = {path: MRenderFile(path, start, end) for path, start, end in files}
54                 self.status = JOB_WAITING
55         
56         def save(self):
57                 if self.save_path:
58                         f = open(self.save_path + "job.txt", "w")
59                         f.write(repr(self.serialize()))
60                         f.close()
61         
62         def testStart(self):
63                 for f in self.files_map.values():
64                         if not f.test():
65                                 return False
66                 
67                 self.start()
68                 return True
69         
70         def start(self):
71                 self.status = JOB_QUEUED
72         
73         def update(self):
74                 self.credits -= 5 # cost of one frame
75                 self.credits += (time.time() - self.last_dispatched) / 60
76                 self.last_dispatched = time.time()
77         
78         def addLog(self, frames):
79                 log_name = "_".join(("%04d" % f for f in frames)) + ".log"
80                 log_path = self.save_path + log_name
81                 
82                 for number in frames:
83                         frame = self[number]
84                         if frame:
85                                 frame.log_path = log_path
86         
87         def addFrame(self, frame_number):
88                 frame = MRenderFrame(frame_number)
89                 self.frames.append(frame)
90                 return frame
91                 
92         def reset(self, all):
93                 for f in self.frames:
94                         f.reset(all)
95         
96         def getFrames(self):
97                 frames = []
98                 for f in self.frames:
99                         if f.status == QUEUED:
100                                 self.update()
101                                 frames.append(f)
102                                 if len(frames) >= self.chunks:
103                                         break
104                 
105                 return frames
106
107 class MRenderFrame(netrender.model.RenderFrame):
108         def __init__(self, frame):
109                 super().__init__()
110                 self.number = frame
111                 self.slave = None
112                 self.time = 0
113                 self.status = QUEUED
114                 self.log_path = None
115                 
116         def reset(self, all):
117                 if all or self.status == ERROR:
118                         self.slave = None
119                         self.time = 0
120                         self.status = QUEUED
121
122
123 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
124 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
125 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
126 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
127
128 class RenderHandler(http.server.BaseHTTPRequestHandler):
129         def send_head(self, code = http.client.OK, headers = {}):
130                 self.send_response(code)
131                 self.send_header("Content-type", "application/octet-stream")
132                 
133                 for key, value in headers.items():
134                         self.send_header(key, value)
135                 
136                 self.end_headers()
137
138         def do_HEAD(self):
139                 print(self.path)
140         
141                 if self.path == "/status":
142                         job_id = self.headers.get('job-id', "")
143                         job_frame = int(self.headers.get('job-frame', -1))
144                         
145                         if job_id:
146                                 print("status:", job_id, "\n")
147                                 
148                                 job = self.server.getJobByID(job_id)
149                                 if job:
150                                         if job_frame != -1:
151                                                 frame = job[frame]
152                                                 
153                                                 if not frame:
154                                                         # no such frame
155                                                         self.send_heat(http.client.NO_CONTENT)
156                                                         return
157                                 else:
158                                         # no such job id
159                                         self.send_head(http.client.NO_CONTENT)
160                                         return
161                         
162                         self.send_head()
163         
164         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
165         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
166         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
167         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
168         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
169         
170         def do_GET(self):
171                 print(self.path)
172                 
173                 if self.path == "/version":
174                         self.send_head()
175                         self.server.stats("", "New client connection")
176                         self.wfile.write(VERSION)
177                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
178                 elif self.path == "/render":
179                         job_id = self.headers['job-id']
180                         job_frame = int(self.headers['job-frame'])
181                         print("render:", job_id, job_frame)
182                         
183                         job = self.server.getJobByID(job_id)
184                         
185                         if job:
186                                 frame = job[job_frame]
187                                 
188                                 if frame:
189                                         if frame.status in (QUEUED, DISPATCHED):
190                                                 self.send_head(http.client.ACCEPTED)
191                                         elif frame.status == DONE:
192                                                 self.server.stats("", "Sending result back to client")
193                                                 f = open(job.save_path + "%04d" % job_frame + ".exr", 'rb')
194                                                 
195                                                 self.send_head()
196                                                 
197                                                 shutil.copyfileobj(f, self.wfile)
198                                                 
199                                                 f.close()
200                                         elif frame.status == ERROR:
201                                                 self.send_head(http.client.PARTIAL_CONTENT)
202                                 else:
203                                         # no such frame
204                                         self.send_head(http.client.NO_CONTENT)
205                         else:
206                                 # no such job id
207                                 self.send_head(http.client.NO_CONTENT)
208                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
209                 elif self.path == "/log":
210                         job_id = self.headers['job-id']
211                         job_frame = int(self.headers['job-frame'])
212                         print("log:", job_id, job_frame)
213                         
214                         job = self.server.getJobByID(job_id)
215                         
216                         if job:
217                                 frame = job[job_frame]
218                                 
219                                 if frame:
220                                         if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
221                                                 self.send_head(http.client.PROCESSING)
222                                         else:
223                                                 self.server.stats("", "Sending log back to client")
224                                                 f = open(frame.log_path, 'rb')
225                                                 
226                                                 self.send_head()
227                                                 
228                                                 shutil.copyfileobj(f, self.wfile)
229                                                 
230                                                 f.close()
231                                 else:
232                                         # no such frame
233                                         self.send_head(http.client.NO_CONTENT)
234                         else:
235                                 # no such job id
236                                 self.send_head(http.client.NO_CONTENT)
237                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
238                 elif self.path == "/status":
239                         job_id = self.headers.get('job-id', "")
240                         job_frame = int(self.headers.get('job-frame', -1))
241                         
242                         if job_id:
243                                 print("status:", job_id, "\n")
244                                 
245                                 job = self.server.getJobByID(job_id)
246                                 if job:
247                                         if job_frame != -1:
248                                                 frame = job[frame]
249                                                 
250                                                 if frame:
251                                                         message = frame.serialize()
252                                                 else:
253                                                         # no such frame
254                                                         self.send_heat(http.client.NO_CONTENT)
255                                                         return
256                                         else:
257                                                 message = job.serialize()
258                                 else:
259                                         # no such job id
260                                         self.send_head(http.client.NO_CONTENT)
261                                         return
262                         else: # status of all jobs
263                                 message = []
264                                 
265                                 for job in self.server:
266                                         message.append(job.serialize())
267                         
268                         self.send_head()
269                         self.wfile.write(bytes(repr(message), encoding='utf8'))
270                         
271                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
272                 elif self.path == "/job":
273                         self.server.update()
274                         
275                         slave_id = self.headers['slave-id']
276                         
277                         print("slave-id", slave_id)
278                         
279                         slave = self.server.updateSlave(slave_id)
280                         
281                         if slave: # only if slave id is valid
282                                 job, frames = self.server.getNewJob(slave_id)
283                                 
284                                 if job and frames:
285                                         for f in frames:
286                                                 print("dispatch", f.number)
287                                                 f.status = DISPATCHED
288                                                 f.slave = slave
289                                         
290                                         self.send_head(headers={"job-id": job.id})
291                                         
292                                         message = job.serialize(frames)
293                                         
294                                         self.wfile.write(bytes(repr(message), encoding='utf8'))
295                                         
296                                         self.server.stats("", "Sending job frame to render node")
297                                 else:
298                                         # no job available, return error code
299                                         self.send_head(http.client.ACCEPTED)
300                         else: # invalid slave id
301                                 self.send_head(http.client.NO_CONTENT)
302                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
303                 elif self.path == "/file":
304                         slave_id = self.headers['slave-id']
305                         
306                         slave = self.server.updateSlave(slave_id)
307                         
308                         if slave: # only if slave id is valid
309                                 job_id = self.headers['job-id']
310                                 job_file = self.headers['job-file']
311                                 print("job:", job_id, "\n")
312                                 print("file:", job_file, "\n")
313                                 
314                                 job = self.server.getJobByID(job_id)
315                                 
316                                 if job:
317                                         render_file = job.files_map.get(job_file, None)
318                                         
319                                         if render_file:
320                                                 self.server.stats("", "Sending file to render node")
321                                                 f = open(render_file.filepath, 'rb')
322                                                 
323                                                 self.send_head()
324                                                 shutil.copyfileobj(f, self.wfile)
325                                                 
326                                                 f.close()
327                                         else:
328                                                 # no such file
329                                                 self.send_head(http.client.NO_CONTENT)
330                                 else:
331                                         # no such job id
332                                         self.send_head(http.client.NO_CONTENT)
333                         else: # invalid slave id
334                                 self.send_head(http.client.NO_CONTENT)
335                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
336                 elif self.path == "/slave":
337                         message = []
338                         
339                         for slave in self.server.slaves:
340                                 message.append(slave.serialize())
341                         
342                         self.send_head()
343                         
344                         self.wfile.write(bytes(repr(message), encoding='utf8'))
345                         
346
347         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
348         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
349         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
350         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
351         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
352         def do_POST(self):
353                 print(self.path)
354         
355                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
356                 if self.path == "/job":
357                         print("posting job info")
358                         self.server.stats("", "Receiving job")
359                         
360                         length = int(self.headers['content-length'])
361                         
362                         job_info = netrender.model.RenderJob.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
363                         
364                         job_id = self.server.nextJobID()
365                         
366                         print(job_info.files)
367                         
368                         job = MRenderJob(job_id, job_info.name, job_info.files, chunks = job_info.chunks, priority = job_info.priority, blacklist = job_info.blacklist)
369                         
370                         for frame in job_info.frames:
371                                 frame = job.addFrame(frame.number)
372                         
373                         self.server.addJob(job)
374                         
375                         headers={"job-id": job_id}
376                         
377                         if job.testStart():
378                                 self.send_head(headers=headers)
379                         else:
380                                 self.send_head(http.client.ACCEPTED, headers=headers)
381                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
382                 elif self.path == "/cancel":
383                         job_id = self.headers.get('job-id', "")
384                         if job_id:
385                                 print("cancel:", job_id, "\n")
386                                 self.server.removeJob(job_id)
387                         else: # cancel all jobs
388                                 self.server.clear()
389                                 
390                         self.send_head()
391                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
392                 elif self.path == "/reset":
393                         job_id = self.headers.get('job-id', "")
394                         job_frame = int(self.headers.get('job-frame', "-1"))
395                         all = bool(self.headers.get('reset-all', "False"))
396                         
397                         job = self.server.getJobByID(job_id)
398                         
399                         if job:
400                                 if job_frame != -1:
401                                         job[job_frame].reset(all)
402                                 else:
403                                         job.reset(all)
404                                         
405                                 self.send_head()
406                         else: # job not found
407                                 self.send_head(http.client.NO_CONTENT)
408                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
409                 elif self.path == "/slave":
410                         length = int(self.headers['content-length'])
411                         job_frame_string = self.headers['job-frame']
412                         
413                         slave_info = netrender.model.RenderSlave.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
414                         
415                         slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
416                         
417                         self.send_head(headers = {"slave-id": slave_id})
418                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
419                 elif self.path == "/log":
420                         slave_id = self.headers['slave-id']
421                         
422                         slave = self.server.updateSlave(slave_id)
423                         
424                         if slave: # only if slave id is valid
425                                 length = int(self.headers['content-length'])
426                                 
427                                 log_info = netrender.model.LogFile.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
428                                 
429                                 job = self.server.getJobByID(log_info.job_id)
430                                 
431                                 if job:
432                                         job.addLog(log_info.frames)
433                                         self.send_head(http.client.OK)
434                                 else:
435                                         # no such job id
436                                         self.send_head(http.client.NO_CONTENT)
437                         else: # invalid slave id
438                                 self.send_head(http.client.NO_CONTENT)  
439         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
440         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
441         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
442         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
443         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
444         def do_PUT(self):
445                 print(self.path)
446                 
447                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
448                 if self.path == "/file":
449                         print("writing blend file")
450                         self.server.stats("", "Receiving job")
451                         
452                         length = int(self.headers['content-length'])
453                         job_id = self.headers['job-id']
454                         job_file = self.headers['job-file']
455                         
456                         job = self.server.getJobByID(job_id)
457                         
458                         if job:
459                                 
460                                 render_file = job.files_map.get(job_file, None)
461                                 
462                                 if render_file:
463                                         main_file = job.files[0][0] # filename of the first file
464                                         
465                                         main_path, main_name = os.path.split(main_file)
466                                         
467                                         if job_file != main_file:
468                                                 file_path = prefixPath(job.save_path, job_file, main_path)
469                                         else:
470                                                 file_path = job.save_path + main_name
471                                         
472                                         buf = self.rfile.read(length)
473                                         
474                                         # add same temp file + renames as slave
475                                         
476                                         f = open(file_path, "wb")
477                                         f.write(buf)
478                                         f.close()
479                                         del buf
480                                         
481                                         render_file.filepath = file_path # set the new path
482                                         
483                                         if job.testStart():
484                                                 self.send_head(http.client.OK)
485                                         else:
486                                                 self.send_head(http.client.ACCEPTED)
487                                 else: # invalid file
488                                         self.send_head(http.client.NO_CONTENT)
489                         else: # job not found
490                                 self.send_head(http.client.NO_CONTENT)
491                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
492                 elif self.path == "/render":
493                         print("writing result file")
494                         self.server.stats("", "Receiving render result")
495                         
496                         slave_id = self.headers['slave-id']
497                         
498                         slave = self.server.updateSlave(slave_id)
499                         
500                         if slave: # only if slave id is valid
501                                 job_id = self.headers['job-id']
502                                 
503                                 job = self.server.getJobByID(job_id)
504                                 
505                                 if job:
506                                         job_frame = int(self.headers['job-frame'])
507                                         job_result = int(self.headers['job-result'])
508                                         job_time = float(self.headers['job-time'])
509                                         
510                                         frame = job[job_frame]
511                                         
512                                         if job_result == DONE:
513                                                 length = int(self.headers['content-length'])
514                                                 buf = self.rfile.read(length)
515                                                 f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
516                                                 f.write(buf)
517                                                 f.close()
518                                                 
519                                                 del buf
520                                         elif job_result == ERROR:
521                                                 # blacklist slave on this job on error
522                                                 job.blacklist.append(slave.id)
523                                                 
524                                         frame.status = job_result
525                                         frame.time = job_time
526                         
527                                         self.server.updateSlave(self.headers['slave-id'])
528                                         
529                                         self.send_head()
530                                 else: # job not found
531                                         self.send_head(http.client.NO_CONTENT)
532                         else: # invalid slave id
533                                 self.send_head(http.client.NO_CONTENT)
534                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
535                 elif self.path == "/log":
536                         print("writing log file")
537                         self.server.stats("", "Receiving log file")
538                         
539                         job_id = self.headers['job-id']
540                         
541                         job = self.server.getJobByID(job_id)
542                         
543                         if job:
544                                 job_frame = int(self.headers['job-frame'])
545                                 
546                                 frame = job[job_frame]
547                                 
548                                 if frame and frame.log_path:
549                                         length = int(self.headers['content-length'])
550                                         buf = self.rfile.read(length)
551                                         f = open(frame.log_path, 'ab')
552                                         f.write(buf)
553                                         f.close()
554                                                 
555                                         del buf
556                                         
557                                         self.server.updateSlave(self.headers['slave-id'])
558                                         
559                                         self.send_head()
560                                 else: # frame not found
561                                         self.send_head(http.client.NO_CONTENT)
562                         else: # job not found
563                                 self.send_head(http.client.NO_CONTENT)
564
565 class RenderMasterServer(http.server.HTTPServer):
566         def __init__(self, address, handler_class, path):
567                 super().__init__(address, handler_class)
568                 self.jobs = []
569                 self.jobs_map = {}
570                 self.slaves = []
571                 self.slaves_map = {}
572                 self.job_id = 0
573                 self.path = path + "master_" + str(os.getpid()) + os.sep
574                 
575                 self.balancer = netrender.balancing.Balancer()
576                 self.balancer.addRule(netrender.balancing.RatingCredit())
577                 self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
578                 self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves))
579                 self.balancer.addPriority(netrender.balancing.NewJobPriority())
580                 self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority())
581                 
582                 if not os.path.exists(self.path):
583                         os.mkdir(self.path)
584         
585         def nextJobID(self):
586                 self.job_id += 1
587                 return str(self.job_id)
588         
589         def addSlave(self, name, address, stats):
590                 slave = MRenderSlave(name, address, stats)
591                 self.slaves.append(slave)
592                 self.slaves_map[slave.id] = slave
593                 
594                 return slave.id
595         
596         def getSlave(self, slave_id):
597                 return self.slaves_map.get(slave_id, None)
598         
599         def updateSlave(self, slave_id):
600                 slave = self.getSlave(slave_id)
601                 if slave:
602                         slave.seen()
603                         
604                 return slave
605         
606         def clear(self):
607                 self.jobs_map = {}
608                 self.jobs = []
609         
610         def update(self):
611                 self.balancer.balance(self.jobs)
612         
613         def countJobs(self, status = JOB_QUEUED):
614                 total = 0
615                 for j in self.jobs:
616                         if j.status == status:
617                                 total += 1
618                 
619                 return total
620         
621         def countSlaves(self):
622                 return len(self.slaves)
623         
624         def removeJob(self, id):
625                 job = self.jobs_map.pop(id)
626
627                 if job:
628                         self.jobs.remove(job)
629         
630         def addJob(self, job):
631                 self.jobs.append(job)
632                 self.jobs_map[job.id] = job
633                 
634                 # create job directory
635                 job.save_path = self.path + "job_" + job.id + os.sep
636                 if not os.path.exists(job.save_path):
637                         os.mkdir(job.save_path)
638                         
639                 job.save()
640         
641         def getJobByID(self, id):
642                 return self.jobs_map.get(id, None)
643         
644         def __iter__(self):
645                 for job in self.jobs:
646                         yield job
647         
648         def getNewJob(self, slave_id):
649                 if self.jobs:
650                         for job in self.jobs:
651                                 if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist:
652                                         return job, job.getFrames()
653                 
654                 return None, None
655
656 def runMaster(address, broadcast, path, update_stats, test_break):
657                 httpd = RenderMasterServer(address, RenderHandler, path)
658                 httpd.timeout = 1
659                 httpd.stats = update_stats
660                 
661                 if broadcast:
662                         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
663                         s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
664
665                         start_time = time.time()
666                         
667                 while not test_break():
668                         httpd.handle_request()
669                         
670                         if broadcast:
671                                 if time.time() - start_time >= 10: # need constant here
672                                         print("broadcasting address")
673                                         s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
674                                         start_time = time.time()