xref: /aosp_15_r20/external/toolchain-utils/llvm_tools/patch_utils.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# Copyright 2022 The ChromiumOS Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Provides patch utilities for PATCHES.json file handling."""
6
7import collections
8import contextlib
9import dataclasses
10import json
11from pathlib import Path
12import re
13import subprocess
14import sys
15from typing import (
16    Any,
17    Callable,
18    Dict,
19    IO,
20    Iterable,
21    List,
22    Optional,
23    Tuple,
24    Union,
25)
26
27import atomic_write_file
28
29
30APPLIED_RE = re.compile(r"^Applying: (.+) \(#(\d+)\)$")
31CHECKED_FILE_RE = re.compile(r"^checking file\s+(.*)$")
32HUNK_FAILED_RE = re.compile(r"^Hunk #(\d+) FAILED at.*")
33HUNK_HEADER_RE = re.compile(r"^@@\s+-(\d+),(\d+)\s+\+(\d+),(\d+)\s+@@")
34HUNK_END_RE = re.compile(r"^--\s*$")
35PATCH_SUBFILE_HEADER_RE = re.compile(r"^\+\+\+ [ab]/(.*)$")
36
37CHROMEOS_PATCHES_JSON_PACKAGES = (
38    "dev-util/lldb-server",
39    "sys-devel/llvm",
40    "sys-libs/compiler-rt",
41    "sys-libs/libcxx",
42    "sys-libs/llvm-libunwind",
43    "sys-libs/scudo",
44)
45
46
47@dataclasses.dataclass
48class Hunk:
49    """Represents a patch Hunk."""
50
51    hunk_id: int
52    """Hunk ID for the current file."""
53    orig_start: int
54    orig_hunk_len: int
55    patch_start: int
56    patch_hunk_len: int
57    patch_hunk_lineno_begin: int
58    patch_hunk_lineno_end: Optional[int]
59
60
61def parse_patch_stream(patch_stream: IO[str]) -> Dict[str, List[Hunk]]:
62    """Parse a patch file-like into Hunks.
63
64    Args:
65        patch_stream: A IO stream formatted like a git patch file.
66
67    Returns:
68        A dictionary mapping filenames to lists of Hunks present
69        in the patch stream.
70    """
71
72    current_filepath = None
73    current_hunk_id = 0
74    current_hunk = None
75    out = collections.defaultdict(list)
76    for lineno, line in enumerate(patch_stream.readlines()):
77        subfile_header = PATCH_SUBFILE_HEADER_RE.match(line)
78        if subfile_header:
79            current_filepath = subfile_header.group(1)
80            if not current_filepath:
81                raise RuntimeError("Could not get file header in patch stream")
82            # Need to reset the hunk id, as it's per-file.
83            current_hunk_id = 0
84            continue
85        hunk_header = HUNK_HEADER_RE.match(line)
86        if hunk_header:
87            if not current_filepath:
88                raise RuntimeError(
89                    "Parsed hunk before file header in patch stream"
90                )
91            if current_hunk:
92                # Already parsing a hunk
93                current_hunk.patch_hunk_lineno_end = lineno
94            current_hunk_id += 1
95            current_hunk = Hunk(
96                hunk_id=current_hunk_id,
97                orig_start=int(hunk_header.group(1)),
98                orig_hunk_len=int(hunk_header.group(2)),
99                patch_start=int(hunk_header.group(3)),
100                patch_hunk_len=int(hunk_header.group(4)),
101                patch_hunk_lineno_begin=lineno + 1,
102                patch_hunk_lineno_end=None,
103            )
104            out[current_filepath].append(current_hunk)
105            continue
106        if current_hunk and HUNK_END_RE.match(line):
107            current_hunk.patch_hunk_lineno_end = lineno
108    return out
109
110
111def parse_failed_patch_output(text: str) -> Dict[str, List[int]]:
112    current_file = None
113    failed_hunks = collections.defaultdict(list)
114    for eline in text.split("\n"):
115        checked_file_match = CHECKED_FILE_RE.match(eline)
116        if checked_file_match:
117            current_file = checked_file_match.group(1)
118            continue
119        failed_match = HUNK_FAILED_RE.match(eline)
120        if failed_match:
121            if not current_file:
122                raise ValueError("Input stream was not parsable")
123            hunk_id = int(failed_match.group(1))
124            failed_hunks[current_file].append(hunk_id)
125        else:
126            failed_applied_patches = APPLIED_RE.match(eline)
127            if failed_applied_patches:
128                current_file = failed_applied_patches.group(1)
129                hunk_id = int(failed_applied_patches.group(2))
130                failed_hunks[current_file].append(hunk_id)
131    return failed_hunks
132
133
134@dataclasses.dataclass(frozen=True)
135class PatchResult:
136    """Result of a patch application."""
137
138    succeeded: bool
139    failed_hunks: Dict[str, List[Hunk]] = dataclasses.field(
140        default_factory=dict
141    )
142
143    def __bool__(self):
144        return self.succeeded
145
146    def failure_info(self) -> str:
147        if self.succeeded:
148            return ""
149        s = ""
150        for file, hunks in self.failed_hunks.items():
151            s += f"{file}:\n"
152            for h in hunks:
153                s += (
154                    f"Lines {h.orig_start} to "
155                    f"{h.orig_start + h.orig_hunk_len}\n"
156                )
157            s += "--------------------\n"
158        return s
159
160
161def git_apply(patch_path: Path) -> List[Union[str, Path]]:
162    """Patch a patch file using 'git apply'."""
163    return ["git", "apply", patch_path]
164
165
166def git_am(patch_path: Path) -> List[Union[str, Path]]:
167    """Patch a patch file using 'git am'."""
168    return ["git", "am", "--3way", patch_path]
169
170
171def gnu_patch(root_dir: Path, patch_path: Path) -> List[Union[str, Path]]:
172    """Patch a patch file using GNU 'patch'."""
173    return [
174        "patch",
175        "-d",
176        root_dir.absolute(),
177        "-f",
178        "-E",
179        "-p1",
180        "--no-backup-if-mismatch",
181        "-i",
182        patch_path,
183    ]
184
185
186@dataclasses.dataclass
187class PatchEntry:
188    """Object mapping of an entry of PATCHES.json."""
189
190    workdir: Path
191    """Storage location for the patches."""
192    metadata: Optional[Dict[str, Any]]
193    platforms: Optional[List[str]]
194    rel_patch_path: str
195    version_range: Optional[Dict[str, Optional[int]]]
196    verify_workdir: bool = True
197    """Don't verify the workdir exists. Used for testing."""
198    _parsed_hunks = None
199
200    def __post_init__(self):
201        if self.verify_workdir and not self.workdir.is_dir():
202            raise ValueError(f"workdir {self.workdir} is not a directory")
203
204    @classmethod
205    def from_dict(cls, workdir: Path, data: Dict[str, Any]):
206        """Instatiate from a dictionary.
207
208        Dictionary must have at least the following key:
209        {
210            'rel_patch_path': '<relative patch path to workdir>',
211        }
212
213        Returns:
214            A new PatchEntry.
215        """
216        return cls(
217            workdir,
218            data.get("metadata"),
219            data.get("platforms"),
220            data["rel_patch_path"],
221            data.get("version_range"),
222        )
223
224    def to_dict(self) -> Dict[str, Any]:
225        # We sort the metadata so that it doesn't matter
226        # how it was passed to patch_utils.
227        if self.metadata is None:
228            sorted_metadata = None
229        else:
230            sorted_metadata = dict(
231                sorted(self.metadata.items(), key=lambda x: x[0])
232            )
233        out: Dict[str, Any] = {
234            "metadata": sorted_metadata,
235        }
236        if self.platforms:
237            # To match patch_sync, only serialized when
238            # non-empty and non-null.
239            out["platforms"] = sorted(self.platforms)
240        out.update(
241            {
242                "rel_patch_path": self.rel_patch_path,
243                "version_range": self.version_range,
244            }
245        )
246        return out
247
248    def parsed_hunks(self) -> Dict[str, List[Hunk]]:
249        # Minor caching here because IO is slow.
250        if not self._parsed_hunks:
251            with self.patch_path().open(encoding="utf-8") as f:
252                self._parsed_hunks = parse_patch_stream(f)
253        return self._parsed_hunks
254
255    def patch_path(self) -> Path:
256        return self.workdir / self.rel_patch_path
257
258    def can_patch_version(self, svn_version: int) -> bool:
259        """Is this patch meant to apply to `svn_version`?"""
260        # Sometimes the key is there, but it's set to None.
261        if not self.version_range:
262            return True
263        from_v = self.version_range.get("from") or 0
264        until_v = self.version_range.get("until")
265        if until_v is None:
266            until_v = sys.maxsize
267        return from_v <= svn_version < until_v
268
269    def apply(
270        self,
271        root_dir: Path,
272        patch_cmd: Optional[Callable] = None,
273        extra_args: Optional[List[str]] = None,
274    ) -> PatchResult:
275        """Apply a patch to a given directory."""
276        # Cmd to apply a patch in the src unpack path.
277        abs_patch_path = self.patch_path().absolute()
278        if not abs_patch_path.is_file():
279            raise RuntimeError(
280                f"Cannot apply: patch {abs_patch_path} is not a file"
281            )
282
283        if not patch_cmd or patch_cmd is gnu_patch:
284            cmd = gnu_patch(root_dir, abs_patch_path) + (extra_args or [])
285        else:
286            cmd = patch_cmd(abs_patch_path) + (extra_args or [])
287
288        try:
289            subprocess.run(
290                cmd, encoding="utf-8", check=True, stdout=subprocess.PIPE
291            )
292        except subprocess.CalledProcessError as e:
293            parsed_hunks = self.parsed_hunks()
294            failed_hunks_id_dict = parse_failed_patch_output(e.stdout)
295            failed_hunks = {}
296            if patch_cmd is gnu_patch:
297                for path, failed_hunk_ids in failed_hunks_id_dict.items():
298                    hunks_for_file = parsed_hunks[path]
299                    failed_hunks[path] = [
300                        hunk
301                        for hunk in hunks_for_file
302                        if hunk.hunk_id in failed_hunk_ids
303                    ]
304            elif failed_hunks_id_dict:
305                # using git am
306                failed_hunks = parsed_hunks
307
308            return PatchResult(succeeded=False, failed_hunks=failed_hunks)
309        return PatchResult(succeeded=True)
310
311    def test_apply(
312        self, root_dir: Path, patch_cmd: Optional[Callable] = None
313    ) -> PatchResult:
314        """Dry run applying a patch to a given directory.
315
316        When using gnu_patch, this will pass --dry-run.
317        When using git_am or git_apply, this will instead
318        use git_apply with --summary.
319        """
320        if patch_cmd is git_am or patch_cmd is git_apply:
321            # There is no dry run option for git am,
322            # so we use git apply for test.
323            return self.apply(root_dir, git_apply, ["--summary"])
324        if patch_cmd is gnu_patch or patch_cmd is None:
325            return self.apply(root_dir, patch_cmd, ["--dry-run"])
326        raise ValueError(f"No such patch command: {patch_cmd.__name__}.")
327
328    def title(self) -> str:
329        if not self.metadata:
330            return ""
331        return self.metadata.get("title", "")
332
333
334def patch_applies_after(
335    version_range: Optional[Dict[str, Optional[int]]], svn_version: int
336) -> bool:
337    """Does this patch apply after `svn_version`?"""
338    if not version_range:
339        return True
340    until = version_range.get("until")
341    before_svn_version = until is not None and svn_version > until
342    return not before_svn_version
343
344
345@dataclasses.dataclass(frozen=True)
346class PatchInfo:
347    """Holds info for a round of patch applications."""
348
349    # str types are legacy. Patch lists should
350    # probably be PatchEntries,
351    applied_patches: List[PatchEntry]
352    failed_patches: List[PatchEntry]
353    # Can be deleted once legacy code is removed.
354    non_applicable_patches: List[PatchEntry]
355    # Can be deleted once legacy code is removed.
356    disabled_patches: List[str]
357    # Can be deleted once legacy code is removed.
358    removed_patches: List[str]
359    # Can be deleted once legacy code is removed.
360    modified_metadata: Optional[str]
361
362    def _asdict(self):
363        return dataclasses.asdict(self)
364
365
366def json_to_patch_entries(workdir: Path, json_fd: IO[str]) -> List[PatchEntry]:
367    """Convert a json IO object to List[PatchEntry].
368
369    Examples:
370        >>> f = open('PATCHES.json')
371        >>> patch_entries = json_to_patch_entries(Path(), f)
372    """
373    return [PatchEntry.from_dict(workdir, d) for d in json.load(json_fd)]
374
375
376def json_str_to_patch_entries(workdir: Path, json_str: str) -> List[PatchEntry]:
377    """Convert a json IO object to List[PatchEntry].
378
379    Examples:
380        >>> f = open('PATCHES.json').read()
381        >>> patch_entries = json_str_to_patch_entries(Path(), f)
382    """
383    return [PatchEntry.from_dict(workdir, d) for d in json.loads(json_str)]
384
385
386def _print_failed_patch(pe: PatchEntry, failed_hunks: Dict[str, List[Hunk]]):
387    """Print information about a single failing PatchEntry.
388
389    Args:
390        pe: A PatchEntry that failed.
391        failed_hunks: Hunks for pe which failed as dict:
392          filepath: [Hunk...]
393    """
394    print(f"Could not apply {pe.rel_patch_path}: {pe.title()}", file=sys.stderr)
395    for fp, hunks in failed_hunks.items():
396        print(f"{fp}:", file=sys.stderr)
397        for h in hunks:
398            print(
399                f"- {pe.rel_patch_path} "
400                f"l:{h.patch_hunk_lineno_begin}...{h.patch_hunk_lineno_end}",
401                file=sys.stderr,
402            )
403
404
405def apply_all_from_json(
406    svn_version: int,
407    llvm_src_dir: Path,
408    patches_json_fp: Path,
409    patch_cmd: Optional[Callable] = None,
410    continue_on_failure: bool = False,
411) -> PatchInfo:
412    """Attempt to apply some patches to a given LLVM source tree.
413
414    This relies on a PATCHES.json file to be the primary way
415    the patches are applied.
416
417    Args:
418        svn_version: LLVM Subversion revision to patch.
419        llvm_src_dir: llvm-project root-level source directory to patch.
420        patches_json_fp: Filepath to the PATCHES.json file.
421        patch_cmd: The function to use when actually applying the patch.
422        continue_on_failure: Skip any patches which failed to apply,
423          rather than throw an Exception.
424    """
425    with patches_json_fp.open(encoding="utf-8") as f:
426        patches = json_to_patch_entries(patches_json_fp.parent, f)
427    skipped_patches = []
428    failed_patches = []
429    applied_patches = []
430    for pe in patches:
431        applied, failed_hunks = apply_single_patch_entry(
432            svn_version, llvm_src_dir, pe, patch_cmd
433        )
434        if applied:
435            applied_patches.append(pe)
436            continue
437        if failed_hunks is not None:
438            if continue_on_failure:
439                failed_patches.append(pe)
440                continue
441            else:
442                _print_failed_patch(pe, failed_hunks)
443                raise RuntimeError(
444                    "failed to apply patch " f"{pe.patch_path()}: {pe.title()}"
445                )
446        # Didn't apply, didn't fail, it was skipped.
447        skipped_patches.append(pe)
448    return PatchInfo(
449        non_applicable_patches=skipped_patches,
450        applied_patches=applied_patches,
451        failed_patches=failed_patches,
452        disabled_patches=[],
453        removed_patches=[],
454        modified_metadata=None,
455    )
456
457
458def apply_single_patch_entry(
459    svn_version: int,
460    llvm_src_dir: Path,
461    pe: PatchEntry,
462    patch_cmd: Optional[Callable] = None,
463    ignore_version_range: bool = False,
464) -> Tuple[bool, Optional[Dict[str, List[Hunk]]]]:
465    """Try to apply a single PatchEntry object.
466
467    Returns:
468        Tuple where the first element indicates whether the patch applied, and
469        the second element is a faild hunk mapping from file name to lists of
470        hunks (if the patch didn't apply).
471    """
472    # Don't apply patches outside of the version range.
473    if not ignore_version_range and not pe.can_patch_version(svn_version):
474        return False, None
475    # Test first to avoid making changes.
476    test_application = pe.test_apply(llvm_src_dir, patch_cmd)
477    if not test_application:
478        return False, test_application.failed_hunks
479    # Now actually make changes.
480    application_result = pe.apply(llvm_src_dir, patch_cmd)
481    if not application_result:
482        # This should be very rare/impossible.
483        return False, application_result.failed_hunks
484    return True, None
485
486
487def is_git_dirty(git_root_dir: Path) -> bool:
488    """Return whether the given git directory has uncommitted changes."""
489    if not git_root_dir.is_dir():
490        raise ValueError(f"git_root_dir {git_root_dir} is not a directory")
491    cmd = ["git", "ls-files", "-m", "--other", "--exclude-standard"]
492    return (
493        subprocess.run(
494            cmd,
495            stdout=subprocess.PIPE,
496            check=True,
497            cwd=git_root_dir,
498            encoding="utf-8",
499        ).stdout
500        != ""
501    )
502
503
504def clean_src_tree(src_path):
505    """Cleans the source tree of the changes made in 'src_path'."""
506
507    reset_src_tree_cmd = ["git", "-C", src_path, "reset", "HEAD", "--hard"]
508
509    subprocess.run(reset_src_tree_cmd, check=True)
510
511    clean_src_tree_cmd = ["git", "-C", src_path, "clean", "-fd"]
512
513    subprocess.run(clean_src_tree_cmd, check=True)
514
515
516@contextlib.contextmanager
517def git_clean_context(git_root_dir: Path):
518    """Cleans up a git directory when the context exits."""
519    if is_git_dirty(git_root_dir):
520        raise RuntimeError("Cannot setup clean context; git_root_dir is dirty")
521    try:
522        yield
523    finally:
524        clean_src_tree(git_root_dir)
525
526
527def _write_json_changes(
528    patches: List[Dict[str, Any]], file_io: IO[str], indent_len=2
529):
530    """Write JSON changes to file, does not acquire new file lock."""
531    json.dump(patches, file_io, indent=indent_len, separators=(",", ": "))
532    # Need to add a newline as json.dump omits it.
533    file_io.write("\n")
534
535
536def predict_indent(patches_lines: List[str]) -> int:
537    """Given file lines, predict and return the max indentation unit."""
538    indents = [len(x) - len(x.lstrip(" ")) for x in patches_lines]
539    if all(x % 4 == 0 for x in indents):
540        return 4
541    if all(x % 2 == 0 for x in indents):
542        return 2
543    if all(x == 0 for x in indents):
544        return 0
545    return 1
546
547
548def update_version_ranges(
549    svn_version: int,
550    llvm_src_dir: Path,
551    patches_json_fp: Path,
552    patch_cmd: Optional[Callable] = None,
553) -> PatchInfo:
554    """Reduce the version ranges of failing patches.
555
556    Patches which fail to apply will have their 'version_range.until'
557    field reduced to the passed in svn_version.
558
559    Modifies the contents of patches_json_fp.
560
561    Args:
562        svn_version: LLVM revision number.
563        llvm_src_dir: llvm-project directory path.
564        patches_json_fp: Filepath to the PATCHES.json file.
565        patch_cmd: option to apply patch.
566
567    Returns:
568        PatchInfo for applied and disabled patches.
569    """
570    with patches_json_fp.open(encoding="utf-8") as f:
571        contents = f.read()
572    indent_len = predict_indent(contents.splitlines())
573    patch_entries = json_str_to_patch_entries(
574        patches_json_fp.parent,
575        contents,
576    )
577    modified_entries, applied_patches = update_version_ranges_with_entries(
578        svn_version, llvm_src_dir, patch_entries, patch_cmd
579    )
580    with atomic_write_file.atomic_write(patches_json_fp, encoding="utf-8") as f:
581        _write_json_changes(
582            [p.to_dict() for p in patch_entries], f, indent_len=indent_len
583        )
584    for entry in modified_entries:
585        print(
586            f"Stopped applying {entry.rel_patch_path} ({entry.title()}) "
587            f"for r{svn_version}"
588        )
589    return PatchInfo(
590        non_applicable_patches=[],
591        applied_patches=applied_patches,
592        failed_patches=[],
593        disabled_patches=[p.rel_patch_path for p in modified_entries],
594        removed_patches=[],
595        modified_metadata=str(patches_json_fp) if modified_entries else None,
596    )
597
598
599def update_version_ranges_with_entries(
600    svn_version: int,
601    llvm_src_dir: Path,
602    patch_entries: Iterable[PatchEntry],
603    patch_cmd: Optional[Callable] = None,
604) -> Tuple[List[PatchEntry], List[PatchEntry]]:
605    """Test-able helper for UpdateVersionRanges.
606
607    Args:
608        svn_version: LLVM revision number.
609        llvm_src_dir: llvm-project directory path.
610        patch_entries: PatchEntry objects to modify.
611        patch_cmd: The function to use when actually applying the patch.
612
613    Returns:
614        Tuple of (modified entries, applied patches)
615
616    Post:
617        Modifies patch_entries in place.
618    """
619    modified_entries: List[PatchEntry] = []
620    applied_patches: List[PatchEntry] = []
621    active_patches = (
622        pe for pe in patch_entries if pe.can_patch_version(svn_version)
623    )
624    with git_clean_context(llvm_src_dir):
625        for pe in active_patches:
626            test_result = pe.test_apply(llvm_src_dir, patch_cmd)
627            if not test_result:
628                if pe.version_range is None:
629                    pe.version_range = {}
630                pe.version_range["until"] = svn_version
631                modified_entries.append(pe)
632            else:
633                # We have to actually apply the patch so that future patches
634                # will stack properly.
635                if not pe.apply(llvm_src_dir, patch_cmd).succeeded:
636                    raise RuntimeError(
637                        "Could not apply patch that dry ran successfully"
638                    )
639                applied_patches.append(pe)
640
641    return modified_entries, applied_patches
642
643
644def remove_old_patches(svn_version: int, patches_json: Path) -> List[Path]:
645    """Remove patches that don't and will never apply for the future.
646
647    Patches are determined to be "old" via the "is_old" method for
648    each patch entry.
649
650    Args:
651        svn_version: LLVM SVN version.
652        patches_json: Location of PATCHES.json.
653
654    Returns:
655        A list of all patch paths removed from PATCHES.json.
656    """
657    contents = patches_json.read_text(encoding="utf-8")
658    indent_len = predict_indent(contents.splitlines())
659
660    still_new = []
661    removed_patches = []
662    patches_parent = patches_json.parent
663    for entry in json.loads(contents):
664        if patch_applies_after(entry.get("version_range"), svn_version):
665            still_new.append(entry)
666        else:
667            removed_patches.append(patches_parent / entry["rel_patch_path"])
668
669    with atomic_write_file.atomic_write(patches_json, encoding="utf-8") as f:
670        _write_json_changes(still_new, f, indent_len=indent_len)
671
672    return removed_patches
673