# Copyright (C) 2024 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """A module for background python log artifacts uploading.""" import argparse import functools from importlib import resources import logging import multiprocessing import os import pathlib import subprocess import sys from typing import Callable from atest import constants from atest.logstorage import logstorage_utils from atest.metrics import metrics from googleapiclient import errors from googleapiclient import http _ENABLE_ATEST_LOG_UPLOADING_ENV_KEY = 'ENABLE_ATEST_LOG_UPLOADING' class _SimpleUploadingClient: """A proxy class used to interact with the logstorage_utils module.""" def __init__(self, atest_run_id: str): self._atest_run_id = atest_run_id self._client = None self._client_legacy = None self._invocation_id = None self._workunit_id = None self._legacy_test_result_id = None self._invocation_data = None def initialize_invocation(self): """Initialize internal build clients and get invocation ID from AnTS.""" configuration = {} creds, self._invocation_data = logstorage_utils.do_upload_flow( configuration, {'atest_run_id': self._atest_run_id} ) self._client = logstorage_utils.BuildClient(creds) # Legacy test result ID is required when using AnTS' `testartifact` API # to upload test artifacts due to a limitation in the API, and we need # The legacy client to get the legacy ID. self._client_legacy = logstorage_utils.BuildClient( creds, api_version=constants.STORAGE_API_VERSION_LEGACY, url=constants.DISCOVERY_SERVICE_LEGACY, ) self._invocation_id = configuration[constants.INVOCATION_ID] self._workunit_id = configuration[constants.WORKUNIT_ID] self._legacy_test_result_id = ( self._client_legacy.client.testresult() .insert( buildId=self._invocation_data['primaryBuild']['buildId'], target=self._invocation_data['primaryBuild']['buildTarget'], attemptId='latest', body={ 'status': 'completePass', }, ) .execute()['id'] ) logging.debug( 'Initialized AnTS invocation: http://ab/%s', self._invocation_id ) def complete_invocation(self) -> None: """Set schedule state as complete to AnTS for the current invocation.""" self._invocation_data['schedulerState'] = 'completed' self._client.update_invocation(self._invocation_data) logging.debug( 'Finalized AnTS invocation: http://ab/%s', self._invocation_id ) def upload_artifact( self, resource_id: str, metadata: dict[str, str], artifact_path: pathlib.Path, num_of_retries, ) -> None: """Upload an artifact to AnTS with retries. Args: resource_id: The artifact's destination resource ID metadata: The metadata for the artifact. Invocation ID and work unit ID is not required in the input metadata dict as this method will add the values to it. artifact_path: The path of the artifact file num_of_retries: Number of retries when the upload request failed Raises: errors.HttpError: When the upload failed. """ metadata['invocationId'] = self._invocation_id metadata['workUnitId'] = self._workunit_id self._client.client.testartifact().update( resourceId=resource_id, invocationId=self._invocation_id, workUnitId=self._workunit_id, body=metadata, legacyTestResultId=self._legacy_test_result_id, media_body=http.MediaFileUpload(artifact_path), ).execute(num_retries=num_of_retries) class _LogUploadSession: """A class to handle log uploading to AnTS.""" def __init__( self, atest_run_id: str, upload_client: _SimpleUploadingClient = None ): self._upload_client = upload_client or _SimpleUploadingClient(atest_run_id) self._resource_ids = {} def __enter__(self): self._upload_client.initialize_invocation() return self def __exit__(self, exc_type, exc_val, exc_tb): self._upload_client.complete_invocation() @classmethod def _get_file_paths(cls, directory: pathlib.Path) -> list[pathlib.Path]: """Returns all the files under the given directory following symbolic links. Args: directory: The root directory path. Returns: A list of pathlib.Path objects representing the file paths. """ file_paths = [] with os.scandir(directory) as scan: for entry in scan: if entry.is_file(): file_paths.append(pathlib.Path(entry.path)) elif entry.is_dir(): file_paths.extend(cls._get_file_paths(entry)) return file_paths @staticmethod def _create_artifact_metadata(artifact_path: pathlib.Path) -> dict[str, str]: metadata = { 'name': artifact_path.name, } if artifact_path.suffix in ['.txt', '.log']: metadata['artifactType'] = 'HOST_LOG' metadata['contentType'] = 'text/plain' return metadata def upload_directory(self, artifacts_dir: pathlib.Path) -> None: """Upload all artifacts under a directory.""" logging.debug('Uploading artifact directory %s', artifacts_dir) for artifact_path in self._get_file_paths(artifacts_dir): self.upload_single_file(artifact_path) def upload_single_file(self, artifact_path: pathlib.Path) -> None: """Upload an single artifact.""" logging.debug('Uploading artifact path %s', artifact_path) file_upload_retires = 3 try: self._upload_client.upload_artifact( self._create_resource_id(artifact_path), self._create_artifact_metadata(artifact_path), artifact_path, file_upload_retires, ) except errors.HttpError as e: # Upload error may happen due to temporary network issue. We log down # an error but do stop the upload loop so that other files may gets # uploaded when the network recover. logging.error('Failed to upload file %s with error: %s', artifact_path, e) def _create_resource_id(self, artifact_path: pathlib.Path) -> str: """Create a unique resource id for a file. Args: artifact_path: artifact file path Returns: A unique resource ID derived from the file name. If the file name has appeared before, an extra string will be inserted between the file name stem and suffix to make it unique. """ count = self._resource_ids.get(artifact_path.name, 0) + 1 self._resource_ids[artifact_path.name] = count return ( artifact_path.name if count == 1 else f'{artifact_path.stem}_{count}{artifact_path.suffix}' ) @functools.cache def is_uploading_logs(gcert_checker: Callable[[], bool] = None) -> bool: """Determines whether log uploading is happening in the current run.""" if os.environ.get(_ENABLE_ATEST_LOG_UPLOADING_ENV_KEY, 'true').lower() in [ 'false', '0', ]: return False if not logstorage_utils.is_credential_available(): return False # Checks whether gcert is available and not about to expire. if gcert_checker is None: gcert_checker = ( lambda: subprocess.run( ['which', 'gcertstatus'], capture_output=True, check=False, ).returncode == 0 and subprocess.run( ['gcertstatus', '--check_remaining=6m'], capture_output=True, check=False, ).returncode == 0 ) return gcert_checker() def upload_logs_detached(logs_dir: pathlib.Path): """Upload logs to AnTS in a detached process.""" if not is_uploading_logs(): return assert logs_dir, 'artifacts_dir cannot be None.' assert logs_dir.as_posix(), 'The path of artifacts_dir should not be empty.' def _start_upload_process(): # We need to fock a background process instead of calling Popen with # start_new_session=True because we want to make sure the atest_log_uploader # resource binary is deleted after execution. if os.fork() != 0: return with resources.as_file( resources.files('atest').joinpath('atest_log_uploader') ) as uploader_path: # TODO: Explore whether it's possible to package the binary with # executable permission. os.chmod(uploader_path, 0o755) timeout = 60 * 60 * 24 # 1 day # We need to call atest_log_uploader as a binary so that the python # environment can be properly loaded. process = subprocess.run( [uploader_path.as_posix(), logs_dir.as_posix(), metrics.get_run_id()], timeout=timeout, capture_output=True, check=False, ) if process.returncode: logging.error('Failed to run log upload process: %s', process) proc = multiprocessing.Process(target=_start_upload_process) proc.start() proc.join() def _configure_logging(log_dir: str) -> None: """Configure the logger.""" log_fmat = '%(asctime)s %(filename)s:%(lineno)s:%(levelname)s: %(message)s' date_fmt = '%Y-%m-%d %H:%M:%S' log_path = os.path.join(log_dir, 'atest_log_uploader.log') logging.getLogger('').handlers = [] logging.basicConfig( filename=log_path, level=logging.DEBUG, format=log_fmat, datefmt=date_fmt ) def _redirect_stdout_stderr() -> None: """Redirect stdout and stderr to logger.""" class _StreamToLogger: def __init__(self, logger, log_level=logging.INFO): self._logger = logger self._log_level = log_level def write(self, buf): self._logger.log(self._log_level, buf) def flush(self): pass logger = logging.getLogger('') sys.stdout = _StreamToLogger(logger, logging.INFO) sys.stderr = _StreamToLogger(logger, logging.ERROR) def _main() -> None: """The main method to be executed when executing this module as a binary.""" arg_parser = argparse.ArgumentParser( description='Internal tool for uploading test artifacts to AnTS.', add_help=True, ) arg_parser.add_argument( 'artifacts_dir', help='Root directory of the test artifacts.' ) arg_parser.add_argument('atest_run_id', help='The Atest run ID.') args = arg_parser.parse_args() _configure_logging(args.artifacts_dir) _redirect_stdout_stderr() with _LogUploadSession(args.atest_run_id) as artifact_upload_session: artifact_upload_session.upload_directory(pathlib.Path(args.artifacts_dir)) if __name__ == '__main__': _main()