1# -*- coding: utf-8 -*- 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Module for running cbuildbot stages in the background.""" 7 8from __future__ import print_function 9 10import collections 11import contextlib 12import ctypes 13import errno 14import functools 15import multiprocessing 16from multiprocessing.managers import SyncManager 17import os 18import signal 19import sys 20import time 21import traceback 22 23import six 24from six.moves import queue as Queue 25 26from autotest_lib.utils.frozen_chromite.lib import failures_lib 27from autotest_lib.utils.frozen_chromite.lib import results_lib 28from autotest_lib.utils.frozen_chromite.lib import cros_build_lib 29from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging 30from autotest_lib.utils.frozen_chromite.lib import osutils 31from autotest_lib.utils.frozen_chromite.lib import signals 32from autotest_lib.utils.frozen_chromite.lib import timeout_util 33 34 35_BUFSIZE = 1024 36 37 38class HackTimeoutSyncManager(SyncManager): 39 """Increase the process join timeout in SyncManager. 40 41 The timeout for the manager process to join in the core library is 42 too low. The process is often killed before shutting down properly, 43 resulting in temporary directories (pymp-xxx) not being cleaned 44 up. This class increases the default timeout. 45 """ 46 47 @staticmethod 48 def _finalize_manager(process, *args, **kwargs): 49 """Shutdown the manager process.""" 50 51 def _join(functor, *args, **kwargs): 52 timeout = kwargs.get('timeout') 53 if not timeout is None and timeout < 1: 54 kwargs['timeout'] = 1 55 56 functor(*args, **kwargs) 57 58 process.join = functools.partial(_join, process.join) 59 SyncManager._finalize_manager(process, *args, **kwargs) 60 61 62def IgnoreSigintAndSigterm(): 63 """Ignores any future SIGINTs and SIGTERMs.""" 64 signal.signal(signal.SIGINT, signal.SIG_IGN) 65 signal.signal(signal.SIGTERM, signal.SIG_IGN) 66 67 68def Manager(): 69 """Create a background process for managing interprocess communication. 70 71 This manager wraps multiprocessing.Manager() and ensures that any sockets 72 created during initialization are created under the /tmp tree rather than in a 73 custom temp directory. This is needed because TMPDIR might be really long, and 74 named sockets are limited to 108 characters. 75 76 Examples: 77 with Manager() as manager: 78 queue = manager.Queue() 79 ... 80 81 Returns: 82 The return value of multiprocessing.Manager() 83 """ 84 # Use a short directory in /tmp. Do not use /tmp directly to keep these 85 # temperary files together and because certain environments do not like too 86 # many top-level paths in /tmp (see crbug.com/945523). 87 # Make it mode 1777 to mirror /tmp, so that we don't have failures when root 88 # calls parallel first, and some other user calls it later. 89 tmp_dir = '/tmp/chromite.parallel.%d' % os.geteuid() 90 osutils.SafeMakedirs(tmp_dir, mode=0o1777) 91 old_tempdir_value, old_tempdir_env = osutils.SetGlobalTempDir(tmp_dir) 92 try: 93 m = HackTimeoutSyncManager() 94 # SyncManager doesn't handle KeyboardInterrupt exceptions well; pipes get 95 # broken and E_NOENT or E_PIPE errors are thrown from various places. We 96 # can just ignore SIGINT in the SyncManager and things will close properly 97 # when the enclosing with-statement exits. 98 m.start(IgnoreSigintAndSigterm) 99 return m 100 finally: 101 osutils.SetGlobalTempDir(old_tempdir_value, old_tempdir_env) 102 103 104class BackgroundFailure(failures_lib.CompoundFailure): 105 """Exception to show a step failed while running in a background process.""" 106 107 108class ProcessExitTimeout(Exception): 109 """Raised if a process cannot exit within the timeout.""" 110 111 112class ProcessUnexpectedExit(Exception): 113 """Raised if a process exits unexpectedly.""" 114 115 116class ProcessSilentTimeout(Exception): 117 """Raised when there is no output for a prolonged period of time.""" 118 119 120class UnexpectedException(Exception): 121 """Raised when exception occurs at an unexpected place.""" 122 123 124class _BackgroundTask(multiprocessing.Process): 125 """Run a task in the background. 126 127 This task may be the 'Run' function from a buildbot stage or just a plain 128 function. It will be run in the background. Output from this task is saved 129 to a temporary file and is printed when the 'Wait' function is called. 130 """ 131 132 # The time we give Python to startup and exit. 133 STARTUP_TIMEOUT = 60 * 5 134 EXIT_TIMEOUT = 60 * 10 135 136 # The time we allow processes to be silent. This is in place so that we 137 # eventually catch hanging processes, and print the remainder of our output. 138 # Do not increase this. Instead, adjust your program to print regular progress 139 # updates, so that cbuildbot (and buildbot) can know that it has not hung. 140 SILENT_TIMEOUT = 60 * 145 141 142 # The amount by which we reduce the SILENT_TIMEOUT every time we launch 143 # a subprocess. This helps ensure that children get a chance to enforce the 144 # SILENT_TIMEOUT prior to the parents enforcing it. 145 SILENT_TIMEOUT_STEP = 30 146 MINIMUM_SILENT_TIMEOUT = 60 * 135 147 148 # The time before terminating or killing a task. 149 SIGTERM_TIMEOUT = 30 150 SIGKILL_TIMEOUT = 60 151 152 # How long we allow debug commands to run (so we don't hang will trying to 153 # recover from a hang). 154 DEBUG_CMD_TIMEOUT = 60 155 156 # Interval we check for updates from print statements. 157 PRINT_INTERVAL = 1 158 159 def __init__(self, task, queue, semaphore=None, task_args=None, 160 task_kwargs=None): 161 """Create a new _BackgroundTask object. 162 163 If semaphore is supplied, it will be acquired for the duration of the 164 steps that are run in the background. This can be used to limit the 165 number of simultaneous parallel tasks. 166 167 Args: 168 task: The task (a functor) to run in the background. 169 queue: A queue to be used for managing communication between the parent 170 and child process. This queue must be valid for the length of the 171 life of the child process, until the parent has collected its status. 172 semaphore: The lock to hold while |task| runs. 173 task_args: A list of args to pass to the |task|. 174 task_kwargs: A dict of optional args to pass to the |task|. 175 """ 176 multiprocessing.Process.__init__(self) 177 self._task = task 178 self._queue = queue 179 self._semaphore = semaphore 180 self._started = multiprocessing.Event() 181 self._killing = multiprocessing.Event() 182 self._output = None 183 self._parent_pid = None 184 self._task_args = task_args if task_args else () 185 self._task_kwargs = task_kwargs if task_kwargs else {} 186 187 def _WaitForStartup(self): 188 # TODO(davidjames): Use python-2.7 syntax to simplify this. 189 self._started.wait(self.STARTUP_TIMEOUT) 190 msg = 'Process failed to start in %d seconds' % self.STARTUP_TIMEOUT 191 assert self._started.is_set(), msg 192 193 @classmethod 194 def _DebugRunCommand(cls, cmd, **kwargs): 195 """Swallow any exception run raises. 196 197 Since these commands are for purely informational purposes, we don't 198 random issues causing the bot to die. 199 200 Returns: 201 Stdout on success 202 """ 203 log_level = kwargs['debug_level'] 204 try: 205 with timeout_util.Timeout(cls.DEBUG_CMD_TIMEOUT): 206 return cros_build_lib.run(cmd, **kwargs).output 207 except (cros_build_lib.RunCommandError, timeout_util.TimeoutError) as e: 208 logging.log(log_level, 'Running %s failed: %s', cmd[0], str(e)) 209 return '' 210 211 # Debug commands to run in gdb. A class member so tests can stub it out. 212 GDB_COMMANDS = ( 213 'info proc all', 214 'info threads', 215 'thread apply all py-list', 216 'thread apply all py-bt', 217 'thread apply all bt', 218 'detach', 219 ) 220 221 @classmethod 222 def _DumpDebugPid(cls, log_level, pid): 223 """Dump debug info about the hanging |pid|.""" 224 pid = str(pid) 225 commands = ( 226 ('pstree', '-Apals', pid), 227 ('lsof', '-p', pid), 228 ) 229 for cmd in commands: 230 cls._DebugRunCommand(cmd, debug_level=log_level, check=False, 231 log_output=True, encoding='utf-8') 232 233 stdin = '\n'.join(['echo \\n>>> %s\\n\n%s' % (x, x) 234 for x in cls.GDB_COMMANDS]) 235 cmd = ('gdb', '--nx', '-q', '-p', pid, '-ex', 'set prompt',) 236 cls._DebugRunCommand(cmd, debug_level=log_level, check=False, 237 log_output=True, input=stdin, encoding='utf-8') 238 239 def Kill(self, sig, log_level, first=False): 240 """Kill process with signal, ignoring if the process is dead. 241 242 Args: 243 sig: Signal to send. 244 log_level: The log level of log messages. 245 first: Whether this is the first signal we've sent. 246 """ 247 self._killing.set() 248 self._WaitForStartup() 249 if logging.getLogger().isEnabledFor(log_level): 250 # Dump debug information about the hanging process. 251 logging.log(log_level, 'Killing %r (sig=%r %s)', self.pid, sig, 252 signals.StrSignal(sig)) 253 254 if first: 255 ppid = str(self.pid) 256 output = self._DebugRunCommand( 257 ('pgrep', '-P', ppid), debug_level=log_level, print_cmd=False, 258 check=False, capture_output=True) 259 for pid in [ppid] + output.splitlines(): 260 self._DumpDebugPid(log_level, pid) 261 262 try: 263 os.kill(self.pid, sig) 264 except OSError as ex: 265 if ex.errno != errno.ESRCH: 266 raise 267 268 def Cleanup(self, silent=False): 269 """Wait for a process to exit.""" 270 if os.getpid() != self._parent_pid or self._output is None: 271 return 272 try: 273 # Print output from subprocess. 274 if not silent and logging.getLogger().isEnabledFor(logging.DEBUG): 275 with open(self._output.name, 'r') as f: 276 for line in f: 277 logging.debug(line.rstrip('\n')) 278 finally: 279 # Clean up our temporary file. 280 osutils.SafeUnlink(self._output.name) 281 self._output.close() 282 self._output = None 283 284 def Wait(self): 285 """Wait for the task to complete. 286 287 Output from the task is printed as it runs. 288 289 If an exception occurs, return a string containing the traceback. 290 """ 291 try: 292 # Flush stdout and stderr to be sure no output is interleaved. 293 sys.stdout.flush() 294 sys.stderr.flush() 295 296 # File position pointers are shared across processes, so we must open 297 # our own file descriptor to ensure output is not lost. 298 self._WaitForStartup() 299 silent_death_time = time.time() + self.SILENT_TIMEOUT 300 results = [] 301 with open(self._output.name, 'r') as output: 302 pos = 0 303 running, exited_cleanly, task_errors, run_errors = (True, False, [], []) 304 while running: 305 # Check whether the process is still alive. 306 running = self.is_alive() 307 308 try: 309 errors, results = \ 310 self._queue.get(True, self.PRINT_INTERVAL) 311 if errors: 312 task_errors.extend(errors) 313 314 running = False 315 exited_cleanly = True 316 except Queue.Empty: 317 pass 318 319 if not running: 320 # Wait for the process to actually exit. If the child doesn't exit 321 # in a timely fashion, kill it. 322 self.join(self.EXIT_TIMEOUT) 323 if self.exitcode is None: 324 msg = '%r hung for %r seconds' % (self, self.EXIT_TIMEOUT) 325 run_errors.extend( 326 failures_lib.CreateExceptInfo(ProcessExitTimeout(msg), '')) 327 self._KillChildren([self]) 328 elif not exited_cleanly: 329 msg = ('%r exited unexpectedly with code %s' 330 % (self, self.exitcode)) 331 run_errors.extend( 332 failures_lib.CreateExceptInfo(ProcessUnexpectedExit(msg), '')) 333 334 # Read output from process. 335 output.seek(pos) 336 buf = output.read(_BUFSIZE) 337 338 if buf: 339 silent_death_time = time.time() + self.SILENT_TIMEOUT 340 elif running and time.time() > silent_death_time: 341 msg = ('No output from %r for %r seconds' % 342 (self, self.SILENT_TIMEOUT)) 343 run_errors.extend( 344 failures_lib.CreateExceptInfo(ProcessSilentTimeout(msg), '')) 345 self._KillChildren([self]) 346 347 # Read remaining output from the process. 348 output.seek(pos) 349 buf = output.read(_BUFSIZE) 350 running = False 351 352 # Print output so far. 353 while buf: 354 sys.stdout.write(buf) 355 pos += len(buf) 356 if len(buf) < _BUFSIZE: 357 break 358 buf = output.read(_BUFSIZE) 359 360 # Print error messages if anything exceptional occurred. 361 if run_errors: 362 logging.PrintBuildbotStepFailure() 363 traceback.print_stack() 364 logging.warning('\n'.join(x.str for x in run_errors if x)) 365 logging.info('\n'.join(x.str for x in task_errors if x)) 366 367 sys.stdout.flush() 368 sys.stderr.flush() 369 370 # Propagate any results. 371 for result in results: 372 results_lib.Results.Record(*result) 373 374 finally: 375 self.Cleanup(silent=True) 376 377 # If an error occurred, return it. 378 return run_errors + task_errors 379 380 def start(self): 381 """Invoke multiprocessing.Process.start after flushing output/err.""" 382 if self.SILENT_TIMEOUT < self.MINIMUM_SILENT_TIMEOUT: 383 raise AssertionError('Maximum recursion depth exceeded in %r' % self) 384 385 sys.stdout.flush() 386 sys.stderr.flush() 387 tmp_dir = '/tmp/chromite.parallel.%d' % os.geteuid() 388 osutils.SafeMakedirs(tmp_dir, mode=0o1777) 389 self._output = cros_build_lib.UnbufferedNamedTemporaryFile( 390 delete=False, dir=tmp_dir, prefix='chromite-parallel-') 391 self._parent_pid = os.getpid() 392 return multiprocessing.Process.start(self) 393 394 def run(self): 395 """Run the list of steps.""" 396 if self._semaphore is not None: 397 self._semaphore.acquire() 398 399 errors = failures_lib.CreateExceptInfo( 400 UnexpectedException('Unexpected exception in %r' % self), '') 401 pid = os.getpid() 402 try: 403 errors = self._Run() 404 finally: 405 if not self._killing.is_set() and os.getpid() == pid: 406 results = results_lib.Results.Get() 407 self._queue.put((errors, results)) 408 if self._semaphore is not None: 409 self._semaphore.release() 410 411 def _Run(self): 412 """Internal method for running the list of steps.""" 413 # Register a handler for a signal that is rarely used. 414 def trigger_bt(_sig_num, frame): 415 logging.error('pre-kill notification (SIGXCPU); traceback:\n%s', 416 ''.join(traceback.format_stack(frame))) 417 signal.signal(signal.SIGXCPU, trigger_bt) 418 419 sys.stdout.flush() 420 sys.stderr.flush() 421 errors = [] 422 # Send all output to a named temporary file. 423 with open(self._output.name, 'wb', 0) as output: 424 # Back up sys.std{err,out}. These aren't used, but we keep a copy so 425 # that they aren't garbage collected. We intentionally don't restore 426 # the old stdout and stderr at the end, because we want shutdown errors 427 # to also be sent to the same log file. 428 _orig_stdout, _orig_stderr = sys.stdout, sys.stderr 429 430 # Replace std{out,err} with unbuffered file objects. 431 os.dup2(output.fileno(), sys.__stdout__.fileno()) 432 os.dup2(output.fileno(), sys.__stderr__.fileno()) 433 # The API of these funcs changed between versions. 434 if sys.version_info.major < 3: 435 sys.stdout = os.fdopen(sys.__stdout__.fileno(), 'w', 0) 436 sys.stderr = os.fdopen(sys.__stderr__.fileno(), 'w', 0) 437 else: 438 sys.stdout = os.fdopen(sys.__stdout__.fileno(), 'w', closefd=False) 439 sys.stderr = os.fdopen(sys.__stderr__.fileno(), 'w', closefd=False) 440 441 try: 442 self._started.set() 443 results_lib.Results.Clear() 444 445 # Reduce the silent timeout by the prescribed amount. 446 cls = self.__class__ 447 cls.SILENT_TIMEOUT -= cls.SILENT_TIMEOUT_STEP 448 449 # Actually launch the task. 450 self._task(*self._task_args, **self._task_kwargs) 451 except failures_lib.StepFailure as ex: 452 errors.extend(failures_lib.CreateExceptInfo( 453 ex, traceback.format_exc())) 454 except BaseException as ex: 455 errors.extend(failures_lib.CreateExceptInfo( 456 ex, traceback.format_exc())) 457 if self._killing.is_set(): 458 traceback.print_exc() 459 finally: 460 sys.stdout.flush() 461 sys.stderr.flush() 462 463 return errors 464 465 @classmethod 466 def _KillChildren(cls, bg_tasks, log_level=logging.WARNING): 467 """Kill a deque of background tasks. 468 469 This is needed to prevent hangs in the case where child processes refuse 470 to exit. 471 472 Args: 473 bg_tasks: A list filled with _BackgroundTask objects. 474 log_level: The log level of log messages. 475 """ 476 logging.log(log_level, 'Killing tasks: %r', bg_tasks) 477 siglist = ( 478 (signal.SIGXCPU, cls.SIGTERM_TIMEOUT), 479 (signal.SIGTERM, cls.SIGKILL_TIMEOUT), 480 (signal.SIGKILL, None), 481 ) 482 first = True 483 for sig, timeout in siglist: 484 # Send signal to all tasks. 485 for task in bg_tasks: 486 task.Kill(sig, log_level, first) 487 first = False 488 489 # Wait for all tasks to exit, if requested. 490 if timeout is None: 491 for task in bg_tasks: 492 task.join() 493 task.Cleanup() 494 break 495 496 # Wait until timeout expires. 497 end_time = time.time() + timeout 498 while bg_tasks: 499 time_left = end_time - time.time() 500 if time_left <= 0: 501 break 502 task = bg_tasks[-1] 503 task.join(time_left) 504 if task.exitcode is not None: 505 task.Cleanup() 506 bg_tasks.pop() 507 508 @classmethod 509 @contextlib.contextmanager 510 def ParallelTasks(cls, steps, max_parallel=None, halt_on_error=False): 511 """Run a list of functions in parallel. 512 513 This function launches the provided functions in the background, yields, 514 and then waits for the functions to exit. 515 516 The output from the functions is saved to a temporary file and printed as if 517 they were run in sequence. 518 519 If exceptions occur in the steps, we join together the tracebacks and print 520 them after all parallel tasks have finished running. Further, a 521 BackgroundFailure is raised with full stack traces of all exceptions. 522 523 Args: 524 steps: A list of functions to run. 525 max_parallel: The maximum number of simultaneous tasks to run in parallel. 526 By default, run all tasks in parallel. 527 halt_on_error: After the first exception occurs, halt any running steps, 528 and squelch any further output, including any exceptions that might 529 occur. 530 """ 531 532 semaphore = None 533 if max_parallel is not None: 534 semaphore = multiprocessing.Semaphore(max_parallel) 535 536 # First, start all the steps. 537 with Manager() as manager: 538 bg_tasks = collections.deque() 539 for step in steps: 540 task = cls(step, queue=manager.Queue(), semaphore=semaphore) 541 task.start() 542 bg_tasks.append(task) 543 544 foreground_except = None 545 try: 546 yield 547 except BaseException: 548 foreground_except = sys.exc_info() 549 finally: 550 errors = [] 551 skip_bg_wait = halt_on_error and foreground_except is not None 552 # Wait for each step to complete. 553 while not skip_bg_wait and bg_tasks: 554 task = bg_tasks.popleft() 555 task_errors = task.Wait() 556 if task_errors: 557 errors.extend(task_errors) 558 if halt_on_error: 559 break 560 561 # If there are still tasks left, kill them. 562 if bg_tasks: 563 cls._KillChildren(bg_tasks, log_level=logging.DEBUG) 564 565 # Propagate any exceptions; foreground exceptions take precedence. 566 if foreground_except is not None: 567 # contextlib ignores caught exceptions unless explicitly re-raised. 568 six.reraise(foreground_except[0], foreground_except[1], 569 foreground_except[2]) 570 if errors: 571 raise BackgroundFailure(exc_infos=errors) 572 573 @staticmethod 574 def TaskRunner(queue, task, onexit=None, task_args=None, task_kwargs=None): 575 """Run task(*input) for each input in the queue. 576 577 Returns when it encounters an _AllTasksComplete object on the queue. 578 If exceptions occur, save them off and re-raise them as a 579 BackgroundFailure once we've finished processing the items in the queue. 580 581 Args: 582 queue: A queue of tasks to run. Add tasks to this queue, and they will 583 be run. 584 task: Function to run on each queued input. 585 onexit: Function to run after all inputs are processed. 586 task_args: A list of args to pass to the |task|. 587 task_kwargs: A dict of optional args to pass to the |task|. 588 """ 589 if task_args is None: 590 task_args = [] 591 elif not isinstance(task_args, list): 592 task_args = list(task_args) 593 if task_kwargs is None: 594 task_kwargs = {} 595 596 errors = [] 597 while True: 598 # Wait for a new item to show up on the queue. This is a blocking wait, 599 # so if there's nothing to do, we just sit here. 600 x = queue.get() 601 if isinstance(x, _AllTasksComplete): 602 # All tasks are complete, so we should exit. 603 break 604 elif not isinstance(x, list): 605 x = task_args + list(x) 606 else: 607 x = task_args + x 608 609 # If no tasks failed yet, process the remaining tasks. 610 if not errors: 611 try: 612 task(*x, **task_kwargs) 613 except BaseException as ex: 614 errors.extend( 615 failures_lib.CreateExceptInfo(ex, traceback.format_exc())) 616 617 # Run exit handlers. 618 if onexit: 619 onexit() 620 621 # Propagate any exceptions. 622 if errors: 623 raise BackgroundFailure(exc_infos=errors) 624 625 626def RunParallelSteps(steps, max_parallel=None, halt_on_error=False, 627 return_values=False): 628 """Run a list of functions in parallel. 629 630 This function blocks until all steps are completed. 631 632 The output from the functions is saved to a temporary file and printed as if 633 they were run in sequence. 634 635 If exceptions occur in the steps, we join together the tracebacks and print 636 them after all parallel tasks have finished running. Further, a 637 BackgroundFailure is raised with full stack traces of all exceptions. 638 639 Examples: 640 # This snippet will execute in parallel: 641 # somefunc() 642 # anotherfunc() 643 # funcfunc() 644 steps = [somefunc, anotherfunc, funcfunc] 645 RunParallelSteps(steps) 646 # Blocks until all calls have completed. 647 648 Args: 649 steps: A list of functions to run. 650 max_parallel: The maximum number of simultaneous tasks to run in parallel. 651 By default, run all tasks in parallel. 652 halt_on_error: After the first exception occurs, halt any running steps, 653 and squelch any further output, including any exceptions that might occur. 654 return_values: If set to True, RunParallelSteps returns a list containing 655 the return values of the steps. Defaults to False. 656 657 Returns: 658 If |return_values| is True, the function will return a list containing the 659 return values of the steps. 660 """ 661 def ReturnWrapper(queue, fn): 662 """Put the return value of |fn| into |queue|.""" 663 queue.put(fn()) 664 665 full_steps = [] 666 queues = [] 667 with cros_build_lib.ContextManagerStack() as stack: 668 if return_values: 669 # We use a managed queue here, because the child process will wait for the 670 # queue(pipe) to be flushed (i.e., when items are read from the queue) 671 # before exiting, and with a regular queue this may result in hangs for 672 # large return values. But with a managed queue, the manager process will 673 # read the items and hold on to them until the managed queue goes out of 674 # scope and is cleaned up. 675 manager = stack.Add(Manager) 676 for step in steps: 677 queue = manager.Queue() 678 queues.append(queue) 679 full_steps.append(functools.partial(ReturnWrapper, queue, step)) 680 else: 681 full_steps = steps 682 683 with _BackgroundTask.ParallelTasks(full_steps, max_parallel=max_parallel, 684 halt_on_error=halt_on_error): 685 pass 686 687 if return_values: 688 return [queue.get_nowait() for queue in queues] 689 690 691class _AllTasksComplete(object): 692 """Sentinel object to indicate that all tasks are complete.""" 693 694 695@contextlib.contextmanager 696def BackgroundTaskRunner(task, *args, **kwargs): 697 """Run the specified task on each queued input in a pool of processes. 698 699 This context manager starts a set of workers in the background, who each 700 wait for input on the specified queue. For each input on the queue, these 701 workers run task(*args + *input, **kwargs). Note that certain kwargs will 702 not pass through to the task (see Args below for the list). 703 704 The output from these tasks is saved to a temporary file. When control 705 returns to the context manager, the background output is printed in order, 706 as if the tasks were run in sequence. 707 708 If exceptions occur in the steps, we join together the tracebacks and print 709 them after all parallel tasks have finished running. Further, a 710 BackgroundFailure is raised with full stack traces of all exceptions. 711 712 Examples: 713 # This will run somefunc(1, 'small', 'cow', foo='bar') in the background 714 # as soon as data is added to the queue (i.e. queue.put() is called). 715 716 def somefunc(arg1, arg2, arg3, foo=None): 717 ... 718 719 with BackgroundTaskRunner(somefunc, 1, foo='bar') as queue: 720 ... do random stuff ... 721 queue.put(['small', 'cow']) 722 ... do more random stuff while somefunc() runs ... 723 # Exiting the with statement will block until all calls have completed. 724 725 Args: 726 task: Function to run on each queued input. 727 queue: A queue of tasks to run. Add tasks to this queue, and they will 728 be run in the background. If None, one will be created on the fly. 729 processes: Number of processes to launch. 730 onexit: Function to run in each background process after all inputs are 731 processed. 732 halt_on_error: After the first exception occurs, halt any running steps, and 733 squelch any further output, including any exceptions that might occur. 734 Halts on exceptions in any of the background processes, or in the 735 foreground process using the BackgroundTaskRunner. 736 """ 737 738 queue = kwargs.pop('queue', None) 739 processes = kwargs.pop('processes', None) 740 onexit = kwargs.pop('onexit', None) 741 halt_on_error = kwargs.pop('halt_on_error', False) 742 743 with cros_build_lib.ContextManagerStack() as stack: 744 if queue is None: 745 manager = stack.Add(Manager) 746 queue = manager.Queue() 747 748 if not processes: 749 processes = multiprocessing.cpu_count() 750 751 child = functools.partial(_BackgroundTask.TaskRunner, queue, task, 752 onexit=onexit, task_args=args, 753 task_kwargs=kwargs) 754 steps = [child] * processes 755 with _BackgroundTask.ParallelTasks(steps, halt_on_error=halt_on_error): 756 try: 757 yield queue 758 finally: 759 for _ in range(processes): 760 queue.put(_AllTasksComplete()) 761 762 763def RunTasksInProcessPool(task, inputs, processes=None, onexit=None): 764 """Run the specified function with each supplied input in a pool of processes. 765 766 This function runs task(*x) for x in inputs in a pool of processes. This 767 function blocks until all tasks are completed. 768 769 The output from these tasks is saved to a temporary file. When control 770 returns to the context manager, the background output is printed in order, 771 as if the tasks were run in sequence. 772 773 If exceptions occur in the steps, we join together the tracebacks and print 774 them after all parallel tasks have finished running. Further, a 775 BackgroundFailure is raised with full stack traces of all exceptions. 776 777 Examples: 778 # This snippet will execute in parallel: 779 # somefunc('hi', 'fat', 'code') 780 # somefunc('foo', 'bar', 'cow') 781 782 def somefunc(arg1, arg2, arg3): 783 ... 784 ... 785 inputs = [ 786 ['hi', 'fat', 'code'], 787 ['foo', 'bar', 'cow'], 788 ] 789 RunTasksInProcessPool(somefunc, inputs) 790 # Blocks until all calls have completed. 791 792 Args: 793 task: Function to run on each input. 794 inputs: List of inputs. 795 processes: Number of processes, at most, to launch. 796 onexit: Function to run in each background process after all inputs are 797 processed. 798 799 Returns: 800 Returns a list containing the return values of the task for each input. 801 """ 802 if not processes: 803 # - Use >=16 processes by default, in case it's a network-bound operation. 804 # - Try to use all of the CPUs, in case it's a CPU-bound operation. 805 processes = min(max(16, multiprocessing.cpu_count()), len(inputs)) 806 807 with Manager() as manager: 808 # Set up output queue. 809 out_queue = manager.Queue() 810 fn = lambda idx, task_args: out_queue.put((idx, task(*task_args))) 811 812 # Micro-optimization: Setup the queue so that BackgroundTaskRunner 813 # doesn't have to set up another Manager process. 814 queue = manager.Queue() 815 816 with BackgroundTaskRunner(fn, queue=queue, processes=processes, 817 onexit=onexit) as queue: 818 for idx, input_args in enumerate(inputs): 819 queue.put((idx, input_args)) 820 821 return [x[1] for x in sorted(out_queue.get() for _ in range(len(inputs)))] 822 823 824PR_SET_PDEATHSIG = 1 825 826 827def ExitWithParent(sig=signal.SIGHUP): 828 """Sets this process to receive |sig| when the parent dies. 829 830 Note: this uses libc, so it only works on linux. 831 832 Args: 833 sig: Signal to recieve. Defaults to SIGHUP. 834 835 Returns: 836 Whether we were successful in setting the deathsignal flag 837 """ 838 libc_name = ctypes.util.find_library('c') 839 if not libc_name: 840 return False 841 try: 842 libc = ctypes.CDLL(libc_name) 843 libc.prctl(PR_SET_PDEATHSIG, sig) 844 return True 845 # We might not be able to load the library (OSError), or prctl might be 846 # missing (AttributeError) 847 except (OSError, AttributeError): 848 return False 849