1# Copyright 2020 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""General purpose tools for running presubmit checks.""" 15 16import collections.abc 17from collections import Counter, defaultdict 18import logging 19import os 20from pathlib import Path 21import shlex 22import subprocess 23from typing import ( 24 Any, 25 Iterable, 26 Iterator, 27 Sequence, 28 Pattern, 29) 30 31from pw_cli.plural import plural 32from pw_cli.tool_runner import ToolRunner 33from pw_presubmit.presubmit_context import PRESUBMIT_CONTEXT 34 35_LOG: logging.Logger = logging.getLogger(__name__) 36 37 38def make_box(section_alignments: Sequence[str]) -> str: 39 indices = [i + 1 for i in range(len(section_alignments))] 40 top_sections = '{2}'.join('{1:{1}^{width%d}}' % i for i in indices) 41 mid_sections = '{5}'.join( 42 '{section%d:%s{width%d}}' % (i, section_alignments[i - 1], i) 43 for i in indices 44 ) 45 bot_sections = '{9}'.join('{8:{8}^{width%d}}' % i for i in indices) 46 47 return ''.join( 48 [ 49 '{0}', 50 *top_sections, 51 '{3}\n', 52 '{4}', 53 *mid_sections, 54 '{6}\n', 55 '{7}', 56 *bot_sections, 57 '{10}', 58 ] 59 ) 60 61 62def file_summary( 63 paths: Iterable[Path], 64 levels: int = 2, 65 max_lines: int = 12, 66 max_types: int = 3, 67 pad: str = ' ', 68 pad_start: str = ' ', 69 pad_end: str = ' ', 70) -> list[str]: 71 """Summarizes a list of files by the file types in each directory.""" 72 73 # Count the file types in each directory. 74 all_counts: dict[Any, Counter] = defaultdict(Counter) 75 76 for path in paths: 77 parent = path.parents[max(len(path.parents) - levels, 0)] 78 all_counts[parent][path.suffix] += 1 79 80 # If there are too many lines, condense directories with the fewest files. 81 if len(all_counts) > max_lines: 82 counts = sorted( 83 all_counts.items(), key=lambda item: -sum(item[1].values()) 84 ) 85 counts, others = ( 86 sorted(counts[: max_lines - 1]), 87 counts[max_lines - 1 :], 88 ) 89 counts.append( 90 ( 91 f'({plural(others, "other")})', 92 sum((c for _, c in others), Counter()), 93 ) 94 ) 95 else: 96 counts = sorted(all_counts.items()) 97 98 width = max(len(str(d)) + len(os.sep) for d, _ in counts) if counts else 0 99 width += len(pad_start) 100 101 # Prepare the output. 102 output = [] 103 for path, files in counts: 104 total = sum(files.values()) 105 del files[''] # Never display no-extension files individually. 106 107 if files: 108 extensions = files.most_common(max_types) 109 other_extensions = total - sum(count for _, count in extensions) 110 if other_extensions: 111 extensions.append(('other', other_extensions)) 112 113 types = ' (' + ', '.join(f'{c} {e}' for e, c in extensions) + ')' 114 else: 115 types = '' 116 117 root = f'{path}{os.sep}{pad_start}'.ljust(width, pad) 118 output.append(f'{root}{pad_end}{plural(total, "file")}{types}') 119 120 return output 121 122 123def relative_paths(paths: Iterable[Path], start: Path) -> Iterable[Path]: 124 """Returns relative Paths calculated with os.path.relpath.""" 125 for path in paths: 126 yield Path(os.path.relpath(path, start)) 127 128 129def exclude_paths( 130 exclusions: Iterable[Pattern[str]], 131 paths: Iterable[Path], 132 relative_to: Path | None = None, 133) -> Iterable[Path]: 134 """Excludes paths based on a series of regular expressions.""" 135 if relative_to: 136 137 def relpath(path): 138 return Path(os.path.relpath(path, relative_to)) 139 140 else: 141 142 def relpath(path): 143 return path 144 145 for path in paths: 146 if not any(e.search(relpath(path).as_posix()) for e in exclusions): 147 yield path 148 149 150def _truncate(value, length: int = 60) -> str: 151 value = str(value) 152 return (value[: length - 5] + '[...]') if len(value) > length else value 153 154 155def format_command(args: Sequence, kwargs: dict) -> tuple[str, str]: 156 attr = ', '.join(f'{k}={_truncate(v)}' for k, v in sorted(kwargs.items())) 157 return attr, ' '.join(shlex.quote(str(arg)) for arg in args) 158 159 160def log_run( 161 args, ignore_dry_run: bool = False, **kwargs 162) -> subprocess.CompletedProcess: 163 """Logs a command then runs it with subprocess.run. 164 165 Takes the same arguments as subprocess.run. The command is only executed if 166 dry-run is not enabled. 167 """ 168 ctx = PRESUBMIT_CONTEXT.get() 169 if ctx: 170 # Save the subprocess command args for pw build presubmit runner. 171 if not ignore_dry_run: 172 ctx.append_check_command(*args, **kwargs) 173 if ctx.dry_run and not ignore_dry_run: 174 # Return an empty CompletedProcess without actually running anything 175 # if dry-run mode is on. 176 empty_proc: subprocess.CompletedProcess = ( 177 subprocess.CompletedProcess('', 0) 178 ) 179 empty_proc.stdout = b'' 180 empty_proc.stderr = b'' 181 return empty_proc 182 _LOG.debug('[COMMAND] %s\n%s', *format_command(args, kwargs)) 183 return subprocess.run(args, **kwargs) 184 185 186class PresubmitToolRunner(ToolRunner): 187 """A simple ToolRunner that runs a process via `log_run()`.""" 188 189 @staticmethod 190 def _custom_args() -> Iterable[str]: 191 return ['pw_presubmit_ignore_dry_run'] 192 193 def _run_tool( 194 self, tool: str, args, pw_presubmit_ignore_dry_run=False, **kwargs 195 ) -> subprocess.CompletedProcess: 196 """Run the requested tool as a subprocess.""" 197 return log_run( 198 [tool, *args], 199 **kwargs, 200 ignore_dry_run=pw_presubmit_ignore_dry_run, 201 ) 202 203 204def flatten(*items) -> Iterator: 205 """Yields items from a series of items and nested iterables. 206 207 This function is used to flatten arbitrarily nested lists. str and bytes 208 are kept intact. 209 """ 210 211 for item in items: 212 if isinstance(item, collections.abc.Iterable) and not isinstance( 213 item, (str, bytes, bytearray) 214 ): 215 yield from flatten(*item) 216 else: 217 yield item 218