xref: /aosp_15_r20/external/pigweed/pw_log_rpc/public/pw_log_rpc/rpc_log_drain.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #pragma once
16 
17 #include <algorithm>
18 #include <array>
19 #include <cstdint>
20 #include <limits>
21 #include <optional>
22 #include <string_view>
23 
24 #include "pw_assert/assert.h"
25 #include "pw_bytes/span.h"
26 #include "pw_chrono/system_clock.h"
27 #include "pw_function/function.h"
28 #include "pw_log/proto/log.pwpb.h"
29 #include "pw_log_rpc/internal/config.h"
30 #include "pw_log_rpc/log_filter.h"
31 #include "pw_multisink/multisink.h"
32 #include "pw_protobuf/serialized_size.h"
33 #include "pw_result/result.h"
34 #include "pw_rpc/raw/server_reader_writer.h"
35 #include "pw_status/status.h"
36 #include "pw_sync/lock_annotations.h"
37 #include "pw_sync/mutex.h"
38 
39 namespace pw::log_rpc {
40 
41 // RpcLogDrain matches a MultiSink::Drain with with an RPC channel's writer. A
42 // RPC channel ID identifies this drain. The user must attach this drain
43 // to a MultiSink that returns a log::pwpb::LogEntry, and provide a buffer large
44 // enough to hold the largest log::pwpb::LogEntry transmittable. The user must
45 // call Flush(), which, on every call, packs as many log::pwpb::LogEntry items
46 // as possible into a log::pwpb::LogEntries message, writes the message to the
47 // provided writer, then repeats the process until there are no more entries in
48 // the MultiSink or the writer failed to write the outgoing package and
49 // error_handling is set to `kCloseStreamOnWriterError`. When error_handling is
50 // `kIgnoreWriterErrors` the drain will continue to retrieve log entries out of
51 // the MultiSink and attempt to send them out ignoring the writer errors without
52 // sending a drop count.
53 // Note: the error handling and drop count reporting might change in the future.
54 // Log filtering is done using the rules of the Filter provided if any.
55 class RpcLogDrain : public multisink::MultiSink::Drain {
56  public:
57   // Dictates how to handle server writer errors.
58   enum class LogDrainErrorHandling {
59     kIgnoreWriterErrors,
60     kCloseStreamOnWriterError,
61   };
62 
63   // The minimum buffer size, without the message payload or module sizes,
64   // needed to retrieve a log::pwpb::LogEntry from the attached MultiSink. The
65   // user must account for the max message size to avoid log entry drops. The
66   // dropped field is not accounted since a dropped message has all other fields
67   // unset.
68   static constexpr size_t kMinEntrySizeWithoutPayload =
69       protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kMessage, 0) +
70       protobuf::SizeOfFieldUint32(log::pwpb::LogEntry::Fields::kLineLevel) +
71       protobuf::SizeOfFieldUint32(log::pwpb::LogEntry::Fields::kFlags) +
72       protobuf::SizeOfFieldInt64(log::pwpb::LogEntry::Fields::kTimestamp) +
73       protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kModule, 0) +
74       protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kFile, 0) +
75       protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kThread, 0);
76 
77   // Error messages sent when logs are dropped.
78   static constexpr std::string_view kIngressErrorMessage{
79       PW_LOG_RPC_INGRESS_ERROR_MSG};
80   static constexpr std::string_view kSlowDrainErrorMessage{
81       PW_LOG_RPC_SLOW_DRAIN_MSG};
82   static constexpr std::string_view kSmallOutboundBufferErrorMessage{
83       PW_LOG_RPC_SMALL_OUTBOUND_BUFFER_MSG};
84   static constexpr std::string_view kSmallStackBufferErrorMessage{
85       PW_LOG_RPC_SMALL_STACK_BUFFER_MSG};
86   static constexpr std::string_view kWriterErrorMessage{
87       PW_LOG_RPC_WRITER_ERROR_MSG};
88   // The smallest entry buffer must fit the largest error message, or a typical
89   // token size (4B), whichever is largest.
90   static constexpr size_t kLargestErrorMessageOrTokenSize =
91       std::max({size_t(4),
92                 kIngressErrorMessage.size(),
93                 kSlowDrainErrorMessage.size(),
94                 kSmallOutboundBufferErrorMessage.size(),
95                 kSmallStackBufferErrorMessage.size(),
96                 kWriterErrorMessage.size()});
97   static constexpr size_t kMinEntryBufferSize =
98       kMinEntrySizeWithoutPayload + sizeof(kLargestErrorMessageOrTokenSize);
99 
100   // When encoding LogEntry in LogEntries, there are kLogEntriesEncodeFrameSize
101   // bytes added to the encoded LogEntry. This constant and kMinEntryBufferSize
102   // can be used to calculate the minimum RPC ChannelOutput buffer size.
103   static constexpr size_t kLogEntriesEncodeFrameSize =
104       protobuf::TagSizeBytes(log::pwpb::LogEntries::Fields::kEntries) +
105       protobuf::kMaxSizeOfLength +
106       protobuf::SizeOfFieldUint32(
107           log::pwpb::LogEntries::Fields::kFirstEntrySequenceId);
108 
109   // Creates a closed log stream with a writer that can be set at a later time.
110   // The provided buffer must be large enough to hold the largest transmittable
111   // log::pwpb::LogEntry or a drop count message at the very least. The user can
112   // choose to provide a unique mutex for the drain, or share it to save RAM as
113   // long as they are aware of contengency issues.
114   RpcLogDrain(
115       const uint32_t channel_id,
116       ByteSpan log_entry_buffer,
117       sync::Mutex& mutex,
118       LogDrainErrorHandling error_handling,
119       Filter* filter = nullptr,
120       size_t max_bundles_per_trickle = std::numeric_limits<size_t>::max(),
121       pw::chrono::SystemClock::duration trickle_delay =
122           chrono::SystemClock::duration::zero())
channel_id_(channel_id)123       : channel_id_(channel_id),
124         error_handling_(error_handling),
125         server_writer_(),
126         log_entry_buffer_(log_entry_buffer),
127         drop_count_ingress_error_(0),
128         drop_count_slow_drain_(0),
129         drop_count_small_outbound_buffer_(0),
130         drop_count_small_stack_buffer_(0),
131         drop_count_writer_error_(0),
132         mutex_(mutex),
133         filter_(filter),
134         sequence_id_(0),
135         max_bundles_per_trickle_(max_bundles_per_trickle),
136         trickle_delay_(trickle_delay),
137         no_writes_until_(chrono::SystemClock::now()),
138         on_open_callback_(nullptr) {
139     PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize);
140   }
141 
142   // Not copyable.
143   RpcLogDrain(const RpcLogDrain&) = delete;
144   RpcLogDrain& operator=(const RpcLogDrain&) = delete;
145 
146   // Configures the drain with a new open server writer if the current one is
147   // not open.
148   //
149   // Return values:
150   // OK - Successfully set the new open writer.
151   // FAILED_PRECONDITION - The given writer is not open.
152   // ALREADY_EXISTS - an open writer is already set.
153   Status Open(rpc::RawServerWriter& writer) PW_LOCKS_EXCLUDED(mutex_);
154 
155   // Accesses log entries and sends them via the writer. Expected to be called
156   // frequently to avoid log drops. If the writer fails to send a packet with
157   // multiple log entries, the entries are dropped and a drop message with the
158   // count is sent. When error_handling is kCloseStreamOnWriterError, the stream
159   // will automatically be closed and Flush will return the writer error.
160   //
161   // Precondition: the drain must be attached to a MultiSink.
162   //
163   // Return values:
164   // OK - all entries were consumed.
165   // ABORTED - there was an error writing the packet, and error_handling equals
166   // `kCloseStreamOnWriterError`.
167   Status Flush(ByteSpan encoding_buffer) PW_LOCKS_EXCLUDED(mutex_);
168 
169   // Writes entries as dictated by this drain's rate limiting configuration.
170   //
171   // Returns:
172   //   A minimum wait duration before Trickle() will be ready to write more logs
173   // If no duration is returned, this drain is caught up.
174   std::optional<pw::chrono::SystemClock::duration> Trickle(
175       ByteSpan encoding_buffer) PW_LOCKS_EXCLUDED(mutex_);
176 
177   // Ends RPC log stream without flushing.
178   //
179   // Return values:
180   // OK - successfully closed the server writer.
181   // FAILED_PRECONDITION - The given writer is not open.
182   // Errors from the underlying writer send packet.
183   Status Close() PW_LOCKS_EXCLUDED(mutex_);
184 
channel_id()185   uint32_t channel_id() const { return channel_id_; }
186 
max_bundles_per_trickle()187   size_t max_bundles_per_trickle() const { return max_bundles_per_trickle_; }
set_max_bundles_per_trickle(size_t max_num_entries)188   void set_max_bundles_per_trickle(size_t max_num_entries) {
189     max_bundles_per_trickle_ = max_num_entries;
190   }
191 
trickle_delay()192   chrono::SystemClock::duration trickle_delay() const { return trickle_delay_; }
set_trickle_delay(chrono::SystemClock::duration trickle_delay)193   void set_trickle_delay(chrono::SystemClock::duration trickle_delay) {
194     trickle_delay_ = trickle_delay;
195   }
196 
197   // Stores a function that is called when Open() is successful. Pass nulltpr to
198   // clear it. This is useful in cases where the owner of the drain needs to be
199   // notified that the drain was opened.
set_on_open_callback(pw::Function<void ()> && callback)200   void set_on_open_callback(pw::Function<void()>&& callback) {
201     on_open_callback_ = std::move(callback);
202   }
203 
204  private:
205   enum class LogDrainState {
206     kCaughtUp,
207     kMoreEntriesRemaining,
208   };
209 
210   LogDrainState SendLogs(size_t max_num_bundles,
211                          ByteSpan encoding_buffer,
212                          Status& encoding_status) PW_LOCKS_EXCLUDED(mutex_);
213 
214   // Fills the outgoing buffer with as many entries as possible.
215   LogDrainState EncodeOutgoingPacket(
216       log::pwpb::LogEntries::MemoryEncoder& encoder,
217       uint32_t& packed_entry_count_out) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
218 
219   const uint32_t channel_id_;
220   const LogDrainErrorHandling error_handling_;
221   rpc::RawServerWriter server_writer_ PW_GUARDED_BY(mutex_);
222   const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_);
223   uint32_t drop_count_ingress_error_ PW_GUARDED_BY(mutex_);
224   uint32_t drop_count_slow_drain_ PW_GUARDED_BY(mutex_);
225   uint32_t drop_count_small_outbound_buffer_ PW_GUARDED_BY(mutex_);
226   uint32_t drop_count_small_stack_buffer_ PW_GUARDED_BY(mutex_);
227   uint32_t drop_count_writer_error_ PW_GUARDED_BY(mutex_);
228   sync::Mutex& mutex_;
229   Filter* filter_;
230   uint32_t sequence_id_;
231   size_t max_bundles_per_trickle_;
232   pw::chrono::SystemClock::duration trickle_delay_;
233   pw::chrono::SystemClock::time_point no_writes_until_;
234   pw::Function<void()> on_open_callback_;
235 };
236 
237 }  // namespace pw::log_rpc
238