xref: /aosp_15_r20/external/angle/build/android/fast_local_dev_server.py (revision 8975f5c5ed3d1c378011245431ada316dfb6f244)
1#!/usr/bin/env python3
2# Copyright 2021 The Chromium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Creates an server to offload non-critical-path GN targets."""
6
7from __future__ import annotations
8
9import argparse
10import collections
11import contextlib
12import datetime
13import json
14import os
15import pathlib
16import re
17import shutil
18import socket
19import subprocess
20import sys
21import threading
22import traceback
23import time
24from typing import Callable, Dict, List, Optional, Tuple, IO
25
26sys.path.append(os.path.join(os.path.dirname(__file__), 'gyp'))
27from util import server_utils
28
29_SOCKET_TIMEOUT = 300  # seconds
30
31_LOGFILES = {}
32_LOGFILE_NAME = 'buildserver.log'
33_MAX_LOGFILES = 6
34
35FIRST_LOG_LINE = '#### Start of log for build_id = {build_id} ####\n'
36BUILD_ID_RE = re.compile(r'^#### .*build_id = (?P<build_id>.+) ####')
37
38
39def log(msg: str, quiet: bool = False):
40  if quiet:
41    return
42  # Ensure we start our message on a new line.
43  print('\n' + msg)
44
45
46def set_status(msg: str, *, quiet: bool = False, build_id: str = None):
47  prefix = f'[{TaskStats.prefix()}] '
48  # if message is specific to a build then also output to its logfile.
49  if build_id:
50    log_to_file(f'{prefix}{msg}', build_id=build_id)
51
52  # No need to also output to the terminal if quiet.
53  if quiet:
54    return
55  # Shrink the message (leaving a 2-char prefix and use the rest of the room
56  # for the suffix) according to terminal size so it is always one line.
57  width = shutil.get_terminal_size().columns
58  max_msg_width = width - len(prefix)
59  if len(msg) > max_msg_width:
60    length_to_show = max_msg_width - 5  # Account for ellipsis and header.
61    msg = f'{msg[:2]}...{msg[-length_to_show:]}'
62  # \r to return the carriage to the beginning of line.
63  # \033[K to replace the normal \n to erase until the end of the line.
64  # Avoid the default line ending so the next \r overwrites the same line just
65  #     like ninja's output.
66  print(f'\r{prefix}{msg}\033[K', end='', flush=True)
67
68
69def log_to_file(message: str, build_id: str):
70  logfile = _LOGFILES.get(build_id)
71  print(message, file=logfile, flush=True)
72
73
74def _exception_hook(exctype: type, exc: Exception, tb):
75  # Output uncaught exceptions to all live terminals
76  BuildManager.broadcast(''.join(traceback.format_exception(exctype, exc, tb)))
77  # Cancel all pending tasks cleanly (i.e. delete stamp files if necessary).
78  TaskManager.deactivate()
79  sys.__excepthook__(exctype, exc, tb)
80
81
82def create_logfile(build_id, outdir):
83  if logfile := _LOGFILES.get(build_id, None):
84    return logfile
85
86  outdir = pathlib.Path(outdir)
87  latest_logfile = outdir / f'{_LOGFILE_NAME}.0'
88
89  if latest_logfile.exists():
90    with latest_logfile.open('rt') as f:
91      first_line = f.readline()
92      if log_build_id := BUILD_ID_RE.search(first_line):
93        # If the newest logfile on disk is referencing the same build we are
94        # currently processing, we probably crashed previously and we should
95        # pick up where we left off in the same logfile.
96        if log_build_id.group('build_id') == build_id:
97          _LOGFILES[build_id] = latest_logfile.open('at')
98          return _LOGFILES[build_id]
99
100  # Do the logfile name shift.
101  filenames = os.listdir(outdir)
102  logfiles = {f for f in filenames if f.startswith(_LOGFILE_NAME)}
103  for idx in reversed(range(_MAX_LOGFILES)):
104    current_name = f'{_LOGFILE_NAME}.{idx}'
105    next_name = f'{_LOGFILE_NAME}.{idx+1}'
106    if current_name in logfiles:
107      shutil.move(os.path.join(outdir, current_name),
108                  os.path.join(outdir, next_name))
109
110  # Create a new 0th logfile.
111  logfile = latest_logfile.open('wt')
112  _LOGFILES[build_id] = logfile
113  logfile.write(FIRST_LOG_LINE.format(build_id=build_id))
114  logfile.flush()
115  return logfile
116
117
118class TaskStats:
119  """Class to keep track of aggregate stats for all tasks across threads."""
120  _num_processes = 0
121  _completed_tasks = 0
122  _total_tasks = 0
123  _total_task_count_per_build = collections.defaultdict(int)
124  _completed_task_count_per_build = collections.defaultdict(int)
125  _running_processes_count_per_build = collections.defaultdict(int)
126  _lock = threading.Lock()
127
128  @classmethod
129  def no_running_processes(cls):
130    with cls._lock:
131      return cls._num_processes == 0
132
133  @classmethod
134  def add_task(cls, build_id: str):
135    with cls._lock:
136      cls._total_tasks += 1
137      cls._total_task_count_per_build[build_id] += 1
138
139  @classmethod
140  def add_process(cls, build_id: str):
141    with cls._lock:
142      cls._num_processes += 1
143      cls._running_processes_count_per_build[build_id] += 1
144
145  @classmethod
146  def remove_process(cls, build_id: str):
147    with cls._lock:
148      cls._num_processes -= 1
149      cls._running_processes_count_per_build[build_id] -= 1
150
151  @classmethod
152  def complete_task(cls, build_id: str):
153    with cls._lock:
154      cls._completed_tasks += 1
155      cls._completed_task_count_per_build[build_id] += 1
156
157  @classmethod
158  def num_pending_tasks(cls, build_id: str = None):
159    with cls._lock:
160      if build_id:
161        return cls._total_task_count_per_build[
162            build_id] - cls._completed_task_count_per_build[build_id]
163      return cls._total_tasks - cls._completed_tasks
164
165  @classmethod
166  def num_completed_tasks(cls, build_id: str = None):
167    with cls._lock:
168      if build_id:
169        return cls._completed_task_count_per_build[build_id]
170      return cls._completed_tasks
171
172  @classmethod
173  def prefix(cls, build_id: str = None):
174    # Ninja's prefix is: [205 processes, 6/734 @ 6.5/s : 0.922s ]
175    # Time taken and task completion rate are not important for the build server
176    # since it is always running in the background and uses idle priority for
177    # its tasks.
178    with cls._lock:
179      if build_id:
180        _num_processes = cls._running_processes_count_per_build[build_id]
181        _completed_tasks = cls._completed_task_count_per_build[build_id]
182        _total_tasks = cls._total_task_count_per_build[build_id]
183      else:
184        _num_processes = cls._num_processes
185        _completed_tasks = cls._completed_tasks
186        _total_tasks = cls._total_tasks
187      word = 'process' if _num_processes == 1 else 'processes'
188      return (f'{_num_processes} {word}, '
189              f'{_completed_tasks}/{_total_tasks}')
190
191
192def check_pid_alive(pid: int):
193  try:
194    os.kill(pid, 0)
195  except OSError:
196    return False
197  return True
198
199
200class BuildManager:
201  _live_builders: dict[str, int] = dict()
202  _build_ttys: dict[str, IO[str]] = dict()
203  _lock = threading.RLock()
204
205  @classmethod
206  def register_builder(cls, build_id, builder_pid):
207    with cls._lock:
208      cls._live_builders[build_id] = int(builder_pid)
209
210  @classmethod
211  def register_tty(cls, build_id, tty):
212    with cls._lock:
213      cls._build_ttys[build_id] = tty
214
215  @classmethod
216  def get_live_builds(cls):
217    with cls._lock:
218      for build_id, builder_pid in list(cls._live_builders.items()):
219        if not check_pid_alive(builder_pid):
220          del cls._live_builders[build_id]
221      return list(cls._live_builders.keys())
222
223  @classmethod
224  def broadcast(cls, msg: str):
225    seen = set()
226    with cls._lock:
227      for tty in cls._build_ttys.values():
228        # Do not output to the same tty multiple times. Use st_ino and st_dev to
229        # compare open file descriptors.
230        st = os.stat(tty.fileno())
231        key = (st.st_ino, st.st_dev)
232        if key in seen:
233          continue
234        seen.add(key)
235        try:
236          tty.write(msg + '\n')
237          tty.flush()
238        except BrokenPipeError:
239          pass
240
241  @classmethod
242  def has_live_builds(cls):
243    return bool(cls.get_live_builds())
244
245
246class TaskManager:
247  """Class to encapsulate a threadsafe queue and handle deactivating it."""
248  _queue: collections.deque[Task] = collections.deque()
249  _deactivated = False
250  _lock = threading.RLock()
251
252  @classmethod
253  def add_task(cls, task: Task, options):
254    assert not cls._deactivated
255    TaskStats.add_task(build_id=task.build_id)
256    with cls._lock:
257      cls._queue.appendleft(task)
258    set_status(f'QUEUED {task.name}',
259               quiet=options.quiet,
260               build_id=task.build_id)
261    cls._maybe_start_tasks()
262
263  @classmethod
264  def deactivate(cls):
265    cls._deactivated = True
266    with cls._lock:
267      while cls._queue:
268        task = cls._queue.pop()
269        task.terminate()
270
271  @classmethod
272  def cancel_build(cls, build_id):
273    terminated_tasks = []
274    with cls._lock:
275      for task in cls._queue:
276        if task.build_id == build_id:
277          task.terminate()
278          terminated_tasks.append(task)
279      for task in terminated_tasks:
280        cls._queue.remove(task)
281
282  @staticmethod
283  # pylint: disable=inconsistent-return-statements
284  def _num_running_processes():
285    with open('/proc/stat') as f:
286      for line in f:
287        if line.startswith('procs_running'):
288          return int(line.rstrip().split()[1])
289    assert False, 'Could not read /proc/stat'
290
291  @classmethod
292  def _maybe_start_tasks(cls):
293    if cls._deactivated:
294      return
295    # Include load avg so that a small dip in the number of currently running
296    # processes will not cause new tasks to be started while the overall load is
297    # heavy.
298    cur_load = max(cls._num_running_processes(), os.getloadavg()[0])
299    num_started = 0
300    # Always start a task if we don't have any running, so that all tasks are
301    # eventually finished. Try starting up tasks when the overall load is light.
302    # Limit to at most 2 new tasks to prevent ramping up too fast. There is a
303    # chance where multiple threads call _maybe_start_tasks and each gets to
304    # spawn up to 2 new tasks, but since the only downside is some build tasks
305    # get worked on earlier rather than later, it is not worth mitigating.
306    while num_started < 2 and (TaskStats.no_running_processes()
307                               or num_started + cur_load < os.cpu_count()):
308      with cls._lock:
309        try:
310          next_task = cls._queue.pop()
311        except IndexError:
312          return
313      num_started += next_task.start(cls._maybe_start_tasks)
314
315
316# TODO(wnwen): Break this into Request (encapsulating what ninja sends) and Task
317#              when a Request starts to be run. This would eliminate ambiguity
318#              about when and whether _proc/_thread are initialized.
319class Task:
320  """Class to represent one task and operations on it."""
321
322  def __init__(self, name: str, cwd: str, cmd: List[str], tty: IO[str],
323               stamp_file: str, build_id: str, remote_print: bool, options):
324    self.name = name
325    self.cwd = cwd
326    self.cmd = cmd
327    self.stamp_file = stamp_file
328    self.tty = tty
329    self.build_id = build_id
330    self.remote_print = remote_print
331    self.options = options
332    self._terminated = False
333    self._replaced = False
334    self._lock = threading.RLock()
335    self._proc: Optional[subprocess.Popen] = None
336    self._thread: Optional[threading.Thread] = None
337    self._delete_stamp_thread: Optional[threading.Thread] = None
338    self._return_code: Optional[int] = None
339
340  @property
341  def key(self):
342    return (self.cwd, self.name)
343
344  def __eq__(self, other):
345    return self.key == other.key and self.build_id == other.build_id
346
347  def start(self, on_complete_callback: Callable[[], None]) -> int:
348    """Starts the task if it has not already been terminated.
349
350    Returns the number of processes that have been started. This is called at
351    most once when the task is popped off the task queue."""
352
353    # The environment variable forces the script to actually run in order to
354    # avoid infinite recursion.
355    env = os.environ.copy()
356    env[server_utils.BUILD_SERVER_ENV_VARIABLE] = '1'
357
358    with self._lock:
359      if self._terminated:
360        return 0
361
362      # Use os.nice(19) to ensure the lowest priority (idle) for these analysis
363      # tasks since we want to avoid slowing down the actual build.
364      # TODO(wnwen): Use ionice to reduce resource consumption.
365      TaskStats.add_process(self.build_id)
366      set_status(f'STARTING {self.name}',
367                 quiet=self.options.quiet,
368                 build_id=self.build_id)
369      # This use of preexec_fn is sufficiently simple, just one os.nice call.
370      # pylint: disable=subprocess-popen-preexec-fn
371      self._proc = subprocess.Popen(
372          self.cmd,
373          stdout=subprocess.PIPE,
374          stderr=subprocess.STDOUT,
375          cwd=self.cwd,
376          env=env,
377          text=True,
378          preexec_fn=lambda: os.nice(19),
379      )
380      self._thread = threading.Thread(
381          target=self._complete_when_process_finishes,
382          args=(on_complete_callback, ))
383      self._thread.start()
384      return 1
385
386  def terminate(self, replaced=False):
387    """Can be called multiple times to cancel and ignore the task's output."""
388
389    with self._lock:
390      if self._terminated:
391        return
392      self._terminated = True
393      self._replaced = replaced
394
395    # It is safe to access _proc and _thread outside of _lock since they are
396    # only changed by self.start holding _lock when self._terminate is false.
397    # Since we have just set self._terminate to true inside of _lock, we know
398    # that neither _proc nor _thread will be changed from this point onwards.
399    if self._proc:
400      self._proc.terminate()
401      self._proc.wait()
402    # Ensure that self._complete is called either by the thread or by us.
403    if self._thread:
404      self._thread.join()
405    else:
406      self._complete()
407
408  def _complete_when_process_finishes(self,
409                                      on_complete_callback: Callable[[], None]):
410    assert self._proc
411    # We know Popen.communicate will return a str and not a byte since it is
412    # constructed with text=True.
413    stdout: str = self._proc.communicate()[0]
414    self._return_code = self._proc.returncode
415    TaskStats.remove_process(build_id=self.build_id)
416    self._complete(stdout)
417    on_complete_callback()
418
419  def _complete(self, stdout: str = ''):
420    """Update the user and ninja after the task has run or been terminated.
421
422    This method should only be run once per task. Avoid modifying the task so
423    that this method does not need locking."""
424
425    delete_stamp = False
426    status_string = 'FINISHED'
427    if self._terminated:
428      status_string = 'TERMINATED'
429      # When tasks are replaced, avoid deleting the stamp file, context:
430      # https://issuetracker.google.com/301961827.
431      if not self._replaced:
432        delete_stamp = True
433    elif stdout or self._return_code != 0:
434      status_string = 'FAILED'
435      delete_stamp = True
436      preamble = [
437          f'FAILED: {self.name}',
438          f'Return code: {self._return_code}',
439          'CMD: ' + ' '.join(self.cmd),
440          'STDOUT:',
441      ]
442
443      message = '\n'.join(preamble + [stdout])
444      log_to_file(message, build_id=self.build_id)
445      log(message, quiet=self.options.quiet)
446      if self.remote_print:
447        # Add emoji to show that output is from the build server.
448        preamble = [f'⏩ {line}' for line in preamble]
449        remote_message = '\n'.join(preamble + [stdout])
450        # Add a new line at start of message to clearly delineate from previous
451        # output/text already on the remote tty we are printing to.
452        self.tty.write(f'\n{remote_message}')
453        self.tty.flush()
454    set_status(f'{status_string} {self.name}',
455               quiet=self.options.quiet,
456               build_id=self.build_id)
457    if delete_stamp:
458      # Force siso to consider failed targets as dirty.
459      try:
460        os.unlink(os.path.join(self.cwd, self.stamp_file))
461      except FileNotFoundError:
462        pass
463    else:
464      # We do not care about the action writing a too new mtime. Siso only cares
465      # about the mtime that is recorded in its database at the time the
466      # original action finished.
467      pass
468    TaskStats.complete_task(build_id=self.build_id)
469
470
471def _handle_add_task(data, current_tasks: Dict[Tuple[str, str], Task], options):
472  """Handle messages of type ADD_TASK."""
473  build_id = data['build_id']
474  task_outdir = data['cwd']
475
476  is_experimental = data.get('experimental', False)
477  tty = None
478  if is_experimental:
479    tty = open(data['tty'], 'wt')
480    BuildManager.register_tty(build_id, tty)
481
482  # Make sure a logfile for the build_id exists.
483  create_logfile(build_id, task_outdir)
484
485  new_task = Task(name=data['name'],
486                  cwd=task_outdir,
487                  cmd=data['cmd'],
488                  tty=tty,
489                  build_id=build_id,
490                  remote_print=is_experimental,
491                  stamp_file=data['stamp_file'],
492                  options=options)
493  existing_task = current_tasks.get(new_task.key)
494  if existing_task:
495    existing_task.terminate(replaced=True)
496  current_tasks[new_task.key] = new_task
497
498  TaskManager.add_task(new_task, options)
499
500
501def _handle_query_build(data, connection: socket.socket):
502  """Handle messages of type QUERY_BUILD."""
503  build_id = data['build_id']
504  pending_tasks = TaskStats.num_pending_tasks(build_id)
505  completed_tasks = TaskStats.num_completed_tasks(build_id)
506  response = {
507      'build_id': build_id,
508      'completed_tasks': completed_tasks,
509      'pending_tasks': pending_tasks,
510  }
511  try:
512    with connection:
513      server_utils.SendMessage(connection, json.dumps(response).encode('utf8'))
514  except BrokenPipeError:
515    # We should not die because the client died.
516    pass
517
518
519def _handle_heartbeat(connection: socket.socket):
520  """Handle messages of type POLL_HEARTBEAT."""
521  try:
522    with connection:
523      server_utils.SendMessage(connection,
524                               json.dumps({
525                                   'status': 'OK'
526                               }).encode('utf8'))
527  except BrokenPipeError:
528    # We should not die because the client died.
529    pass
530
531
532def _handle_register_builder(data):
533  """Handle messages of type REGISTER_BUILDER."""
534  build_id = data['build_id']
535  builder_pid = data['builder_pid']
536  BuildManager.register_builder(build_id, builder_pid)
537
538
539def _handle_cancel_build(data):
540  """Handle messages of type CANCEL_BUILD."""
541  build_id = data['build_id']
542  TaskManager.cancel_build(build_id)
543
544
545def _listen_for_request_data(sock: socket.socket):
546  """Helper to encapsulate getting a new message."""
547  while True:
548    conn = sock.accept()[0]
549    message_bytes = server_utils.ReceiveMessage(conn)
550    if message_bytes:
551      yield json.loads(message_bytes), conn
552
553
554def _process_requests(sock: socket.socket, options):
555  """Main loop for build server receiving request messages."""
556  # Since dicts in python can contain anything, explicitly type tasks to help
557  # make static type checking more useful.
558  tasks: Dict[Tuple[str, str], Task] = {}
559  log(
560      'READY... Remember to set android_static_analysis="build_server" in '
561      'args.gn files',
562      quiet=options.quiet)
563  # pylint: disable=too-many-nested-blocks
564  try:
565    while True:
566      try:
567        for data, connection in _listen_for_request_data(sock):
568          message_type = data.get('message_type', server_utils.ADD_TASK)
569          if message_type == server_utils.POLL_HEARTBEAT:
570            _handle_heartbeat(connection)
571          if message_type == server_utils.ADD_TASK:
572            connection.close()
573            _handle_add_task(data, tasks, options)
574          if message_type == server_utils.QUERY_BUILD:
575            _handle_query_build(data, connection)
576          if message_type == server_utils.REGISTER_BUILDER:
577            connection.close()
578            _handle_register_builder(data)
579          if message_type == server_utils.CANCEL_BUILD:
580            connection.close()
581            _handle_cancel_build(data)
582      except TimeoutError:
583        # If we have not received a new task in a while and do not have any
584        # pending tasks or running builds, then exit. Otherwise keep waiting.
585        if (TaskStats.num_pending_tasks() == 0
586            and not BuildManager.has_live_builds() and options.exit_on_idle):
587          break
588      except KeyboardInterrupt:
589        break
590  finally:
591    log('STOPPING SERVER...', quiet=options.quiet)
592    # Gracefully shut down the task manager, terminating all queued tasks.
593    TaskManager.deactivate()
594    # Terminate all currently running tasks.
595    for task in tasks.values():
596      task.terminate()
597    log('STOPPED', quiet=options.quiet)
598
599
600def query_build_info(build_id):
601  """Communicates with the main server to query build info."""
602  with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
603    sock.connect(server_utils.SOCKET_ADDRESS)
604    sock.settimeout(3)
605    server_utils.SendMessage(
606        sock,
607        json.dumps({
608            'message_type': server_utils.QUERY_BUILD,
609            'build_id': build_id,
610        }).encode('utf8'))
611    response_bytes = server_utils.ReceiveMessage(sock)
612    return json.loads(response_bytes)
613
614
615def _wait_for_build(build_id):
616  """Comunicates with the main server waiting for a build to complete."""
617  start_time = datetime.datetime.now()
618  while True:
619    build_info = query_build_info(build_id)
620    pending_tasks = build_info['pending_tasks']
621
622    if pending_tasks == 0:
623      print(f'\nAll tasks completed for build_id: {build_id}.')
624      return 0
625
626    current_time = datetime.datetime.now()
627    duration = current_time - start_time
628    print(f'\rWaiting for {pending_tasks} tasks [{str(duration)}]\033[K',
629          end='',
630          flush=True)
631    time.sleep(1)
632
633
634def _check_if_running():
635  """Communicates with the main server to make sure its running."""
636  with socket.socket(socket.AF_UNIX) as sock:
637    try:
638      sock.connect(server_utils.SOCKET_ADDRESS)
639    except socket.error:
640      print('Build server is not running and '
641            'android_static_analysis="build_server" is set.\nPlease run '
642            'this command in a separate terminal:\n\n'
643            '$ build/android/fast_local_dev_server.py\n')
644      return 1
645    else:
646      return 0
647
648
649def _send_message_and_close(message_dict):
650  with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
651    sock.connect(server_utils.SOCKET_ADDRESS)
652    sock.settimeout(3)
653    server_utils.SendMessage(sock, json.dumps(message_dict).encode('utf8'))
654
655
656def _send_cancel_build(build_id):
657  _send_message_and_close({
658      'message_type': server_utils.CANCEL_BUILD,
659      'build_id': build_id,
660  })
661  return 0
662
663
664def _register_builder(build_id, builder_pid):
665  for _attempt in range(3):
666    try:
667      _send_message_and_close({
668          'message_type': server_utils.REGISTER_BUILDER,
669          'build_id': build_id,
670          'builder_pid': builder_pid,
671      })
672      return 0
673    except socket.error:
674      time.sleep(0.05)
675  print(f'Failed to register builer for build_id={build_id}.')
676  return 1
677
678
679def _wait_for_task_requests(args):
680  with socket.socket(socket.AF_UNIX) as sock:
681    sock.settimeout(_SOCKET_TIMEOUT)
682    try:
683      sock.bind(server_utils.SOCKET_ADDRESS)
684    except socket.error as e:
685      # errno 98 is Address already in use
686      if e.errno == 98:
687        print('fast_local_dev_server.py is already running.')
688        return 1
689      raise
690    sock.listen()
691    _process_requests(sock, args)
692  return 0
693
694
695def main():
696  parser = argparse.ArgumentParser(description=__doc__)
697  parser.add_argument(
698      '--fail-if-not-running',
699      action='store_true',
700      help='Used by GN to fail fast if the build server is not running.')
701  parser.add_argument(
702      '--exit-on-idle',
703      action='store_true',
704      help='Server started on demand. Exit when all tasks run out.')
705  parser.add_argument('--quiet',
706                      action='store_true',
707                      help='Do not output status updates.')
708  parser.add_argument('--wait-for-build',
709                      metavar='BUILD_ID',
710                      help='Wait for build server to finish with all tasks '
711                      'for BUILD_ID and output any pending messages.')
712  parser.add_argument(
713      '--register-build-id',
714      metavar='BUILD_ID',
715      help='Inform the build server that a new build has started.')
716  parser.add_argument('--builder-pid',
717                      help='Builder process\'s pid for build BUILD_ID.')
718  parser.add_argument('--cancel-build',
719                      metavar='BUILD_ID',
720                      help='Cancel all pending and running tasks for BUILD_ID.')
721  args = parser.parse_args()
722  if args.fail_if_not_running:
723    return _check_if_running()
724  if args.wait_for_build:
725    return _wait_for_build(args.wait_for_build)
726  if args.register_build_id:
727    return _register_builder(args.register_build_id, args.builder_pid)
728  if args.cancel_build:
729    return _send_cancel_build(args.cancel_build)
730  return _wait_for_task_requests(args)
731
732
733if __name__ == '__main__':
734  sys.excepthook = _exception_hook
735  sys.exit(main())
736