netrender: draft code for cluster usage per job calculations. Eventually, this will...
[blender.git] / release / io / netrender / master.py
1 import sys, os
2 import http, http.client, http.server, urllib, socket
3 import subprocess, shutil, time, hashlib
4
5 from netrender.utils import *
6 import netrender.model
7 import netrender.balancing
8 import netrender.master_html
9
10 class MRenderFile:
11         def __init__(self, filepath, start, end):
12                 self.filepath = filepath
13                 self.start = start
14                 self.end = end
15                 self.found = False
16         
17         def test(self):
18                 self.found = os.path.exists(self.filepath)
19                 return self.found
20
21
22 class MRenderSlave(netrender.model.RenderSlave):
23         def __init__(self, name, address, stats):
24                 super().__init__()
25                 self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest()
26                 self.name = name
27                 self.address = address
28                 self.stats = stats
29                 self.last_seen = time.time()
30                 
31                 self.job = None
32                 self.job_frames = []
33                 
34                 netrender.model.RenderSlave._slave_map[self.id] = self
35
36         def seen(self):
37                 self.last_seen = time.time()
38
39 class MRenderJob(netrender.model.RenderJob):
40         def __init__(self, job_id, name, files, chunks = 1, priority = 1, credits = 100.0, blacklist = []):
41                 super().__init__()
42                 self.id = job_id
43                 self.name = name
44                 self.files = files
45                 self.frames = []
46                 self.chunks = chunks
47                 self.priority = priority
48                 self.credits = credits
49                 self.blacklist = blacklist
50                 self.last_dispatched = time.time()
51         
52                 # special server properties
53                 self.usage = 0.0
54                 self.last_update = 0
55                 self.save_path = ""
56                 self.files_map = {path: MRenderFile(path, start, end) for path, start, end in files}
57                 self.status = JOB_WAITING
58         
59         def save(self):
60                 if self.save_path:
61                         f = open(self.save_path + "job.txt", "w")
62                         f.write(repr(self.serialize()))
63                         f.close()
64         
65         def testStart(self):
66                 for f in self.files_map.values():
67                         if not f.test():
68                                 return False
69                 
70                 self.start()
71                 return True
72         
73         def testFinished(self):
74                 for f in self.frames:
75                         if f.status == QUEUED or f.status == DISPATCHED:
76                                 break
77                 else:
78                         self.status = JOB_FINISHED
79
80         def start(self):
81                 self.status = JOB_QUEUED
82
83         def update(self):
84                 if self.last_update == 0:
85                         self.credits += (time.time() - self.last_dispatched) / 60
86                 else:
87                         self.credits += (time.time() - self.last_update) / 60
88
89                 self.last_update = time.time()
90         
91         def addLog(self, frames):
92                 log_name = "_".join(("%04d" % f for f in frames)) + ".log"
93                 log_path = self.save_path + log_name
94                 
95                 for number in frames:
96                         frame = self[number]
97                         if frame:
98                                 frame.log_path = log_path
99         
100         def addFrame(self, frame_number):
101                 frame = MRenderFrame(frame_number)
102                 self.frames.append(frame)
103                 return frame
104                 
105         def reset(self, all):
106                 for f in self.frames:
107                         f.reset(all)
108         
109         def getFrames(self):
110                 frames = []
111                 for f in self.frames:
112                         if f.status == QUEUED:
113                                 self.credits -= 1 # cost of one frame
114                                 self.last_dispatched = time.time()
115                                 frames.append(f)
116                                 if len(frames) >= self.chunks:
117                                         break
118                 
119                 return frames
120
121 class MRenderFrame(netrender.model.RenderFrame):
122         def __init__(self, frame):
123                 super().__init__()
124                 self.number = frame
125                 self.slave = None
126                 self.time = 0
127                 self.status = QUEUED
128                 self.log_path = None
129                 
130         def reset(self, all):
131                 if all or self.status == ERROR:
132                         self.slave = None
133                         self.time = 0
134                         self.status = QUEUED
135
136
137 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
138 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
139 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
140 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
141
142 class RenderHandler(http.server.BaseHTTPRequestHandler):
143         def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"):
144                 self.send_response(code)
145                 self.send_header("Content-type", content)
146                 
147                 for key, value in headers.items():
148                         self.send_header(key, value)
149                 
150                 self.end_headers()
151
152         def do_HEAD(self):
153                 print(self.path)
154         
155                 if self.path == "/status":
156                         job_id = self.headers.get('job-id', "")
157                         job_frame = int(self.headers.get('job-frame', -1))
158                         
159                         if job_id:
160                                 print("status:", job_id, "\n")
161                                 
162                                 job = self.server.getJobByID(job_id)
163                                 if job:
164                                         if job_frame != -1:
165                                                 frame = job[frame]
166                                                 
167                                                 if not frame:
168                                                         # no such frame
169                                                         self.send_heat(http.client.NO_CONTENT)
170                                                         return
171                                 else:
172                                         # no such job id
173                                         self.send_head(http.client.NO_CONTENT)
174                                         return
175                         
176                         self.send_head()
177         
178         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
179         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
180         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
181         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
182         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
183         
184         def do_GET(self):
185                 print(self.path)
186                 
187                 if self.path == "/version":
188                         self.send_head()
189                         self.server.stats("", "New client connection")
190                         self.wfile.write(VERSION)
191                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
192                 elif self.path == "/render":
193                         job_id = self.headers['job-id']
194                         job_frame = int(self.headers['job-frame'])
195                         print("render:", job_id, job_frame)
196                         
197                         job = self.server.getJobByID(job_id)
198                         
199                         if job:
200                                 frame = job[job_frame]
201                                 
202                                 if frame:
203                                         if frame.status in (QUEUED, DISPATCHED):
204                                                 self.send_head(http.client.ACCEPTED)
205                                         elif frame.status == DONE:
206                                                 self.server.stats("", "Sending result back to client")
207                                                 f = open(job.save_path + "%04d" % job_frame + ".exr", 'rb')
208                                                 
209                                                 self.send_head()
210                                                 
211                                                 shutil.copyfileobj(f, self.wfile)
212                                                 
213                                                 f.close()
214                                         elif frame.status == ERROR:
215                                                 self.send_head(http.client.PARTIAL_CONTENT)
216                                 else:
217                                         # no such frame
218                                         self.send_head(http.client.NO_CONTENT)
219                         else:
220                                 # no such job id
221                                 self.send_head(http.client.NO_CONTENT)
222                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
223                 elif self.path == "/log":
224                         job_id = self.headers['job-id']
225                         job_frame = int(self.headers['job-frame'])
226                         print("log:", job_id, job_frame)
227                         
228                         job = self.server.getJobByID(job_id)
229                         
230                         if job:
231                                 frame = job[job_frame]
232                                 
233                                 if frame:
234                                         if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
235                                                 self.send_head(http.client.PROCESSING)
236                                         else:
237                                                 self.server.stats("", "Sending log back to client")
238                                                 f = open(frame.log_path, 'rb')
239                                                 
240                                                 self.send_head()
241                                                 
242                                                 shutil.copyfileobj(f, self.wfile)
243                                                 
244                                                 f.close()
245                                 else:
246                                         # no such frame
247                                         self.send_head(http.client.NO_CONTENT)
248                         else:
249                                 # no such job id
250                                 self.send_head(http.client.NO_CONTENT)
251                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
252                 elif self.path == "/status":
253                         job_id = self.headers.get('job-id', "")
254                         job_frame = int(self.headers.get('job-frame', -1))
255                         
256                         if job_id:
257                                 print("status:", job_id, "\n")
258                                 
259                                 job = self.server.getJobByID(job_id)
260                                 if job:
261                                         if job_frame != -1:
262                                                 frame = job[frame]
263                                                 
264                                                 if frame:
265                                                         message = frame.serialize()
266                                                 else:
267                                                         # no such frame
268                                                         self.send_heat(http.client.NO_CONTENT)
269                                                         return
270                                         else:
271                                                 message = job.serialize()
272                                 else:
273                                         # no such job id
274                                         self.send_head(http.client.NO_CONTENT)
275                                         return
276                         else: # status of all jobs
277                                 message = []
278                                 
279                                 for job in self.server:
280                                         message.append(job.serialize())
281                         
282                         self.send_head()
283                         self.wfile.write(bytes(repr(message), encoding='utf8'))
284                         
285                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
286                 elif self.path == "/job":
287                         self.server.update()
288                         
289                         slave_id = self.headers['slave-id']
290                         
291                         print("slave-id", slave_id)
292                         
293                         slave = self.server.updateSlave(slave_id)
294                         
295                         if slave: # only if slave id is valid
296                                 job, frames = self.server.getNewJob(slave_id)
297                                 
298                                 if job and frames:
299                                         for f in frames:
300                                                 print("dispatch", f.number)
301                                                 f.status = DISPATCHED
302                                                 f.slave = slave
303                                         
304                                         slave.job = job
305                                         slave.job_frames = [f.number for f in frames]
306                                         
307                                         self.send_head(headers={"job-id": job.id})
308                                         
309                                         message = job.serialize(frames)
310                                         
311                                         self.wfile.write(bytes(repr(message), encoding='utf8'))
312                                         
313                                         self.server.stats("", "Sending job frame to render node")
314                                 else:
315                                         # no job available, return error code
316                                         self.send_head(http.client.ACCEPTED)
317                         else: # invalid slave id
318                                 self.send_head(http.client.NO_CONTENT)
319                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
320                 elif self.path == "/file":
321                         slave_id = self.headers['slave-id']
322                         
323                         slave = self.server.updateSlave(slave_id)
324                         
325                         if slave: # only if slave id is valid
326                                 job_id = self.headers['job-id']
327                                 job_file = self.headers['job-file']
328                                 print("job:", job_id, "\n")
329                                 print("file:", job_file, "\n")
330                                 
331                                 job = self.server.getJobByID(job_id)
332                                 
333                                 if job:
334                                         render_file = job.files_map.get(job_file, None)
335                                         
336                                         if render_file:
337                                                 self.server.stats("", "Sending file to render node")
338                                                 f = open(render_file.filepath, 'rb')
339                                                 
340                                                 self.send_head()
341                                                 shutil.copyfileobj(f, self.wfile)
342                                                 
343                                                 f.close()
344                                         else:
345                                                 # no such file
346                                                 self.send_head(http.client.NO_CONTENT)
347                                 else:
348                                         # no such job id
349                                         self.send_head(http.client.NO_CONTENT)
350                         else: # invalid slave id
351                                 self.send_head(http.client.NO_CONTENT)
352                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
353                 elif self.path == "/slave":
354                         message = []
355                         
356                         for slave in self.server.slaves:
357                                 message.append(slave.serialize())
358                         
359                         self.send_head()
360                         
361                         self.wfile.write(bytes(repr(message), encoding='utf8'))
362                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
363                 else:
364                         # hand over the rest to the html section
365                         netrender.master_html.get(self)
366
367         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
368         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
369         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
370         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
371         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
372         def do_POST(self):
373                 print(self.path)
374         
375                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
376                 if self.path == "/job":
377                         print("posting job info")
378                         self.server.stats("", "Receiving job")
379                         
380                         length = int(self.headers['content-length'])
381                         
382                         job_info = netrender.model.RenderJob.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
383                         
384                         job_id = self.server.nextJobID()
385                         
386                         print(job_info.files)
387                         
388                         job = MRenderJob(job_id, job_info.name, job_info.files, chunks = job_info.chunks, priority = job_info.priority, blacklist = job_info.blacklist)
389                         
390                         for frame in job_info.frames:
391                                 frame = job.addFrame(frame.number)
392                         
393                         self.server.addJob(job)
394                         
395                         headers={"job-id": job_id}
396                         
397                         if job.testStart():
398                                 self.send_head(headers=headers)
399                         else:
400                                 self.send_head(http.client.ACCEPTED, headers=headers)
401                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
402                 elif self.path == "/cancel":
403                         job_id = self.headers.get('job-id', "")
404                         if job_id:
405                                 print("cancel:", job_id, "\n")
406                                 self.server.removeJob(job_id)
407                         else: # cancel all jobs
408                                 self.server.clear()
409                                 
410                         self.send_head()
411                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
412                 elif self.path == "/reset":
413                         job_id = self.headers.get('job-id', "")
414                         job_frame = int(self.headers.get('job-frame', "-1"))
415                         all = bool(self.headers.get('reset-all', "False"))
416                         
417                         job = self.server.getJobByID(job_id)
418                         
419                         if job:
420                                 if job_frame != -1:
421                                         job[job_frame].reset(all)
422                                 else:
423                                         job.reset(all)
424                                         
425                                 self.send_head()
426                         else: # job not found
427                                 self.send_head(http.client.NO_CONTENT)
428                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
429                 elif self.path == "/slave":
430                         length = int(self.headers['content-length'])
431                         job_frame_string = self.headers['job-frame']
432                         
433                         slave_info = netrender.model.RenderSlave.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
434                         
435                         slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
436                         
437                         self.send_head(headers = {"slave-id": slave_id})
438                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
439                 elif self.path == "/log":
440                         slave_id = self.headers['slave-id']
441                         
442                         slave = self.server.updateSlave(slave_id)
443                         
444                         if slave: # only if slave id is valid
445                                 length = int(self.headers['content-length'])
446                                 
447                                 log_info = netrender.model.LogFile.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
448                                 
449                                 job = self.server.getJobByID(log_info.job_id)
450                                 
451                                 if job:
452                                         job.addLog(log_info.frames)
453                                         self.send_head(http.client.OK)
454                                 else:
455                                         # no such job id
456                                         self.send_head(http.client.NO_CONTENT)
457                         else: # invalid slave id
458                                 self.send_head(http.client.NO_CONTENT)  
459         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
460         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
461         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
462         # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
463         # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
464         def do_PUT(self):
465                 print(self.path)
466                 
467                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
468                 if self.path == "/file":
469                         print("writing blend file")
470                         self.server.stats("", "Receiving job")
471                         
472                         length = int(self.headers['content-length'])
473                         job_id = self.headers['job-id']
474                         job_file = self.headers['job-file']
475                         
476                         job = self.server.getJobByID(job_id)
477                         
478                         if job:
479                                 
480                                 render_file = job.files_map.get(job_file, None)
481                                 
482                                 if render_file:
483                                         main_file = job.files[0][0] # filename of the first file
484                                         
485                                         main_path, main_name = os.path.split(main_file)
486                                         
487                                         if job_file != main_file:
488                                                 file_path = prefixPath(job.save_path, job_file, main_path)
489                                         else:
490                                                 file_path = job.save_path + main_name
491                                         
492                                         buf = self.rfile.read(length)
493                                         
494                                         # add same temp file + renames as slave
495                                         
496                                         f = open(file_path, "wb")
497                                         f.write(buf)
498                                         f.close()
499                                         del buf
500                                         
501                                         render_file.filepath = file_path # set the new path
502                                         
503                                         if job.testStart():
504                                                 self.send_head(http.client.OK)
505                                         else:
506                                                 self.send_head(http.client.ACCEPTED)
507                                 else: # invalid file
508                                         self.send_head(http.client.NO_CONTENT)
509                         else: # job not found
510                                 self.send_head(http.client.NO_CONTENT)
511                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
512                 elif self.path == "/render":
513                         print("writing result file")
514                         self.server.stats("", "Receiving render result")
515                         
516                         slave_id = self.headers['slave-id']
517                         
518                         slave = self.server.updateSlave(slave_id)
519                         
520                         if slave: # only if slave id is valid
521                                 job_id = self.headers['job-id']
522                                 
523                                 job = self.server.getJobByID(job_id)
524                                 
525                                 if job:
526                                         job_frame = int(self.headers['job-frame'])
527                                         job_result = int(self.headers['job-result'])
528                                         job_time = float(self.headers['job-time'])
529                                         
530                                         frame = job[job_frame]
531
532                                         if job_result == DONE:
533                                                 length = int(self.headers['content-length'])
534                                                 buf = self.rfile.read(length)
535                                                 f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
536                                                 f.write(buf)
537                                                 f.close()
538                                                 
539                                                 del buf
540                                         elif job_result == ERROR:
541                                                 # blacklist slave on this job on error
542                                                 job.blacklist.append(slave.id)
543                                         
544                                         slave.job_frames.remove(job_frame)
545                                         if not slave.job_frames:
546                                                 slave.job = None
547                                         
548                                         frame.status = job_result
549                                         frame.time = job_time
550
551                                         job.testFinished()
552                         
553                                         self.server.updateSlave(self.headers['slave-id'])
554                                         
555                                         self.send_head()
556                                 else: # job not found
557                                         self.send_head(http.client.NO_CONTENT)
558                         else: # invalid slave id
559                                 self.send_head(http.client.NO_CONTENT)
560                 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
561                 elif self.path == "/log":
562                         print("writing log file")
563                         self.server.stats("", "Receiving log file")
564                         
565                         job_id = self.headers['job-id']
566                         
567                         job = self.server.getJobByID(job_id)
568                         
569                         if job:
570                                 job_frame = int(self.headers['job-frame'])
571                                 
572                                 frame = job[job_frame]
573                                 
574                                 if frame and frame.log_path:
575                                         length = int(self.headers['content-length'])
576                                         buf = self.rfile.read(length)
577                                         f = open(frame.log_path, 'ab')
578                                         f.write(buf)
579                                         f.close()
580                                                 
581                                         del buf
582                                         
583                                         self.server.updateSlave(self.headers['slave-id'])
584                                         
585                                         self.send_head()
586                                 else: # frame not found
587                                         self.send_head(http.client.NO_CONTENT)
588                         else: # job not found
589                                 self.send_head(http.client.NO_CONTENT)
590
591 class RenderMasterServer(http.server.HTTPServer):
592         def __init__(self, address, handler_class, path):
593                 super().__init__(address, handler_class)
594                 self.jobs = []
595                 self.jobs_map = {}
596                 self.slaves = []
597                 self.slaves_map = {}
598                 self.job_id = 0
599                 self.path = path + "master_" + str(os.getpid()) + os.sep
600                 
601                 self.slave_timeout = 2
602                 
603                 self.first_usage = True
604                 
605                 self.balancer = netrender.balancing.Balancer()
606                 self.balancer.addRule(netrender.balancing.RatingCredit())
607                 self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
608                 self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves))
609                 self.balancer.addPriority(netrender.balancing.NewJobPriority())
610                 self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2))
611                 
612                 if not os.path.exists(self.path):
613                         os.mkdir(self.path)
614         
615         def nextJobID(self):
616                 self.job_id += 1
617                 return str(self.job_id)
618         
619         def addSlave(self, name, address, stats):
620                 slave = MRenderSlave(name, address, stats)
621                 self.slaves.append(slave)
622                 self.slaves_map[slave.id] = slave
623                 
624                 return slave.id
625         
626         def removeSlave(self, slave):
627                 self.slaves.remove(slave)
628                 self.slaves_map.pop(slave.id)
629         
630         def getSlave(self, slave_id):
631                 return self.slaves_map.get(slave_id, None)
632         
633         def updateSlave(self, slave_id):
634                 slave = self.getSlave(slave_id)
635                 if slave:
636                         slave.seen()
637                         
638                 return slave
639         
640         def timeoutSlaves(self):
641                 removed = []
642                 
643                 t = time.time()
644                 
645                 for slave in self.slaves:
646                         if (t - slave.last_seen) / 60 > self.slave_timeout:
647                                 removed.append(slave)
648                                 
649                                 if slave.job:
650                                         for f in slave.job_frames:
651                                                 slave.job[f].status = ERROR
652                                 
653                 for slave in removed:
654                         self.removeSlave(slave)
655         
656         def updateUsage(self):
657                 m = 1.0
658                 
659                 if not self.first_usage:
660                         for job in self.jobs:
661                                 job.usage *= 0.5
662                         
663                         m = 0.5
664                 else:
665                         self.first_usage = False
666                         
667                 if self.slaves:
668                         slave_usage = m / self.countSlaves()
669                         
670                         for slave in self.slaves:
671                                 if slave.job:
672                                         slave.job.usage += slave_usage
673                 
674         
675         def clear(self):
676                 removed = self.jobs[:]
677                 
678                 for job in removed:
679                         self.removeJob(job)
680         
681         def update(self):
682                 for job in self.jobs:
683                         job.update()
684                 self.balancer.balance(self.jobs)
685         
686         def countJobs(self, status = JOB_QUEUED):
687                 total = 0
688                 for j in self.jobs:
689                         if j.status == status:
690                                 total += 1
691                 
692                 return total
693         
694         def countSlaves(self):
695                 return len(self.slaves)
696         
697         def removeJob(self, id):
698                 job = self.jobs_map.pop(id)
699
700                 if job:
701                         self.jobs.remove(job)
702                         
703                         for slave in self.slaves:
704                                 if slave.job == job:
705                                         slave.job = None
706                                         slave.job_frames = []
707         
708         def addJob(self, job):
709                 self.jobs.append(job)
710                 self.jobs_map[job.id] = job
711                 
712                 # create job directory
713                 job.save_path = self.path + "job_" + job.id + os.sep
714                 if not os.path.exists(job.save_path):
715                         os.mkdir(job.save_path)
716                         
717                 job.save()
718         
719         def getJobByID(self, id):
720                 return self.jobs_map.get(id, None)
721         
722         def __iter__(self):
723                 for job in self.jobs:
724                         yield job
725         
726         def getNewJob(self, slave_id):
727                 if self.jobs:
728                         for job in self.jobs:
729                                 if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist:
730                                         return job, job.getFrames()
731                 
732                 return None, None
733
734 def runMaster(address, broadcast, path, update_stats, test_break):
735                 httpd = RenderMasterServer(address, RenderHandler, path)
736                 httpd.timeout = 1
737                 httpd.stats = update_stats
738                 
739                 if broadcast:
740                         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
741                         s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
742
743                         start_time = time.time()
744                         
745                 while not test_break():
746                         httpd.handle_request()
747                         
748                         if time.time() - start_time >= 10: # need constant here
749                                 httpd.timeoutSlaves()
750                                 
751                                 httpd.updateUsage()
752                                 
753                                 if broadcast:
754                                                 print("broadcasting address")
755                                                 s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
756                                                 start_time = time.time()