#!/usr/bin/env python # Copyright 2023 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """Launcher and Connector for mock emulator""" import json import os from pathlib import Path from typing import Any import time from pw_emu.core import ( Connector, Launcher, InvalidProperty, InvalidPropertyPath, ) # mock emulator script _mock_emu = [ 'python', os.path.join(Path(os.path.dirname(__file__)).resolve(), 'mock_emu.py'), ] def wait_for_file_size( path: os.PathLike | str, size: int, timeout: int = 5 ) -> None: deadline = time.monotonic() + timeout while not os.path.exists(path): if time.monotonic() > deadline: break time.sleep(0.1) while os.path.getsize(path) < size: if time.monotonic() > deadline: break time.sleep(0.1) class MockEmuLauncher(Launcher): """Launcher for mock emulator""" def __init__( self, config_path: Path, ): super().__init__('mock-emu', config_path) self._wdir: Path | None = None self.log = True def _pre_start( self, target: str, file: Path | None = None, pause: bool = False, debug: bool = False, args: str | None = None, ) -> list[str]: channels = [] if self._config.get_target(['pre-start-cmds']): self._handles.add_channel_tcp('test_subst_tcp', 'localhost', 1234) self._handles.add_channel_pty('test_subst_pty', 'pty-path') if self._config.get_emu(['gdb_channel']): channels += ['--tcp-channel', 'gdb'] if self._config.get_emu(['tcp_channel']): channels += ['--tcp-channel', 'tcp'] if self._config.get_emu(['pty_channel']): channels += ['--pty-channel', 'pty'] if len(channels) > 0: channels += ['--working-dir', str(self._wdir)] return _mock_emu + channels + ['starting mock emulator'] def _post_start(self) -> None: if not self._wdir: return if self._config.get_emu(['gdb_channel']): path = os.path.join(self._wdir, 'gdb') wait_for_file_size(path, 5, 5) with open(path, 'r') as file: port = int(file.read()) self._handles.add_channel_tcp('gdb', 'localhost', port) if self._config.get_emu(['tcp_channel']): path = os.path.join(self._wdir, 'tcp') wait_for_file_size(path, 5, 5) with open(path, 'r') as file: port = int(file.read()) self._handles.add_channel_tcp('tcp', 'localhost', port) if self._config.get_emu(['pty_channel']): path = os.path.join(self._wdir, 'pty') wait_for_file_size(path, 5, 5) with open(path, 'r') as file: pty_path = file.read() self._handles.add_channel_pty('pty', pty_path) def _get_connector(self, wdir: Path) -> Connector: return MockEmuConnector(wdir) class MockEmuConnector(Connector): """Connector for mock emulator""" _props = { 'path1': { 'prop1': 'val1', } } def reset(self) -> None: Path(os.path.join(self._wdir, 'reset')).touch() def cont(self) -> None: Path(os.path.join(self._wdir, 'cont')).touch() def list_properties(self, path: str) -> list[Any]: try: return list(self._props[path].keys()) except KeyError: raise InvalidPropertyPath(path) def set_property(self, path: str, prop: str, value: str) -> None: if not self._props.get(path): raise InvalidPropertyPath(path) if not self._props[path].get(prop): raise InvalidProperty(path, prop) self._props[path][prop] = value with open(os.path.join(self._wdir, 'props.json'), 'w') as file: json.dump(self._props, file) def get_property(self, path: str, prop: str) -> Any: try: with open(os.path.join(self._wdir, 'props.json'), 'r') as file: self._props = json.load(file) except OSError: pass if not self._props.get(path): raise InvalidPropertyPath(path) if not self._props[path].get(prop): raise InvalidProperty(path, prop) return self._props[path][prop]