xref: /aosp_15_r20/external/pigweed/pw_stream/py/pw_stream/stream_readers.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Serial reader utilities."""
15
16from abc import ABC, abstractmethod
17from concurrent.futures import ThreadPoolExecutor
18import logging
19import os
20import platform
21import select
22import threading
23import time
24import socket
25import subprocess
26from typing import (
27    Any,
28    Callable,
29    Iterable,
30    Sequence,
31    TypeVar,
32)
33import warnings
34
35import serial
36
37_LOG = logging.getLogger('pw_stream.stream_readers')
38_VERBOSE = logging.DEBUG - 1
39
40FrameTypeT = TypeVar('FrameTypeT')
41
42
43class CancellableReader(ABC):
44    """Wraps communication interfaces used for reading incoming data with the
45    guarantee that the read request can be cancelled. Derived classes must
46    implement the :py:func:`cancel_read()` method.
47
48    Cancelling a read invalidates ongoing and future reads. The
49    :py:func:`cancel_read()` method can only be called once.
50    """
51
52    def __init__(self, base_obj: Any, *read_args, **read_kwargs):
53        """
54        Args:
55            base_obj: Object that offers a ``read()`` method with optional args
56                and kwargs.
57            read_args: Arguments for ``base_obj.read()`` function.
58            read_kwargs: Keyword arguments for ``base_obj.read()`` function.
59        """
60        self._base_obj = base_obj
61        self._read_args = read_args
62        self._read_kwargs = read_kwargs
63
64    def __enter__(self) -> 'CancellableReader':
65        return self
66
67    def __exit__(self, *exc_info) -> None:
68        self.cancel_read()
69
70    def read(self) -> bytes:
71        """Reads bytes that contain parts of or full RPC packets."""
72        return self._base_obj.read(*self._read_args, **self._read_kwargs)
73
74    @abstractmethod
75    def cancel_read(self) -> None:
76        """Cancels a blocking read request and all future reads.
77
78        Can only be called once.
79        """
80
81
82class SelectableReader(CancellableReader):
83    """
84    Wraps interfaces that work with ``select()`` to signal when data is
85    received.
86
87    These interfaces must provide a ``fileno()`` method.
88    WINDOWS ONLY: Only sockets that originate from WinSock can be wrapped. File
89    objects are not acceptable.
90    """
91
92    _STOP_CMD = b'STOP'
93
94    def __init__(self, base_obj: Any, *read_args, **read_kwargs):
95        assert hasattr(base_obj, 'fileno')
96        if platform.system() == 'Windows' and not isinstance(
97            base_obj, socket.socket
98        ):
99            raise ValueError('Only socket objects are selectable on Windows')
100        super().__init__(base_obj, *read_args, **read_kwargs)
101        self._cancel_signal_pipe_r_fd, self._cancel_signal_pipe_w_fd = os.pipe()
102        self._waiting_for_read_or_cancel_lock = threading.Lock()
103
104    def __exit__(self, *exc_info) -> None:
105        self.cancel_read()
106        with self._waiting_for_read_or_cancel_lock:
107            if self._cancel_signal_pipe_r_fd > 0:
108                os.close(self._cancel_signal_pipe_r_fd)
109                self._cancel_signal_pipe_r_fd = -1
110
111    def read(self) -> bytes:
112        if self._wait_for_read_or_cancel():
113            return super().read()
114        return b''
115
116    def _wait_for_read_or_cancel(self) -> bool:
117        """Returns ``True`` when ready to read."""
118        with self._waiting_for_read_or_cancel_lock:
119            if self._base_obj.fileno() < 0 or self._cancel_signal_pipe_r_fd < 0:
120                # The interface might've been closed already.
121                return False
122            ready_to_read, _, exception_list = select.select(
123                [self._cancel_signal_pipe_r_fd, self._base_obj],
124                [],
125                [self._base_obj],
126            )
127            if self._cancel_signal_pipe_r_fd in ready_to_read:
128                # A signal to stop the reading process was received.
129                os.read(self._cancel_signal_pipe_r_fd, len(self._STOP_CMD))
130                os.close(self._cancel_signal_pipe_r_fd)
131                self._cancel_signal_pipe_r_fd = -1
132                return False
133
134            if exception_list:
135                _LOG.error('Error reading interface')
136                return False
137        return True
138
139    def cancel_read(self) -> None:
140        if self._cancel_signal_pipe_w_fd > 0:
141            os.write(self._cancel_signal_pipe_w_fd, self._STOP_CMD)
142            os.close(self._cancel_signal_pipe_w_fd)
143            self._cancel_signal_pipe_w_fd = -1
144
145
146class SocketReader(SelectableReader):
147    """Wraps a socket ``recv()`` function."""
148
149    def __init__(self, base_obj: socket.socket, *read_args, **read_kwargs):
150        super().__init__(base_obj, *read_args, **read_kwargs)
151
152    def read(self) -> bytes:
153        if self._wait_for_read_or_cancel():
154            return self._base_obj.recv(*self._read_args, **self._read_kwargs)
155        return b''
156
157    def __exit__(self, *exc_info) -> None:
158        self.cancel_read()
159        self._base_obj.close()
160
161
162class SerialReader(CancellableReader):
163    """Wraps a :py:class:`serial.Serial` object."""
164
165    def __init__(self, base_obj: serial.Serial, *read_args, **read_kwargs):
166        super().__init__(base_obj, *read_args, **read_kwargs)
167
168    def cancel_read(self) -> None:
169        self._base_obj.cancel_read()
170
171    def __exit__(self, *exc_info) -> None:
172        self.cancel_read()
173        self._base_obj.close()
174
175
176class DataReaderAndExecutor:
177    """Reads incoming bytes, data processor that delegates frame handling.
178
179    Executing callbacks in a ``ThreadPoolExecutor`` decouples reading the input
180    stream from handling the data. That way, if a handler function takes a
181    long time or crashes, this reading thread is not interrupted.
182    """
183
184    def __init__(
185        self,
186        reader: CancellableReader,
187        on_read_error: Callable[[Exception], None],
188        data_processor: Callable[[bytes], Iterable[FrameTypeT]],
189        frame_handler: Callable[[FrameTypeT], None],
190        handler_threads: int | None = 1,
191    ):
192        """Creates the data reader and frame delegator.
193
194        Args:
195            reader: Reads incoming bytes from the given transport, blocks until
196              data is available or an exception is raised. Otherwise the reader
197              will exit.
198            on_read_error: Called when there is an error reading incoming bytes.
199            data_processor: Processes read bytes and returns a frame-like object
200              that the frame_handler can process.
201            frame_handler: Handles a received frame.
202            handler_threads: The number of threads in the executor pool.
203        """
204
205        self._reader = reader
206        self._on_read_error = on_read_error
207        self._data_processor = data_processor
208        self._frame_handler = frame_handler
209        self._handler_threads = handler_threads
210
211        self._reader_thread = threading.Thread(target=self._run)
212        self._reader_thread_stop = threading.Event()
213
214    def start(self) -> None:
215        """Starts the reading process."""
216        _LOG.debug('Starting read process')
217        self._reader_thread_stop.clear()
218        self._reader_thread.start()
219
220    def stop(self) -> None:
221        """Stops the reading process.
222
223        This requests that the reading process stop and waits
224        for the background thread to exit.
225        """
226        _LOG.debug('Stopping read process')
227        self._reader_thread_stop.set()
228        self._reader.cancel_read()
229        self._reader_thread.join(30)
230        if self._reader_thread.is_alive():
231            warnings.warn(
232                'Timed out waiting for read thread to terminate.\n'
233                'Tip: Use a `CancellableReader` to cancel reads.'
234            )
235
236    def _run(self) -> None:
237        """Reads raw data in a background thread."""
238        with ThreadPoolExecutor(max_workers=self._handler_threads) as executor:
239            while not self._reader_thread_stop.is_set():
240                try:
241                    data = self._reader.read()
242                except Exception as exc:  # pylint: disable=broad-except
243                    # Don't report the read error if the thread is stopping.
244                    # The stream or device backing _read was likely closed,
245                    # so errors are expected.
246                    if not self._reader_thread_stop.is_set():
247                        self._on_read_error(exc)
248                    _LOG.debug(
249                        'DataReaderAndExecutor thread exiting due to exception',
250                        exc_info=exc,
251                    )
252                    return
253
254                if not data:
255                    continue
256
257                _LOG.log(_VERBOSE, 'Read %2d B: %s', len(data), data)
258
259                for frame in self._data_processor(data):
260                    executor.submit(self._frame_handler, frame)
261
262
263def _try_connect(port: int, attempts: int = 10) -> socket.socket:
264    """Tries to connect to the specified port up to the given number of times.
265
266    This is helpful when connecting to a process that was started by this
267    script. The process may need time to start listening for connections, and
268    that length of time can vary. This retries with a short delay rather than
269    having to wait for the worst case delay every time.
270    """
271    timeout_s = 0.001
272    while True:
273        time.sleep(timeout_s)
274
275        try:
276            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
277            sock.connect(('localhost', port))
278            return sock
279        except ConnectionRefusedError:
280            sock.close()
281            attempts -= 1
282            if attempts <= 0:
283                raise
284
285            timeout_s *= 2
286
287
288class SocketSubprocess:
289    """Executes a subprocess and connects to it with a socket."""
290
291    def __init__(self, command: Sequence, port: int) -> None:
292        self._server_process = subprocess.Popen(command, stdin=subprocess.PIPE)
293        self.stdin = self._server_process.stdin
294
295        try:
296            self.socket: socket.socket = _try_connect(port)  # ��
297        except:
298            self._server_process.terminate()
299            self._server_process.communicate()
300            raise
301
302    def close(self) -> None:
303        try:
304            self.socket.close()
305        finally:
306            self._server_process.terminate()
307            self._server_process.communicate()
308
309    def __enter__(self) -> 'SocketSubprocess':
310        return self
311
312    def __exit__(self, exc_type, exc_value, traceback) -> None:
313        self.close()
314