xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/lib/parallel.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# -*- coding: utf-8 -*-
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Module for running cbuildbot stages in the background."""
7
8from __future__ import print_function
9
10import collections
11import contextlib
12import ctypes
13import errno
14import functools
15import multiprocessing
16from multiprocessing.managers import SyncManager
17import os
18import signal
19import sys
20import time
21import traceback
22
23import six
24from six.moves import queue as Queue
25
26from autotest_lib.utils.frozen_chromite.lib import failures_lib
27from autotest_lib.utils.frozen_chromite.lib import results_lib
28from autotest_lib.utils.frozen_chromite.lib import cros_build_lib
29from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
30from autotest_lib.utils.frozen_chromite.lib import osutils
31from autotest_lib.utils.frozen_chromite.lib import signals
32from autotest_lib.utils.frozen_chromite.lib import timeout_util
33
34
35_BUFSIZE = 1024
36
37
38class HackTimeoutSyncManager(SyncManager):
39  """Increase the process join timeout in SyncManager.
40
41  The timeout for the manager process to join in the core library is
42  too low. The process is often killed before shutting down properly,
43  resulting in temporary directories (pymp-xxx) not being cleaned
44  up. This class increases the default timeout.
45  """
46
47  @staticmethod
48  def _finalize_manager(process, *args, **kwargs):
49    """Shutdown the manager process."""
50
51    def _join(functor, *args, **kwargs):
52      timeout = kwargs.get('timeout')
53      if not timeout is None and timeout < 1:
54        kwargs['timeout'] = 1
55
56      functor(*args, **kwargs)
57
58    process.join = functools.partial(_join, process.join)
59    SyncManager._finalize_manager(process, *args, **kwargs)
60
61
62def IgnoreSigintAndSigterm():
63  """Ignores any future SIGINTs and SIGTERMs."""
64  signal.signal(signal.SIGINT, signal.SIG_IGN)
65  signal.signal(signal.SIGTERM, signal.SIG_IGN)
66
67
68def Manager():
69  """Create a background process for managing interprocess communication.
70
71  This manager wraps multiprocessing.Manager() and ensures that any sockets
72  created during initialization are created under the /tmp tree rather than in a
73  custom temp directory. This is needed because TMPDIR might be really long, and
74  named sockets are limited to 108 characters.
75
76  Examples:
77    with Manager() as manager:
78      queue = manager.Queue()
79      ...
80
81  Returns:
82    The return value of multiprocessing.Manager()
83  """
84  # Use a short directory in /tmp. Do not use /tmp directly to keep these
85  # temperary files together and because certain environments do not like too
86  # many top-level paths in /tmp (see crbug.com/945523).
87  # Make it mode 1777 to mirror /tmp, so that we don't have failures when root
88  # calls parallel first, and some other user calls it later.
89  tmp_dir = '/tmp/chromite.parallel.%d' % os.geteuid()
90  osutils.SafeMakedirs(tmp_dir, mode=0o1777)
91  old_tempdir_value, old_tempdir_env = osutils.SetGlobalTempDir(tmp_dir)
92  try:
93    m = HackTimeoutSyncManager()
94    # SyncManager doesn't handle KeyboardInterrupt exceptions well; pipes get
95    # broken and E_NOENT or E_PIPE errors are thrown from various places. We
96    # can just ignore SIGINT in the SyncManager and things will close properly
97    # when the enclosing with-statement exits.
98    m.start(IgnoreSigintAndSigterm)
99    return m
100  finally:
101    osutils.SetGlobalTempDir(old_tempdir_value, old_tempdir_env)
102
103
104class BackgroundFailure(failures_lib.CompoundFailure):
105  """Exception to show a step failed while running in a background process."""
106
107
108class ProcessExitTimeout(Exception):
109  """Raised if a process cannot exit within the timeout."""
110
111
112class ProcessUnexpectedExit(Exception):
113  """Raised if a process exits unexpectedly."""
114
115
116class ProcessSilentTimeout(Exception):
117  """Raised when there is no output for a prolonged period of time."""
118
119
120class UnexpectedException(Exception):
121  """Raised when exception occurs at an unexpected place."""
122
123
124class _BackgroundTask(multiprocessing.Process):
125  """Run a task in the background.
126
127  This task may be the 'Run' function from a buildbot stage or just a plain
128  function. It will be run in the background. Output from this task is saved
129  to a temporary file and is printed when the 'Wait' function is called.
130  """
131
132  # The time we give Python to startup and exit.
133  STARTUP_TIMEOUT = 60 * 5
134  EXIT_TIMEOUT = 60 * 10
135
136  # The time we allow processes to be silent. This is in place so that we
137  # eventually catch hanging processes, and print the remainder of our output.
138  # Do not increase this. Instead, adjust your program to print regular progress
139  # updates, so that cbuildbot (and buildbot) can know that it has not hung.
140  SILENT_TIMEOUT = 60 * 145
141
142  # The amount by which we reduce the SILENT_TIMEOUT every time we launch
143  # a subprocess. This helps ensure that children get a chance to enforce the
144  # SILENT_TIMEOUT prior to the parents enforcing it.
145  SILENT_TIMEOUT_STEP = 30
146  MINIMUM_SILENT_TIMEOUT = 60 * 135
147
148  # The time before terminating or killing a task.
149  SIGTERM_TIMEOUT = 30
150  SIGKILL_TIMEOUT = 60
151
152  # How long we allow debug commands to run (so we don't hang will trying to
153  # recover from a hang).
154  DEBUG_CMD_TIMEOUT = 60
155
156  # Interval we check for updates from print statements.
157  PRINT_INTERVAL = 1
158
159  def __init__(self, task, queue, semaphore=None, task_args=None,
160               task_kwargs=None):
161    """Create a new _BackgroundTask object.
162
163    If semaphore is supplied, it will be acquired for the duration of the
164    steps that are run in the background. This can be used to limit the
165    number of simultaneous parallel tasks.
166
167    Args:
168      task: The task (a functor) to run in the background.
169      queue: A queue to be used for managing communication between the parent
170        and child process. This queue must be valid for the length of the
171        life of the child process, until the parent has collected its status.
172      semaphore: The lock to hold while |task| runs.
173      task_args: A list of args to pass to the |task|.
174      task_kwargs: A dict of optional args to pass to the |task|.
175    """
176    multiprocessing.Process.__init__(self)
177    self._task = task
178    self._queue = queue
179    self._semaphore = semaphore
180    self._started = multiprocessing.Event()
181    self._killing = multiprocessing.Event()
182    self._output = None
183    self._parent_pid = None
184    self._task_args = task_args if task_args else ()
185    self._task_kwargs = task_kwargs if task_kwargs else {}
186
187  def _WaitForStartup(self):
188    # TODO(davidjames): Use python-2.7 syntax to simplify this.
189    self._started.wait(self.STARTUP_TIMEOUT)
190    msg = 'Process failed to start in %d seconds' % self.STARTUP_TIMEOUT
191    assert self._started.is_set(), msg
192
193  @classmethod
194  def _DebugRunCommand(cls, cmd, **kwargs):
195    """Swallow any exception run raises.
196
197    Since these commands are for purely informational purposes, we don't
198    random issues causing the bot to die.
199
200    Returns:
201      Stdout on success
202    """
203    log_level = kwargs['debug_level']
204    try:
205      with timeout_util.Timeout(cls.DEBUG_CMD_TIMEOUT):
206        return cros_build_lib.run(cmd, **kwargs).output
207    except (cros_build_lib.RunCommandError, timeout_util.TimeoutError) as e:
208      logging.log(log_level, 'Running %s failed: %s', cmd[0], str(e))
209      return ''
210
211  # Debug commands to run in gdb.  A class member so tests can stub it out.
212  GDB_COMMANDS = (
213      'info proc all',
214      'info threads',
215      'thread apply all py-list',
216      'thread apply all py-bt',
217      'thread apply all bt',
218      'detach',
219  )
220
221  @classmethod
222  def _DumpDebugPid(cls, log_level, pid):
223    """Dump debug info about the hanging |pid|."""
224    pid = str(pid)
225    commands = (
226        ('pstree', '-Apals', pid),
227        ('lsof', '-p', pid),
228    )
229    for cmd in commands:
230      cls._DebugRunCommand(cmd, debug_level=log_level, check=False,
231                           log_output=True, encoding='utf-8')
232
233    stdin = '\n'.join(['echo \\n>>> %s\\n\n%s' % (x, x)
234                       for x in cls.GDB_COMMANDS])
235    cmd = ('gdb', '--nx', '-q', '-p', pid, '-ex', 'set prompt',)
236    cls._DebugRunCommand(cmd, debug_level=log_level, check=False,
237                         log_output=True, input=stdin, encoding='utf-8')
238
239  def Kill(self, sig, log_level, first=False):
240    """Kill process with signal, ignoring if the process is dead.
241
242    Args:
243      sig: Signal to send.
244      log_level: The log level of log messages.
245      first: Whether this is the first signal we've sent.
246    """
247    self._killing.set()
248    self._WaitForStartup()
249    if logging.getLogger().isEnabledFor(log_level):
250      # Dump debug information about the hanging process.
251      logging.log(log_level, 'Killing %r (sig=%r %s)', self.pid, sig,
252                  signals.StrSignal(sig))
253
254      if first:
255        ppid = str(self.pid)
256        output = self._DebugRunCommand(
257            ('pgrep', '-P', ppid), debug_level=log_level, print_cmd=False,
258            check=False, capture_output=True)
259        for pid in [ppid] + output.splitlines():
260          self._DumpDebugPid(log_level, pid)
261
262    try:
263      os.kill(self.pid, sig)
264    except OSError as ex:
265      if ex.errno != errno.ESRCH:
266        raise
267
268  def Cleanup(self, silent=False):
269    """Wait for a process to exit."""
270    if os.getpid() != self._parent_pid or self._output is None:
271      return
272    try:
273      # Print output from subprocess.
274      if not silent and logging.getLogger().isEnabledFor(logging.DEBUG):
275        with open(self._output.name, 'r') as f:
276          for line in f:
277            logging.debug(line.rstrip('\n'))
278    finally:
279      # Clean up our temporary file.
280      osutils.SafeUnlink(self._output.name)
281      self._output.close()
282      self._output = None
283
284  def Wait(self):
285    """Wait for the task to complete.
286
287    Output from the task is printed as it runs.
288
289    If an exception occurs, return a string containing the traceback.
290    """
291    try:
292      # Flush stdout and stderr to be sure no output is interleaved.
293      sys.stdout.flush()
294      sys.stderr.flush()
295
296      # File position pointers are shared across processes, so we must open
297      # our own file descriptor to ensure output is not lost.
298      self._WaitForStartup()
299      silent_death_time = time.time() + self.SILENT_TIMEOUT
300      results = []
301      with open(self._output.name, 'r') as output:
302        pos = 0
303        running, exited_cleanly, task_errors, run_errors = (True, False, [], [])
304        while running:
305          # Check whether the process is still alive.
306          running = self.is_alive()
307
308          try:
309            errors, results = \
310                self._queue.get(True, self.PRINT_INTERVAL)
311            if errors:
312              task_errors.extend(errors)
313
314            running = False
315            exited_cleanly = True
316          except Queue.Empty:
317            pass
318
319          if not running:
320            # Wait for the process to actually exit. If the child doesn't exit
321            # in a timely fashion, kill it.
322            self.join(self.EXIT_TIMEOUT)
323            if self.exitcode is None:
324              msg = '%r hung for %r seconds' % (self, self.EXIT_TIMEOUT)
325              run_errors.extend(
326                  failures_lib.CreateExceptInfo(ProcessExitTimeout(msg), ''))
327              self._KillChildren([self])
328            elif not exited_cleanly:
329              msg = ('%r exited unexpectedly with code %s'
330                     % (self, self.exitcode))
331              run_errors.extend(
332                  failures_lib.CreateExceptInfo(ProcessUnexpectedExit(msg), ''))
333
334          # Read output from process.
335          output.seek(pos)
336          buf = output.read(_BUFSIZE)
337
338          if buf:
339            silent_death_time = time.time() + self.SILENT_TIMEOUT
340          elif running and time.time() > silent_death_time:
341            msg = ('No output from %r for %r seconds' %
342                   (self, self.SILENT_TIMEOUT))
343            run_errors.extend(
344                failures_lib.CreateExceptInfo(ProcessSilentTimeout(msg), ''))
345            self._KillChildren([self])
346
347            # Read remaining output from the process.
348            output.seek(pos)
349            buf = output.read(_BUFSIZE)
350            running = False
351
352          # Print output so far.
353          while buf:
354            sys.stdout.write(buf)
355            pos += len(buf)
356            if len(buf) < _BUFSIZE:
357              break
358            buf = output.read(_BUFSIZE)
359
360          # Print error messages if anything exceptional occurred.
361          if run_errors:
362            logging.PrintBuildbotStepFailure()
363            traceback.print_stack()
364            logging.warning('\n'.join(x.str for x in run_errors if x))
365            logging.info('\n'.join(x.str for x in task_errors if x))
366
367          sys.stdout.flush()
368          sys.stderr.flush()
369
370      # Propagate any results.
371      for result in results:
372        results_lib.Results.Record(*result)
373
374    finally:
375      self.Cleanup(silent=True)
376
377    # If an error occurred, return it.
378    return run_errors + task_errors
379
380  def start(self):
381    """Invoke multiprocessing.Process.start after flushing output/err."""
382    if self.SILENT_TIMEOUT < self.MINIMUM_SILENT_TIMEOUT:
383      raise AssertionError('Maximum recursion depth exceeded in %r' % self)
384
385    sys.stdout.flush()
386    sys.stderr.flush()
387    tmp_dir = '/tmp/chromite.parallel.%d' % os.geteuid()
388    osutils.SafeMakedirs(tmp_dir, mode=0o1777)
389    self._output = cros_build_lib.UnbufferedNamedTemporaryFile(
390        delete=False, dir=tmp_dir, prefix='chromite-parallel-')
391    self._parent_pid = os.getpid()
392    return multiprocessing.Process.start(self)
393
394  def run(self):
395    """Run the list of steps."""
396    if self._semaphore is not None:
397      self._semaphore.acquire()
398
399    errors = failures_lib.CreateExceptInfo(
400        UnexpectedException('Unexpected exception in %r' % self), '')
401    pid = os.getpid()
402    try:
403      errors = self._Run()
404    finally:
405      if not self._killing.is_set() and os.getpid() == pid:
406        results = results_lib.Results.Get()
407        self._queue.put((errors, results))
408        if self._semaphore is not None:
409          self._semaphore.release()
410
411  def _Run(self):
412    """Internal method for running the list of steps."""
413    # Register a handler for a signal that is rarely used.
414    def trigger_bt(_sig_num, frame):
415      logging.error('pre-kill notification (SIGXCPU); traceback:\n%s',
416                    ''.join(traceback.format_stack(frame)))
417    signal.signal(signal.SIGXCPU, trigger_bt)
418
419    sys.stdout.flush()
420    sys.stderr.flush()
421    errors = []
422    # Send all output to a named temporary file.
423    with open(self._output.name, 'wb', 0) as output:
424      # Back up sys.std{err,out}. These aren't used, but we keep a copy so
425      # that they aren't garbage collected. We intentionally don't restore
426      # the old stdout and stderr at the end, because we want shutdown errors
427      # to also be sent to the same log file.
428      _orig_stdout, _orig_stderr = sys.stdout, sys.stderr
429
430      # Replace std{out,err} with unbuffered file objects.
431      os.dup2(output.fileno(), sys.__stdout__.fileno())
432      os.dup2(output.fileno(), sys.__stderr__.fileno())
433      # The API of these funcs changed between versions.
434      if sys.version_info.major < 3:
435        sys.stdout = os.fdopen(sys.__stdout__.fileno(), 'w', 0)
436        sys.stderr = os.fdopen(sys.__stderr__.fileno(), 'w', 0)
437      else:
438        sys.stdout = os.fdopen(sys.__stdout__.fileno(), 'w', closefd=False)
439        sys.stderr = os.fdopen(sys.__stderr__.fileno(), 'w', closefd=False)
440
441      try:
442        self._started.set()
443        results_lib.Results.Clear()
444
445        # Reduce the silent timeout by the prescribed amount.
446        cls = self.__class__
447        cls.SILENT_TIMEOUT -= cls.SILENT_TIMEOUT_STEP
448
449        # Actually launch the task.
450        self._task(*self._task_args, **self._task_kwargs)
451      except failures_lib.StepFailure as ex:
452        errors.extend(failures_lib.CreateExceptInfo(
453            ex, traceback.format_exc()))
454      except BaseException as ex:
455        errors.extend(failures_lib.CreateExceptInfo(
456            ex, traceback.format_exc()))
457        if self._killing.is_set():
458          traceback.print_exc()
459      finally:
460        sys.stdout.flush()
461        sys.stderr.flush()
462
463    return errors
464
465  @classmethod
466  def _KillChildren(cls, bg_tasks, log_level=logging.WARNING):
467    """Kill a deque of background tasks.
468
469    This is needed to prevent hangs in the case where child processes refuse
470    to exit.
471
472    Args:
473      bg_tasks: A list filled with _BackgroundTask objects.
474      log_level: The log level of log messages.
475    """
476    logging.log(log_level, 'Killing tasks: %r', bg_tasks)
477    siglist = (
478        (signal.SIGXCPU, cls.SIGTERM_TIMEOUT),
479        (signal.SIGTERM, cls.SIGKILL_TIMEOUT),
480        (signal.SIGKILL, None),
481    )
482    first = True
483    for sig, timeout in siglist:
484      # Send signal to all tasks.
485      for task in bg_tasks:
486        task.Kill(sig, log_level, first)
487      first = False
488
489      # Wait for all tasks to exit, if requested.
490      if timeout is None:
491        for task in bg_tasks:
492          task.join()
493          task.Cleanup()
494        break
495
496      # Wait until timeout expires.
497      end_time = time.time() + timeout
498      while bg_tasks:
499        time_left = end_time - time.time()
500        if time_left <= 0:
501          break
502        task = bg_tasks[-1]
503        task.join(time_left)
504        if task.exitcode is not None:
505          task.Cleanup()
506          bg_tasks.pop()
507
508  @classmethod
509  @contextlib.contextmanager
510  def ParallelTasks(cls, steps, max_parallel=None, halt_on_error=False):
511    """Run a list of functions in parallel.
512
513    This function launches the provided functions in the background, yields,
514    and then waits for the functions to exit.
515
516    The output from the functions is saved to a temporary file and printed as if
517    they were run in sequence.
518
519    If exceptions occur in the steps, we join together the tracebacks and print
520    them after all parallel tasks have finished running. Further, a
521    BackgroundFailure is raised with full stack traces of all exceptions.
522
523    Args:
524      steps: A list of functions to run.
525      max_parallel: The maximum number of simultaneous tasks to run in parallel.
526        By default, run all tasks in parallel.
527      halt_on_error: After the first exception occurs, halt any running steps,
528        and squelch any further output, including any exceptions that might
529        occur.
530    """
531
532    semaphore = None
533    if max_parallel is not None:
534      semaphore = multiprocessing.Semaphore(max_parallel)
535
536    # First, start all the steps.
537    with Manager() as manager:
538      bg_tasks = collections.deque()
539      for step in steps:
540        task = cls(step, queue=manager.Queue(), semaphore=semaphore)
541        task.start()
542        bg_tasks.append(task)
543
544      foreground_except = None
545      try:
546        yield
547      except BaseException:
548        foreground_except = sys.exc_info()
549      finally:
550        errors = []
551        skip_bg_wait = halt_on_error and foreground_except is not None
552        # Wait for each step to complete.
553        while not skip_bg_wait and bg_tasks:
554          task = bg_tasks.popleft()
555          task_errors = task.Wait()
556          if task_errors:
557            errors.extend(task_errors)
558            if halt_on_error:
559              break
560
561        # If there are still tasks left, kill them.
562        if bg_tasks:
563          cls._KillChildren(bg_tasks, log_level=logging.DEBUG)
564
565        # Propagate any exceptions; foreground exceptions take precedence.
566        if foreground_except is not None:
567          # contextlib ignores caught exceptions unless explicitly re-raised.
568          six.reraise(foreground_except[0], foreground_except[1],
569                      foreground_except[2])
570        if errors:
571          raise BackgroundFailure(exc_infos=errors)
572
573  @staticmethod
574  def TaskRunner(queue, task, onexit=None, task_args=None, task_kwargs=None):
575    """Run task(*input) for each input in the queue.
576
577    Returns when it encounters an _AllTasksComplete object on the queue.
578    If exceptions occur, save them off and re-raise them as a
579    BackgroundFailure once we've finished processing the items in the queue.
580
581    Args:
582      queue: A queue of tasks to run. Add tasks to this queue, and they will
583        be run.
584      task: Function to run on each queued input.
585      onexit: Function to run after all inputs are processed.
586      task_args: A list of args to pass to the |task|.
587      task_kwargs: A dict of optional args to pass to the |task|.
588    """
589    if task_args is None:
590      task_args = []
591    elif not isinstance(task_args, list):
592      task_args = list(task_args)
593    if task_kwargs is None:
594      task_kwargs = {}
595
596    errors = []
597    while True:
598      # Wait for a new item to show up on the queue. This is a blocking wait,
599      # so if there's nothing to do, we just sit here.
600      x = queue.get()
601      if isinstance(x, _AllTasksComplete):
602        # All tasks are complete, so we should exit.
603        break
604      elif not isinstance(x, list):
605        x = task_args + list(x)
606      else:
607        x = task_args + x
608
609      # If no tasks failed yet, process the remaining tasks.
610      if not errors:
611        try:
612          task(*x, **task_kwargs)
613        except BaseException as ex:
614          errors.extend(
615              failures_lib.CreateExceptInfo(ex, traceback.format_exc()))
616
617    # Run exit handlers.
618    if onexit:
619      onexit()
620
621    # Propagate any exceptions.
622    if errors:
623      raise BackgroundFailure(exc_infos=errors)
624
625
626def RunParallelSteps(steps, max_parallel=None, halt_on_error=False,
627                     return_values=False):
628  """Run a list of functions in parallel.
629
630  This function blocks until all steps are completed.
631
632  The output from the functions is saved to a temporary file and printed as if
633  they were run in sequence.
634
635  If exceptions occur in the steps, we join together the tracebacks and print
636  them after all parallel tasks have finished running. Further, a
637  BackgroundFailure is raised with full stack traces of all exceptions.
638
639  Examples:
640    # This snippet will execute in parallel:
641    #   somefunc()
642    #   anotherfunc()
643    #   funcfunc()
644    steps = [somefunc, anotherfunc, funcfunc]
645    RunParallelSteps(steps)
646    # Blocks until all calls have completed.
647
648  Args:
649    steps: A list of functions to run.
650    max_parallel: The maximum number of simultaneous tasks to run in parallel.
651      By default, run all tasks in parallel.
652    halt_on_error: After the first exception occurs, halt any running steps,
653      and squelch any further output, including any exceptions that might occur.
654    return_values: If set to True, RunParallelSteps returns a list containing
655      the return values of the steps.  Defaults to False.
656
657  Returns:
658    If |return_values| is True, the function will return a list containing the
659    return values of the steps.
660  """
661  def ReturnWrapper(queue, fn):
662    """Put the return value of |fn| into |queue|."""
663    queue.put(fn())
664
665  full_steps = []
666  queues = []
667  with cros_build_lib.ContextManagerStack() as stack:
668    if return_values:
669      # We use a managed queue here, because the child process will wait for the
670      # queue(pipe) to be flushed (i.e., when items are read from the queue)
671      # before exiting, and with a regular queue this may result in hangs for
672      # large return values.  But with a managed queue, the manager process will
673      # read the items and hold on to them until the managed queue goes out of
674      # scope and is cleaned up.
675      manager = stack.Add(Manager)
676      for step in steps:
677        queue = manager.Queue()
678        queues.append(queue)
679        full_steps.append(functools.partial(ReturnWrapper, queue, step))
680    else:
681      full_steps = steps
682
683    with _BackgroundTask.ParallelTasks(full_steps, max_parallel=max_parallel,
684                                       halt_on_error=halt_on_error):
685      pass
686
687    if return_values:
688      return [queue.get_nowait() for queue in queues]
689
690
691class _AllTasksComplete(object):
692  """Sentinel object to indicate that all tasks are complete."""
693
694
695@contextlib.contextmanager
696def BackgroundTaskRunner(task, *args, **kwargs):
697  """Run the specified task on each queued input in a pool of processes.
698
699  This context manager starts a set of workers in the background, who each
700  wait for input on the specified queue. For each input on the queue, these
701  workers run task(*args + *input, **kwargs). Note that certain kwargs will
702  not pass through to the task (see Args below for the list).
703
704  The output from these tasks is saved to a temporary file. When control
705  returns to the context manager, the background output is printed in order,
706  as if the tasks were run in sequence.
707
708  If exceptions occur in the steps, we join together the tracebacks and print
709  them after all parallel tasks have finished running. Further, a
710  BackgroundFailure is raised with full stack traces of all exceptions.
711
712  Examples:
713    # This will run somefunc(1, 'small', 'cow', foo='bar') in the background
714    # as soon as data is added to the queue (i.e. queue.put() is called).
715
716    def somefunc(arg1, arg2, arg3, foo=None):
717      ...
718
719    with BackgroundTaskRunner(somefunc, 1, foo='bar') as queue:
720      ... do random stuff ...
721      queue.put(['small', 'cow'])
722      ... do more random stuff while somefunc() runs ...
723    # Exiting the with statement will block until all calls have completed.
724
725  Args:
726    task: Function to run on each queued input.
727    queue: A queue of tasks to run. Add tasks to this queue, and they will
728      be run in the background.  If None, one will be created on the fly.
729    processes: Number of processes to launch.
730    onexit: Function to run in each background process after all inputs are
731      processed.
732    halt_on_error: After the first exception occurs, halt any running steps, and
733      squelch any further output, including any exceptions that might occur.
734      Halts on exceptions in any of the background processes, or in the
735      foreground process using the BackgroundTaskRunner.
736  """
737
738  queue = kwargs.pop('queue', None)
739  processes = kwargs.pop('processes', None)
740  onexit = kwargs.pop('onexit', None)
741  halt_on_error = kwargs.pop('halt_on_error', False)
742
743  with cros_build_lib.ContextManagerStack() as stack:
744    if queue is None:
745      manager = stack.Add(Manager)
746      queue = manager.Queue()
747
748    if not processes:
749      processes = multiprocessing.cpu_count()
750
751    child = functools.partial(_BackgroundTask.TaskRunner, queue, task,
752                              onexit=onexit, task_args=args,
753                              task_kwargs=kwargs)
754    steps = [child] * processes
755    with _BackgroundTask.ParallelTasks(steps, halt_on_error=halt_on_error):
756      try:
757        yield queue
758      finally:
759        for _ in range(processes):
760          queue.put(_AllTasksComplete())
761
762
763def RunTasksInProcessPool(task, inputs, processes=None, onexit=None):
764  """Run the specified function with each supplied input in a pool of processes.
765
766  This function runs task(*x) for x in inputs in a pool of processes. This
767  function blocks until all tasks are completed.
768
769  The output from these tasks is saved to a temporary file. When control
770  returns to the context manager, the background output is printed in order,
771  as if the tasks were run in sequence.
772
773  If exceptions occur in the steps, we join together the tracebacks and print
774  them after all parallel tasks have finished running. Further, a
775  BackgroundFailure is raised with full stack traces of all exceptions.
776
777  Examples:
778    # This snippet will execute in parallel:
779    #   somefunc('hi', 'fat', 'code')
780    #   somefunc('foo', 'bar', 'cow')
781
782    def somefunc(arg1, arg2, arg3):
783      ...
784    ...
785    inputs = [
786      ['hi', 'fat', 'code'],
787      ['foo', 'bar', 'cow'],
788    ]
789    RunTasksInProcessPool(somefunc, inputs)
790    # Blocks until all calls have completed.
791
792  Args:
793    task: Function to run on each input.
794    inputs: List of inputs.
795    processes: Number of processes, at most, to launch.
796    onexit: Function to run in each background process after all inputs are
797      processed.
798
799  Returns:
800    Returns a list containing the return values of the task for each input.
801  """
802  if not processes:
803    # - Use >=16 processes by default, in case it's a network-bound operation.
804    # - Try to use all of the CPUs, in case it's a CPU-bound operation.
805    processes = min(max(16, multiprocessing.cpu_count()), len(inputs))
806
807  with Manager() as manager:
808    # Set up output queue.
809    out_queue = manager.Queue()
810    fn = lambda idx, task_args: out_queue.put((idx, task(*task_args)))
811
812    # Micro-optimization: Setup the queue so that BackgroundTaskRunner
813    # doesn't have to set up another Manager process.
814    queue = manager.Queue()
815
816    with BackgroundTaskRunner(fn, queue=queue, processes=processes,
817                              onexit=onexit) as queue:
818      for idx, input_args in enumerate(inputs):
819        queue.put((idx, input_args))
820
821    return [x[1] for x in sorted(out_queue.get() for _ in range(len(inputs)))]
822
823
824PR_SET_PDEATHSIG = 1
825
826
827def ExitWithParent(sig=signal.SIGHUP):
828  """Sets this process to receive |sig| when the parent dies.
829
830  Note: this uses libc, so it only works on linux.
831
832  Args:
833    sig: Signal to recieve. Defaults to SIGHUP.
834
835  Returns:
836    Whether we were successful in setting the deathsignal flag
837  """
838  libc_name = ctypes.util.find_library('c')
839  if not libc_name:
840    return False
841  try:
842    libc = ctypes.CDLL(libc_name)
843    libc.prctl(PR_SET_PDEATHSIG, sig)
844    return True
845  # We might not be able to load the library (OSError), or prctl might be
846  # missing (AttributeError)
847  except (OSError, AttributeError):
848    return False
849