# Copyright 2018 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from pathlib import Path import shutil from typing import Callable, Iterable, Union, List, Optional import os import re import sys from synthtool import _tracked_paths from synthtool.log import logger from synthtool import metadata PathOrStr = Union[str, Path] ListOfPathsOrStrs = Iterable[Union[str, Path]] class MissingSourceError(Exception): pass def _expand_paths(paths: ListOfPathsOrStrs, root: PathOrStr = None) -> Iterable[Path]: """Given a list of globs/paths, expands them into a flat sequence, expanding globs as necessary.""" if paths is None: return [] if isinstance(paths, (str, Path)): paths = [paths] if root is None: root = Path(".") # ensure root is a path root = Path(root) # record name of synth script so we don't try to do transforms on it synth_script_name = sys.argv[0] for path in paths: if isinstance(path, Path): if path.is_absolute(): anchor = Path(path.anchor) remainder = str(path.relative_to(path.anchor)) yield from anchor.glob(remainder) else: yield from root.glob(str(path)) else: yield from ( p for p in root.glob(path) if p.absolute() != Path(synth_script_name).absolute() ) def _filter_files(paths: Iterable[Path]) -> Iterable[Path]: """Returns only the paths that are files (no directories).""" return (path for path in paths if path.is_file() and os.access(path, os.W_OK)) def _merge_file( source_path: Path, dest_path: Path, merge: Callable[[str, str, Path], str] ): """ Writes to the destination the result of merging the source with the existing destination contents, using the given merge function. The merge function must take three arguments: the source contents, the old destination contents, and a Path to the file to be written. """ with source_path.open("r") as source_file: source_text = source_file.read() with dest_path.open("r+") as dest_file: dest_text = dest_file.read() final_text = merge(source_text, dest_text, dest_path) # use the source file's file permission mode os.chmod(dest_path, os.stat(source_path).st_mode) if final_text != dest_text: dest_file.seek(0) dest_file.write(final_text) dest_file.truncate() else: dest_path.touch() def _copy_dir_to_existing_dir( source: Path, destination: Path, excludes: ListOfPathsOrStrs = None, merge: Callable[[str, str, Path], str] = None, ) -> bool: """ copies files over existing files to an existing directory this function does not copy empty directories. Returns: True if any files were copied, False otherwise. """ copied = False if not excludes: excludes = [] for root, _, files in os.walk(source): for name in files: rel_path = str(Path(root).relative_to(source)) dest_dir = destination / rel_path dest_path = dest_dir / name exclude = [ e for e in excludes if ( Path(e) == _tracked_paths.relativize(root) or Path(e) == _tracked_paths.relativize(Path(root) / name) ) ] if not exclude: os.makedirs(str(dest_dir), exist_ok=True) source_path = Path(os.path.join(root, name)) if merge is not None and dest_path.is_file(): try: _merge_file(source_path, dest_path, merge) except Exception: logger.exception( "_merge_file failed for %s, fall back to copy", source_path, ) shutil.copy2(str(source_path), str(dest_path)) else: shutil.copy2(str(source_path), str(dest_path)) copied = True return copied def dont_overwrite( patterns: ListOfPathsOrStrs, ) -> Callable[[str, str, Path], str]: """Returns a merge function that doesn't overwrite the specified files. Pass the return value to move() or copy() to avoid overwriting existing files. """ def merge(source_text: str, destinaton_text: str, file_path: Path) -> str: for pattern in patterns: if file_path.match(str(pattern)): logger.debug(f"Preserving existing contents of {file_path}.") return destinaton_text return source_text return merge def move( sources: ListOfPathsOrStrs, destination: PathOrStr = None, excludes: ListOfPathsOrStrs = None, merge: Callable[[str, str, Path], str] = None, required: bool = False, ) -> bool: """ copy file(s) at source to current directory, preserving file mode. Args: sources (ListOfPathsOrStrs): Glob pattern(s) to copy destination (PathOrStr): Destination folder for copied files excludes (ListOfPathsOrStrs): Glob pattern(s) of files to skip merge (Callable[[str, str, Path], str]): Callback function for merging files if there is an existing file. required (bool): If required and no source files are copied, throws a MissingSourceError Returns: True if any files were copied, False otherwise. """ copied = False for excluded_pattern in excludes or []: metadata.add_pattern_excluded_during_copy(str(excluded_pattern)) for source in _expand_paths(sources): if destination is None: canonical_destination = _tracked_paths.relativize(source) else: canonical_destination = Path(destination) if excludes: excludes = [ _tracked_paths.relativize(e) for e in _expand_paths(excludes, source) ] else: excludes = [] if source.is_dir(): copied = copied or _copy_dir_to_existing_dir( source, canonical_destination, excludes=excludes, merge=merge ) elif source not in excludes: # copy individual file if merge is not None and canonical_destination.is_file(): try: _merge_file(source, canonical_destination, merge) except Exception: logger.exception( "_merge_file failed for %s, fall back to copy", source ) shutil.copy2(source, canonical_destination) else: shutil.copy2(source, canonical_destination) copied = True if not copied: if required: raise MissingSourceError( f"No files in sources {sources} were copied. Does the source " f"contain files?" ) else: logger.warning( f"No files in sources {sources} were copied. Does the source " f"contain files?" ) return copied def _replace_in_file(path, expr, replacement): try: with path.open("r+") as fh: return _replace_in_file_handle(fh, expr, replacement) except UnicodeDecodeError: pass # It's a binary file. Try again with a binary regular expression. flags = expr.flags & ~re.UNICODE expr = re.compile(expr.pattern.encode(), flags) with path.open("rb+") as fh: return _replace_in_file_handle(fh, expr, replacement.encode()) def _replace_in_file_handle(fh, expr, replacement): content = fh.read() content, count = expr.subn(replacement, content) # Don't bother writing the file if we didn't change # anything. if count: fh.seek(0) fh.write(content) fh.truncate() return count def replace( sources: ListOfPathsOrStrs, before: str, after: str, flags: int = re.MULTILINE ) -> int: """Replaces occurrences of before with after in all the given sources. Returns: The number of times the text was found and replaced across all files. """ expr = re.compile(before, flags=flags or 0) paths = _filter_files(_expand_paths(sources, ".")) if not paths: logger.warning(f"No files were found in sources {sources} for replace()") count_replaced = 0 for path in paths: replaced = _replace_in_file(path, expr, after) count_replaced += replaced if replaced: logger.info(f"Replaced {before!r} in {path}.") if not count_replaced: logger.warning( f"No replacements made in {sources} for pattern {before}, maybe " "replacement is no longer needed?" ) return count_replaced def get_staging_dirs( default_version: Optional[str] = None, staging_path: Optional[str] = None ) -> List[Path]: """Returns the list of directories, one per version, copied from https://github.com/googleapis/googleapis-gen. Will return in lexical sorting order with the exception of the default_version which will be last (if specified). Args: default_version: the default version of the API. The directory for this version will be the last item in the returned list if specified. staging_path: the path to the staging directory. Returns: the empty list if no file were copied. """ if staging_path: staging = Path(staging_path) else: staging = Path("owl-bot-staging") if staging.is_dir(): # Collect the subdirectories of the staging directory. versions = [v.name for v in staging.iterdir() if v.is_dir()] # Reorder the versions so the default version always comes last. versions = [v for v in versions if v != default_version] versions.sort() if default_version is not None: versions += [default_version] dirs = [staging / v for v in versions] for dir in dirs: _tracked_paths.add(dir) return dirs else: return [] def remove_staging_dirs(): """Removes all the staging directories.""" staging = Path("owl-bot-staging") if staging.is_dir(): shutil.rmtree(staging)