1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Run a group of subprocesses and then finish.""" 15 16import errno 17import logging 18import multiprocessing 19import os 20import platform 21import re 22import signal 23import subprocess 24import sys 25import tempfile 26import time 27 28# cpu cost measurement 29measure_cpu_costs = False 30 31_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count() 32# Maximum number of bytes of job's stdout that will be stored in the result. 33# Only last N bytes of stdout will be kept if the actual output longer. 34_MAX_RESULT_SIZE = 64 * 1024 35 36 37# NOTE: If you change this, please make sure to test reviewing the 38# github PR with http://reviewable.io, which is known to add UTF-8 39# characters to the PR description, which leak into the environment here 40# and cause failures. 41def strip_non_ascii_chars(s): 42 return "".join(c for c in s if ord(c) < 128) 43 44 45def sanitized_environment(env): 46 sanitized = {} 47 for key, value in list(env.items()): 48 sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value) 49 return sanitized 50 51 52def platform_string(): 53 if platform.system() == "Windows": 54 return "windows" 55 elif platform.system()[:7] == "MSYS_NT": 56 return "windows" 57 elif platform.system() == "Darwin": 58 return "mac" 59 elif platform.system() == "Linux": 60 return "linux" 61 else: 62 return "posix" 63 64 65# setup a signal handler so that signal.pause registers 'something' 66# when a child finishes 67# not using futures and threading to avoid a dependency on subprocess32 68if platform_string() == "windows": 69 pass 70else: 71 72 def alarm_handler(unused_signum, unused_frame): 73 pass 74 75 signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None) 76 signal.signal(signal.SIGALRM, alarm_handler) 77 78_SUCCESS = object() 79_FAILURE = object() 80_RUNNING = object() 81_KILLED = object() 82 83_COLORS = { 84 "red": [31, 0], 85 "green": [32, 0], 86 "yellow": [33, 0], 87 "lightgray": [37, 0], 88 "gray": [30, 1], 89 "purple": [35, 0], 90 "cyan": [36, 0], 91} 92 93_BEGINNING_OF_LINE = "\x1b[0G" 94_CLEAR_LINE = "\x1b[2K" 95 96_TAG_COLOR = { 97 "FAILED": "red", 98 "FLAKE": "purple", 99 "TIMEOUT_FLAKE": "purple", 100 "WARNING": "yellow", 101 "TIMEOUT": "red", 102 "PASSED": "green", 103 "START": "gray", 104 "WAITING": "yellow", 105 "SUCCESS": "green", 106 "IDLE": "gray", 107 "SKIPPED": "cyan", 108} 109 110_FORMAT = "%(asctime)-15s %(message)s" 111logging.basicConfig(level=logging.INFO, format=_FORMAT) 112 113 114def eintr_be_gone(fn): 115 """Run fn until it doesn't stop because of EINTR""" 116 while True: 117 try: 118 return fn() 119 except IOError as e: 120 if e.errno != errno.EINTR: 121 raise 122 123 124def message(tag, msg, explanatory_text=None, do_newline=False): 125 if ( 126 message.old_tag == tag 127 and message.old_msg == msg 128 and not explanatory_text 129 ): 130 return 131 message.old_tag = tag 132 message.old_msg = msg 133 if explanatory_text: 134 if isinstance(explanatory_text, bytes): 135 explanatory_text = explanatory_text.decode("utf8", errors="replace") 136 while True: 137 try: 138 if platform_string() == "windows" or not sys.stdout.isatty(): 139 if explanatory_text: 140 logging.info(explanatory_text) 141 logging.info("%s: %s", tag, msg) 142 else: 143 sys.stdout.write( 144 "%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s" 145 % ( 146 _BEGINNING_OF_LINE, 147 _CLEAR_LINE, 148 "\n%s" % explanatory_text 149 if explanatory_text is not None 150 else "", 151 _COLORS[_TAG_COLOR[tag]][1], 152 _COLORS[_TAG_COLOR[tag]][0], 153 tag, 154 msg, 155 "\n" 156 if do_newline or explanatory_text is not None 157 else "", 158 ) 159 ) 160 sys.stdout.flush() 161 return 162 except IOError as e: 163 if e.errno != errno.EINTR: 164 raise 165 166 167message.old_tag = "" 168message.old_msg = "" 169 170 171def which(filename): 172 if "/" in filename: 173 return filename 174 for path in os.environ["PATH"].split(os.pathsep): 175 if os.path.exists(os.path.join(path, filename)): 176 return os.path.join(path, filename) 177 raise Exception("%s not found" % filename) 178 179 180class JobSpec(object): 181 """Specifies what to run for a job.""" 182 183 def __init__( 184 self, 185 cmdline, 186 shortname=None, 187 environ=None, 188 cwd=None, 189 shell=False, 190 timeout_seconds=5 * 60, 191 flake_retries=0, 192 timeout_retries=0, 193 kill_handler=None, 194 cpu_cost=1.0, 195 verbose_success=False, 196 logfilename=None, 197 ): 198 """ 199 Arguments: 200 cmdline: a list of arguments to pass as the command line 201 environ: a dictionary of environment variables to set in the child process 202 kill_handler: a handler that will be called whenever job.kill() is invoked 203 cpu_cost: number of cores per second this job needs 204 logfilename: use given file to store job's output, rather than using a temporary file 205 """ 206 if environ is None: 207 environ = {} 208 self.cmdline = cmdline 209 self.environ = environ 210 self.shortname = cmdline[0] if shortname is None else shortname 211 self.cwd = cwd 212 self.shell = shell 213 self.timeout_seconds = timeout_seconds 214 self.flake_retries = flake_retries 215 self.timeout_retries = timeout_retries 216 self.kill_handler = kill_handler 217 self.cpu_cost = cpu_cost 218 self.verbose_success = verbose_success 219 self.logfilename = logfilename 220 if ( 221 self.logfilename 222 and self.flake_retries != 0 223 and self.timeout_retries != 0 224 ): 225 # Forbidden to avoid overwriting the test log when retrying. 226 raise Exception( 227 "Cannot use custom logfile when retries are enabled" 228 ) 229 230 def identity(self): 231 return "%r %r" % (self.cmdline, self.environ) 232 233 def __hash__(self): 234 return hash(self.identity()) 235 236 def __cmp__(self, other): 237 return self.identity() == other.identity() 238 239 def __lt__(self, other): 240 return self.identity() < other.identity() 241 242 def __repr__(self): 243 return "JobSpec(shortname=%s, cmdline=%s)" % ( 244 self.shortname, 245 self.cmdline, 246 ) 247 248 def __str__(self): 249 return "%s: %s %s" % ( 250 self.shortname, 251 " ".join("%s=%s" % kv for kv in list(self.environ.items())), 252 " ".join(self.cmdline), 253 ) 254 255 256class JobResult(object): 257 def __init__(self): 258 self.state = "UNKNOWN" 259 self.returncode = -1 260 self.elapsed_time = 0 261 self.num_failures = 0 262 self.retries = 0 263 self.message = "" 264 self.cpu_estimated = 1 265 self.cpu_measured = 1 266 267 268def read_from_start(f): 269 f.seek(0) 270 return f.read() 271 272 273class Job(object): 274 """Manages one job.""" 275 276 def __init__( 277 self, spec, newline_on_success, travis, add_env, quiet_success=False 278 ): 279 self._spec = spec 280 self._newline_on_success = newline_on_success 281 self._travis = travis 282 self._add_env = add_env.copy() 283 self._retries = 0 284 self._timeout_retries = 0 285 self._suppress_failure_message = False 286 self._quiet_success = quiet_success 287 if not self._quiet_success: 288 message("START", spec.shortname, do_newline=self._travis) 289 self.result = JobResult() 290 self.start() 291 292 def GetSpec(self): 293 return self._spec 294 295 def start(self): 296 if self._spec.logfilename: 297 # make sure the log directory exists 298 logfile_dir = os.path.dirname( 299 os.path.abspath(self._spec.logfilename) 300 ) 301 if not os.path.exists(logfile_dir): 302 os.makedirs(logfile_dir) 303 self._logfile = open(self._spec.logfilename, "w+") 304 else: 305 # macOS: a series of quick os.unlink invocation might cause OS 306 # error during the creation of temporary file. By using 307 # NamedTemporaryFile, we defer the removal of file and directory. 308 self._logfile = tempfile.NamedTemporaryFile() 309 env = dict(os.environ) 310 env.update(self._spec.environ) 311 env.update(self._add_env) 312 env = sanitized_environment(env) 313 self._start = time.time() 314 cmdline = self._spec.cmdline 315 # The Unix time command is finicky when used with MSBuild, so we don't use it 316 # with jobs that run MSBuild. 317 global measure_cpu_costs 318 if measure_cpu_costs and not "vsprojects\\build" in cmdline[0]: 319 cmdline = ["time", "-p"] + cmdline 320 else: 321 measure_cpu_costs = False 322 try_start = lambda: subprocess.Popen( 323 args=cmdline, 324 stderr=subprocess.STDOUT, 325 stdout=self._logfile, 326 cwd=self._spec.cwd, 327 shell=self._spec.shell, 328 env=env, 329 ) 330 delay = 0.3 331 for i in range(0, 4): 332 try: 333 self._process = try_start() 334 break 335 except OSError: 336 message( 337 "WARNING", 338 "Failed to start %s, retrying in %f seconds" 339 % (self._spec.shortname, delay), 340 ) 341 time.sleep(delay) 342 delay *= 2 343 else: 344 self._process = try_start() 345 self._state = _RUNNING 346 347 def state(self): 348 """Poll current state of the job. Prints messages at completion.""" 349 350 def stdout(self=self): 351 stdout = read_from_start(self._logfile) 352 self.result.message = stdout[-_MAX_RESULT_SIZE:] 353 return stdout 354 355 if self._state == _RUNNING and self._process.poll() is not None: 356 elapsed = time.time() - self._start 357 self.result.elapsed_time = elapsed 358 if self._process.returncode != 0: 359 if self._retries < self._spec.flake_retries: 360 message( 361 "FLAKE", 362 "%s [ret=%d, pid=%d]" 363 % ( 364 self._spec.shortname, 365 self._process.returncode, 366 self._process.pid, 367 ), 368 stdout(), 369 do_newline=True, 370 ) 371 self._retries += 1 372 self.result.num_failures += 1 373 self.result.retries = self._timeout_retries + self._retries 374 # NOTE: job is restarted regardless of jobset's max_time setting 375 self.start() 376 else: 377 self._state = _FAILURE 378 if not self._suppress_failure_message: 379 message( 380 "FAILED", 381 "%s [ret=%d, pid=%d, time=%.1fsec]" 382 % ( 383 self._spec.shortname, 384 self._process.returncode, 385 self._process.pid, 386 elapsed, 387 ), 388 stdout(), 389 do_newline=True, 390 ) 391 self.result.state = "FAILED" 392 self.result.num_failures += 1 393 self.result.returncode = self._process.returncode 394 else: 395 self._state = _SUCCESS 396 measurement = "" 397 if measure_cpu_costs: 398 m = re.search( 399 r"real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)", 400 (stdout()).decode("utf8", errors="replace"), 401 ) 402 real = float(m.group(1)) 403 user = float(m.group(2)) 404 sys = float(m.group(3)) 405 if real > 0.5: 406 cores = (user + sys) / real 407 self.result.cpu_measured = float("%.01f" % cores) 408 self.result.cpu_estimated = float( 409 "%.01f" % self._spec.cpu_cost 410 ) 411 measurement = "; cpu_cost=%.01f; estimated=%.01f" % ( 412 self.result.cpu_measured, 413 self.result.cpu_estimated, 414 ) 415 if not self._quiet_success: 416 message( 417 "PASSED", 418 "%s [time=%.1fsec, retries=%d:%d%s]" 419 % ( 420 self._spec.shortname, 421 elapsed, 422 self._retries, 423 self._timeout_retries, 424 measurement, 425 ), 426 stdout() if self._spec.verbose_success else None, 427 do_newline=self._newline_on_success or self._travis, 428 ) 429 self.result.state = "PASSED" 430 elif ( 431 self._state == _RUNNING 432 and self._spec.timeout_seconds is not None 433 and time.time() - self._start > self._spec.timeout_seconds 434 ): 435 elapsed = time.time() - self._start 436 self.result.elapsed_time = elapsed 437 if self._timeout_retries < self._spec.timeout_retries: 438 message( 439 "TIMEOUT_FLAKE", 440 "%s [pid=%d]" % (self._spec.shortname, self._process.pid), 441 stdout(), 442 do_newline=True, 443 ) 444 self._timeout_retries += 1 445 self.result.num_failures += 1 446 self.result.retries = self._timeout_retries + self._retries 447 if self._spec.kill_handler: 448 self._spec.kill_handler(self) 449 self._process.terminate() 450 # NOTE: job is restarted regardless of jobset's max_time setting 451 self.start() 452 else: 453 message( 454 "TIMEOUT", 455 "%s [pid=%d, time=%.1fsec]" 456 % (self._spec.shortname, self._process.pid, elapsed), 457 stdout(), 458 do_newline=True, 459 ) 460 self.kill() 461 self.result.state = "TIMEOUT" 462 self.result.num_failures += 1 463 return self._state 464 465 def kill(self): 466 if self._state == _RUNNING: 467 self._state = _KILLED 468 if self._spec.kill_handler: 469 self._spec.kill_handler(self) 470 self._process.terminate() 471 472 def suppress_failure_message(self): 473 self._suppress_failure_message = True 474 475 476class Jobset(object): 477 """Manages one run of jobs.""" 478 479 def __init__( 480 self, 481 check_cancelled, 482 maxjobs, 483 maxjobs_cpu_agnostic, 484 newline_on_success, 485 travis, 486 stop_on_failure, 487 add_env, 488 quiet_success, 489 max_time, 490 ): 491 self._running = set() 492 self._check_cancelled = check_cancelled 493 self._cancelled = False 494 self._failures = 0 495 self._completed = 0 496 self._maxjobs = maxjobs 497 self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic 498 self._newline_on_success = newline_on_success 499 self._travis = travis 500 self._stop_on_failure = stop_on_failure 501 self._add_env = add_env 502 self._quiet_success = quiet_success 503 self._max_time = max_time 504 self.resultset = {} 505 self._remaining = None 506 self._start_time = time.time() 507 508 def set_remaining(self, remaining): 509 self._remaining = remaining 510 511 def get_num_failures(self): 512 return self._failures 513 514 def cpu_cost(self): 515 c = 0 516 for job in self._running: 517 c += job._spec.cpu_cost 518 return c 519 520 def start(self, spec): 521 """Start a job. Return True on success, False on failure.""" 522 while True: 523 if ( 524 self._max_time > 0 525 and time.time() - self._start_time > self._max_time 526 ): 527 skipped_job_result = JobResult() 528 skipped_job_result.state = "SKIPPED" 529 message("SKIPPED", spec.shortname, do_newline=True) 530 self.resultset[spec.shortname] = [skipped_job_result] 531 return True 532 if self.cancelled(): 533 return False 534 current_cpu_cost = self.cpu_cost() 535 if current_cpu_cost == 0: 536 break 537 if current_cpu_cost + spec.cpu_cost <= self._maxjobs: 538 if len(self._running) < self._maxjobs_cpu_agnostic: 539 break 540 self.reap(spec.shortname, spec.cpu_cost) 541 if self.cancelled(): 542 return False 543 job = Job( 544 spec, 545 self._newline_on_success, 546 self._travis, 547 self._add_env, 548 self._quiet_success, 549 ) 550 self._running.add(job) 551 if job.GetSpec().shortname not in self.resultset: 552 self.resultset[job.GetSpec().shortname] = [] 553 return True 554 555 def reap(self, waiting_for=None, waiting_for_cost=None): 556 """Collect the dead jobs.""" 557 while self._running: 558 dead = set() 559 for job in self._running: 560 st = eintr_be_gone(lambda: job.state()) 561 if st == _RUNNING: 562 continue 563 if st == _FAILURE or st == _KILLED: 564 self._failures += 1 565 if self._stop_on_failure: 566 self._cancelled = True 567 for job in self._running: 568 job.kill() 569 dead.add(job) 570 break 571 for job in dead: 572 self._completed += 1 573 if not self._quiet_success or job.result.state != "PASSED": 574 self.resultset[job.GetSpec().shortname].append(job.result) 575 self._running.remove(job) 576 if dead: 577 return 578 if not self._travis and platform_string() != "windows": 579 rstr = ( 580 "" 581 if self._remaining is None 582 else "%d queued, " % self._remaining 583 ) 584 if self._remaining is not None and self._completed > 0: 585 now = time.time() 586 sofar = now - self._start_time 587 remaining = ( 588 sofar 589 / self._completed 590 * (self._remaining + len(self._running)) 591 ) 592 rstr = "ETA %.1f sec; %s" % (remaining, rstr) 593 if waiting_for is not None: 594 wstr = " next: %s @ %.2f cpu" % ( 595 waiting_for, 596 waiting_for_cost, 597 ) 598 else: 599 wstr = "" 600 message( 601 "WAITING", 602 "%s%d jobs running, %d complete, %d failed (load %.2f)%s" 603 % ( 604 rstr, 605 len(self._running), 606 self._completed, 607 self._failures, 608 self.cpu_cost(), 609 wstr, 610 ), 611 ) 612 if platform_string() == "windows": 613 time.sleep(0.1) 614 else: 615 signal.alarm(10) 616 signal.pause() 617 618 def cancelled(self): 619 """Poll for cancellation.""" 620 if self._cancelled: 621 return True 622 if not self._check_cancelled(): 623 return False 624 for job in self._running: 625 job.kill() 626 self._cancelled = True 627 return True 628 629 def finish(self): 630 while self._running: 631 if self.cancelled(): 632 pass # poll cancellation 633 self.reap() 634 if platform_string() != "windows": 635 signal.alarm(0) 636 return not self.cancelled() and self._failures == 0 637 638 639def _never_cancelled(): 640 return False 641 642 643def tag_remaining(xs): 644 staging = [] 645 for x in xs: 646 staging.append(x) 647 if len(staging) > 5000: 648 yield (staging.pop(0), None) 649 n = len(staging) 650 for i, x in enumerate(staging): 651 yield (x, n - i - 1) 652 653 654def run( 655 cmdlines, 656 check_cancelled=_never_cancelled, 657 maxjobs=None, 658 maxjobs_cpu_agnostic=None, 659 newline_on_success=False, 660 travis=False, 661 infinite_runs=False, 662 stop_on_failure=False, 663 add_env={}, 664 skip_jobs=False, 665 quiet_success=False, 666 max_time=-1, 667): 668 if skip_jobs: 669 resultset = {} 670 skipped_job_result = JobResult() 671 skipped_job_result.state = "SKIPPED" 672 for job in cmdlines: 673 message("SKIPPED", job.shortname, do_newline=True) 674 resultset[job.shortname] = [skipped_job_result] 675 return 0, resultset 676 js = Jobset( 677 check_cancelled, 678 maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS, 679 maxjobs_cpu_agnostic 680 if maxjobs_cpu_agnostic is not None 681 else _DEFAULT_MAX_JOBS, 682 newline_on_success, 683 travis, 684 stop_on_failure, 685 add_env, 686 quiet_success, 687 max_time, 688 ) 689 for cmdline, remaining in tag_remaining(cmdlines): 690 if not js.start(cmdline): 691 break 692 if remaining is not None: 693 js.set_remaining(remaining) 694 js.finish() 695 return js.get_num_failures(), js.resultset 696