netrender: first draft for process jobs, to be able to run arbitrary commands on...
[blender.git] / release / scripts / io / netrender / slave.py
1 import sys, os, platform
2 import http, http.client, http.server, urllib
3 import subprocess, time
4
5 from netrender.utils import *
6 import netrender.model
7
8 CANCEL_POLL_SPEED = 2
9 MAX_TIMEOUT = 10
10 INCREMENT_TIMEOUT = 1
11
12 if platform.system() == 'Windows' and platform.version() >= '5': # Error mode is only available on Win2k or higher, that's version 5
13         import ctypes
14         def SetErrorMode():
15                 val = ctypes.windll.kernel32.SetErrorMode(0x0002)
16                 ctypes.windll.kernel32.SetErrorMode(val | 0x0002)
17                 return val
18         
19         def RestoreErrorMode(val):
20                 ctypes.windll.kernel32.SetErrorMode(val)
21 else:
22         def SetErrorMode():
23                 return 0
24                 
25         def RestoreErrorMode(val):
26                 pass
27
28 def slave_Info():
29         sysname, nodename, release, version, machine, processor = platform.uname()
30         slave = netrender.model.RenderSlave()
31         slave.name = nodename
32         slave.stats = sysname + " " + release + " " + machine + " " + processor
33         return slave
34
35 def testCancel(conn, job_id, frame_number):
36                 conn.request("HEAD", "/status", headers={"job-id":job_id, "job-frame": str(frame_number)})
37                 response = conn.getresponse()
38                 
39                 # cancelled if job isn't found anymore
40                 if response.status == http.client.NO_CONTENT:
41                         return True
42                 else:
43                         return False
44
45 def testFile(conn, job_id, slave_id, JOB_PREFIX, file_path, main_path = None):
46         job_full_path = prefixPath(JOB_PREFIX, file_path, main_path)
47         
48         if not os.path.exists(job_full_path):
49                 temp_path = JOB_PREFIX + "slave.temp.blend"
50                 conn.request("GET", "/file", headers={"job-id": job_id, "slave-id":slave_id, "job-file":file_path})
51                 response = conn.getresponse()
52                 
53                 if response.status != http.client.OK:
54                         return None # file for job not returned by server, need to return an error code to server
55                 
56                 f = open(temp_path, "wb")
57                 buf = response.read(1024)
58                 
59                 while buf:
60                         f.write(buf)
61                         buf = response.read(1024)
62                 
63                 f.close()
64                 
65                 os.renames(temp_path, job_full_path)
66                 
67         return job_full_path
68
69
70 def render_slave(engine, scene):
71         netsettings = scene.network_render
72         timeout = 1
73         
74         engine.update_stats("", "Network render node initiation")
75         
76         conn = clientConnection(scene)
77         
78         if conn:
79                 conn.request("POST", "/slave", repr(slave_Info().serialize()))
80                 response = conn.getresponse()
81                 
82                 slave_id = response.getheader("slave-id")
83                 
84                 NODE_PREFIX = netsettings.path + "slave_" + slave_id + os.sep
85                 if not os.path.exists(NODE_PREFIX):
86                         os.mkdir(NODE_PREFIX)
87         
88                 while not engine.test_break():
89                         
90                         conn.request("GET", "/job", headers={"slave-id":slave_id})
91                         response = conn.getresponse()
92                         
93                         if response.status == http.client.OK:
94                                 timeout = 1 # reset timeout on new job
95                                 
96                                 job = netrender.model.RenderJob.materialize(eval(str(response.read(), encoding='utf8')))
97                                 
98                                 JOB_PREFIX = NODE_PREFIX + "job_" + job.id + os.sep
99                                 if not os.path.exists(JOB_PREFIX):
100                                         os.mkdir(JOB_PREFIX)
101                                 
102                                 
103                                 if job.type == netrender.model.JOB_BLENDER:
104                                         job_path = job.files[0][0] # data in files have format (path, start, end)
105                                         main_path, main_file = os.path.split(job_path)
106                                         
107                                         job_full_path = testFile(conn, job.id, slave_id, JOB_PREFIX, job_path)
108                                         print("Fullpath", job_full_path)
109                                         print("File:", main_file, "and %i other files" % (len(job.files) - 1,))
110                                         engine.update_stats("", "Render File", main_file, "for job", job.id)
111                                         
112                                         for file_path, start, end in job.files[1:]:
113                                                 print("\t", file_path)
114                                                 testFile(conn, job.id, slave_id, JOB_PREFIX, file_path, main_path)
115
116                                 # announce log to master
117                                 logfile = netrender.model.LogFile(job.id, [frame.number for frame in job.frames])
118                                 conn.request("POST", "/log", bytes(repr(logfile.serialize()), encoding='utf8'), headers={"slave-id":slave_id})
119                                 response = conn.getresponse()
120                                 
121                                 
122                                 first_frame = job.frames[0].number
123                                         
124                                 # start render
125                                 start_t = time.time()
126                                         
127                                 if job.type == netrender.model.JOB_BLENDER:
128                                         frame_args = []
129                                         
130                                         for frame in job.frames:
131                                                 print("frame", frame.number)
132                                                 frame_args += ["-f", str(frame.number)]
133                                         
134                                         val = SetErrorMode()
135                                         process = subprocess.Popen([sys.argv[0], "-b", job_full_path, "-o", JOB_PREFIX + "######", "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
136                                         RestoreErrorMode(val)
137                                 elif job.type == netrender.model.JOB_PROCESS:
138                                         command = job.frames[0].command
139                                         val = SetErrorMode()
140                                         process = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
141                                         RestoreErrorMode(val)
142                                 
143                                 headers = {"job-id":job.id, "slave-id":slave_id}
144                                 
145                                 cancelled = False
146                                 stdout = bytes()
147                                 run_t = time.time()
148                                 while process.poll() == None and not cancelled:
149                                         stdout += process.stdout.read(32)
150                                         current_t = time.time()
151                                         cancelled = engine.test_break()
152                                         if current_t - run_t > CANCEL_POLL_SPEED:
153                                                 
154                                                 # update logs if needed
155                                                 if stdout:
156                                                         # (only need to update on one frame, they are linked
157                                                         headers["job-frame"] = str(first_frame)
158                                                         conn.request("PUT", "/log", stdout, headers=headers)
159                                                         response = conn.getresponse()
160                                                         
161                                                         stdout = bytes()
162                                                 
163                                                 run_t = current_t
164                                                 if testCancel(conn, job.id, first_frame):
165                                                         cancelled = True
166                                 
167                                 # read leftovers if needed
168                                 stdout += process.stdout.read()
169                                 
170                                 if cancelled:
171                                         # kill process if needed
172                                         if process.poll() == None:
173                                                 process.terminate()
174                                         continue # to next frame
175                                 
176                                 total_t = time.time() - start_t
177                                 
178                                 avg_t = total_t / len(job.frames)
179                                 
180                                 status = process.returncode
181                                 
182                                 print("status", status)
183                                 
184                                 # flush the rest of the logs
185                                 if stdout:
186                                         # (only need to update on one frame, they are linked
187                                         headers["job-frame"] = str(first_frame)
188                                         conn.request("PUT", "/log", stdout, headers=headers)
189                                         response = conn.getresponse()
190                                 
191                                 headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)}
192                                 
193                                 if status == 0: # non zero status is error
194                                         headers["job-result"] = str(DONE)
195                                         for frame in job.frames:
196                                                 headers["job-frame"] = str(frame.number)
197                                                 
198                                                 if job.type == netrender.model.JOB_BLENDER:
199                                                         # send image back to server
200                                                         f = open(JOB_PREFIX + "%06d" % frame.number + ".exr", 'rb')
201                                                         conn.request("PUT", "/render", f, headers=headers)
202                                                         f.close()
203                                                         response = conn.getresponse()
204                                                 elif job.type == netrender.model.JOB_PROCESS:
205                                                         conn.request("PUT", "/render", headers=headers)
206                                                         response = conn.getresponse()
207                                 else:
208                                         headers["job-result"] = str(ERROR)
209                                         for frame in job.frames:
210                                                 headers["job-frame"] = str(frame.number)
211                                                 # send error result back to server
212                                                 conn.request("PUT", "/render", headers=headers)
213                                                 response = conn.getresponse()
214                         else:
215                                 if timeout < MAX_TIMEOUT:
216                                         timeout += INCREMENT_TIMEOUT
217                                 
218                                 for i in range(timeout):
219                                         time.sleep(1)
220                                         if engine.test_break():
221                                                 conn.close()
222                                                 return
223                         
224                 conn.close()