1# Copyright 2024 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Serial reader utilities.""" 15 16from abc import ABC, abstractmethod 17from concurrent.futures import ThreadPoolExecutor 18import logging 19import os 20import platform 21import select 22import threading 23import time 24import socket 25import subprocess 26from typing import ( 27 Any, 28 Callable, 29 Iterable, 30 Sequence, 31 TypeVar, 32) 33import warnings 34 35import serial 36 37_LOG = logging.getLogger('pw_stream.stream_readers') 38_VERBOSE = logging.DEBUG - 1 39 40FrameTypeT = TypeVar('FrameTypeT') 41 42 43class CancellableReader(ABC): 44 """Wraps communication interfaces used for reading incoming data with the 45 guarantee that the read request can be cancelled. Derived classes must 46 implement the :py:func:`cancel_read()` method. 47 48 Cancelling a read invalidates ongoing and future reads. The 49 :py:func:`cancel_read()` method can only be called once. 50 """ 51 52 def __init__(self, base_obj: Any, *read_args, **read_kwargs): 53 """ 54 Args: 55 base_obj: Object that offers a ``read()`` method with optional args 56 and kwargs. 57 read_args: Arguments for ``base_obj.read()`` function. 58 read_kwargs: Keyword arguments for ``base_obj.read()`` function. 59 """ 60 self._base_obj = base_obj 61 self._read_args = read_args 62 self._read_kwargs = read_kwargs 63 64 def __enter__(self) -> 'CancellableReader': 65 return self 66 67 def __exit__(self, *exc_info) -> None: 68 self.cancel_read() 69 70 def read(self) -> bytes: 71 """Reads bytes that contain parts of or full RPC packets.""" 72 return self._base_obj.read(*self._read_args, **self._read_kwargs) 73 74 @abstractmethod 75 def cancel_read(self) -> None: 76 """Cancels a blocking read request and all future reads. 77 78 Can only be called once. 79 """ 80 81 82class SelectableReader(CancellableReader): 83 """ 84 Wraps interfaces that work with ``select()`` to signal when data is 85 received. 86 87 These interfaces must provide a ``fileno()`` method. 88 WINDOWS ONLY: Only sockets that originate from WinSock can be wrapped. File 89 objects are not acceptable. 90 """ 91 92 _STOP_CMD = b'STOP' 93 94 def __init__(self, base_obj: Any, *read_args, **read_kwargs): 95 assert hasattr(base_obj, 'fileno') 96 if platform.system() == 'Windows' and not isinstance( 97 base_obj, socket.socket 98 ): 99 raise ValueError('Only socket objects are selectable on Windows') 100 super().__init__(base_obj, *read_args, **read_kwargs) 101 self._cancel_signal_pipe_r_fd, self._cancel_signal_pipe_w_fd = os.pipe() 102 self._waiting_for_read_or_cancel_lock = threading.Lock() 103 104 def __exit__(self, *exc_info) -> None: 105 self.cancel_read() 106 with self._waiting_for_read_or_cancel_lock: 107 if self._cancel_signal_pipe_r_fd > 0: 108 os.close(self._cancel_signal_pipe_r_fd) 109 self._cancel_signal_pipe_r_fd = -1 110 111 def read(self) -> bytes: 112 if self._wait_for_read_or_cancel(): 113 return super().read() 114 return b'' 115 116 def _wait_for_read_or_cancel(self) -> bool: 117 """Returns ``True`` when ready to read.""" 118 with self._waiting_for_read_or_cancel_lock: 119 if self._base_obj.fileno() < 0 or self._cancel_signal_pipe_r_fd < 0: 120 # The interface might've been closed already. 121 return False 122 ready_to_read, _, exception_list = select.select( 123 [self._cancel_signal_pipe_r_fd, self._base_obj], 124 [], 125 [self._base_obj], 126 ) 127 if self._cancel_signal_pipe_r_fd in ready_to_read: 128 # A signal to stop the reading process was received. 129 os.read(self._cancel_signal_pipe_r_fd, len(self._STOP_CMD)) 130 os.close(self._cancel_signal_pipe_r_fd) 131 self._cancel_signal_pipe_r_fd = -1 132 return False 133 134 if exception_list: 135 _LOG.error('Error reading interface') 136 return False 137 return True 138 139 def cancel_read(self) -> None: 140 if self._cancel_signal_pipe_w_fd > 0: 141 os.write(self._cancel_signal_pipe_w_fd, self._STOP_CMD) 142 os.close(self._cancel_signal_pipe_w_fd) 143 self._cancel_signal_pipe_w_fd = -1 144 145 146class SocketReader(SelectableReader): 147 """Wraps a socket ``recv()`` function.""" 148 149 def __init__(self, base_obj: socket.socket, *read_args, **read_kwargs): 150 super().__init__(base_obj, *read_args, **read_kwargs) 151 152 def read(self) -> bytes: 153 if self._wait_for_read_or_cancel(): 154 return self._base_obj.recv(*self._read_args, **self._read_kwargs) 155 return b'' 156 157 def __exit__(self, *exc_info) -> None: 158 self.cancel_read() 159 self._base_obj.close() 160 161 162class SerialReader(CancellableReader): 163 """Wraps a :py:class:`serial.Serial` object.""" 164 165 def __init__(self, base_obj: serial.Serial, *read_args, **read_kwargs): 166 super().__init__(base_obj, *read_args, **read_kwargs) 167 168 def cancel_read(self) -> None: 169 self._base_obj.cancel_read() 170 171 def __exit__(self, *exc_info) -> None: 172 self.cancel_read() 173 self._base_obj.close() 174 175 176class DataReaderAndExecutor: 177 """Reads incoming bytes, data processor that delegates frame handling. 178 179 Executing callbacks in a ``ThreadPoolExecutor`` decouples reading the input 180 stream from handling the data. That way, if a handler function takes a 181 long time or crashes, this reading thread is not interrupted. 182 """ 183 184 def __init__( 185 self, 186 reader: CancellableReader, 187 on_read_error: Callable[[Exception], None], 188 data_processor: Callable[[bytes], Iterable[FrameTypeT]], 189 frame_handler: Callable[[FrameTypeT], None], 190 handler_threads: int | None = 1, 191 ): 192 """Creates the data reader and frame delegator. 193 194 Args: 195 reader: Reads incoming bytes from the given transport, blocks until 196 data is available or an exception is raised. Otherwise the reader 197 will exit. 198 on_read_error: Called when there is an error reading incoming bytes. 199 data_processor: Processes read bytes and returns a frame-like object 200 that the frame_handler can process. 201 frame_handler: Handles a received frame. 202 handler_threads: The number of threads in the executor pool. 203 """ 204 205 self._reader = reader 206 self._on_read_error = on_read_error 207 self._data_processor = data_processor 208 self._frame_handler = frame_handler 209 self._handler_threads = handler_threads 210 211 self._reader_thread = threading.Thread(target=self._run) 212 self._reader_thread_stop = threading.Event() 213 214 def start(self) -> None: 215 """Starts the reading process.""" 216 _LOG.debug('Starting read process') 217 self._reader_thread_stop.clear() 218 self._reader_thread.start() 219 220 def stop(self) -> None: 221 """Stops the reading process. 222 223 This requests that the reading process stop and waits 224 for the background thread to exit. 225 """ 226 _LOG.debug('Stopping read process') 227 self._reader_thread_stop.set() 228 self._reader.cancel_read() 229 self._reader_thread.join(30) 230 if self._reader_thread.is_alive(): 231 warnings.warn( 232 'Timed out waiting for read thread to terminate.\n' 233 'Tip: Use a `CancellableReader` to cancel reads.' 234 ) 235 236 def _run(self) -> None: 237 """Reads raw data in a background thread.""" 238 with ThreadPoolExecutor(max_workers=self._handler_threads) as executor: 239 while not self._reader_thread_stop.is_set(): 240 try: 241 data = self._reader.read() 242 except Exception as exc: # pylint: disable=broad-except 243 # Don't report the read error if the thread is stopping. 244 # The stream or device backing _read was likely closed, 245 # so errors are expected. 246 if not self._reader_thread_stop.is_set(): 247 self._on_read_error(exc) 248 _LOG.debug( 249 'DataReaderAndExecutor thread exiting due to exception', 250 exc_info=exc, 251 ) 252 return 253 254 if not data: 255 continue 256 257 _LOG.log(_VERBOSE, 'Read %2d B: %s', len(data), data) 258 259 for frame in self._data_processor(data): 260 executor.submit(self._frame_handler, frame) 261 262 263def _try_connect(port: int, attempts: int = 10) -> socket.socket: 264 """Tries to connect to the specified port up to the given number of times. 265 266 This is helpful when connecting to a process that was started by this 267 script. The process may need time to start listening for connections, and 268 that length of time can vary. This retries with a short delay rather than 269 having to wait for the worst case delay every time. 270 """ 271 timeout_s = 0.001 272 while True: 273 time.sleep(timeout_s) 274 275 try: 276 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 277 sock.connect(('localhost', port)) 278 return sock 279 except ConnectionRefusedError: 280 sock.close() 281 attempts -= 1 282 if attempts <= 0: 283 raise 284 285 timeout_s *= 2 286 287 288class SocketSubprocess: 289 """Executes a subprocess and connects to it with a socket.""" 290 291 def __init__(self, command: Sequence, port: int) -> None: 292 self._server_process = subprocess.Popen(command, stdin=subprocess.PIPE) 293 self.stdin = self._server_process.stdin 294 295 try: 296 self.socket: socket.socket = _try_connect(port) # 297 except: 298 self._server_process.terminate() 299 self._server_process.communicate() 300 raise 301 302 def close(self) -> None: 303 try: 304 self.socket.close() 305 finally: 306 self._server_process.terminate() 307 self._server_process.communicate() 308 309 def __enter__(self) -> 'SocketSubprocess': 310 return self 311 312 def __exit__(self, exc_type, exc_value, traceback) -> None: 313 self.close() 314