netrender: first draft for process jobs, to be able to run arbitrary commands on...
[blender.git] / release / scripts / io / netrender / master.py
1 import sys, os
2 import http, http.client, http.server, urllib, socket
3 import subprocess, shutil, time, hashlib
4
5 from netrender.utils import *
6 import netrender.model
7 import netrender.balancing
8 import netrender.master_html
9
10 class MRenderFile:
11         def __init__(self, filepath, start, end):
12                 self.filepath = filepath
13                 self.start = start
14                 self.end = end
15                 self.found = False
16         
17         def test(self):
18                 self.found = os.path.exists(self.filepath)
19                 return self.found
20
21
22 class MRenderSlave(netrender.model.RenderSlave):
23         def __init__(self, name, address, stats):
24                 super().__init__()
25                 self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest()
26                 self.name = name
27                 self.address = address
28                 self.stats = stats
29                 self.last_seen = time.time()
30                 
31                 self.job = None
32                 self.job_frames = []
33                 
34                 netrender.model.RenderSlave._slave_map[self.id] = self
35
36         def seen(self):
37                 self.last_seen = time.time()
38                 
39         def finishedFrame(self, frame_number):
40                 self.job_frames.remove(frame_number)
41                 if not self.job_frames:
42                         self.job = None
43
44 class MRenderJob(netrender.model.RenderJob):
45         def __init__(self, job_id, job_type, name, files, chunks = 1, priority = 1, blacklist = []):
46                 super().__init__()
47                 self.id = job_id
48                 self.type = job_type
49                 self.name = name
50                 self.files = files
51                 self.frames = []
52                 self.chunks = chunks
53                 self.priority = priority
54                 self.usage = 0.0
55                 self.blacklist = blacklist
56                 self.last_dispatched = time.time()
57                 
58                 # force one chunk for process jobs
59                 if self.type == netrender.model.JOB_PROCESS:
60                         self.chunks = 1
61         
62                 # special server properties
63                 self.last_update = 0
64                 self.save_path = ""
65                 self.files_map = {path: MRenderFile(path, start, end) for path, start, end in files}
66                 self.status = JOB_WAITING
67         
68         def save(self):
69                 if self.save_path:
70                         f = open(self.save_path + "job.txt", "w")
71                         f.write(repr(self.serialize()))
72                         f.close()
73         
74         def testStart(self):
75                 for f in self.files_map.values():
76                         if not f.test():
77                                 return False
78                 
79                 self.start()
80                 return True
81         
82         def testFinished(self):
83                 for f in self.frames:
84                         if f.status == QUEUED or f.status == DISPATCHED:
85                                 break
86                 else:
87                         self.status = JOB_FINISHED
88
89         def start(self):
90                 self.status = JOB_QUEUED
91         
92         def addLog(self, frames):
93                 log_name = "_".join(("%04d" % f for f in frames)) + ".log"
94                 log_path = self.save_path + log_name
95                 
96                 for number in frames:
97                         frame = self[number]
98                         if frame:
99                                 frame.log_path = log_path
100         
101         def addFrame(self, frame_number, command):
102                 frame = MRenderFrame(frame_number, command)
103                 self.frames.append(frame)
104                 return frame
105                 
106         def reset(self, all):
107                 for f in self.frames:
108                         f.reset(all)
109         
110         def getFrames(self):
111                 frames = []
112                 for f in self.frames:
113                         if f.status == QUEUED:
114                                 self.last_dispatched = time.time()
115                                 frames.append(f)
116                                 if len(frames) >= self.chunks:
117                                         break
118                 
119                 return frames
120
121 class MRenderFrame(netrender.model.RenderFrame):
122         def __init__(self, frame, command):
123                 super().__init__()
124                 self.number = frame
125                 self.slave = None
126                 self.time = 0
127                 self.status = QUEUED
128                 self.command = command
129                 
130                 self.log_path = None
131                 
132         def reset(self, all):
133                 if all or self.status == ERROR:
134                         self.slave = None
135                         self.time = 0
136                         self.status = QUEUED
137
138
139 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
140 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
141 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
142 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
143
144 class RenderHandler(http.server.BaseHTTPRequestHandler):
145         def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"):
146                 self.send_response(code)
147                 self.send_header("Content-type", content)
148                 
149                 for key, value in headers.items():
150                         self.send_header(key, value)
151                 
152                 self.end_headers()
153
154         def do_HEAD(self):
155         
156                 if self.path == "/status":
157                         job_id = self.headers.get('job-id', "")
158                         job_frame = int(self.headers.get('job-frame', -1))
159                         
160                         job = self.server.getJobID(job_id)
161                         if job:
162                                 frame = job[job_frame]
163                                 
164                                 
165                                 if frame:
166                                         self.send_head(http.client.OK)
167                                 else:
168                                         # no such frame
169                                         self.send_head(http.client.NO_CONTENT)
170                         else:
171                                 # no such job id
172                                 self.send_head(http.client.NO_CONTENT)
173         
174         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
175         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
176         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
177         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
178         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
179         
180         def do_GET(self):
181                 
182                 if self.path == "/version":
183                         self.send_head()
184                         self.server.stats("", "Version check")
185                         self.wfile.write(VERSION)
186                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
187                 elif self.path == "/render":
188                         job_id = self.headers['job-id']
189                         job_frame = int(self.headers['job-frame'])
190                         
191                         job = self.server.getJobID(job_id)
192                         
193                         if job:
194                                 frame = job[job_frame]
195                                 
196                                 if frame:
197                                         if frame.status in (QUEUED, DISPATCHED):
198                                                 self.send_head(http.client.ACCEPTED)
199                                         elif frame.status == DONE:
200                                                 self.server.stats("", "Sending result to client")
201                                                 f = open(job.save_path + "%04d" % job_frame + ".exr", 'rb')
202                                                 
203                                                 self.send_head()
204                                                 
205                                                 shutil.copyfileobj(f, self.wfile)
206                                                 
207                                                 f.close()
208                                         elif frame.status == ERROR:
209                                                 self.send_head(http.client.PARTIAL_CONTENT)
210                                 else:
211                                         # no such frame
212                                         self.send_head(http.client.NO_CONTENT)
213                         else:
214                                 # no such job id
215                                 self.send_head(http.client.NO_CONTENT)
216                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
217                 elif self.path == "/log":
218                         job_id = self.headers['job-id']
219                         job_frame = int(self.headers['job-frame'])
220                         
221                         job = self.server.getJobID(job_id)
222                         
223                         if job:
224                                 frame = job[job_frame]
225                                 
226                                 if frame:
227                                         if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
228                                                 self.send_head(http.client.PROCESSING)
229                                         else:
230                                                 self.server.stats("", "Sending log to client")
231                                                 f = open(frame.log_path, 'rb')
232                                                 
233                                                 self.send_head()
234                                                 
235                                                 shutil.copyfileobj(f, self.wfile)
236                                                 
237                                                 f.close()
238                                 else:
239                                         # no such frame
240                                         self.send_head(http.client.NO_CONTENT)
241                         else:
242                                 # no such job id
243                                 self.send_head(http.client.NO_CONTENT)
244                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
245                 elif self.path == "/status":
246                         job_id = self.headers.get('job-id', "")
247                         job_frame = int(self.headers.get('job-frame', -1))
248                         
249                         if job_id:
250                                 
251                                 job = self.server.getJobID(job_id)
252                                 if job:
253                                         if job_frame != -1:
254                                                 frame = job[frame]
255                                                 
256                                                 if frame:
257                                                         message = frame.serialize()
258                                                 else:
259                                                         # no such frame
260                                                         self.send_heat(http.client.NO_CONTENT)
261                                                         return
262                                         else:
263                                                 message = job.serialize()
264                                 else:
265                                         # no such job id
266                                         self.send_head(http.client.NO_CONTENT)
267                                         return
268                         else: # status of all jobs
269                                 message = []
270                                 
271                                 for job in self.server:
272                                         message.append(job.serialize())
273                         
274                         
275                         self.server.stats("", "Sending status")
276                         self.send_head()
277                         self.wfile.write(bytes(repr(message), encoding='utf8'))
278                         
279                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
280                 elif self.path == "/job":
281                         self.server.balance()
282                         
283                         slave_id = self.headers['slave-id']
284                         
285                         slave = self.server.getSeenSlave(slave_id)
286                         
287                         if slave: # only if slave id is valid
288                                 job, frames = self.server.newDispatch(slave_id)
289                                 
290                                 if job and frames:
291                                         for f in frames:
292                                                 print("dispatch", f.number)
293                                                 f.status = DISPATCHED
294                                                 f.slave = slave
295                                         
296                                         slave.job = job
297                                         slave.job_frames = [f.number for f in frames]
298                                         
299                                         self.send_head(headers={"job-id": job.id})
300                                         
301                                         message = job.serialize(frames)
302                                         
303                                         self.wfile.write(bytes(repr(message), encoding='utf8'))
304                                         
305                                         self.server.stats("", "Sending job to slave")
306                                 else:
307                                         # no job available, return error code
308                                         slave.job = None
309                                         slave.job_frames = []
310                                         
311                                         self.send_head(http.client.ACCEPTED)
312                         else: # invalid slave id
313                                 self.send_head(http.client.NO_CONTENT)
314                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
315                 elif self.path == "/file":
316                         slave_id = self.headers['slave-id']
317                         
318                         slave = self.server.getSeenSlave(slave_id)
319                         
320                         if slave: # only if slave id is valid
321                                 job_id = self.headers['job-id']
322                                 job_file = self.headers['job-file']
323                                 
324                                 job = self.server.getJobID(job_id)
325                                 
326                                 if job:
327                                         render_file = job.files_map.get(job_file, None)
328                                         
329                                         if render_file:
330                                                 self.server.stats("", "Sending file to slave")
331                                                 f = open(render_file.filepath, 'rb')
332                                                 
333                                                 self.send_head()
334                                                 shutil.copyfileobj(f, self.wfile)
335                                                 
336                                                 f.close()
337                                         else:
338                                                 # no such file
339                                                 self.send_head(http.client.NO_CONTENT)
340                                 else:
341                                         # no such job id
342                                         self.send_head(http.client.NO_CONTENT)
343                         else: # invalid slave id
344                                 self.send_head(http.client.NO_CONTENT)
345                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
346                 elif self.path == "/slaves":
347                         message = []
348                         
349                         self.server.stats("", "Sending slaves status")
350                         
351                         for slave in self.server.slaves:
352                                 message.append(slave.serialize())
353                         
354                         self.send_head()
355                         
356                         self.wfile.write(bytes(repr(message), encoding='utf8'))
357                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
358                 else:
359                         # hand over the rest to the html section
360                         netrender.master_html.get(self)
361
362         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
363         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
364         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
365         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
366         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
367         def do_POST(self):
368         
369                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
370                 if self.path == "/job":
371                         
372                         length = int(self.headers['content-length'])
373                         
374                         job_info = netrender.model.RenderJob.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
375                         
376                         job_id = self.server.nextJobID()
377                         
378                         job = MRenderJob(job_id, job_info.type, job_info.name, job_info.files, chunks = job_info.chunks, priority = job_info.priority, blacklist = job_info.blacklist)
379                         
380                         for frame in job_info.frames:
381                                 frame = job.addFrame(frame.number, frame.command)
382                         
383                         self.server.addJob(job)
384                         
385                         headers={"job-id": job_id}
386                         
387                         if job.testStart():
388                                 self.server.stats("", "New job, missing files")
389                                 self.send_head(headers=headers)
390                         else:
391                                 self.server.stats("", "New job, started")
392                                 self.send_head(http.client.ACCEPTED, headers=headers)
393                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
394                 elif self.path == "/cancel":
395                         job_id = self.headers.get('job-id', "")
396                         
397                         job = self.server.getJobID(job_id)
398                         
399                         if job:
400                                 self.server.stats("", "Cancelling job")
401                                 self.server.removeJob(job)
402                                 self.send_head()
403                         else: 
404                                 # no such job id
405                                 self.send_head(http.client.NO_CONTENT)
406                                 
407                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
408                 elif self.path == "/clear":
409                         # cancel all jobs
410                         self.server.stats("", "Clearing jobs")
411                         self.server.clear()
412                                 
413                         self.send_head()
414                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
415                 elif self.path == "/reset":
416                         job_id = self.headers.get('job-id', "")
417                         job_frame = int(self.headers.get('job-frame', "-1"))
418                         all = bool(self.headers.get('reset-all', "False"))
419                         
420                         job = self.server.getJobID(job_id)
421                         
422                         if job:
423                                 if job_frame != -1:
424                                         
425                                         frame = job[job_frame]
426                                         if frame:
427                                                 self.server.stats("", "Reset job frame")
428                                                 frame.reset(all)
429                                                 self.send_head()
430                                         else:
431                                                 # no such frame
432                                                 self.send_head(http.client.NO_CONTENT)
433                                                 
434                                 else:
435                                         self.server.stats("", "Reset job")
436                                         job.reset(all)
437                                         self.send_head()
438                                         
439                         else: # job not found
440                                 self.send_head(http.client.NO_CONTENT)
441                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
442                 elif self.path == "/slave":
443                         length = int(self.headers['content-length'])
444                         job_frame_string = self.headers['job-frame']
445                         
446                         self.server.stats("", "New slave connected")
447                         
448                         slave_info = netrender.model.RenderSlave.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
449                         
450                         slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
451                         
452                         self.send_head(headers = {"slave-id": slave_id})
453                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
454                 elif self.path == "/log":
455                         slave_id = self.headers['slave-id']
456                         
457                         slave = self.server.getSeenSlave(slave_id)
458                         
459                         if slave: # only if slave id is valid
460                                 length = int(self.headers['content-length'])
461                                 
462                                 log_info = netrender.model.LogFile.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
463                                 
464                                 job = self.server.getJobID(log_info.job_id)
465                                 
466                                 if job:
467                                         self.server.stats("", "Log announcement")
468                                         job.addLog(log_info.frames)
469                                         self.send_head(http.client.OK)
470                                 else:
471                                         # no such job id
472                                         self.send_head(http.client.NO_CONTENT)
473                         else: # invalid slave id
474                                 self.send_head(http.client.NO_CONTENT)  
475         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
476         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
477         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
478         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
479         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
480         def do_PUT(self):
481                 
482                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
483                 if self.path == "/file":
484                         self.server.stats("", "Receiving job")
485                         
486                         length = int(self.headers['content-length'])
487                         job_id = self.headers['job-id']
488                         job_file = self.headers['job-file']
489                         
490                         job = self.server.getJobID(job_id)
491                         
492                         if job:
493                                 
494                                 render_file = job.files_map.get(job_file, None)
495                                 
496                                 if render_file:
497                                         main_file = job.files[0][0] # filename of the first file
498                                         
499                                         main_path, main_name = os.path.split(main_file)
500                                         
501                                         if job_file != main_file:
502                                                 file_path = prefixPath(job.save_path, job_file, main_path)
503                                         else:
504                                                 file_path = job.save_path + main_name
505                                         
506                                         buf = self.rfile.read(length)
507                                         
508                                         # add same temp file + renames as slave
509                                         
510                                         f = open(file_path, "wb")
511                                         f.write(buf)
512                                         f.close()
513                                         del buf
514                                         
515                                         render_file.filepath = file_path # set the new path
516                                         
517                                         if job.testStart():
518                                                 self.server.stats("", "File upload, starting job")
519                                                 self.send_head(http.client.OK)
520                                         else:
521                                                 self.server.stats("", "File upload, file missings")
522                                                 self.send_head(http.client.ACCEPTED)
523                                 else: # invalid file
524                                         self.send_head(http.client.NO_CONTENT)
525                         else: # job not found
526                                 self.send_head(http.client.NO_CONTENT)
527                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
528                 elif self.path == "/render":
529                         self.server.stats("", "Receiving render result")
530                         
531                         slave_id = self.headers['slave-id']
532                         
533                         slave = self.server.getSeenSlave(slave_id)
534                         
535                         if slave: # only if slave id is valid
536                                 job_id = self.headers['job-id']
537                                 
538                                 job = self.server.getJobID(job_id)
539                                 
540                                 if job:
541                                         job_frame = int(self.headers['job-frame'])
542                                         job_result = int(self.headers['job-result'])
543                                         job_time = float(self.headers['job-time'])
544                                         
545                                         frame = job[job_frame]
546                                         
547                                         if frame:
548                                                 if job.type == netrender.model.JOB_BLENDER:
549                                                         if job_result == DONE:
550                                                                 length = int(self.headers['content-length'])
551                                                                 buf = self.rfile.read(length)
552                                                                 f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
553                                                                 f.write(buf)
554                                                                 f.close()
555                                                         
556                                                                 del buf
557                                                         elif job_result == ERROR:
558                                                                 # blacklist slave on this job on error
559                                                                 job.blacklist.append(slave.id)
560                                                 
561                                                 self.server.stats("", "Receiving result")
562                                                 
563                                                 slave.finishedFrame(job_frame)
564                                                 
565                                                 frame.status = job_result
566                                                 frame.time = job_time
567                 
568                                                 job.testFinished()
569                                                 
570                                                 self.send_head()
571                                         else: # frame not found
572                                                 self.send_head(http.client.NO_CONTENT)
573                                 else: # job not found
574                                         self.send_head(http.client.NO_CONTENT)
575                         else: # invalid slave id
576                                 self.send_head(http.client.NO_CONTENT)
577                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
578                 elif self.path == "/log":
579                         self.server.stats("", "Receiving log file")
580                         
581                         job_id = self.headers['job-id']
582                         
583                         job = self.server.getJobID(job_id)
584                         
585                         if job:
586                                 job_frame = int(self.headers['job-frame'])
587                                 
588                                 frame = job[job_frame]
589                                 
590                                 if frame and frame.log_path:
591                                         length = int(self.headers['content-length'])
592                                         buf = self.rfile.read(length)
593                                         f = open(frame.log_path, 'ab')
594                                         f.write(buf)
595                                         f.close()
596                                                 
597                                         del buf
598                                         
599                                         self.server.getSeenSlave(self.headers['slave-id'])
600                                         
601                                         self.send_head()
602                                 else: # frame not found
603                                         self.send_head(http.client.NO_CONTENT)
604                         else: # job not found
605                                 self.send_head(http.client.NO_CONTENT)
606
607 class RenderMasterServer(http.server.HTTPServer):
608         def __init__(self, address, handler_class, path):
609                 super().__init__(address, handler_class)
610                 self.jobs = []
611                 self.jobs_map = {}
612                 self.slaves = []
613                 self.slaves_map = {}
614                 self.job_id = 0
615                 self.path = path + "master_" + str(os.getpid()) + os.sep
616                 
617                 self.slave_timeout = 2
618                 
619                 self.balancer = netrender.balancing.Balancer()
620                 self.balancer.addRule(netrender.balancing.RatingUsage())
621                 self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
622                 self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9))
623                 self.balancer.addPriority(netrender.balancing.NewJobPriority())
624                 self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2))
625                 
626                 if not os.path.exists(self.path):
627                         os.mkdir(self.path)
628         
629         def nextJobID(self):
630                 self.job_id += 1
631                 return str(self.job_id)
632         
633         def addSlave(self, name, address, stats):
634                 slave = MRenderSlave(name, address, stats)
635                 self.slaves.append(slave)
636                 self.slaves_map[slave.id] = slave
637                 
638                 return slave.id
639         
640         def removeSlave(self, slave):
641                 self.slaves.remove(slave)
642                 self.slaves_map.pop(slave.id)
643         
644         def getSlave(self, slave_id):
645                 return self.slaves_map.get(slave_id, None)
646         
647         def getSeenSlave(self, slave_id):
648                 slave = self.getSlave(slave_id)
649                 if slave:
650                         slave.seen()
651                         
652                 return slave
653         
654         def timeoutSlaves(self):
655                 removed = []
656                 
657                 t = time.time()
658                 
659                 for slave in self.slaves:
660                         if (t - slave.last_seen) / 60 > self.slave_timeout:
661                                 removed.append(slave)
662                                 
663                                 if slave.job:
664                                         for f in slave.job_frames:
665                                                 slave.job[f].status = ERROR
666                                 
667                 for slave in removed:
668                         self.removeSlave(slave)
669         
670         def updateUsage(self):
671                 blend = 0.5
672                 for job in self.jobs:
673                         job.usage *= (1 - blend)
674                         
675                 if self.slaves:
676                         slave_usage = blend / self.countSlaves()
677                         
678                         for slave in self.slaves:
679                                 if slave.job:
680                                         slave.job.usage += slave_usage
681                 
682         
683         def clear(self):
684                 removed = self.jobs[:]
685                 
686                 for job in removed:
687                         self.removeJob(job)
688         
689         def balance(self):
690                 self.balancer.balance(self.jobs)
691         
692         def countJobs(self, status = JOB_QUEUED):
693                 total = 0
694                 for j in self.jobs:
695                         if j.status == status:
696                                 total += 1
697                 
698                 return total
699         
700         def countSlaves(self):
701                 return len(self.slaves)
702         
703         def removeJob(self, job):
704                 self.jobs.remove(job)
705                 self.jobs_map.pop(job.id)
706                 
707                 for slave in self.slaves:
708                         if slave.job == job:
709                                 slave.job = None
710                                 slave.job_frames = []
711         
712         def addJob(self, job):
713                 self.jobs.append(job)
714                 self.jobs_map[job.id] = job
715                 
716                 # create job directory
717                 job.save_path = self.path + "job_" + job.id + os.sep
718                 if not os.path.exists(job.save_path):
719                         os.mkdir(job.save_path)
720                         
721                 job.save()
722         
723         def getJobID(self, id):
724                 return self.jobs_map.get(id, None)
725         
726         def __iter__(self):
727                 for job in self.jobs:
728                         yield job
729         
730         def newDispatch(self, slave_id):
731                 if self.jobs:
732                         for job in self.jobs:
733                                 if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist:
734                                         return job, job.getFrames()
735                 
736                 return None, None
737
738 def runMaster(address, broadcast, path, update_stats, test_break):
739                 httpd = RenderMasterServer(address, RenderHandler, path)
740                 httpd.timeout = 1
741                 httpd.stats = update_stats
742                 
743                 if broadcast:
744                         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
745                         s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
746
747                         start_time = time.time()
748                         
749                 while not test_break():
750                         httpd.handle_request()
751                         
752                         if time.time() - start_time >= 10: # need constant here
753                                 httpd.timeoutSlaves()
754                                 
755                                 httpd.updateUsage()
756                                 
757                                 if broadcast:
758                                                 print("broadcasting address")
759                                                 s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
760                                                 start_time = time.time()