1#!/usr/bin/env python3
2#
3# Copyright 2019 The ChromiumOS Authors
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Runs presubmit checks against a bundle of files."""
8
9import argparse
10import dataclasses
11import datetime
12import multiprocessing
13import multiprocessing.pool
14import os
15from pathlib import Path
16import re
17import shlex
18import shutil
19import subprocess
20import sys
21import textwrap
22import threading
23import traceback
24from typing import (
25    Dict,
26    Iterable,
27    List,
28    NamedTuple,
29    Optional,
30    Sequence,
31    Tuple,
32    Union,
33)
34
35
36# This was originally had many packages in it (notably scipy)
37# but due to changes in how scipy is built, we can no longer install
38# it in the chroot. See b/284489250
39#
40# For type checking Python code, we also need mypy. This isn't
41# listed here because (1) only very few files are actually type checked,
42# so we don't pull the dependency in unless needed, and (2) mypy
43# may be installed through other means than pip.
44PIP_DEPENDENCIES = ("numpy",)
45
46
47# Each checker represents an independent check that's done on our sources.
48#
49# They should:
50#  - never write to stdout/stderr or read from stdin directly
51#  - return either a CheckResult, or a list of [(subcheck_name, CheckResult)]
52#  - ideally use thread_pool to check things concurrently
53#    - though it's important to note that these *also* live on the threadpool
54#      we've provided. It's the caller's responsibility to guarantee that at
55#      least ${number_of_concurrently_running_checkers}+1 threads are present
56#      in the pool. In order words, blocking on results from the provided
57#      threadpool is OK.
58CheckResult = NamedTuple(
59    "CheckResult",
60    (
61        ("ok", bool),
62        ("output", str),
63        ("autofix_commands", List[List[str]]),
64    ),
65)
66
67
68Command = Sequence[Union[str, os.PathLike]]
69CheckResults = Union[List[Tuple[str, CheckResult]], CheckResult]
70
71
72# The files and directories on which we run the mypy typechecker. The paths are
73# relative to the root of the toolchain-utils repository.
74MYPY_CHECKED_PATHS = (
75    "afdo_tools/update_kernel_afdo.py",
76    "check_portable_toolchains.py",
77    "cros_utils/bugs.py",
78    "cros_utils/bugs_test.py",
79    "cros_utils/tiny_render.py",
80    "llvm_tools",
81    "pgo_tools",
82    "pgo_tools_rust/pgo_rust.py",
83    "rust_tools",
84    "toolchain_utils_githooks/check-presubmit.py",
85)
86
87
88def run_command_unchecked(
89    command: Command,
90    cwd: Optional[str] = None,
91    env: Optional[Dict[str, str]] = None,
92) -> Tuple[int, str]:
93    """Runs a command in the given dir, returning its exit code and stdio."""
94    p = subprocess.run(
95        command,
96        check=False,
97        cwd=cwd,
98        stdin=subprocess.DEVNULL,
99        stdout=subprocess.PIPE,
100        stderr=subprocess.STDOUT,
101        env=env,
102        encoding="utf-8",
103        errors="replace",
104    )
105    return p.returncode, p.stdout
106
107
108def has_executable_on_path(exe: str) -> bool:
109    """Returns whether we have `exe` somewhere on our $PATH"""
110    return shutil.which(exe) is not None
111
112
113def remove_deleted_files(files: Iterable[str]) -> List[str]:
114    return [f for f in files if os.path.exists(f)]
115
116
117def is_file_executable(file_path: str) -> bool:
118    return os.access(file_path, os.X_OK)
119
120
121# As noted in our docs, some of our Python code depends on modules that sit in
122# toolchain-utils/. Add that to PYTHONPATH to ensure that things like `cros
123# lint` are kept happy.
124def env_with_pythonpath(toolchain_utils_root: str) -> Dict[str, str]:
125    env = dict(os.environ)
126    if "PYTHONPATH" in env:
127        env["PYTHONPATH"] += ":" + toolchain_utils_root
128    else:
129        env["PYTHONPATH"] = toolchain_utils_root
130    return env
131
132
133@dataclasses.dataclass(frozen=True)
134class MyPyInvocation:
135    """An invocation of mypy."""
136
137    command: List[str]
138    # Entries to add to PYTHONPATH, formatted for direct use in the PYTHONPATH
139    # env var.
140    pythonpath_additions: str
141
142
143def get_mypy() -> Optional[MyPyInvocation]:
144    """Finds the mypy executable and returns a command to invoke it.
145
146    If mypy cannot be found and we're inside the chroot, this
147    function installs mypy and returns a command to invoke it.
148
149    If mypy cannot be found and we're outside the chroot, this
150    returns None.
151
152    Returns:
153        An optional tuple containing:
154            - the command to invoke mypy, and
155            - any environment variables to set when invoking mypy
156    """
157    if has_executable_on_path("mypy"):
158        return MyPyInvocation(command=["mypy"], pythonpath_additions="")
159    pip = get_pip()
160    if not pip:
161        assert not is_in_chroot()
162        return None
163
164    def get_from_pip() -> Optional[MyPyInvocation]:
165        rc, output = run_command_unchecked(pip + ["show", "mypy"])
166        if rc:
167            return None
168
169        m = re.search(r"^Location: (.*)", output, re.MULTILINE)
170        if not m:
171            return None
172
173        pythonpath = m.group(1)
174        return MyPyInvocation(
175            command=[
176                "python3",
177                "-m",
178                "mypy",
179            ],
180            pythonpath_additions=pythonpath,
181        )
182
183    from_pip = get_from_pip()
184    if from_pip:
185        return from_pip
186
187    if is_in_chroot():
188        assert pip is not None
189        subprocess.check_call(pip + ["install", "--user", "mypy"])
190        return get_from_pip()
191    return None
192
193
194def get_pip() -> Optional[List[str]]:
195    """Finds pip and returns a command to invoke it.
196
197    If pip cannot be found, this function attempts to install
198    pip and returns a command to invoke it.
199
200    If pip cannot be found, this function returns None.
201    """
202    have_pip = can_import_py_module("pip")
203    if not have_pip:
204        print("Autoinstalling `pip`...")
205        subprocess.check_call(["python", "-m", "ensurepip"])
206        have_pip = can_import_py_module("pip")
207
208    if have_pip:
209        return ["python", "-m", "pip"]
210    return None
211
212
213def get_check_result_or_catch(
214    task: multiprocessing.pool.ApplyResult,
215) -> CheckResult:
216    """Returns the result of task(); if that raises, returns a CheckResult.
217
218    The task is expected to return a CheckResult on get().
219    """
220    try:
221        return task.get()
222    except Exception:
223        return CheckResult(
224            ok=False,
225            output="Check exited with an unexpected exception:\n%s"
226            % traceback.format_exc(),
227            autofix_commands=[],
228        )
229
230
231def check_isort(
232    toolchain_utils_root: str, python_files: Iterable[str]
233) -> CheckResult:
234    """Subchecker of check_py_format. Checks python file formats with isort"""
235    chromite = Path("/mnt/host/source/chromite")
236    isort = chromite / "scripts" / "isort"
237    config_file = chromite / ".isort.cfg"
238
239    if not (isort.exists() and config_file.exists()):
240        return CheckResult(
241            ok=True,
242            output="isort not found; skipping",
243            autofix_commands=[],
244        )
245
246    config_file_flag = f"--settings-file={config_file}"
247    command = [str(isort), "-c", config_file_flag] + list(python_files)
248    exit_code, stdout_and_stderr = run_command_unchecked(
249        command, cwd=toolchain_utils_root
250    )
251
252    # isort fails when files have broken formatting.
253    if not exit_code:
254        return CheckResult(
255            ok=True,
256            output="",
257            autofix_commands=[],
258        )
259
260    bad_files = []
261    bad_file_re = re.compile(
262        r"^ERROR: (.*) Imports are incorrectly sorted and/or formatted\.$"
263    )
264    for line in stdout_and_stderr.splitlines():
265        m = bad_file_re.match(line)
266        if m:
267            (file_name,) = m.groups()
268            bad_files.append(file_name.strip())
269
270    if not bad_files:
271        return CheckResult(
272            ok=False,
273            output=f"`{shlex.join(command)}` failed; stdout/stderr:\n"
274            f"{stdout_and_stderr}",
275            autofix_commands=[],
276        )
277
278    autofix = [str(isort), config_file_flag] + bad_files
279    return CheckResult(
280        ok=False,
281        output="The following file(s) have formatting errors: %s" % bad_files,
282        autofix_commands=[autofix],
283    )
284
285
286def check_black(
287    toolchain_utils_root: str, black: Path, python_files: Iterable[str]
288) -> CheckResult:
289    """Subchecker of check_py_format. Checks python file formats with black"""
290    # Folks have been bitten by accidentally using multiple formatter
291    # versions in the past. This is an issue, since newer versions of
292    # black may format things differently. Make the version obvious.
293    command: Command = [black, "--version"]
294    exit_code, stdout_and_stderr = run_command_unchecked(
295        command, cwd=toolchain_utils_root
296    )
297    if exit_code:
298        return CheckResult(
299            ok=False,
300            output="Failed getting black version; "
301            f"stdstreams: {stdout_and_stderr}",
302            autofix_commands=[],
303        )
304
305    black_version = stdout_and_stderr.strip()
306    black_invocation: List[str] = [str(black), "--line-length=80"]
307    command = black_invocation + ["--check"] + list(python_files)
308    exit_code, stdout_and_stderr = run_command_unchecked(
309        command, cwd=toolchain_utils_root
310    )
311    # black fails when files are poorly formatted.
312    if exit_code == 0:
313        return CheckResult(
314            ok=True,
315            output=f"Using {black_version!r}, no issues were found.",
316            autofix_commands=[],
317        )
318
319    # Output format looks something like:
320    # f'{complaints}\nOh no!{emojis}\n{summary}'
321    # Whittle it down to complaints.
322    complaints = stdout_and_stderr.split("\nOh no!", 1)
323    if len(complaints) != 2:
324        return CheckResult(
325            ok=False,
326            output=f"Unparseable `black` output:\n{stdout_and_stderr}",
327            autofix_commands=[],
328        )
329
330    bad_files = []
331    errors = []
332    refmt_prefix = "would reformat "
333    for line in complaints[0].strip().splitlines():
334        line = line.strip()
335        if line.startswith("error:"):
336            errors.append(line)
337            continue
338
339        if not line.startswith(refmt_prefix):
340            return CheckResult(
341                ok=False,
342                output=f"Unparseable `black` output:\n{stdout_and_stderr}",
343                autofix_commands=[],
344            )
345
346        bad_files.append(line[len(refmt_prefix) :].strip())
347
348    # If black had internal errors that it could handle, print them out and exit
349    # without an autofix.
350    if errors:
351        err_str = "\n".join(errors)
352        return CheckResult(
353            ok=False,
354            output=f"Using {black_version!r} had the following errors:\n"
355            f"{err_str}",
356            autofix_commands=[],
357        )
358
359    autofix = black_invocation + bad_files
360    return CheckResult(
361        ok=False,
362        output=f"Using {black_version!r}, these file(s) have formatting "
363        f"errors: {bad_files}",
364        autofix_commands=[autofix],
365    )
366
367
368def check_mypy(
369    toolchain_utils_root: str,
370    mypy: MyPyInvocation,
371    files: Iterable[str],
372) -> CheckResult:
373    """Checks type annotations using mypy."""
374    fixed_env = env_with_pythonpath(toolchain_utils_root)
375    if mypy.pythonpath_additions:
376        new_pythonpath = (
377            f"{mypy.pythonpath_additions}:{fixed_env['PYTHONPATH']}"
378        )
379        fixed_env["PYTHONPATH"] = new_pythonpath
380
381    # Show the version number, mainly for troubleshooting purposes.
382    cmd = mypy.command + ["--version"]
383    exit_code, output = run_command_unchecked(
384        cmd, cwd=toolchain_utils_root, env=fixed_env
385    )
386    if exit_code:
387        return CheckResult(
388            ok=False,
389            output=f"Failed getting mypy version; stdstreams: {output}",
390            autofix_commands=[],
391        )
392    # Prefix output with the version information.
393    prefix = f"Using {output.strip()}, "
394
395    cmd = mypy.command + ["--follow-imports=silent"] + list(files)
396    exit_code, output = run_command_unchecked(
397        cmd, cwd=toolchain_utils_root, env=fixed_env
398    )
399    if exit_code == 0:
400        return CheckResult(
401            ok=True,
402            output=f"{output}{prefix}checks passed",
403            autofix_commands=[],
404        )
405    else:
406        return CheckResult(
407            ok=False,
408            output=f"{output}{prefix}type errors were found",
409            autofix_commands=[],
410        )
411
412
413def check_python_file_headers(python_files: Iterable[str]) -> CheckResult:
414    """Subchecker of check_py_format. Checks python #!s"""
415    add_hashbang = []
416    remove_hashbang = []
417
418    for python_file in python_files:
419        needs_hashbang = is_file_executable(python_file)
420        with open(python_file, encoding="utf-8") as f:
421            has_hashbang = f.read(2) == "#!"
422            if needs_hashbang == has_hashbang:
423                continue
424
425            if needs_hashbang:
426                add_hashbang.append(python_file)
427            else:
428                remove_hashbang.append(python_file)
429
430    autofix = []
431    output = []
432    if add_hashbang:
433        output.append(
434            "The following files have no #!, but need one: %s" % add_hashbang
435        )
436        autofix.append(["sed", "-i", "1i#!/usr/bin/env python3"] + add_hashbang)
437
438    if remove_hashbang:
439        output.append(
440            "The following files have a #!, but shouldn't: %s" % remove_hashbang
441        )
442        autofix.append(["sed", "-i", "1d"] + remove_hashbang)
443
444    if not output:
445        return CheckResult(
446            ok=True,
447            output="",
448            autofix_commands=[],
449        )
450    return CheckResult(
451        ok=False,
452        output="\n".join(output),
453        autofix_commands=autofix,
454    )
455
456
457def check_py_format(
458    toolchain_utils_root: str,
459    thread_pool: multiprocessing.pool.ThreadPool,
460    files: Iterable[str],
461) -> CheckResults:
462    """Runs black on files to check for style bugs. Also checks for #!s."""
463    black = "black"
464    if not has_executable_on_path(black):
465        return CheckResult(
466            ok=False,
467            output="black isn't available on your $PATH. Please either "
468            "enter a chroot, or place depot_tools on your $PATH.",
469            autofix_commands=[],
470        )
471
472    python_files = [f for f in remove_deleted_files(files) if f.endswith(".py")]
473    if not python_files:
474        return CheckResult(
475            ok=True,
476            output="no python files to check",
477            autofix_commands=[],
478        )
479
480    tasks = [
481        (
482            "check_black",
483            thread_pool.apply_async(
484                check_black, (toolchain_utils_root, black, python_files)
485            ),
486        ),
487        (
488            "check_isort",
489            thread_pool.apply_async(
490                check_isort, (toolchain_utils_root, python_files)
491            ),
492        ),
493        (
494            "check_file_headers",
495            thread_pool.apply_async(check_python_file_headers, (python_files,)),
496        ),
497    ]
498    return [(name, get_check_result_or_catch(task)) for name, task in tasks]
499
500
501def file_is_relative_to(file: Path, potential_parent: Path) -> bool:
502    """file.is_relative_to(potential_parent), but for Python < 3.9."""
503    try:
504        file.relative_to(potential_parent)
505        return True
506    except ValueError:
507        return False
508
509
510def is_file_in_any_of(file: Path, files_and_dirs: List[Path]) -> bool:
511    """Returns whether `files_and_dirs` encompasses `file`.
512
513    `files_and_dirs` is considered to encompass `file` if `files_and_dirs`
514    contains `file` directly, or if it contains a directory that is a parent of
515    `file`.
516
517    Args:
518        file: a path to check
519        files_and_dirs: a list of directories to check
520    """
521    # This could technically be made sublinear, but it's running at most a few
522    # dozen times on a `files_and_dirs` that's currently < 10 elems.
523    return any(
524        file == x or file_is_relative_to(file, x) for x in files_and_dirs
525    )
526
527
528def check_py_types(
529    toolchain_utils_root: str,
530    thread_pool: multiprocessing.pool.ThreadPool,
531    files: Iterable[str],
532) -> CheckResults:
533    """Runs static type checking for files in MYPY_CHECKED_FILES."""
534    path_root = Path(toolchain_utils_root)
535    check_locations = [path_root / x for x in MYPY_CHECKED_PATHS]
536    to_check = [
537        x
538        for x in files
539        if x.endswith(".py") and is_file_in_any_of(Path(x), check_locations)
540    ]
541
542    if not to_check:
543        return CheckResult(
544            ok=True,
545            output="no python files to typecheck",
546            autofix_commands=[],
547        )
548
549    mypy = get_mypy()
550    if not mypy:
551        return CheckResult(
552            ok=False,
553            output="mypy not found. Please either enter a chroot "
554            "or install mypy",
555            autofix_commands=[],
556        )
557
558    tasks = [
559        (
560            "check_mypy",
561            thread_pool.apply_async(
562                check_mypy, (toolchain_utils_root, mypy, to_check)
563            ),
564        ),
565    ]
566    return [(name, get_check_result_or_catch(task)) for name, task in tasks]
567
568
569def find_chromeos_root_directory() -> Optional[str]:
570    return os.getenv("CHROMEOS_ROOT_DIRECTORY")
571
572
573def check_cros_lint(
574    toolchain_utils_root: str,
575    thread_pool: multiprocessing.pool.ThreadPool,
576    files: Iterable[str],
577) -> CheckResults:
578    """Runs `cros lint`"""
579
580    fixed_env = env_with_pythonpath(toolchain_utils_root)
581
582    # We have to support users who don't have a chroot. So we either run `cros
583    # lint` (if it's been made available to us), or we try a mix of
584    # pylint+golint.
585    def try_run_cros_lint(cros_binary: str) -> Optional[CheckResult]:
586        exit_code, output = run_command_unchecked(
587            [cros_binary, "lint", "--"] + list(files),
588            toolchain_utils_root,
589            env=fixed_env,
590        )
591
592        # This is returned specifically if cros couldn't find the ChromeOS tree
593        # root.
594        if exit_code == 127:
595            return None
596
597        return CheckResult(
598            ok=exit_code == 0,
599            output=output,
600            autofix_commands=[],
601        )
602
603    cros_lint = try_run_cros_lint("cros")
604    if cros_lint is not None:
605        return cros_lint
606
607    cros_root = find_chromeos_root_directory()
608    if cros_root:
609        cros_lint = try_run_cros_lint(
610            os.path.join(cros_root, "chromite/bin/cros")
611        )
612        if cros_lint is not None:
613            return cros_lint
614
615    tasks = []
616
617    def check_result_from_command(command: List[str]) -> CheckResult:
618        exit_code, output = run_command_unchecked(
619            command, toolchain_utils_root, env=fixed_env
620        )
621        return CheckResult(
622            ok=exit_code == 0,
623            output=output,
624            autofix_commands=[],
625        )
626
627    python_files = [f for f in remove_deleted_files(files) if f.endswith(".py")]
628    if python_files:
629
630        def run_pylint() -> CheckResult:
631            # pylint is required. Fail hard if it DNE.
632            return check_result_from_command(["pylint"] + python_files)
633
634        tasks.append(("pylint", thread_pool.apply_async(run_pylint)))
635
636    go_files = [f for f in remove_deleted_files(files) if f.endswith(".go")]
637    if go_files:
638
639        def run_golint() -> CheckResult:
640            if has_executable_on_path("golint"):
641                return check_result_from_command(
642                    ["golint", "-set_exit_status"] + go_files
643                )
644
645            complaint = (
646                "WARNING: go linting disabled. golint is not on your $PATH.\n"
647                "Please either enter a chroot, or install go locally. "
648                "Continuing."
649            )
650            return CheckResult(
651                ok=True,
652                output=complaint,
653                autofix_commands=[],
654            )
655
656        tasks.append(("golint", thread_pool.apply_async(run_golint)))
657
658    complaint = (
659        "WARNING: No ChromeOS checkout detected, and no viable CrOS tree\n"
660        "found; falling back to linting only python and go. If you have a\n"
661        "ChromeOS checkout, please either develop from inside of the source\n"
662        "tree, or set $CHROMEOS_ROOT_DIRECTORY to the root of it."
663    )
664
665    results = [(name, get_check_result_or_catch(task)) for name, task in tasks]
666    if not results:
667        return CheckResult(
668            ok=True,
669            output=complaint,
670            autofix_commands=[],
671        )
672
673    # We need to complain _somewhere_.
674    name, angry_result = results[0]
675    angry_complaint = (complaint + "\n\n" + angry_result.output).strip()
676    results[0] = (name, angry_result._replace(output=angry_complaint))
677    return results
678
679
680def check_go_format(toolchain_utils_root, _thread_pool, files):
681    """Runs gofmt on files to check for style bugs."""
682    gofmt = "gofmt"
683    if not has_executable_on_path(gofmt):
684        return CheckResult(
685            ok=False,
686            output="gofmt isn't available on your $PATH. Please either "
687            "enter a chroot, or place your go bin/ directory on your $PATH.",
688            autofix_commands=[],
689        )
690
691    go_files = [f for f in remove_deleted_files(files) if f.endswith(".go")]
692    if not go_files:
693        return CheckResult(
694            ok=True,
695            output="no go files to check",
696            autofix_commands=[],
697        )
698
699    command = [gofmt, "-l"] + go_files
700    exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root)
701
702    if exit_code:
703        return CheckResult(
704            ok=False,
705            output=f"{shlex.join(command)} failed; stdout/stderr:\n{output}",
706            autofix_commands=[],
707        )
708
709    output = output.strip()
710    if not output:
711        return CheckResult(
712            ok=True,
713            output="",
714            autofix_commands=[],
715        )
716
717    broken_files = [x.strip() for x in output.splitlines()]
718    autofix = [gofmt, "-w"] + broken_files
719    return CheckResult(
720        ok=False,
721        output="The following Go files have incorrect "
722        "formatting: %s" % broken_files,
723        autofix_commands=[autofix],
724    )
725
726
727def check_no_compiler_wrapper_changes(
728    toolchain_utils_root: str,
729    _thread_pool: multiprocessing.pool.ThreadPool,
730    files: List[str],
731) -> CheckResult:
732    compiler_wrapper_prefix = (
733        os.path.join(toolchain_utils_root, "compiler_wrapper") + "/"
734    )
735    if not any(x.startswith(compiler_wrapper_prefix) for x in files):
736        return CheckResult(
737            ok=True,
738            output="no compiler_wrapper changes detected",
739            autofix_commands=[],
740        )
741
742    return CheckResult(
743        ok=False,
744        autofix_commands=[],
745        output=textwrap.dedent(
746            """\
747            Compiler wrapper changes should be made in chromiumos-overlay.
748            If you're a CrOS toolchain maintainer, please make the change
749            directly there now. If you're contributing as part of a downstream
750            (e.g., the Android toolchain team), feel free to bypass this check
751            and note to your reviewer that you received this message. They can
752            review your CL and commit to the right plate for you. Thanks!
753            """
754        ).strip(),
755    )
756
757
758def check_tests(
759    toolchain_utils_root: str,
760    _thread_pool: multiprocessing.pool.ThreadPool,
761    files: List[str],
762) -> CheckResult:
763    """Runs tests."""
764    exit_code, stdout_and_stderr = run_command_unchecked(
765        [os.path.join(toolchain_utils_root, "run_tests_for.py"), "--"] + files,
766        toolchain_utils_root,
767    )
768    return CheckResult(
769        ok=exit_code == 0,
770        output=stdout_and_stderr,
771        autofix_commands=[],
772    )
773
774
775def detect_toolchain_utils_root() -> str:
776    return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
777
778
779def process_check_result(
780    check_name: str,
781    check_results: CheckResults,
782    start_time: datetime.datetime,
783) -> Tuple[bool, List[List[str]]]:
784    """Prints human-readable output for the given check_results."""
785    indent = "  "
786
787    def indent_block(text: str) -> str:
788        return indent + text.replace("\n", "\n" + indent)
789
790    if isinstance(check_results, CheckResult):
791        ok, output, autofix_commands = check_results
792        if not ok and autofix_commands:
793            recommendation = (
794                "Recommended command(s) to fix this: "
795                f"{[shlex.join(x) for x in autofix_commands]}"
796            )
797            if output:
798                output += "\n" + recommendation
799            else:
800                output = recommendation
801    else:
802        output_pieces = []
803        autofix_commands = []
804        for subname, (ok, output, autofix) in check_results:
805            status = "succeeded" if ok else "failed"
806            message = ["*** %s.%s %s" % (check_name, subname, status)]
807            if output:
808                message.append(indent_block(output))
809            if not ok and autofix:
810                message.append(
811                    indent_block(
812                        "Recommended command(s) to fix this: "
813                        f"{[shlex.join(x) for x in autofix]}"
814                    )
815                )
816
817            output_pieces.append("\n".join(message))
818            autofix_commands += autofix
819
820        ok = all(x.ok for _, x in check_results)
821        output = "\n\n".join(output_pieces)
822
823    time_taken = datetime.datetime.now() - start_time
824    if ok:
825        print("*** %s succeeded after %s" % (check_name, time_taken))
826    else:
827        print("*** %s failed after %s" % (check_name, time_taken))
828
829    if output:
830        print(indent_block(output))
831
832    print()
833    return ok, autofix_commands
834
835
836def try_autofix(
837    all_autofix_commands: List[List[str]], toolchain_utils_root: str
838) -> None:
839    """Tries to run all given autofix commands, if appropriate."""
840    if not all_autofix_commands:
841        return
842
843    exit_code, output = run_command_unchecked(
844        ["git", "status", "--porcelain"], cwd=toolchain_utils_root
845    )
846    if exit_code != 0:
847        print("Autofix aborted: couldn't get toolchain-utils git status.")
848        return
849
850    if output.strip():
851        # A clean repo makes checking/undoing autofix commands trivial. A dirty
852        # one... less so. :)
853        print("Git repo seems dirty; skipping autofix.")
854        return
855
856    anything_succeeded = False
857    for command in all_autofix_commands:
858        exit_code, output = run_command_unchecked(
859            command, cwd=toolchain_utils_root
860        )
861
862        if exit_code:
863            print(
864                f"*** Autofix command `{shlex.join(command)}` exited with "
865                f"code {exit_code}; stdout/stderr:"
866            )
867            print(output)
868        else:
869            print(f"*** Autofix `{shlex.join(command)}` succeeded")
870            anything_succeeded = True
871
872    if anything_succeeded:
873        print(
874            "NOTE: Autofixes have been applied. Please check your tree, since "
875            "some lints may now be fixed"
876        )
877
878
879def find_repo_root(base_dir: str) -> Optional[str]:
880    current = base_dir
881    while current != "/":
882        if os.path.isdir(os.path.join(current, ".repo")):
883            return current
884        current = os.path.dirname(current)
885    return None
886
887
888def is_in_chroot() -> bool:
889    return os.path.exists("/etc/cros_chroot_version")
890
891
892def maybe_reexec_inside_chroot(
893    autofix: bool, install_deps_only: bool, files: List[str]
894) -> None:
895    if is_in_chroot():
896        return
897
898    enter_chroot = True
899    chdir_to = None
900    toolchain_utils = detect_toolchain_utils_root()
901    if find_repo_root(toolchain_utils) is None:
902        chromeos_root_dir = find_chromeos_root_directory()
903        if chromeos_root_dir is None:
904            print(
905                "Standalone toolchain-utils checkout detected; cannot enter "
906                "chroot."
907            )
908            enter_chroot = False
909        else:
910            chdir_to = chromeos_root_dir
911
912    if not has_executable_on_path("cros_sdk"):
913        print("No `cros_sdk` detected on $PATH; cannot enter chroot.")
914        enter_chroot = False
915
916    if not enter_chroot:
917        print(
918            "Giving up on entering the chroot; be warned that some presubmits "
919            "may be broken."
920        )
921        return
922
923    # We'll be changing ${PWD}, so make everything relative to toolchain-utils,
924    # which resides at a well-known place inside of the chroot.
925    chroot_toolchain_utils = "/mnt/host/source/src/third_party/toolchain-utils"
926
927    def rebase_path(path: str) -> str:
928        return os.path.join(
929            chroot_toolchain_utils, os.path.relpath(path, toolchain_utils)
930        )
931
932    args = [
933        "cros_sdk",
934        "--enter",
935        "--",
936        rebase_path(__file__),
937    ]
938
939    if not autofix:
940        args.append("--no_autofix")
941    if install_deps_only:
942        args.append("--install_deps_only")
943    args.extend(rebase_path(x) for x in files)
944
945    if chdir_to is None:
946        print("Attempting to enter the chroot...")
947    else:
948        print(f"Attempting to enter the chroot for tree at {chdir_to}...")
949        os.chdir(chdir_to)
950    os.execvp(args[0], args)
951
952
953def can_import_py_module(module: str) -> bool:
954    """Returns true if `import {module}` works."""
955    exit_code = subprocess.call(
956        ["python3", "-c", f"import {module}"],
957        stdout=subprocess.DEVNULL,
958        stderr=subprocess.DEVNULL,
959    )
960    return exit_code == 0
961
962
963def ensure_pip_deps_installed() -> None:
964    if not PIP_DEPENDENCIES:
965        # No need to install pip if we don't have any deps.
966        return
967
968    pip = get_pip()
969    assert pip, "pip not found and could not be installed"
970
971    for package in PIP_DEPENDENCIES:
972        subprocess.check_call(pip + ["install", "--user", package])
973
974
975def main(argv: List[str]) -> int:
976    parser = argparse.ArgumentParser(description=__doc__)
977    parser.add_argument(
978        "--no_autofix",
979        dest="autofix",
980        action="store_false",
981        help="Don't run any autofix commands.",
982    )
983    parser.add_argument(
984        "--no_enter_chroot",
985        dest="enter_chroot",
986        action="store_false",
987        help="Prevent auto-entering the chroot if we're not already in it.",
988    )
989    parser.add_argument(
990        "--install_deps_only",
991        action="store_true",
992        help="""
993        Only install dependencies that would be required if presubmits were
994        being run, and quit. This skips all actual checking.
995        """,
996    )
997    parser.add_argument("files", nargs="*")
998    opts = parser.parse_args(argv)
999
1000    files = opts.files
1001    install_deps_only = opts.install_deps_only
1002    if not files and not install_deps_only:
1003        return 0
1004
1005    if opts.enter_chroot:
1006        maybe_reexec_inside_chroot(opts.autofix, install_deps_only, files)
1007
1008    # If you ask for --no_enter_chroot, you're on your own for installing these
1009    # things.
1010    if is_in_chroot():
1011        ensure_pip_deps_installed()
1012        if install_deps_only:
1013            print(
1014                "Dependency installation complete & --install_deps_only "
1015                "passed. Quit."
1016            )
1017            return 0
1018    elif install_deps_only:
1019        parser.error(
1020            "--install_deps_only is meaningless if the chroot isn't entered"
1021        )
1022
1023    files = [os.path.abspath(f) for f in files]
1024
1025    # Note that we extract .__name__s from these, so please name them in a
1026    # user-friendly way.
1027    checks = (
1028        check_cros_lint,
1029        check_py_format,
1030        check_py_types,
1031        check_go_format,
1032        check_tests,
1033        check_no_compiler_wrapper_changes,
1034    )
1035
1036    toolchain_utils_root = detect_toolchain_utils_root()
1037
1038    # NOTE: As mentioned above, checks can block on threads they spawn in this
1039    # pool, so we need at least len(checks)+1 threads to avoid deadlock. Use *2
1040    # so all checks can make progress at a decent rate.
1041    num_threads = max(multiprocessing.cpu_count(), len(checks) * 2)
1042    start_time = datetime.datetime.now()
1043
1044    # For our single print statement...
1045    spawn_print_lock = threading.RLock()
1046
1047    def run_check(check_fn):
1048        name = check_fn.__name__
1049        with spawn_print_lock:
1050            print("*** Spawning %s" % name)
1051        return name, check_fn(toolchain_utils_root, pool, files)
1052
1053    with multiprocessing.pool.ThreadPool(num_threads) as pool:
1054        all_checks_ok = True
1055        all_autofix_commands = []
1056        for check_name, result in pool.imap_unordered(run_check, checks):
1057            ok, autofix_commands = process_check_result(
1058                check_name, result, start_time
1059            )
1060            all_checks_ok = ok and all_checks_ok
1061            all_autofix_commands += autofix_commands
1062
1063    # Run these after everything settles, so:
1064    # - we don't collide with checkers that are running concurrently
1065    # - we clearly print out everything that went wrong ahead of time, in case
1066    #   any of these fail
1067    if opts.autofix:
1068        try_autofix(all_autofix_commands, toolchain_utils_root)
1069
1070    if not all_checks_ok:
1071        return 1
1072    return 0
1073
1074
1075if __name__ == "__main__":
1076    sys.exit(main(sys.argv[1:]))
1077