1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #pragma once 16 17 #include <algorithm> 18 #include <array> 19 #include <cstdint> 20 #include <limits> 21 #include <optional> 22 #include <string_view> 23 24 #include "pw_assert/assert.h" 25 #include "pw_bytes/span.h" 26 #include "pw_chrono/system_clock.h" 27 #include "pw_function/function.h" 28 #include "pw_log/proto/log.pwpb.h" 29 #include "pw_log_rpc/internal/config.h" 30 #include "pw_log_rpc/log_filter.h" 31 #include "pw_multisink/multisink.h" 32 #include "pw_protobuf/serialized_size.h" 33 #include "pw_result/result.h" 34 #include "pw_rpc/raw/server_reader_writer.h" 35 #include "pw_status/status.h" 36 #include "pw_sync/lock_annotations.h" 37 #include "pw_sync/mutex.h" 38 39 namespace pw::log_rpc { 40 41 // RpcLogDrain matches a MultiSink::Drain with with an RPC channel's writer. A 42 // RPC channel ID identifies this drain. The user must attach this drain 43 // to a MultiSink that returns a log::pwpb::LogEntry, and provide a buffer large 44 // enough to hold the largest log::pwpb::LogEntry transmittable. The user must 45 // call Flush(), which, on every call, packs as many log::pwpb::LogEntry items 46 // as possible into a log::pwpb::LogEntries message, writes the message to the 47 // provided writer, then repeats the process until there are no more entries in 48 // the MultiSink or the writer failed to write the outgoing package and 49 // error_handling is set to `kCloseStreamOnWriterError`. When error_handling is 50 // `kIgnoreWriterErrors` the drain will continue to retrieve log entries out of 51 // the MultiSink and attempt to send them out ignoring the writer errors without 52 // sending a drop count. 53 // Note: the error handling and drop count reporting might change in the future. 54 // Log filtering is done using the rules of the Filter provided if any. 55 class RpcLogDrain : public multisink::MultiSink::Drain { 56 public: 57 // Dictates how to handle server writer errors. 58 enum class LogDrainErrorHandling { 59 kIgnoreWriterErrors, 60 kCloseStreamOnWriterError, 61 }; 62 63 // The minimum buffer size, without the message payload or module sizes, 64 // needed to retrieve a log::pwpb::LogEntry from the attached MultiSink. The 65 // user must account for the max message size to avoid log entry drops. The 66 // dropped field is not accounted since a dropped message has all other fields 67 // unset. 68 static constexpr size_t kMinEntrySizeWithoutPayload = 69 protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kMessage, 0) + 70 protobuf::SizeOfFieldUint32(log::pwpb::LogEntry::Fields::kLineLevel) + 71 protobuf::SizeOfFieldUint32(log::pwpb::LogEntry::Fields::kFlags) + 72 protobuf::SizeOfFieldInt64(log::pwpb::LogEntry::Fields::kTimestamp) + 73 protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kModule, 0) + 74 protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kFile, 0) + 75 protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kThread, 0); 76 77 // Error messages sent when logs are dropped. 78 static constexpr std::string_view kIngressErrorMessage{ 79 PW_LOG_RPC_INGRESS_ERROR_MSG}; 80 static constexpr std::string_view kSlowDrainErrorMessage{ 81 PW_LOG_RPC_SLOW_DRAIN_MSG}; 82 static constexpr std::string_view kSmallOutboundBufferErrorMessage{ 83 PW_LOG_RPC_SMALL_OUTBOUND_BUFFER_MSG}; 84 static constexpr std::string_view kSmallStackBufferErrorMessage{ 85 PW_LOG_RPC_SMALL_STACK_BUFFER_MSG}; 86 static constexpr std::string_view kWriterErrorMessage{ 87 PW_LOG_RPC_WRITER_ERROR_MSG}; 88 // The smallest entry buffer must fit the largest error message, or a typical 89 // token size (4B), whichever is largest. 90 static constexpr size_t kLargestErrorMessageOrTokenSize = 91 std::max({size_t(4), 92 kIngressErrorMessage.size(), 93 kSlowDrainErrorMessage.size(), 94 kSmallOutboundBufferErrorMessage.size(), 95 kSmallStackBufferErrorMessage.size(), 96 kWriterErrorMessage.size()}); 97 static constexpr size_t kMinEntryBufferSize = 98 kMinEntrySizeWithoutPayload + sizeof(kLargestErrorMessageOrTokenSize); 99 100 // When encoding LogEntry in LogEntries, there are kLogEntriesEncodeFrameSize 101 // bytes added to the encoded LogEntry. This constant and kMinEntryBufferSize 102 // can be used to calculate the minimum RPC ChannelOutput buffer size. 103 static constexpr size_t kLogEntriesEncodeFrameSize = 104 protobuf::TagSizeBytes(log::pwpb::LogEntries::Fields::kEntries) + 105 protobuf::kMaxSizeOfLength + 106 protobuf::SizeOfFieldUint32( 107 log::pwpb::LogEntries::Fields::kFirstEntrySequenceId); 108 109 // Creates a closed log stream with a writer that can be set at a later time. 110 // The provided buffer must be large enough to hold the largest transmittable 111 // log::pwpb::LogEntry or a drop count message at the very least. The user can 112 // choose to provide a unique mutex for the drain, or share it to save RAM as 113 // long as they are aware of contengency issues. 114 RpcLogDrain( 115 const uint32_t channel_id, 116 ByteSpan log_entry_buffer, 117 sync::Mutex& mutex, 118 LogDrainErrorHandling error_handling, 119 Filter* filter = nullptr, 120 size_t max_bundles_per_trickle = std::numeric_limits<size_t>::max(), 121 pw::chrono::SystemClock::duration trickle_delay = 122 chrono::SystemClock::duration::zero()) channel_id_(channel_id)123 : channel_id_(channel_id), 124 error_handling_(error_handling), 125 server_writer_(), 126 log_entry_buffer_(log_entry_buffer), 127 drop_count_ingress_error_(0), 128 drop_count_slow_drain_(0), 129 drop_count_small_outbound_buffer_(0), 130 drop_count_small_stack_buffer_(0), 131 drop_count_writer_error_(0), 132 mutex_(mutex), 133 filter_(filter), 134 sequence_id_(0), 135 max_bundles_per_trickle_(max_bundles_per_trickle), 136 trickle_delay_(trickle_delay), 137 no_writes_until_(chrono::SystemClock::now()), 138 on_open_callback_(nullptr) { 139 PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize); 140 } 141 142 // Not copyable. 143 RpcLogDrain(const RpcLogDrain&) = delete; 144 RpcLogDrain& operator=(const RpcLogDrain&) = delete; 145 146 // Configures the drain with a new open server writer if the current one is 147 // not open. 148 // 149 // Return values: 150 // OK - Successfully set the new open writer. 151 // FAILED_PRECONDITION - The given writer is not open. 152 // ALREADY_EXISTS - an open writer is already set. 153 Status Open(rpc::RawServerWriter& writer) PW_LOCKS_EXCLUDED(mutex_); 154 155 // Accesses log entries and sends them via the writer. Expected to be called 156 // frequently to avoid log drops. If the writer fails to send a packet with 157 // multiple log entries, the entries are dropped and a drop message with the 158 // count is sent. When error_handling is kCloseStreamOnWriterError, the stream 159 // will automatically be closed and Flush will return the writer error. 160 // 161 // Precondition: the drain must be attached to a MultiSink. 162 // 163 // Return values: 164 // OK - all entries were consumed. 165 // ABORTED - there was an error writing the packet, and error_handling equals 166 // `kCloseStreamOnWriterError`. 167 Status Flush(ByteSpan encoding_buffer) PW_LOCKS_EXCLUDED(mutex_); 168 169 // Writes entries as dictated by this drain's rate limiting configuration. 170 // 171 // Returns: 172 // A minimum wait duration before Trickle() will be ready to write more logs 173 // If no duration is returned, this drain is caught up. 174 std::optional<pw::chrono::SystemClock::duration> Trickle( 175 ByteSpan encoding_buffer) PW_LOCKS_EXCLUDED(mutex_); 176 177 // Ends RPC log stream without flushing. 178 // 179 // Return values: 180 // OK - successfully closed the server writer. 181 // FAILED_PRECONDITION - The given writer is not open. 182 // Errors from the underlying writer send packet. 183 Status Close() PW_LOCKS_EXCLUDED(mutex_); 184 channel_id()185 uint32_t channel_id() const { return channel_id_; } 186 max_bundles_per_trickle()187 size_t max_bundles_per_trickle() const { return max_bundles_per_trickle_; } set_max_bundles_per_trickle(size_t max_num_entries)188 void set_max_bundles_per_trickle(size_t max_num_entries) { 189 max_bundles_per_trickle_ = max_num_entries; 190 } 191 trickle_delay()192 chrono::SystemClock::duration trickle_delay() const { return trickle_delay_; } set_trickle_delay(chrono::SystemClock::duration trickle_delay)193 void set_trickle_delay(chrono::SystemClock::duration trickle_delay) { 194 trickle_delay_ = trickle_delay; 195 } 196 197 // Stores a function that is called when Open() is successful. Pass nulltpr to 198 // clear it. This is useful in cases where the owner of the drain needs to be 199 // notified that the drain was opened. set_on_open_callback(pw::Function<void ()> && callback)200 void set_on_open_callback(pw::Function<void()>&& callback) { 201 on_open_callback_ = std::move(callback); 202 } 203 204 private: 205 enum class LogDrainState { 206 kCaughtUp, 207 kMoreEntriesRemaining, 208 }; 209 210 LogDrainState SendLogs(size_t max_num_bundles, 211 ByteSpan encoding_buffer, 212 Status& encoding_status) PW_LOCKS_EXCLUDED(mutex_); 213 214 // Fills the outgoing buffer with as many entries as possible. 215 LogDrainState EncodeOutgoingPacket( 216 log::pwpb::LogEntries::MemoryEncoder& encoder, 217 uint32_t& packed_entry_count_out) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 218 219 const uint32_t channel_id_; 220 const LogDrainErrorHandling error_handling_; 221 rpc::RawServerWriter server_writer_ PW_GUARDED_BY(mutex_); 222 const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_); 223 uint32_t drop_count_ingress_error_ PW_GUARDED_BY(mutex_); 224 uint32_t drop_count_slow_drain_ PW_GUARDED_BY(mutex_); 225 uint32_t drop_count_small_outbound_buffer_ PW_GUARDED_BY(mutex_); 226 uint32_t drop_count_small_stack_buffer_ PW_GUARDED_BY(mutex_); 227 uint32_t drop_count_writer_error_ PW_GUARDED_BY(mutex_); 228 sync::Mutex& mutex_; 229 Filter* filter_; 230 uint32_t sequence_id_; 231 size_t max_bundles_per_trickle_; 232 pw::chrono::SystemClock::duration trickle_delay_; 233 pw::chrono::SystemClock::time_point no_writes_until_; 234 pw::Function<void()> on_open_callback_; 235 }; 236 237 } // namespace pw::log_rpc 238