xref: /aosp_15_r20/external/pigweed/pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14
15"""Utils to decode logs."""
16
17import logging
18import warnings
19
20from pw_log.log_decoder import LogStreamDecoder
21from pw_log.proto import log_pb2
22import pw_rpc
23import pw_status
24
25_LOG = logging.getLogger(__name__)
26
27
28class LogStreamHandler:
29    """Handles an RPC Log Stream.
30
31    Args:
32        rpcs: RPC services to request RPC Log Streams.
33        decoder: LogStreamDecoder
34    """
35
36    def __init__(
37        self, rpcs: pw_rpc.client.Services, decoder: LogStreamDecoder
38    ) -> None:
39        self.rpcs = rpcs
40        self._decoder = decoder
41
42    def listen_to_logs(self) -> None:
43        warnings.warn(
44            'listen_to_logs is deprecated; call start_logging() instead',
45            DeprecationWarning,
46        )
47        self.start_logging()
48
49    def start_logging(self) -> None:
50        """Requests logs to be streamed over the pw.log.Logs.Listen RPC."""
51        self.rpcs.pw.log.Logs.Listen.invoke(
52            on_next=self._on_log_entries,
53            on_completed=lambda _, status: self.handle_log_stream_completed(
54                status
55            ),
56            on_error=lambda _, error: self.handle_log_stream_error(error),
57        )
58
59    def _on_log_entries(self, _, log_entries_proto: log_pb2.LogEntries) -> None:
60        self._decoder.parse_log_entries_proto(log_entries_proto)
61
62    def handle_log_stream_error(self, error: pw_status.Status) -> None:
63        """Resets the log stream RPC on error to avoid losing logs.
64
65        Override this function to change default behavior.
66        """
67        _LOG.error(
68            'Log stream error: %s from source %s',
69            error,
70            self.source_name,
71        )
72        # Only re-request logs if the RPC was not cancelled by the client.
73        if error != pw_status.Status.CANCELLED:
74            self.start_logging()
75
76    def handle_log_stream_completed(self, status: pw_status.Status) -> None:
77        """Resets the log stream RPC on completed to avoid losing logs.
78
79        Override this function to change default behavior.
80        """
81        _LOG.debug(
82            'Log stream completed with status: %s for source: %s',
83            status,
84            self.source_name,
85        )
86        self.start_logging()
87
88    @property
89    def source_name(self) -> str:
90        return self._decoder.source_name
91