# Copyright 2024, The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import getpass import logging import multiprocessing.connection import os import pathlib import platform import threading import time from atest.metrics import clearcut_client from atest.proto import clientanalytics_pb2 from proto import edit_event_pb2 from watchdog.events import FileSystemEvent from watchdog.events import PatternMatchingEventHandler from watchdog.observers import Observer # Enum of the Clearcut log source defined under # /google3/wireless/android/play/playlog/proto/log_source_enum.proto LOG_SOURCE = 2524 DEFAULT_FLUSH_INTERVAL_SECONDS = 5 DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100 class ClearcutEventHandler(PatternMatchingEventHandler): def __init__( self, path: str, flush_interval_sec: int, single_events_size_threshold: int, is_dry_run: bool = False, cclient: clearcut_client.Clearcut | None = None, ): super().__init__(patterns=["*"], ignore_directories=True) self.root_monitoring_path = path self.flush_interval_sec = flush_interval_sec self.single_events_size_threshold = single_events_size_threshold self.is_dry_run = is_dry_run self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE) self.user_name = getpass.getuser() self.host_name = platform.node() self.source_root = os.environ.get("ANDROID_BUILD_TOP", "") self.pending_events = [] self._scheduled_log_thread = None self._pending_events_lock = threading.Lock() def on_moved(self, event: FileSystemEvent): self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE) def on_created(self, event: FileSystemEvent): self._log_edit_event(event, edit_event_pb2.EditEvent.CREATE) def on_deleted(self, event: FileSystemEvent): self._log_edit_event(event, edit_event_pb2.EditEvent.DELETE) def on_modified(self, event: FileSystemEvent): self._log_edit_event(event, edit_event_pb2.EditEvent.MODIFY) def flushall(self): logging.info("flushing all pending events.") if self._scheduled_log_thread: logging.info("canceling log thread") self._scheduled_log_thread.cancel() self._scheduled_log_thread = None self._log_clearcut_events() self.cclient.flush_events() def _log_edit_event( self, event: FileSystemEvent, edit_type: edit_event_pb2.EditEvent.EditType ): try: event_time = time.time() if self._is_hidden_file(pathlib.Path(event.src_path)): logging.debug("ignore hidden file: %s.", event.src_path) return if not self._is_under_git_project(pathlib.Path(event.src_path)): logging.debug( "ignore file %s which does not belong to a git project", event.src_path, ) return logging.info("%s: %s", event.event_type, event.src_path) event_proto = edit_event_pb2.EditEvent( user_name=self.user_name, host_name=self.host_name, source_root=self.source_root, ) event_proto.single_edit_event.CopyFrom( edit_event_pb2.EditEvent.SingleEditEvent( file_path=event.src_path, edit_type=edit_type ) ) with self._pending_events_lock: self.pending_events.append((event_proto, event_time)) if not self._scheduled_log_thread: logging.debug( "Scheduling thread to run in %d seconds", self.flush_interval_sec ) self._scheduled_log_thread = threading.Timer( self.flush_interval_sec, self._log_clearcut_events ) self._scheduled_log_thread.start() except Exception: logging.exception("Failed to log edit event.") def _is_hidden_file(self, file_path: pathlib.Path) -> bool: return any( part.startswith(".") for part in file_path.relative_to(self.root_monitoring_path).parts ) def _is_under_git_project(self, file_path: pathlib.Path) -> bool: root_path = pathlib.Path(self.root_monitoring_path).resolve() return any( root_path.joinpath(dir).joinpath('.git').exists() for dir in file_path.relative_to(root_path).parents ) def _log_clearcut_events(self): with self._pending_events_lock: self._scheduled_log_thread = None edit_events = self.pending_events self.pending_events = [] pending_events_size = len(edit_events) if pending_events_size > self.single_events_size_threshold: logging.info( "got %d events in %d seconds, sending aggregated events instead", pending_events_size, self.flush_interval_sec, ) aggregated_event_time = edit_events[0][1] aggregated_event_proto = edit_event_pb2.EditEvent( user_name=self.user_name, host_name=self.host_name, source_root=self.source_root, ) aggregated_event_proto.aggregated_edit_event.CopyFrom( edit_event_pb2.EditEvent.AggregatedEditEvent( num_edits=pending_events_size ) ) edit_events = [(aggregated_event_proto, aggregated_event_time)] if self.is_dry_run: logging.info("Sent %d edit events in dry run.", len(edit_events)) return for event_proto, event_time in edit_events: log_event = clientanalytics_pb2.LogEvent( event_time_ms=int(event_time * 1000), source_extension=event_proto.SerializeToString(), ) self.cclient.log(log_event) logging.info("sent %d edit events", len(edit_events)) def start( path: str, is_dry_run: bool = False, flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS, single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD, cclient: clearcut_client.Clearcut | None = None, pipe_sender: multiprocessing.connection.Connection | None = None, ): """Method to start the edit monitor. This is the entry point to start the edit monitor as a subprocess of the daemon manager. params: path: The root path to monitor cclient: The clearcut client to send the edit logs. conn: the sender of the pipe to communicate with the deamon manager. """ event_handler = ClearcutEventHandler( path, flush_interval_sec, single_events_size_threshold, is_dry_run, cclient) observer = Observer() logging.info("Starting observer on path %s.", path) observer.schedule(event_handler, path, recursive=True) observer.start() logging.info("Observer started.") if pipe_sender: pipe_sender.send("Observer started.") try: while True: time.sleep(1) finally: event_handler.flushall() observer.stop() observer.join() if pipe_sender: pipe_sender.close()