1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_log_rpc/rpc_log_drain.h"
16
17 #include <limits>
18 #include <mutex>
19 #include <optional>
20 #include <string_view>
21
22 #include "pw_assert/check.h"
23 #include "pw_chrono/system_clock.h"
24 #include "pw_log/proto/log.pwpb.h"
25 #include "pw_result/result.h"
26 #include "pw_rpc/raw/server_reader_writer.h"
27 #include "pw_span/span.h"
28 #include "pw_status/status.h"
29 #include "pw_status/try.h"
30
31 namespace pw::log_rpc {
32 namespace {
33
34 // Creates an encoded drop message on the provided buffer and adds it to the
35 // bulk log entries. Resets the drop count when successfull.
TryEncodeDropMessage(ByteSpan encoded_drop_message_buffer,std::string_view reason,uint32_t & drop_count,log::pwpb::LogEntries::MemoryEncoder & entries_encoder)36 void TryEncodeDropMessage(
37 ByteSpan encoded_drop_message_buffer,
38 std::string_view reason,
39 uint32_t& drop_count,
40 log::pwpb::LogEntries::MemoryEncoder& entries_encoder) {
41 // Encode drop count and reason, if any, in log proto.
42 log::pwpb::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer);
43 if (!reason.empty()) {
44 encoder.WriteMessage(as_bytes(span<const char>(reason))).IgnoreError();
45 }
46 encoder.WriteDropped(drop_count).IgnoreError();
47 if (!encoder.status().ok()) {
48 return;
49 }
50 // Add encoded drop message if fits in buffer.
51 ConstByteSpan drop_message(encoder);
52 if (drop_message.size() + RpcLogDrain::kLogEntriesEncodeFrameSize <
53 entries_encoder.ConservativeWriteLimit()) {
54 PW_CHECK_OK(entries_encoder.WriteBytes(
55 static_cast<uint32_t>(log::pwpb::LogEntries::Fields::kEntries),
56 drop_message));
57 drop_count = 0;
58 }
59 }
60
61 } // namespace
62
Open(rpc::RawServerWriter & writer)63 Status RpcLogDrain::Open(rpc::RawServerWriter& writer) {
64 if (!writer.active()) {
65 return Status::FailedPrecondition();
66 }
67 std::lock_guard lock(mutex_);
68 if (server_writer_.active()) {
69 return Status::AlreadyExists();
70 }
71 server_writer_ = std::move(writer);
72
73 // Set a callback to close the drain when RequestCompletion() is requested by
74 // the reader. This callback is only set and invoked if
75 // PW_RPC_COMPLETION_REQUEST_CALLBACK is enabled.
76 // TODO: b/274936558 - : Add unit tests to check that when this callback is
77 // invoked, the stream is closed gracefully without dropping logs.
78 server_writer_.set_on_completion_requested_if_enabled(
79 [this]() { Close().IgnoreError(); });
80
81 if (on_open_callback_ != nullptr) {
82 on_open_callback_();
83 }
84 return OkStatus();
85 }
86
Flush(ByteSpan encoding_buffer)87 Status RpcLogDrain::Flush(ByteSpan encoding_buffer) {
88 Status status;
89 SendLogs(std::numeric_limits<size_t>::max(), encoding_buffer, status);
90 return status;
91 }
92
Trickle(ByteSpan encoding_buffer)93 std::optional<chrono::SystemClock::duration> RpcLogDrain::Trickle(
94 ByteSpan encoding_buffer) {
95 chrono::SystemClock::time_point now = chrono::SystemClock::now();
96 // Called before drain is ready to send more logs. Ignore this request and
97 // remind the caller how much longer they'll need to wait.
98 if (no_writes_until_ > now) {
99 return no_writes_until_ - now;
100 }
101
102 Status encoding_status;
103 if (SendLogs(max_bundles_per_trickle_, encoding_buffer, encoding_status) ==
104 LogDrainState::kCaughtUp) {
105 return std::nullopt;
106 }
107
108 no_writes_until_ = chrono::SystemClock::TimePointAfterAtLeast(trickle_delay_);
109 return trickle_delay_;
110 }
111
SendLogs(size_t max_num_bundles,ByteSpan encoding_buffer,Status & encoding_status_out)112 RpcLogDrain::LogDrainState RpcLogDrain::SendLogs(size_t max_num_bundles,
113 ByteSpan encoding_buffer,
114 Status& encoding_status_out) {
115 PW_CHECK_NOTNULL(multisink_);
116
117 LogDrainState log_sink_state = LogDrainState::kMoreEntriesRemaining;
118 std::lock_guard lock(mutex_);
119 size_t sent_bundle_count = 0;
120 while (sent_bundle_count < max_num_bundles &&
121 log_sink_state != LogDrainState::kCaughtUp) {
122 if (!server_writer_.active()) {
123 encoding_status_out = Status::Unavailable();
124 // No reason to keep polling this drain until the writer is opened.
125 return LogDrainState::kCaughtUp;
126 }
127 log::pwpb::LogEntries::MemoryEncoder encoder(encoding_buffer);
128 uint32_t packed_entry_count = 0;
129 log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count);
130
131 // Avoid sending empty packets.
132 if (encoder.size() == 0) {
133 continue;
134 }
135
136 encoder.WriteFirstEntrySequenceId(sequence_id_)
137 .IgnoreError(); // TODO: b/242598609 - Handle Status properly
138 sequence_id_ += packed_entry_count;
139 const Status status = server_writer_.Write(encoder);
140 sent_bundle_count++;
141
142 if (!status.ok() &&
143 error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) {
144 // Only update this drop count when writer errors are not ignored.
145 drop_count_writer_error_ += packed_entry_count;
146 server_writer_.Finish().IgnoreError();
147 encoding_status_out = Status::Aborted();
148 return log_sink_state;
149 }
150 }
151 return log_sink_state;
152 }
153
EncodeOutgoingPacket(log::pwpb::LogEntries::MemoryEncoder & encoder,uint32_t & packed_entry_count_out)154 RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket(
155 log::pwpb::LogEntries::MemoryEncoder& encoder,
156 uint32_t& packed_entry_count_out) {
157 const size_t total_buffer_size = encoder.ConservativeWriteLimit();
158 do {
159 // Peek entry and get drop count from multisink.
160 uint32_t drop_count = 0;
161 uint32_t ingress_drop_count = 0;
162 Result<multisink::MultiSink::Drain::PeekedEntry> possible_entry =
163 PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count);
164 drop_count_ingress_error_ += ingress_drop_count;
165
166 // Check if the entry fits in the entry buffer.
167 if (possible_entry.status().IsResourceExhausted()) {
168 ++drop_count_small_stack_buffer_;
169 continue;
170 }
171
172 // Check if there are any entries left.
173 if (possible_entry.status().IsOutOfRange()) {
174 // Stash multisink's reported drop count that will be reported later with
175 // any other drop counts.
176 drop_count_slow_drain_ += drop_count;
177 return LogDrainState::kCaughtUp; // There are no more entries.
178 }
179
180 // At this point all expected errors have been handled.
181 PW_CHECK_OK(possible_entry.status());
182
183 // Check if the entry passes any set filter rules.
184 if (filter_ != nullptr &&
185 filter_->ShouldDropLog(possible_entry.value().entry())) {
186 // Add the drop count from the multisink peek, stored in `drop_count`, to
187 // the total drop count. Then drop the entry without counting it towards
188 // the total drop count. Drops will be reported later all together.
189 drop_count_slow_drain_ += drop_count;
190 PW_CHECK_OK(PopEntry(possible_entry.value()));
191 continue;
192 }
193
194 // Check if the entry fits in the encoder buffer by itself.
195 const size_t encoded_entry_size =
196 possible_entry.value().entry().size() + kLogEntriesEncodeFrameSize;
197 if (encoded_entry_size + kLogEntriesEncodeFrameSize > total_buffer_size) {
198 // Entry is larger than the entire available buffer.
199 ++drop_count_small_outbound_buffer_;
200 PW_CHECK_OK(PopEntry(possible_entry.value()));
201 continue;
202 }
203
204 // At this point, we have a valid entry that may fit in the encode buffer.
205 // Report any drop counts combined reusing the log_entry_buffer_ to encode a
206 // drop message.
207 drop_count_slow_drain_ += drop_count;
208 // Account for dropped entries too large for stack buffer, which PeekEntry()
209 // also reports.
210 drop_count_slow_drain_ -= drop_count_small_stack_buffer_;
211 bool log_entry_buffer_has_valid_entry = possible_entry.ok();
212 if (drop_count_slow_drain_ > 0) {
213 TryEncodeDropMessage(log_entry_buffer_,
214 std::string_view(kSlowDrainErrorMessage),
215 drop_count_slow_drain_,
216 encoder);
217 log_entry_buffer_has_valid_entry = false;
218 }
219 if (drop_count_ingress_error_ > 0) {
220 TryEncodeDropMessage(log_entry_buffer_,
221 std::string_view(kIngressErrorMessage),
222 drop_count_ingress_error_,
223 encoder);
224 log_entry_buffer_has_valid_entry = false;
225 }
226 if (drop_count_small_stack_buffer_ > 0) {
227 TryEncodeDropMessage(log_entry_buffer_,
228 std::string_view(kSmallStackBufferErrorMessage),
229 drop_count_small_stack_buffer_,
230 encoder);
231 log_entry_buffer_has_valid_entry = false;
232 }
233 if (drop_count_small_outbound_buffer_ > 0) {
234 TryEncodeDropMessage(log_entry_buffer_,
235 std::string_view(kSmallOutboundBufferErrorMessage),
236 drop_count_small_outbound_buffer_,
237 encoder);
238 log_entry_buffer_has_valid_entry = false;
239 }
240 if (drop_count_writer_error_ > 0) {
241 TryEncodeDropMessage(log_entry_buffer_,
242 std::string_view(kWriterErrorMessage),
243 drop_count_writer_error_,
244 encoder);
245 log_entry_buffer_has_valid_entry = false;
246 }
247 if (possible_entry.ok() && !log_entry_buffer_has_valid_entry) {
248 PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count)
249 .status());
250 }
251
252 // Check if the entry fits in the partially filled encoder buffer.
253 if (encoded_entry_size > encoder.ConservativeWriteLimit()) {
254 // Notify the caller there are more entries to send.
255 return LogDrainState::kMoreEntriesRemaining;
256 }
257
258 // Encode the entry and remove it from multisink.
259 PW_CHECK_OK(encoder.WriteBytes(
260 static_cast<uint32_t>(log::pwpb::LogEntries::Fields::kEntries),
261 possible_entry.value().entry()));
262 PW_CHECK_OK(PopEntry(possible_entry.value()));
263 ++packed_entry_count_out;
264 } while (true);
265 }
266
Close()267 Status RpcLogDrain::Close() {
268 std::lock_guard lock(mutex_);
269 return server_writer_.Finish();
270 }
271
272 } // namespace pw::log_rpc
273