1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_log_rpc/log_service.h"
16
17 #include <array>
18 #include <cstdint>
19 #include <limits>
20
21 #include "pw_assert/check.h"
22 #include "pw_bytes/endian.h"
23 #include "pw_containers/vector.h"
24 #include "pw_log/log.h"
25 #include "pw_log/proto/log.pwpb.h"
26 #include "pw_log/proto_utils.h"
27 #include "pw_log_rpc/log_filter.h"
28 #include "pw_log_rpc_private/test_utils.h"
29 #include "pw_log_tokenized/metadata.h"
30 #include "pw_protobuf/bytes_utils.h"
31 #include "pw_protobuf/decoder.h"
32 #include "pw_result/result.h"
33 #include "pw_rpc/channel.h"
34 #include "pw_rpc/raw/fake_channel_output.h"
35 #include "pw_rpc/raw/test_method_context.h"
36 #include "pw_sync/mutex.h"
37 #include "pw_unit_test/framework.h"
38
39 namespace pw::log_rpc {
40 namespace {
41
42 using log::pw_rpc::raw::Logs;
43
44 namespace FilterRule = log::pwpb::FilterRule;
45
46 #define LOG_SERVICE_METHOD_CONTEXT \
47 PW_RAW_TEST_METHOD_CONTEXT(LogService, Listen, 10)
48
49 constexpr size_t kMaxMessageSize = 50;
50 constexpr size_t kMaxLogEntrySize =
51 RpcLogDrain::kMinEntryBufferSize + kMaxMessageSize;
52 static_assert(RpcLogDrain::kMinEntryBufferSize < kMaxLogEntrySize);
53 constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10;
54 constexpr size_t kMaxDrains = 3;
55 constexpr char kMessage[] = "message";
56 // A message small enough to fit encoded in
57 // LogServiceTest::entry_encode_buffer_ but large enough to not fit in
58 // LogServiceTest::small_buffer_.
59 constexpr char kLongMessage[] =
60 "This is a long log message that will be dropped.";
61 static_assert(sizeof(kLongMessage) < kMaxMessageSize);
62 static_assert(sizeof(kLongMessage) + RpcLogDrain::kMinEntrySizeWithoutPayload >
63 RpcLogDrain::kMinEntryBufferSize);
64 std::array<std::byte, 1> rpc_request_buffer;
65 const std::array<std::byte, 5> kSampleThread = {std::byte('M'),
66 std::byte('0'),
67 std::byte('L'),
68 std::byte('O'),
69 std::byte('G')};
70 constexpr auto kSampleMetadata =
71 log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, __LINE__>();
72 constexpr auto kDropMessageMetadata =
73 log_tokenized::Metadata::Set<0, 0, 0, 0>();
74 constexpr int64_t kSampleTimestamp = 1000;
75
76 // `LogServiceTest` sets up a logging environment for testing with a
77 // `MultiSink` for log entries, and multiple `RpcLogDrain`s for consuming such
78 // log entries. It includes methods to add log entries to the `MultiSink`, and
79 // buffers for encoding and retrieving log entries. Tests can choose how many
80 // entries to add to the multisink, and which drain to use.
81 class LogServiceTest : public ::testing::Test {
82 public:
LogServiceTest()83 LogServiceTest() : multisink_(multisink_buffer_), drain_map_(drains_) {
84 for (auto& drain : drain_map_.drains()) {
85 multisink_.AttachDrain(drain);
86 }
87 }
88
AddLogEntries(size_t log_count,std::string_view message,log_tokenized::Metadata metadata,int64_t timestamp,ConstByteSpan thread)89 void AddLogEntries(size_t log_count,
90 std::string_view message,
91 log_tokenized::Metadata metadata,
92 int64_t timestamp,
93 ConstByteSpan thread) {
94 for (size_t i = 0; i < log_count; ++i) {
95 ASSERT_TRUE(AddLogEntry(message, metadata, timestamp, thread).ok());
96 }
97 }
98
AddLogEntry(std::string_view message,log_tokenized::Metadata metadata,int64_t timestamp,ConstByteSpan thread)99 StatusWithSize AddLogEntry(std::string_view message,
100 log_tokenized::Metadata metadata,
101 int64_t timestamp,
102 ConstByteSpan thread) {
103 Result<ConstByteSpan> encoded_log_result =
104 log::EncodeTokenizedLog(metadata,
105 as_bytes(span<const char>(message)),
106 timestamp,
107 thread,
108 entry_encode_buffer_);
109 PW_TRY_WITH_SIZE(encoded_log_result.status());
110 multisink_.HandleEntry(encoded_log_result.value());
111 return StatusWithSize(encoded_log_result.value().size());
112 }
113
114 protected:
115 std::array<std::byte, kMultiSinkBufferSize> multisink_buffer_ = {};
116 multisink::MultiSink multisink_;
117 std::array<std::byte, kMaxLogEntrySize> entry_encode_buffer_ = {};
118 static constexpr size_t kMaxFilterRules = 4;
119 std::array<Filter::Rule, kMaxFilterRules> rules1_ = {};
120 std::array<Filter::Rule, kMaxFilterRules> rules2_ = {};
121 std::array<Filter::Rule, kMaxFilterRules> rules3_ = {};
122 static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1_{
123 std::byte(65), std::byte(66), std::byte(67), std::byte(0)};
124 static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2_{
125 std::byte(68), std::byte(69), std::byte(70), std::byte(0)};
126 static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id3_{
127 std::byte(71), std::byte(72), std::byte(73), std::byte(0)};
128 std::array<Filter, kMaxDrains> filters_ = {
129 Filter(filter_id1_, rules1_),
130 Filter(filter_id2_, rules2_),
131 Filter(filter_id3_, rules3_),
132 };
133
134 // Drain Buffers
135 std::array<std::byte, kMaxLogEntrySize> drain_buffer1_ = {};
136 std::array<std::byte, kMaxLogEntrySize> drain_buffer2_ = {};
137 std::array<std::byte, RpcLogDrain::kMinEntryBufferSize> small_buffer_ = {};
138 static constexpr uint32_t kIgnoreWriterErrorsDrainId = 1;
139 static constexpr uint32_t kCloseWriterOnErrorDrainId = 2;
140 static constexpr uint32_t kSmallBufferDrainId = 3;
141 sync::Mutex shared_mutex_;
142 std::array<RpcLogDrain, kMaxDrains> drains_{
143 RpcLogDrain(kIgnoreWriterErrorsDrainId,
144 drain_buffer1_,
145 shared_mutex_,
146 RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
147 &filters_[0]),
148 RpcLogDrain(kCloseWriterOnErrorDrainId,
149 drain_buffer2_,
150 shared_mutex_,
151 RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
152 &filters_[1]),
153 RpcLogDrain(kSmallBufferDrainId,
154 small_buffer_,
155 shared_mutex_,
156 RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
157 &filters_[2]),
158 };
159 RpcLogDrainMap drain_map_;
160
161 std::array<std::byte, 128> encoding_buffer_ = {};
162 };
163
TEST_F(LogServiceTest,AssignWriter)164 TEST_F(LogServiceTest, AssignWriter) {
165 // Drains don't have writers.
166 for (auto& drain : drain_map_.drains()) {
167 EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
168 }
169
170 // Create context directed to drain with ID 1.
171 RpcLogDrain& active_drain = drains_[0];
172 const uint32_t drain_channel_id = active_drain.channel_id();
173 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
174 context.set_channel_id(drain_channel_id);
175
176 // Call RPC, which sets the drain's writer.
177 context.call(rpc_request_buffer);
178 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
179
180 // Other drains are still missing writers.
181 for (auto& drain : drain_map_.drains()) {
182 if (drain.channel_id() != drain_channel_id) {
183 EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
184 }
185 }
186
187 // Calling an ongoing log stream must not change the active drain's
188 // writer, and the second writer must not get any responses.
189 LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_);
190 second_call_context.set_channel_id(drain_channel_id);
191 second_call_context.call(rpc_request_buffer);
192 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
193 ASSERT_TRUE(second_call_context.done());
194 EXPECT_EQ(second_call_context.responses().size(), 0u);
195
196 // Setting a new writer on a closed stream is allowed.
197 ASSERT_EQ(active_drain.Close(), OkStatus());
198 LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_);
199 third_call_context.set_channel_id(drain_channel_id);
200 third_call_context.call(rpc_request_buffer);
201 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
202 ASSERT_FALSE(third_call_context.done());
203 EXPECT_EQ(third_call_context.responses().size(), 0u);
204 EXPECT_EQ(active_drain.Close(), OkStatus());
205 }
206
TEST_F(LogServiceTest,StartAndEndStream)207 TEST_F(LogServiceTest, StartAndEndStream) {
208 RpcLogDrain& active_drain = drains_[2];
209 const uint32_t drain_channel_id = active_drain.channel_id();
210 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
211 context.set_channel_id(drain_channel_id);
212
213 // Add log entries.
214 const size_t total_entries = 10;
215 AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp, {});
216
217 // Request logs.
218 context.call(rpc_request_buffer);
219 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
220
221 // Not done until the stream is finished.
222 ASSERT_FALSE(context.done());
223 EXPECT_EQ(OkStatus(), active_drain.Close());
224 ASSERT_TRUE(context.done());
225
226 EXPECT_EQ(context.status(), OkStatus());
227 // There is at least 1 response with multiple log entries packed.
228 EXPECT_GE(context.responses().size(), 1u);
229
230 // Verify data in responses.
231 Vector<TestLogEntry, total_entries> expected_messages;
232 for (size_t i = 0; i < total_entries; ++i) {
233 expected_messages.push_back(
234 {.metadata = kSampleMetadata,
235 .timestamp = kSampleTimestamp,
236 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
237 .thread = {}});
238 }
239 size_t entries_found = 0;
240 uint32_t drop_count_found = 0;
241 for (auto& response : context.responses()) {
242 protobuf::Decoder entry_decoder(response);
243 VerifyLogEntries(entry_decoder,
244 expected_messages,
245 entries_found,
246 entries_found,
247 drop_count_found);
248 }
249 EXPECT_EQ(entries_found, total_entries);
250 EXPECT_EQ(drop_count_found, 0u);
251 }
252
TEST_F(LogServiceTest,HandleDropped)253 TEST_F(LogServiceTest, HandleDropped) {
254 RpcLogDrain& active_drain = drains_[0];
255 const uint32_t drain_channel_id = active_drain.channel_id();
256 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
257 context.set_channel_id(drain_channel_id);
258
259 // Add log entries.
260 const size_t total_entries = 5;
261 const size_t entries_before_drop = 1;
262 const uint32_t total_drop_count = 2;
263
264 // Force a drop entry in between entries.
265 AddLogEntries(entries_before_drop,
266 kMessage,
267 kSampleMetadata,
268 kSampleTimestamp,
269 kSampleThread);
270 multisink_.HandleDropped(total_drop_count);
271 AddLogEntries(total_entries - entries_before_drop,
272 kMessage,
273 kSampleMetadata,
274 kSampleTimestamp,
275 kSampleThread);
276
277 // Request logs.
278 context.call(rpc_request_buffer);
279 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
280 EXPECT_EQ(OkStatus(), active_drain.Close());
281 ASSERT_EQ(context.status(), OkStatus());
282 // There is at least 1 response with multiple log entries packed.
283 ASSERT_GE(context.responses().size(), 1u);
284
285 Vector<TestLogEntry, total_entries + 1> expected_messages;
286 size_t i = 0;
287 for (; i < entries_before_drop; ++i) {
288 expected_messages.push_back(
289 {.metadata = kSampleMetadata,
290 .timestamp = kSampleTimestamp,
291 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
292 .thread = kSampleThread});
293 }
294 expected_messages.push_back(
295 {.metadata = kDropMessageMetadata,
296 .dropped = total_drop_count,
297 .tokenized_data =
298 as_bytes(span(std::string_view(RpcLogDrain::kIngressErrorMessage))),
299 .thread = {}});
300 for (; i < total_entries; ++i) {
301 expected_messages.push_back(
302 {.metadata = kSampleMetadata,
303 .timestamp = kSampleTimestamp,
304 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
305 .thread = kSampleThread});
306 }
307
308 // Verify data in responses.
309 size_t entries_found = 0;
310 uint32_t drop_count_found = 0;
311 for (auto& response : context.responses()) {
312 protobuf::Decoder entry_decoder(response);
313 VerifyLogEntries(entry_decoder,
314 expected_messages,
315 entries_found,
316 entries_found,
317 drop_count_found);
318 }
319 EXPECT_EQ(entries_found, total_entries);
320 EXPECT_EQ(drop_count_found, total_drop_count);
321 }
322
TEST_F(LogServiceTest,HandleDroppedBetweenFilteredOutLogs)323 TEST_F(LogServiceTest, HandleDroppedBetweenFilteredOutLogs) {
324 RpcLogDrain& active_drain = drains_[0];
325 const uint32_t drain_channel_id = active_drain.channel_id();
326 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
327 context.set_channel_id(drain_channel_id);
328 // Set filter to drop INFO+ and keep DEBUG logs
329 rules1_[0].action = Filter::Rule::Action::kDrop;
330 rules1_[0].level_greater_than_or_equal = FilterRule::Level::INFO_LEVEL;
331
332 // Add log entries.
333 const size_t total_entries = 5;
334 const uint32_t total_drop_count = total_entries - 1;
335
336 // Force a drop entry in between entries that will be filtered out.
337 for (size_t i = 1; i < total_entries; ++i) {
338 ASSERT_EQ(
339 OkStatus(),
340 AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp, kSampleThread)
341 .status());
342 multisink_.HandleDropped(1);
343 }
344 // Add message that won't be filtered out.
345 constexpr auto metadata =
346 log_tokenized::Metadata::Set<PW_LOG_LEVEL_DEBUG, 0, 0, __LINE__>();
347 ASSERT_EQ(OkStatus(),
348 AddLogEntry(kMessage, metadata, kSampleTimestamp, kSampleThread)
349 .status());
350
351 // Request logs.
352 context.call(rpc_request_buffer);
353 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
354 EXPECT_EQ(OkStatus(), active_drain.Close());
355 ASSERT_EQ(context.status(), OkStatus());
356 // There is at least 1 response with multiple log entries packed.
357 ASSERT_GE(context.responses().size(), 1u);
358
359 Vector<TestLogEntry, 2> expected_messages;
360 expected_messages.push_back(
361 {.metadata = kDropMessageMetadata,
362 .dropped = total_drop_count,
363 .tokenized_data =
364 as_bytes(span(std::string_view(RpcLogDrain::kIngressErrorMessage))),
365 .thread = {}});
366 expected_messages.push_back(
367 {.metadata = metadata,
368 .timestamp = kSampleTimestamp,
369 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
370 .thread = kSampleThread});
371
372 // Verify data in responses.
373 size_t entries_found = 0;
374 uint32_t drop_count_found = 0;
375 for (auto& response : context.responses()) {
376 protobuf::Decoder entry_decoder(response);
377 VerifyLogEntries(entry_decoder,
378 expected_messages,
379 entries_found,
380 entries_found,
381 drop_count_found);
382 }
383 EXPECT_EQ(entries_found, 1u);
384 EXPECT_EQ(drop_count_found, total_drop_count);
385 }
386
TEST_F(LogServiceTest,HandleSmallLogEntryBuffer)387 TEST_F(LogServiceTest, HandleSmallLogEntryBuffer) {
388 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
389 context.set_channel_id(kSmallBufferDrainId);
390 auto small_buffer_drain =
391 drain_map_.GetDrainFromChannelId(kSmallBufferDrainId);
392 ASSERT_TRUE(small_buffer_drain.ok());
393
394 // Add long entries that don't fit the drain's log entry buffer, except for
395 // one, since drop count messages are only sent when a log entry can be sent.
396 const size_t total_entries = 5;
397 const uint32_t total_drop_count = total_entries - 1;
398 AddLogEntries(total_drop_count,
399 kLongMessage,
400 kSampleMetadata,
401 kSampleTimestamp,
402 kSampleThread);
403 EXPECT_EQ(
404 OkStatus(),
405 AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp, kSampleThread)
406 .status());
407
408 // Request logs.
409 context.call(rpc_request_buffer);
410 EXPECT_EQ(small_buffer_drain.value()->Flush(encoding_buffer_), OkStatus());
411 EXPECT_EQ(small_buffer_drain.value()->Close(), OkStatus());
412 ASSERT_EQ(context.status(), OkStatus());
413 ASSERT_EQ(context.responses().size(), 1u);
414
415 Vector<TestLogEntry, total_entries + 1> expected_messages;
416 expected_messages.push_back(
417 {.metadata = kDropMessageMetadata,
418 .dropped = total_drop_count,
419 .tokenized_data = as_bytes(
420 span(std::string_view(RpcLogDrain::kSmallStackBufferErrorMessage))),
421 .thread = {}});
422 expected_messages.push_back(
423 {.metadata = kSampleMetadata,
424 .timestamp = kSampleTimestamp,
425 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
426 .thread = kSampleThread});
427
428 // Expect one drop message with the total drop count, and the only message
429 // that fits the buffer.
430 size_t entries_found = 0;
431 uint32_t drop_count_found = 0;
432 for (auto& response : context.responses()) {
433 protobuf::Decoder entry_decoder(response);
434 VerifyLogEntries(entry_decoder,
435 expected_messages,
436 entries_found,
437 entries_found,
438 drop_count_found);
439 }
440 EXPECT_EQ(entries_found, 1u);
441 EXPECT_EQ(drop_count_found, total_drop_count);
442 }
443
TEST_F(LogServiceTest,FlushDrainWithoutMultisink)444 TEST_F(LogServiceTest, FlushDrainWithoutMultisink) {
445 auto& detached_drain = drains_[0];
446 multisink_.DetachDrain(detached_drain);
447 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
448 context.set_channel_id(detached_drain.channel_id());
449
450 // Add log entries.
451 const size_t total_entries = 5;
452 AddLogEntries(total_entries,
453 kMessage,
454 kSampleMetadata,
455 kSampleTimestamp,
456 kSampleThread);
457 // Request logs.
458 context.call(rpc_request_buffer);
459 EXPECT_EQ(detached_drain.Close(), OkStatus());
460 ASSERT_EQ(context.status(), OkStatus());
461 EXPECT_EQ(context.responses().size(), 0u);
462 }
463
TEST_F(LogServiceTest,LargeLogEntry)464 TEST_F(LogServiceTest, LargeLogEntry) {
465 const TestLogEntry expected_entry{
466 .metadata =
467 log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN,
468 (1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1,
469 (1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1,
470 (1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>(),
471 .timestamp = std::numeric_limits<int64_t>::max(),
472 .tokenized_data = as_bytes(span(kMessage)),
473 .thread = kSampleThread,
474 };
475
476 // Add entry to multisink.
477 log::pwpb::LogEntry::MemoryEncoder encoder(entry_encode_buffer_);
478 ASSERT_EQ(encoder.WriteMessage(expected_entry.tokenized_data), OkStatus());
479 ASSERT_EQ(encoder.WriteLineLevel(
480 (expected_entry.metadata.level() & PW_LOG_LEVEL_BITMASK) |
481 ((expected_entry.metadata.line_number() << PW_LOG_LEVEL_BITS) &
482 ~PW_LOG_LEVEL_BITMASK)),
483 OkStatus());
484 ASSERT_EQ(encoder.WriteFlags(expected_entry.metadata.flags()), OkStatus());
485 ASSERT_EQ(encoder.WriteTimestamp(expected_entry.timestamp), OkStatus());
486 const uint32_t little_endian_module =
487 bytes::ConvertOrderTo(endian::little, expected_entry.metadata.module());
488 ASSERT_EQ(encoder.WriteModule(as_bytes(span(&little_endian_module, 1))),
489 OkStatus());
490 ASSERT_EQ(encoder.WriteThread(expected_entry.thread), OkStatus());
491 ASSERT_EQ(encoder.status(), OkStatus());
492 multisink_.HandleEntry(encoder);
493
494 // Start log stream.
495 RpcLogDrain& active_drain = drains_[0];
496 const uint32_t drain_channel_id = active_drain.channel_id();
497 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
498 context.set_channel_id(drain_channel_id);
499 context.call(rpc_request_buffer);
500 ASSERT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
501 EXPECT_EQ(OkStatus(), active_drain.Close());
502 ASSERT_EQ(context.status(), OkStatus());
503 ASSERT_EQ(context.responses().size(), 1u);
504
505 // Verify message.
506 protobuf::Decoder entries_decoder(context.responses()[0]);
507 ASSERT_TRUE(entries_decoder.Next().ok());
508 ConstByteSpan entry;
509 EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
510 protobuf::Decoder entry_decoder(entry);
511 uint32_t drop_count = 0;
512 VerifyLogEntry(entry_decoder, expected_entry, drop_count);
513 EXPECT_EQ(drop_count, 0u);
514 }
515
TEST_F(LogServiceTest,InterruptedLogStreamSendsDropCount)516 TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
517 const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
518 auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
519 ASSERT_TRUE(drain.ok());
520
521 LogService log_service(drain_map_);
522 const size_t max_packets = 10;
523 rpc::RawFakeChannelOutput<10, 512> output;
524 rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
525 rpc::Server server(span(&channel, 1));
526
527 // Add as many entries needed to have multiple packets send.
528 StatusWithSize status =
529 AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp, kSampleThread);
530 ASSERT_TRUE(status.ok());
531
532 const uint32_t max_messages_per_response =
533 encoding_buffer_.size() / status.size();
534 // Send less packets than the max to avoid crashes.
535 const uint32_t packets_sent = max_packets / 2;
536 const size_t total_entries = packets_sent * max_messages_per_response;
537 const size_t max_entries = 50;
538 // Check we can test all these entries.
539 ASSERT_GE(max_entries, total_entries);
540 AddLogEntries(total_entries - 1,
541 kMessage,
542 kSampleMetadata,
543 kSampleTimestamp,
544 kSampleThread);
545
546 // Interrupt log stream with an error.
547 const uint32_t successful_packets_sent = packets_sent / 2;
548 output.set_send_status(Status::Unavailable(), successful_packets_sent);
549
550 // Request logs.
551 rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
552 server, drain_channel_id, log_service);
553 EXPECT_EQ(drain.value()->Open(writer), OkStatus());
554 // This drain closes on errors.
555 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), Status::Aborted());
556 EXPECT_TRUE(output.done());
557
558 // Make sure not all packets were sent.
559 ASSERT_EQ(output.payloads<Logs::Listen>().size(), successful_packets_sent);
560
561 // Verify data in responses.
562 Vector<TestLogEntry, max_entries> expected_messages;
563 for (size_t i = 0; i < total_entries; ++i) {
564 expected_messages.push_back(
565 {.metadata = kSampleMetadata,
566 .timestamp = kSampleTimestamp,
567 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
568 .thread = kSampleThread});
569 }
570 size_t entries_found = 0;
571 uint32_t drop_count_found = 0;
572 for (auto& response : output.payloads<Logs::Listen>()) {
573 protobuf::Decoder entry_decoder(response);
574 VerifyLogEntries(entry_decoder,
575 expected_messages,
576 entries_found,
577 entries_found,
578 drop_count_found);
579 }
580
581 // Verify that not all the entries were sent.
582 EXPECT_LT(entries_found, total_entries);
583 // The drain closes on errors, thus the drop count is reported on the next
584 // call to Flush.
585 EXPECT_EQ(drop_count_found, 0u);
586
587 // Reset channel output and resume log stream with a new writer.
588 output.clear();
589 writer = rpc::RawServerWriter::Open<Logs::Listen>(
590 server, drain_channel_id, log_service);
591 EXPECT_EQ(drain.value()->Open(writer), OkStatus());
592 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
593
594 // One full packet was dropped. Since all messages are the same length,
595 // there are entries_found / successful_packets_sent per packet.
596 const uint32_t total_drop_count = entries_found / successful_packets_sent;
597 Vector<TestLogEntry, max_entries> expected_messages_after_reset;
598 expected_messages_after_reset.push_back(
599 {.metadata = kDropMessageMetadata,
600 .dropped = total_drop_count,
601 .tokenized_data =
602 as_bytes(span(std::string_view(RpcLogDrain::kWriterErrorMessage)))});
603
604 const uint32_t remaining_entries = total_entries - total_drop_count;
605 for (size_t i = 0; i < remaining_entries; ++i) {
606 expected_messages_after_reset.push_back(
607 {.metadata = kSampleMetadata,
608 .timestamp = kSampleTimestamp,
609 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
610 .thread = kSampleThread});
611 }
612
613 size_t entries_found_after_reset = 0;
614 for (auto& response : output.payloads<Logs::Listen>()) {
615 protobuf::Decoder entry_decoder(response);
616 uint32_t expected_sequence_id =
617 entries_found + entries_found_after_reset + total_drop_count;
618 VerifyLogEntries(entry_decoder,
619 expected_messages_after_reset,
620 expected_sequence_id,
621 entries_found_after_reset,
622 drop_count_found);
623 }
624 EXPECT_EQ(entries_found + entries_found_after_reset, remaining_entries);
625 EXPECT_EQ(drop_count_found, total_drop_count);
626 }
627
TEST_F(LogServiceTest,InterruptedLogStreamIgnoresErrors)628 TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
629 const uint32_t drain_channel_id = kIgnoreWriterErrorsDrainId;
630 auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
631 ASSERT_TRUE(drain.ok());
632
633 LogService log_service(drain_map_);
634 const size_t max_packets = 20;
635 rpc::RawFakeChannelOutput<max_packets, 512> output;
636 rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
637 rpc::Server server(span(&channel, 1));
638
639 // Add as many entries needed to have multiple packets send.
640 StatusWithSize status =
641 AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp, kSampleThread);
642 ASSERT_TRUE(status.ok());
643
644 const uint32_t max_messages_per_response =
645 encoding_buffer_.size() / status.size();
646 // Send less packets than the max to avoid crashes.
647 const uint32_t packets_sent = 4;
648 const size_t total_entries = packets_sent * max_messages_per_response;
649 const size_t max_entries = 50;
650 // Check we can test all these entries.
651 ASSERT_GT(max_entries, total_entries);
652 AddLogEntries(total_entries - 1,
653 kMessage,
654 kSampleMetadata,
655 kSampleTimestamp,
656 kSampleThread);
657
658 // Interrupt log stream with an error.
659 const uint32_t error_on_packet_count = packets_sent / 2;
660 output.set_send_status(Status::Unavailable(), error_on_packet_count);
661
662 // Request logs.
663 rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
664 server, drain_channel_id, log_service);
665 EXPECT_EQ(drain.value()->Open(writer), OkStatus());
666 // This drain ignores errors.
667 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
668 EXPECT_FALSE(output.done());
669
670 // Make sure some packets were sent.
671 ASSERT_GT(output.payloads<Logs::Listen>().size(), 0u);
672
673 // Verify that not all the entries were sent.
674 size_t entries_found = 0;
675 for (auto& response : output.payloads<Logs::Listen>()) {
676 protobuf::Decoder entry_decoder(response);
677 entries_found += CountLogEntries(entry_decoder);
678 }
679 ASSERT_LT(entries_found, total_entries);
680
681 // Verify that all messages were sent.
682 const uint32_t total_drop_count = total_entries - entries_found;
683 Vector<TestLogEntry, max_entries> expected_messages;
684 for (size_t i = 0; i < entries_found; ++i) {
685 expected_messages.push_back(
686 {.metadata = kSampleMetadata,
687 .timestamp = kSampleTimestamp,
688 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
689 .thread = kSampleThread});
690 }
691
692 entries_found = 0;
693 uint32_t drop_count_found = 0;
694 uint32_t i = 0;
695 for (; i < error_on_packet_count; ++i) {
696 protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
697 VerifyLogEntries(entry_decoder,
698 expected_messages,
699 entries_found,
700 entries_found,
701 drop_count_found);
702 }
703 for (; i < output.payloads<Logs::Listen>().size(); ++i) {
704 protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
705 VerifyLogEntries(entry_decoder,
706 expected_messages,
707 entries_found + total_drop_count,
708 entries_found,
709 drop_count_found);
710 }
711 // This drain ignores errors and thus doesn't report drops on its own.
712 EXPECT_EQ(drop_count_found, 0u);
713
714 // More calls to flush with errors will not affect this stubborn drain.
715 const size_t previous_stream_packet_count =
716 output.payloads<Logs::Listen>().size();
717 output.set_send_status(Status::Unavailable());
718 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
719 EXPECT_FALSE(output.done());
720 ASSERT_EQ(output.payloads<Logs::Listen>().size(),
721 previous_stream_packet_count);
722
723 output.clear();
724 EXPECT_EQ(drain.value()->Close(), OkStatus());
725 EXPECT_TRUE(output.done());
726 }
727
TEST_F(LogServiceTest,FilterLogs)728 TEST_F(LogServiceTest, FilterLogs) {
729 // Add a variety of logs.
730 const uint32_t module = 0xcafe;
731 const uint32_t flags = 0x02;
732 const uint32_t line_number = 100;
733 const std::array<std::byte, 3> kNewThread = {
734 std::byte('A'), std::byte('P'), std::byte('P')};
735 const std::array<std::byte, 3> kInvalidThread = {
736 std::byte('C'), std::byte('D'), std::byte('C')};
737 const auto debug_metadata = log_tokenized::Metadata::
738 Set<PW_LOG_LEVEL_DEBUG, module, flags, line_number>();
739 ASSERT_TRUE(
740 AddLogEntry(kMessage, debug_metadata, kSampleTimestamp, kSampleThread)
741 .ok());
742 const auto info_metadata = log_tokenized::Metadata::
743 Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
744 ASSERT_TRUE(
745 AddLogEntry(kMessage, info_metadata, kSampleTimestamp, kSampleThread)
746 .ok());
747 const auto warn_metadata = log_tokenized::Metadata::
748 Set<PW_LOG_LEVEL_WARN, module, flags, line_number>();
749 ASSERT_TRUE(
750 AddLogEntry(kMessage, warn_metadata, kSampleTimestamp, kSampleThread)
751 .ok());
752 const auto error_metadata = log_tokenized::Metadata::
753 Set<PW_LOG_LEVEL_ERROR, module, flags, line_number>();
754 ASSERT_TRUE(
755 AddLogEntry(kMessage, error_metadata, kSampleTimestamp, kNewThread).ok());
756 const auto different_flags_metadata = log_tokenized::Metadata::
757 Set<PW_LOG_LEVEL_ERROR, module, 0x01, line_number>();
758 ASSERT_TRUE(
759 AddLogEntry(
760 kMessage, different_flags_metadata, kSampleTimestamp, kSampleThread)
761 .ok());
762 const auto different_module_metadata = log_tokenized::Metadata::
763 Set<PW_LOG_LEVEL_ERROR, 0xabcd, flags, line_number>();
764 ASSERT_TRUE(
765 AddLogEntry(
766 kMessage, different_module_metadata, kSampleTimestamp, kSampleThread)
767 .ok());
768 const auto second_info_metadata = log_tokenized::Metadata::
769 Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
770 ASSERT_TRUE(
771 AddLogEntry(kMessage, second_info_metadata, kSampleTimestamp, kNewThread)
772 .ok());
773 const auto metadata = log_tokenized::Metadata::
774 Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
775 ASSERT_TRUE(
776 AddLogEntry(kMessage, metadata, kSampleTimestamp, kInvalidThread).ok());
777
778 Vector<TestLogEntry, 4> expected_messages{
779 {.metadata = info_metadata,
780 .timestamp = kSampleTimestamp,
781 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
782 .thread = kSampleThread},
783 {.metadata = warn_metadata,
784 .timestamp = kSampleTimestamp,
785 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
786 .thread = kSampleThread},
787 {.metadata = error_metadata,
788 .timestamp = kSampleTimestamp,
789 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
790 .thread = kNewThread},
791 {.metadata = second_info_metadata,
792 .timestamp = kSampleTimestamp,
793 .tokenized_data = as_bytes(span(std::string_view(kMessage))),
794 .thread = kNewThread},
795 };
796
797 // Set up filter rules for drain at drains_[1].
798 RpcLogDrain& drain = drains_[1];
799 for (auto& rule : rules2_) {
800 rule = {};
801 }
802 const auto module_little_endian =
803 bytes::CopyInOrder<uint32_t>(endian::little, module);
804 rules2_[0] = {
805 .action = Filter::Rule::Action::kKeep,
806 .level_greater_than_or_equal = FilterRule::Level::INFO_LEVEL,
807 .any_flags_set = flags,
808 .module_equals{module_little_endian.begin(), module_little_endian.end()},
809 .thread_equals{kSampleThread.begin(), kSampleThread.end()}};
810 rules2_[1] = {
811 .action = Filter::Rule::Action::kKeep,
812 .level_greater_than_or_equal = FilterRule::Level::DEBUG_LEVEL,
813 .any_flags_set = flags,
814 .module_equals{module_little_endian.begin(), module_little_endian.end()},
815 .thread_equals{kNewThread.begin(), kNewThread.end()}};
816 rules2_[2] = {.action = Filter::Rule::Action::kDrop,
817 .level_greater_than_or_equal = FilterRule::Level::ANY_LEVEL,
818 .any_flags_set = 0,
819 .module_equals{},
820 .thread_equals{}};
821 rules2_[3] = {
822 .action = Filter::Rule::Action::kKeep,
823 .level_greater_than_or_equal = FilterRule::Level::INFO_LEVEL,
824 .any_flags_set = flags,
825 .module_equals{module_little_endian.begin(), module_little_endian.end()},
826 .thread_equals{kNewThread.begin(), kNewThread.end()}};
827
828 // Request logs.
829 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
830 context.set_channel_id(drain.channel_id());
831 context.call({});
832 ASSERT_EQ(drain.Flush(encoding_buffer_), OkStatus());
833
834 size_t entries_found = 0;
835 uint32_t drop_count_found = 0;
836 for (auto& response : context.responses()) {
837 protobuf::Decoder entry_decoder(response);
838 VerifyLogEntries(entry_decoder,
839 expected_messages,
840 entries_found,
841 entries_found,
842 drop_count_found);
843 }
844 EXPECT_EQ(entries_found, 4u);
845 EXPECT_EQ(drop_count_found, 0u);
846 }
847
TEST_F(LogServiceTest,ReopenClosedLogStreamWithAcquiredBuffer)848 TEST_F(LogServiceTest, ReopenClosedLogStreamWithAcquiredBuffer) {
849 const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
850 auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
851 ASSERT_TRUE(drain.ok());
852
853 LogService log_service(drain_map_);
854 rpc::RawFakeChannelOutput<10, 512> output;
855 rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
856 rpc::Server server(span(&channel, 1));
857
858 // Request logs.
859 rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
860 server, drain_channel_id, log_service);
861 EXPECT_EQ(drain.value()->Open(writer), OkStatus());
862 // This drain closes on errors.
863 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
864
865 // Request log stream with a new writer.
866 writer = rpc::RawServerWriter::Open<Logs::Listen>(
867 server, drain_channel_id, log_service);
868 EXPECT_EQ(drain.value()->Open(writer), OkStatus());
869 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
870 }
871
872 } // namespace
873 } // namespace pw::log_rpc
874