netrender: only one log file for each chunk
authorMartin Poirier <theeth@yahoo.com>
Fri, 18 Sep 2009 03:29:50 +0000 (03:29 +0000)
committerMartin Poirier <theeth@yahoo.com>
Fri, 18 Sep 2009 03:29:50 +0000 (03:29 +0000)
release/io/netrender/master.py
release/io/netrender/model.py
release/io/netrender/slave.py

index 8f59ef37069fe509dc892790e94f1e33ba6cdfab..58af47d6240c9c51fd90e4463e70be7ee972008d 100644 (file)
@@ -82,6 +82,15 @@ class MRenderJob(netrender.model.RenderJob):
                self.credits += (time.time() - self.last_dispatched) / 60
                self.last_dispatched = time.time()
        
+       def addLog(self, frames):
+               log_name = "_".join(("%04d" % f for f in frames)) + ".log"
+               log_path = self.save_path + log_name
+               
+               for number in frames:
+                       frame = self[number]
+                       if frame:
+                               frame.log_path = log_path
+       
        def addFrame(self, frame_number):
                frame = MRenderFrame(frame_number)
                self.frames.append(frame)
@@ -117,6 +126,7 @@ class MRenderFrame(netrender.model.RenderFrame):
                self.slave = None
                self.time = 0
                self.status = QUEUED
+               self.log_path = None
                
        def reset(self, all):
                if all or self.status == ERROR:
@@ -222,11 +232,11 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
                                frame = job[job_frame]
                                
                                if frame:
-                                       if frame.status in (QUEUED, DISPATCHED):
+                                       if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
                                                self.send_head(http.client.PROCESSING)
                                        else:
                                                self.server.stats("", "Sending log back to client")
-                                               f = open(job.save_path + "%04d" % job_frame + ".log", 'rb')
+                                               f = open(frame.log_path, 'rb')
                                                
                                                self.send_head()
                                                
@@ -420,7 +430,27 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
                        slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
                        
                        self.send_head(headers = {"slave-id": slave_id})
-       
+               # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
+               elif self.path == "log":
+                       slave_id = self.headers['slave-id']
+                       
+                       slave = self.server.updateSlave(slave_id)
+                       
+                       if slave: # only if slave id is valid
+                               length = int(self.headers['content-length'])
+                               
+                               log_info = netrender.model.LogFile.materialize(eval(str(self.rfile.read(length), encoding='utf8')))
+                               
+                               job = self.server.getJobByID(log_info.job_id)
+                               
+                               if job:
+                                       job.addLog(log_info.frames)
+                                       self.send_head(http.client.OK)
+                               else:
+                                       # no such job id
+                                       self.send_head(http.client.NO_CONTENT)
+                       else: # invalid slave id
+                               self.send_head(http.client.NO_CONTENT)  
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
        # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
        # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
@@ -526,19 +556,24 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
                        job = self.server.getJobByID(job_id)
                        
                        if job:
-                               length = int(self.headers['content-length'])
                                job_frame = int(self.headers['job-frame'])
                                
-                               buf = self.rfile.read(length)
-                               f = open(job.save_path + "%04d" % job_frame + ".log", 'ab')
-                               f.write(buf)
-                               f.close()
-                                       
-                               del buf
-                               
-                               self.server.updateSlave(self.headers['slave-id'])
+                               frame = job[job_frame]
                                
-                               self.send_head()
+                               if frame and frame.log_path:
+                                       length = int(self.headers['content-length'])
+                                       buf = self.rfile.read(length)
+                                       f = open(frame.log_path, 'ab')
+                                       f.write(buf)
+                                       f.close()
+                                               
+                                       del buf
+                                       
+                                       self.server.updateSlave(self.headers['slave-id'])
+                                       
+                                       self.send_head()
+                               else: # frame not found
+                                       self.send_head(http.client.NO_CONTENT)
                        else: # job not found
                                self.send_head(http.client.NO_CONTENT)
 
index 7803ad034a748415e38a853a9feb6b14ef6baa13..924493fd34a699bcfdae07c0e0c08a8859a8e59c 100644 (file)
@@ -4,6 +4,28 @@ import subprocess, shutil, time, hashlib
 
 from netrender.utils import *
 
+class LogFile:
+       def __init__(self, job_id = 0, frames = []):
+               self.job_id = job_id
+               self.frames = frames
+       
+       def serialize(self):
+               return  {
+                                                       "job_id": self.job_id,
+                                                       "frames": self.frames
+                                               }
+       
+       @staticmethod
+       def materialize(data):
+               if not data:
+                       return None
+               
+               logfile = LogFile()
+               logfile.job_id = data["job_id"]
+               logfile.frames = data["frames"]
+               
+               return logfile
+
 class RenderSlave:
        _slave_map = {}
        
index 1f4ef3a3616cdd51b6d361fb4318aef485178270..ecdbf69591acd4e61f935201eaa228259ee50052 100644 (file)
@@ -117,8 +117,14 @@ def render_slave(engine, scene):
                                        print("frame", frame.number)
                                        frame_args += ["-f", str(frame.number)]
                                
+                               # announce log to master
+                               logfile = netrender.model.LogFile(job.id, [frame.number for frame in job.frames])
+                               conn.request("POST", "log", bytes(repr(logfile.serialize()), encoding='utf8'), headers={"slave-id":slave_id})
+                               response = conn.getresponse()
                                
+                               first_frame = job.frames[0].number
                                
+                               # start render
                                start_t = time.time()
                                
                                val = SetErrorMode()
@@ -136,13 +142,14 @@ def render_slave(engine, scene):
                                        cancelled = engine.test_break()
                                        if current_t - run_t > CANCEL_POLL_SPEED:
                                                
-                                               # update logs. Eventually, it should support one log file for many frames
-                                               for frame in job.frames:
-                                                       headers["job-frame"] = str(frame.number)
+                                               # update logs if needed
+                                               if stdout:
+                                                       # (only need to update on one frame, they are linked
+                                                       headers["job-frame"] = str(first_frame)
                                                        conn.request("PUT", "log", stdout, headers=headers)
                                                        response = conn.getresponse()
-                                               
-                                               stdout = bytes()
+                                                       
+                                                       stdout = bytes()
                                                
                                                run_t = current_t
                                                if testCancel(conn, job.id):
@@ -164,10 +171,10 @@ def render_slave(engine, scene):
                                
                                # flush the rest of the logs
                                if stdout:
-                                       for frame in job.frames:
-                                               headers["job-frame"] = str(frame.number)
-                                               conn.request("PUT", "log", stdout, headers=headers)
-                                               response = conn.getresponse()
+                                       # (only need to update on one frame, they are linked
+                                       headers["job-frame"] = str(first_frame)
+                                       conn.request("PUT", "log", stdout, headers=headers)
+                                       response = conn.getresponse()
                                
                                headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)}