1# Lint as: python2, python3 2from __future__ import absolute_import 3from __future__ import division 4from __future__ import print_function 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007""" 7 8import sys, os, signal, time, six.moves.cPickle, logging 9 10from autotest_lib.client.common_lib import error, utils 11from autotest_lib.client.common_lib.cros import retry 12from six.moves import zip 13 14 15# entry points that use subcommand must set this to their logging manager 16# to get log redirection for subcommands 17logging_manager_object = None 18 19 20def parallel(tasklist, timeout=None, return_results=False): 21 """ 22 Run a set of predefined subcommands in parallel. 23 24 @param tasklist: A list of subcommand instances to execute. 25 @param timeout: Number of seconds after which the commands should timeout. 26 @param return_results: If True instead of an AutoServError being raised 27 on any error a list of the results|exceptions from the tasks is 28 returned. [default: False] 29 """ 30 run_error = False 31 for task in tasklist: 32 task.fork_start() 33 34 remaining_timeout = None 35 if timeout: 36 endtime = time.time() + timeout 37 38 results = [] 39 for task in tasklist: 40 if timeout: 41 remaining_timeout = max(endtime - time.time(), 1) 42 try: 43 status = task.fork_waitfor(timeout=remaining_timeout) 44 except error.AutoservSubcommandError: 45 run_error = True 46 else: 47 if status != 0: 48 run_error = True 49 50 results.append( 51 six.moves.cPickle.load(task.result_pickle, encoding='utf-8')) 52 if hasattr(task.result_pickle, 'close'): 53 task.result_pickle.close() 54 55 if return_results: 56 return results 57 elif run_error: 58 message = 'One or more subcommands failed:\n' 59 for task, result in zip(tasklist, results): 60 message += 'task: %s returned/raised: %r\n' % (task, result) 61 raise error.AutoservError(message) 62 63 64def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x), 65 log=True, timeout=None, return_results=False): 66 """ 67 Each element in the arglist used to create a subcommand object, 68 where that arg is used both as a subdir name, and a single argument 69 to pass to "function". 70 71 We create a subcommand object for each element in the list, 72 then execute those subcommand objects in parallel. 73 74 NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used. 75 76 @param function: A callable to run in parallel once per arg in arglist. 77 @param arglist: A list of arguments to be used one per subcommand 78 @param subdir_name_constructor: A function that returns a name for the 79 result sub-directory created per subcommand. 80 Signature is: 81 subdir_name_constructor(arg) 82 where arg is the argument passed to function. 83 @param log: If True, output will be written to output in a subdirectory 84 named after each subcommand's arg. 85 @param timeout: Number of seconds after which the commands should timeout. 86 @param return_results: If True instead of an AutoServError being raised 87 on any error a list of the results|exceptions from the function 88 called on each arg is returned. [default: False] 89 90 @returns None or a list of results/exceptions. 91 """ 92 if not arglist: 93 logging.warning('parallel_simple was called with an empty arglist, ' 94 'did you forget to pass in a list of machines?') 95 96 # Bypass the multithreading if only one machine. 97 if len(arglist) == 1: 98 arg = arglist[0] 99 if return_results: 100 try: 101 result = function(arg) 102 except Exception as e: 103 return [e] 104 return [result] 105 else: 106 function(arg) 107 return 108 109 subcommands = [] 110 for arg in arglist: 111 args = [arg] 112 subdir = subdir_name_constructor(arg) if log else None 113 subcommands.append(subcommand(function, args, subdir)) 114 return parallel(subcommands, timeout, return_results=return_results) 115 116 117class subcommand(object): 118 fork_hooks, join_hooks = [], [] 119 120 def __init__(self, func, args, subdir = None): 121 # func(args) - the subcommand to run 122 # subdir - the subdirectory to log results in 123 if subdir: 124 self.subdir = os.path.abspath(subdir) 125 if not os.path.exists(self.subdir): 126 os.mkdir(self.subdir) 127 self.debug = os.path.join(self.subdir, 'debug') 128 if not os.path.exists(self.debug): 129 os.mkdir(self.debug) 130 else: 131 self.subdir = None 132 self.debug = None 133 134 self.func = func 135 self.args = args 136 self.pid = None 137 self.returncode = None 138 139 140 def __str__(self): 141 return str('subcommand(func=%s, args=%s, subdir=%s)' % 142 (self.func, self.args, self.subdir)) 143 144 145 @classmethod 146 def register_fork_hook(cls, hook): 147 """ Register a function to be called from the child process after 148 forking. """ 149 cls.fork_hooks.append(hook) 150 151 152 @classmethod 153 def register_join_hook(cls, hook): 154 """ Register a function to be called when from the child process 155 just before the child process terminates (joins to the parent). """ 156 cls.join_hooks.append(hook) 157 158 159 def redirect_output(self): 160 if self.subdir and logging_manager_object: 161 tag = os.path.basename(self.subdir) 162 logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag) 163 164 165 def fork_start(self): 166 sys.stdout.flush() 167 sys.stderr.flush() 168 r, w = os.pipe() 169 self.returncode = None 170 self.pid = os.fork() 171 172 if self.pid: # I am the parent 173 os.close(w) 174 self.result_pickle = os.fdopen(r, 'rb') 175 return 176 else: 177 os.close(r) 178 179 # We are the child from this point on. Never return. 180 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler 181 if self.subdir: 182 os.chdir(self.subdir) 183 self.redirect_output() 184 185 try: 186 for hook in self.fork_hooks: 187 hook(self) 188 result = self.func(*self.args) 189 os.write(w, six.moves.cPickle.dumps(result, six.moves.cPickle.HIGHEST_PROTOCOL)) 190 exit_code = 0 191 except Exception as e: 192 logging.exception('function failed') 193 exit_code = 1 194 os.write(w, six.moves.cPickle.dumps(e, six.moves.cPickle.HIGHEST_PROTOCOL)) 195 196 os.close(w) 197 198 try: 199 for hook in self.join_hooks: 200 hook(self) 201 finally: 202 sys.stdout.flush() 203 sys.stderr.flush() 204 os._exit(exit_code) 205 206 207 def _handle_exitstatus(self, sts): 208 """ 209 This is partially borrowed from subprocess.Popen. 210 """ 211 if os.WIFSIGNALED(sts): 212 self.returncode = -os.WTERMSIG(sts) 213 elif os.WIFEXITED(sts): 214 self.returncode = os.WEXITSTATUS(sts) 215 else: 216 # Should never happen 217 raise RuntimeError("Unknown child exit status!") 218 219 if self.returncode != 0: 220 print("subcommand failed pid %d" % self.pid) 221 print("%s" % (self.func,)) 222 print("rc=%d" % self.returncode) 223 print() 224 if self.debug: 225 stderr_file = os.path.join(self.debug, 'autoserv.stderr') 226 if os.path.exists(stderr_file): 227 for line in open(stderr_file).readlines(): 228 print(line, end=' ') 229 print("\n--------------------------------------------\n") 230 raise error.AutoservSubcommandError(self.func, self.returncode) 231 232 233 def poll(self): 234 """ 235 This is borrowed from subprocess.Popen. 236 """ 237 if self.returncode is None: 238 try: 239 pid, sts = os.waitpid(self.pid, os.WNOHANG) 240 if pid == self.pid: 241 self._handle_exitstatus(sts) 242 except os.error: 243 pass 244 return self.returncode 245 246 247 def wait(self): 248 """ 249 This is borrowed from subprocess.Popen. 250 """ 251 if self.returncode is None: 252 pid, sts = os.waitpid(self.pid, 0) 253 self._handle_exitstatus(sts) 254 return self.returncode 255 256 257 def fork_waitfor(self, timeout=None): 258 if not timeout: 259 return self.wait() 260 else: 261 _, result = retry.timeout(self.wait, timeout_sec=timeout) 262 263 if result is None: 264 utils.nuke_pid(self.pid) 265 print("subcommand failed pid %d" % self.pid) 266 print("%s" % (self.func,)) 267 print("timeout after %ds" % timeout) 268 print() 269 result = self.wait() 270 271 return result 272