xref: /aosp_15_r20/external/pigweed/pw_presubmit/py/pw_presubmit/tools.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""General purpose tools for running presubmit checks."""
15
16import collections.abc
17from collections import Counter, defaultdict
18import logging
19import os
20from pathlib import Path
21import shlex
22import subprocess
23from typing import (
24    Any,
25    Iterable,
26    Iterator,
27    Sequence,
28    Pattern,
29)
30
31from pw_cli.plural import plural
32from pw_cli.tool_runner import ToolRunner
33from pw_presubmit.presubmit_context import PRESUBMIT_CONTEXT
34
35_LOG: logging.Logger = logging.getLogger(__name__)
36
37
38def make_box(section_alignments: Sequence[str]) -> str:
39    indices = [i + 1 for i in range(len(section_alignments))]
40    top_sections = '{2}'.join('{1:{1}^{width%d}}' % i for i in indices)
41    mid_sections = '{5}'.join(
42        '{section%d:%s{width%d}}' % (i, section_alignments[i - 1], i)
43        for i in indices
44    )
45    bot_sections = '{9}'.join('{8:{8}^{width%d}}' % i for i in indices)
46
47    return ''.join(
48        [
49            '{0}',
50            *top_sections,
51            '{3}\n',
52            '{4}',
53            *mid_sections,
54            '{6}\n',
55            '{7}',
56            *bot_sections,
57            '{10}',
58        ]
59    )
60
61
62def file_summary(
63    paths: Iterable[Path],
64    levels: int = 2,
65    max_lines: int = 12,
66    max_types: int = 3,
67    pad: str = ' ',
68    pad_start: str = ' ',
69    pad_end: str = ' ',
70) -> list[str]:
71    """Summarizes a list of files by the file types in each directory."""
72
73    # Count the file types in each directory.
74    all_counts: dict[Any, Counter] = defaultdict(Counter)
75
76    for path in paths:
77        parent = path.parents[max(len(path.parents) - levels, 0)]
78        all_counts[parent][path.suffix] += 1
79
80    # If there are too many lines, condense directories with the fewest files.
81    if len(all_counts) > max_lines:
82        counts = sorted(
83            all_counts.items(), key=lambda item: -sum(item[1].values())
84        )
85        counts, others = (
86            sorted(counts[: max_lines - 1]),
87            counts[max_lines - 1 :],
88        )
89        counts.append(
90            (
91                f'({plural(others, "other")})',
92                sum((c for _, c in others), Counter()),
93            )
94        )
95    else:
96        counts = sorted(all_counts.items())
97
98    width = max(len(str(d)) + len(os.sep) for d, _ in counts) if counts else 0
99    width += len(pad_start)
100
101    # Prepare the output.
102    output = []
103    for path, files in counts:
104        total = sum(files.values())
105        del files['']  # Never display no-extension files individually.
106
107        if files:
108            extensions = files.most_common(max_types)
109            other_extensions = total - sum(count for _, count in extensions)
110            if other_extensions:
111                extensions.append(('other', other_extensions))
112
113            types = ' (' + ', '.join(f'{c} {e}' for e, c in extensions) + ')'
114        else:
115            types = ''
116
117        root = f'{path}{os.sep}{pad_start}'.ljust(width, pad)
118        output.append(f'{root}{pad_end}{plural(total, "file")}{types}')
119
120    return output
121
122
123def relative_paths(paths: Iterable[Path], start: Path) -> Iterable[Path]:
124    """Returns relative Paths calculated with os.path.relpath."""
125    for path in paths:
126        yield Path(os.path.relpath(path, start))
127
128
129def exclude_paths(
130    exclusions: Iterable[Pattern[str]],
131    paths: Iterable[Path],
132    relative_to: Path | None = None,
133) -> Iterable[Path]:
134    """Excludes paths based on a series of regular expressions."""
135    if relative_to:
136
137        def relpath(path):
138            return Path(os.path.relpath(path, relative_to))
139
140    else:
141
142        def relpath(path):
143            return path
144
145    for path in paths:
146        if not any(e.search(relpath(path).as_posix()) for e in exclusions):
147            yield path
148
149
150def _truncate(value, length: int = 60) -> str:
151    value = str(value)
152    return (value[: length - 5] + '[...]') if len(value) > length else value
153
154
155def format_command(args: Sequence, kwargs: dict) -> tuple[str, str]:
156    attr = ', '.join(f'{k}={_truncate(v)}' for k, v in sorted(kwargs.items()))
157    return attr, ' '.join(shlex.quote(str(arg)) for arg in args)
158
159
160def log_run(
161    args, ignore_dry_run: bool = False, **kwargs
162) -> subprocess.CompletedProcess:
163    """Logs a command then runs it with subprocess.run.
164
165    Takes the same arguments as subprocess.run. The command is only executed if
166    dry-run is not enabled.
167    """
168    ctx = PRESUBMIT_CONTEXT.get()
169    if ctx:
170        # Save the subprocess command args for pw build presubmit runner.
171        if not ignore_dry_run:
172            ctx.append_check_command(*args, **kwargs)
173        if ctx.dry_run and not ignore_dry_run:
174            # Return an empty CompletedProcess without actually running anything
175            # if dry-run mode is on.
176            empty_proc: subprocess.CompletedProcess = (
177                subprocess.CompletedProcess('', 0)
178            )
179            empty_proc.stdout = b''
180            empty_proc.stderr = b''
181            return empty_proc
182    _LOG.debug('[COMMAND] %s\n%s', *format_command(args, kwargs))
183    return subprocess.run(args, **kwargs)
184
185
186class PresubmitToolRunner(ToolRunner):
187    """A simple ToolRunner that runs a process via `log_run()`."""
188
189    @staticmethod
190    def _custom_args() -> Iterable[str]:
191        return ['pw_presubmit_ignore_dry_run']
192
193    def _run_tool(
194        self, tool: str, args, pw_presubmit_ignore_dry_run=False, **kwargs
195    ) -> subprocess.CompletedProcess:
196        """Run the requested tool as a subprocess."""
197        return log_run(
198            [tool, *args],
199            **kwargs,
200            ignore_dry_run=pw_presubmit_ignore_dry_run,
201        )
202
203
204def flatten(*items) -> Iterator:
205    """Yields items from a series of items and nested iterables.
206
207    This function is used to flatten arbitrarily nested lists. str and bytes
208    are kept intact.
209    """
210
211    for item in items:
212        if isinstance(item, collections.abc.Iterable) and not isinstance(
213            item, (str, bytes, bytearray)
214        ):
215            yield from flatten(*item)
216        else:
217            yield item
218