1# Copyright 2023 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14 15"""Utils to decode logs.""" 16 17import logging 18import warnings 19 20from pw_log.log_decoder import LogStreamDecoder 21from pw_log.proto import log_pb2 22import pw_rpc 23import pw_status 24 25_LOG = logging.getLogger(__name__) 26 27 28class LogStreamHandler: 29 """Handles an RPC Log Stream. 30 31 Args: 32 rpcs: RPC services to request RPC Log Streams. 33 decoder: LogStreamDecoder 34 """ 35 36 def __init__( 37 self, rpcs: pw_rpc.client.Services, decoder: LogStreamDecoder 38 ) -> None: 39 self.rpcs = rpcs 40 self._decoder = decoder 41 42 def listen_to_logs(self) -> None: 43 warnings.warn( 44 'listen_to_logs is deprecated; call start_logging() instead', 45 DeprecationWarning, 46 ) 47 self.start_logging() 48 49 def start_logging(self) -> None: 50 """Requests logs to be streamed over the pw.log.Logs.Listen RPC.""" 51 self.rpcs.pw.log.Logs.Listen.invoke( 52 on_next=self._on_log_entries, 53 on_completed=lambda _, status: self.handle_log_stream_completed( 54 status 55 ), 56 on_error=lambda _, error: self.handle_log_stream_error(error), 57 ) 58 59 def _on_log_entries(self, _, log_entries_proto: log_pb2.LogEntries) -> None: 60 self._decoder.parse_log_entries_proto(log_entries_proto) 61 62 def handle_log_stream_error(self, error: pw_status.Status) -> None: 63 """Resets the log stream RPC on error to avoid losing logs. 64 65 Override this function to change default behavior. 66 """ 67 _LOG.error( 68 'Log stream error: %s from source %s', 69 error, 70 self.source_name, 71 ) 72 # Only re-request logs if the RPC was not cancelled by the client. 73 if error != pw_status.Status.CANCELLED: 74 self.start_logging() 75 76 def handle_log_stream_completed(self, status: pw_status.Status) -> None: 77 """Resets the log stream RPC on completed to avoid losing logs. 78 79 Override this function to change default behavior. 80 """ 81 _LOG.debug( 82 'Log stream completed with status: %s for source: %s', 83 status, 84 self.source_name, 85 ) 86 self.start_logging() 87 88 @property 89 def source_name(self) -> str: 90 return self._decoder.source_name 91