xref: /aosp_15_r20/external/pigweed/pw_log_rpc/log_service_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_log_rpc/log_service.h"
16 
17 #include <array>
18 #include <cstdint>
19 #include <limits>
20 
21 #include "pw_assert/check.h"
22 #include "pw_bytes/endian.h"
23 #include "pw_containers/vector.h"
24 #include "pw_log/log.h"
25 #include "pw_log/proto/log.pwpb.h"
26 #include "pw_log/proto_utils.h"
27 #include "pw_log_rpc/log_filter.h"
28 #include "pw_log_rpc_private/test_utils.h"
29 #include "pw_log_tokenized/metadata.h"
30 #include "pw_protobuf/bytes_utils.h"
31 #include "pw_protobuf/decoder.h"
32 #include "pw_result/result.h"
33 #include "pw_rpc/channel.h"
34 #include "pw_rpc/raw/fake_channel_output.h"
35 #include "pw_rpc/raw/test_method_context.h"
36 #include "pw_sync/mutex.h"
37 #include "pw_unit_test/framework.h"
38 
39 namespace pw::log_rpc {
40 namespace {
41 
42 using log::pw_rpc::raw::Logs;
43 
44 namespace FilterRule = log::pwpb::FilterRule;
45 
46 #define LOG_SERVICE_METHOD_CONTEXT \
47   PW_RAW_TEST_METHOD_CONTEXT(LogService, Listen, 10)
48 
49 constexpr size_t kMaxMessageSize = 50;
50 constexpr size_t kMaxLogEntrySize =
51     RpcLogDrain::kMinEntryBufferSize + kMaxMessageSize;
52 static_assert(RpcLogDrain::kMinEntryBufferSize < kMaxLogEntrySize);
53 constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10;
54 constexpr size_t kMaxDrains = 3;
55 constexpr char kMessage[] = "message";
56 // A message small enough to fit encoded in
57 // LogServiceTest::entry_encode_buffer_ but large enough to not fit in
58 // LogServiceTest::small_buffer_.
59 constexpr char kLongMessage[] =
60     "This is a long log message that will be dropped.";
61 static_assert(sizeof(kLongMessage) < kMaxMessageSize);
62 static_assert(sizeof(kLongMessage) + RpcLogDrain::kMinEntrySizeWithoutPayload >
63               RpcLogDrain::kMinEntryBufferSize);
64 std::array<std::byte, 1> rpc_request_buffer;
65 const std::array<std::byte, 5> kSampleThread = {std::byte('M'),
66                                                 std::byte('0'),
67                                                 std::byte('L'),
68                                                 std::byte('O'),
69                                                 std::byte('G')};
70 constexpr auto kSampleMetadata =
71     log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, __LINE__>();
72 constexpr auto kDropMessageMetadata =
73     log_tokenized::Metadata::Set<0, 0, 0, 0>();
74 constexpr int64_t kSampleTimestamp = 1000;
75 
76 // `LogServiceTest` sets up a logging environment for testing with a
77 // `MultiSink` for log entries, and multiple `RpcLogDrain`s for consuming such
78 // log entries. It includes methods to add log entries to the `MultiSink`, and
79 // buffers for encoding and retrieving log entries. Tests can choose how many
80 // entries to add to the multisink, and which drain to use.
81 class LogServiceTest : public ::testing::Test {
82  public:
LogServiceTest()83   LogServiceTest() : multisink_(multisink_buffer_), drain_map_(drains_) {
84     for (auto& drain : drain_map_.drains()) {
85       multisink_.AttachDrain(drain);
86     }
87   }
88 
AddLogEntries(size_t log_count,std::string_view message,log_tokenized::Metadata metadata,int64_t timestamp,ConstByteSpan thread)89   void AddLogEntries(size_t log_count,
90                      std::string_view message,
91                      log_tokenized::Metadata metadata,
92                      int64_t timestamp,
93                      ConstByteSpan thread) {
94     for (size_t i = 0; i < log_count; ++i) {
95       ASSERT_TRUE(AddLogEntry(message, metadata, timestamp, thread).ok());
96     }
97   }
98 
AddLogEntry(std::string_view message,log_tokenized::Metadata metadata,int64_t timestamp,ConstByteSpan thread)99   StatusWithSize AddLogEntry(std::string_view message,
100                              log_tokenized::Metadata metadata,
101                              int64_t timestamp,
102                              ConstByteSpan thread) {
103     Result<ConstByteSpan> encoded_log_result =
104         log::EncodeTokenizedLog(metadata,
105                                 as_bytes(span<const char>(message)),
106                                 timestamp,
107                                 thread,
108                                 entry_encode_buffer_);
109     PW_TRY_WITH_SIZE(encoded_log_result.status());
110     multisink_.HandleEntry(encoded_log_result.value());
111     return StatusWithSize(encoded_log_result.value().size());
112   }
113 
114  protected:
115   std::array<std::byte, kMultiSinkBufferSize> multisink_buffer_ = {};
116   multisink::MultiSink multisink_;
117   std::array<std::byte, kMaxLogEntrySize> entry_encode_buffer_ = {};
118   static constexpr size_t kMaxFilterRules = 4;
119   std::array<Filter::Rule, kMaxFilterRules> rules1_ = {};
120   std::array<Filter::Rule, kMaxFilterRules> rules2_ = {};
121   std::array<Filter::Rule, kMaxFilterRules> rules3_ = {};
122   static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1_{
123       std::byte(65), std::byte(66), std::byte(67), std::byte(0)};
124   static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2_{
125       std::byte(68), std::byte(69), std::byte(70), std::byte(0)};
126   static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id3_{
127       std::byte(71), std::byte(72), std::byte(73), std::byte(0)};
128   std::array<Filter, kMaxDrains> filters_ = {
129       Filter(filter_id1_, rules1_),
130       Filter(filter_id2_, rules2_),
131       Filter(filter_id3_, rules3_),
132   };
133 
134   // Drain Buffers
135   std::array<std::byte, kMaxLogEntrySize> drain_buffer1_ = {};
136   std::array<std::byte, kMaxLogEntrySize> drain_buffer2_ = {};
137   std::array<std::byte, RpcLogDrain::kMinEntryBufferSize> small_buffer_ = {};
138   static constexpr uint32_t kIgnoreWriterErrorsDrainId = 1;
139   static constexpr uint32_t kCloseWriterOnErrorDrainId = 2;
140   static constexpr uint32_t kSmallBufferDrainId = 3;
141   sync::Mutex shared_mutex_;
142   std::array<RpcLogDrain, kMaxDrains> drains_{
143       RpcLogDrain(kIgnoreWriterErrorsDrainId,
144                   drain_buffer1_,
145                   shared_mutex_,
146                   RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
147                   &filters_[0]),
148       RpcLogDrain(kCloseWriterOnErrorDrainId,
149                   drain_buffer2_,
150                   shared_mutex_,
151                   RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
152                   &filters_[1]),
153       RpcLogDrain(kSmallBufferDrainId,
154                   small_buffer_,
155                   shared_mutex_,
156                   RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
157                   &filters_[2]),
158   };
159   RpcLogDrainMap drain_map_;
160 
161   std::array<std::byte, 128> encoding_buffer_ = {};
162 };
163 
TEST_F(LogServiceTest,AssignWriter)164 TEST_F(LogServiceTest, AssignWriter) {
165   // Drains don't have writers.
166   for (auto& drain : drain_map_.drains()) {
167     EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
168   }
169 
170   // Create context directed to drain with ID 1.
171   RpcLogDrain& active_drain = drains_[0];
172   const uint32_t drain_channel_id = active_drain.channel_id();
173   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
174   context.set_channel_id(drain_channel_id);
175 
176   // Call RPC, which sets the drain's writer.
177   context.call(rpc_request_buffer);
178   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
179 
180   // Other drains are still missing writers.
181   for (auto& drain : drain_map_.drains()) {
182     if (drain.channel_id() != drain_channel_id) {
183       EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
184     }
185   }
186 
187   // Calling an ongoing log stream must not change the active drain's
188   // writer, and the second writer must not get any responses.
189   LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_);
190   second_call_context.set_channel_id(drain_channel_id);
191   second_call_context.call(rpc_request_buffer);
192   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
193   ASSERT_TRUE(second_call_context.done());
194   EXPECT_EQ(second_call_context.responses().size(), 0u);
195 
196   // Setting a new writer on a closed stream is allowed.
197   ASSERT_EQ(active_drain.Close(), OkStatus());
198   LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_);
199   third_call_context.set_channel_id(drain_channel_id);
200   third_call_context.call(rpc_request_buffer);
201   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
202   ASSERT_FALSE(third_call_context.done());
203   EXPECT_EQ(third_call_context.responses().size(), 0u);
204   EXPECT_EQ(active_drain.Close(), OkStatus());
205 }
206 
TEST_F(LogServiceTest,StartAndEndStream)207 TEST_F(LogServiceTest, StartAndEndStream) {
208   RpcLogDrain& active_drain = drains_[2];
209   const uint32_t drain_channel_id = active_drain.channel_id();
210   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
211   context.set_channel_id(drain_channel_id);
212 
213   // Add log entries.
214   const size_t total_entries = 10;
215   AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp, {});
216 
217   // Request logs.
218   context.call(rpc_request_buffer);
219   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
220 
221   // Not done until the stream is finished.
222   ASSERT_FALSE(context.done());
223   EXPECT_EQ(OkStatus(), active_drain.Close());
224   ASSERT_TRUE(context.done());
225 
226   EXPECT_EQ(context.status(), OkStatus());
227   // There is at least 1 response with multiple log entries packed.
228   EXPECT_GE(context.responses().size(), 1u);
229 
230   // Verify data in responses.
231   Vector<TestLogEntry, total_entries> expected_messages;
232   for (size_t i = 0; i < total_entries; ++i) {
233     expected_messages.push_back(
234         {.metadata = kSampleMetadata,
235          .timestamp = kSampleTimestamp,
236          .tokenized_data = as_bytes(span(std::string_view(kMessage))),
237          .thread = {}});
238   }
239   size_t entries_found = 0;
240   uint32_t drop_count_found = 0;
241   for (auto& response : context.responses()) {
242     protobuf::Decoder entry_decoder(response);
243     VerifyLogEntries(entry_decoder,
244                      expected_messages,
245                      entries_found,
246                      entries_found,
247                      drop_count_found);
248   }
249   EXPECT_EQ(entries_found, total_entries);
250   EXPECT_EQ(drop_count_found, 0u);
251 }
252 
TEST_F(LogServiceTest,HandleDropped)253 TEST_F(LogServiceTest, HandleDropped) {
254   RpcLogDrain& active_drain = drains_[0];
255   const uint32_t drain_channel_id = active_drain.channel_id();
256   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
257   context.set_channel_id(drain_channel_id);
258 
259   // Add log entries.
260   const size_t total_entries = 5;
261   const size_t entries_before_drop = 1;
262   const uint32_t total_drop_count = 2;
263 
264   // Force a drop entry in between entries.
265   AddLogEntries(entries_before_drop,
266                 kMessage,
267                 kSampleMetadata,
268                 kSampleTimestamp,
269                 kSampleThread);
270   multisink_.HandleDropped(total_drop_count);
271   AddLogEntries(total_entries - entries_before_drop,
272                 kMessage,
273                 kSampleMetadata,
274                 kSampleTimestamp,
275                 kSampleThread);
276 
277   // Request logs.
278   context.call(rpc_request_buffer);
279   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
280   EXPECT_EQ(OkStatus(), active_drain.Close());
281   ASSERT_EQ(context.status(), OkStatus());
282   // There is at least 1 response with multiple log entries packed.
283   ASSERT_GE(context.responses().size(), 1u);
284 
285   Vector<TestLogEntry, total_entries + 1> expected_messages;
286   size_t i = 0;
287   for (; i < entries_before_drop; ++i) {
288     expected_messages.push_back(
289         {.metadata = kSampleMetadata,
290          .timestamp = kSampleTimestamp,
291          .tokenized_data = as_bytes(span(std::string_view(kMessage))),
292          .thread = kSampleThread});
293   }
294   expected_messages.push_back(
295       {.metadata = kDropMessageMetadata,
296        .dropped = total_drop_count,
297        .tokenized_data =
298            as_bytes(span(std::string_view(RpcLogDrain::kIngressErrorMessage))),
299        .thread = {}});
300   for (; i < total_entries; ++i) {
301     expected_messages.push_back(
302         {.metadata = kSampleMetadata,
303          .timestamp = kSampleTimestamp,
304          .tokenized_data = as_bytes(span(std::string_view(kMessage))),
305          .thread = kSampleThread});
306   }
307 
308   // Verify data in responses.
309   size_t entries_found = 0;
310   uint32_t drop_count_found = 0;
311   for (auto& response : context.responses()) {
312     protobuf::Decoder entry_decoder(response);
313     VerifyLogEntries(entry_decoder,
314                      expected_messages,
315                      entries_found,
316                      entries_found,
317                      drop_count_found);
318   }
319   EXPECT_EQ(entries_found, total_entries);
320   EXPECT_EQ(drop_count_found, total_drop_count);
321 }
322 
TEST_F(LogServiceTest,HandleDroppedBetweenFilteredOutLogs)323 TEST_F(LogServiceTest, HandleDroppedBetweenFilteredOutLogs) {
324   RpcLogDrain& active_drain = drains_[0];
325   const uint32_t drain_channel_id = active_drain.channel_id();
326   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
327   context.set_channel_id(drain_channel_id);
328   // Set filter to drop INFO+ and keep DEBUG logs
329   rules1_[0].action = Filter::Rule::Action::kDrop;
330   rules1_[0].level_greater_than_or_equal = FilterRule::Level::INFO_LEVEL;
331 
332   // Add log entries.
333   const size_t total_entries = 5;
334   const uint32_t total_drop_count = total_entries - 1;
335 
336   // Force a drop entry in between entries that will be filtered out.
337   for (size_t i = 1; i < total_entries; ++i) {
338     ASSERT_EQ(
339         OkStatus(),
340         AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp, kSampleThread)
341             .status());
342     multisink_.HandleDropped(1);
343   }
344   // Add message that won't be filtered out.
345   constexpr auto metadata =
346       log_tokenized::Metadata::Set<PW_LOG_LEVEL_DEBUG, 0, 0, __LINE__>();
347   ASSERT_EQ(OkStatus(),
348             AddLogEntry(kMessage, metadata, kSampleTimestamp, kSampleThread)
349                 .status());
350 
351   // Request logs.
352   context.call(rpc_request_buffer);
353   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
354   EXPECT_EQ(OkStatus(), active_drain.Close());
355   ASSERT_EQ(context.status(), OkStatus());
356   // There is at least 1 response with multiple log entries packed.
357   ASSERT_GE(context.responses().size(), 1u);
358 
359   Vector<TestLogEntry, 2> expected_messages;
360   expected_messages.push_back(
361       {.metadata = kDropMessageMetadata,
362        .dropped = total_drop_count,
363        .tokenized_data =
364            as_bytes(span(std::string_view(RpcLogDrain::kIngressErrorMessage))),
365        .thread = {}});
366   expected_messages.push_back(
367       {.metadata = metadata,
368        .timestamp = kSampleTimestamp,
369        .tokenized_data = as_bytes(span(std::string_view(kMessage))),
370        .thread = kSampleThread});
371 
372   // Verify data in responses.
373   size_t entries_found = 0;
374   uint32_t drop_count_found = 0;
375   for (auto& response : context.responses()) {
376     protobuf::Decoder entry_decoder(response);
377     VerifyLogEntries(entry_decoder,
378                      expected_messages,
379                      entries_found,
380                      entries_found,
381                      drop_count_found);
382   }
383   EXPECT_EQ(entries_found, 1u);
384   EXPECT_EQ(drop_count_found, total_drop_count);
385 }
386 
TEST_F(LogServiceTest,HandleSmallLogEntryBuffer)387 TEST_F(LogServiceTest, HandleSmallLogEntryBuffer) {
388   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
389   context.set_channel_id(kSmallBufferDrainId);
390   auto small_buffer_drain =
391       drain_map_.GetDrainFromChannelId(kSmallBufferDrainId);
392   ASSERT_TRUE(small_buffer_drain.ok());
393 
394   // Add long entries that don't fit the drain's log entry buffer, except for
395   // one, since drop count messages are only sent when a log entry can be sent.
396   const size_t total_entries = 5;
397   const uint32_t total_drop_count = total_entries - 1;
398   AddLogEntries(total_drop_count,
399                 kLongMessage,
400                 kSampleMetadata,
401                 kSampleTimestamp,
402                 kSampleThread);
403   EXPECT_EQ(
404       OkStatus(),
405       AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp, kSampleThread)
406           .status());
407 
408   // Request logs.
409   context.call(rpc_request_buffer);
410   EXPECT_EQ(small_buffer_drain.value()->Flush(encoding_buffer_), OkStatus());
411   EXPECT_EQ(small_buffer_drain.value()->Close(), OkStatus());
412   ASSERT_EQ(context.status(), OkStatus());
413   ASSERT_EQ(context.responses().size(), 1u);
414 
415   Vector<TestLogEntry, total_entries + 1> expected_messages;
416   expected_messages.push_back(
417       {.metadata = kDropMessageMetadata,
418        .dropped = total_drop_count,
419        .tokenized_data = as_bytes(
420            span(std::string_view(RpcLogDrain::kSmallStackBufferErrorMessage))),
421        .thread = {}});
422   expected_messages.push_back(
423       {.metadata = kSampleMetadata,
424        .timestamp = kSampleTimestamp,
425        .tokenized_data = as_bytes(span(std::string_view(kMessage))),
426        .thread = kSampleThread});
427 
428   // Expect one drop message with the total drop count, and the only message
429   // that fits the buffer.
430   size_t entries_found = 0;
431   uint32_t drop_count_found = 0;
432   for (auto& response : context.responses()) {
433     protobuf::Decoder entry_decoder(response);
434     VerifyLogEntries(entry_decoder,
435                      expected_messages,
436                      entries_found,
437                      entries_found,
438                      drop_count_found);
439   }
440   EXPECT_EQ(entries_found, 1u);
441   EXPECT_EQ(drop_count_found, total_drop_count);
442 }
443 
TEST_F(LogServiceTest,FlushDrainWithoutMultisink)444 TEST_F(LogServiceTest, FlushDrainWithoutMultisink) {
445   auto& detached_drain = drains_[0];
446   multisink_.DetachDrain(detached_drain);
447   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
448   context.set_channel_id(detached_drain.channel_id());
449 
450   // Add log entries.
451   const size_t total_entries = 5;
452   AddLogEntries(total_entries,
453                 kMessage,
454                 kSampleMetadata,
455                 kSampleTimestamp,
456                 kSampleThread);
457   // Request logs.
458   context.call(rpc_request_buffer);
459   EXPECT_EQ(detached_drain.Close(), OkStatus());
460   ASSERT_EQ(context.status(), OkStatus());
461   EXPECT_EQ(context.responses().size(), 0u);
462 }
463 
TEST_F(LogServiceTest,LargeLogEntry)464 TEST_F(LogServiceTest, LargeLogEntry) {
465   const TestLogEntry expected_entry{
466       .metadata =
467           log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN,
468                                        (1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1,
469                                        (1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1,
470                                        (1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>(),
471       .timestamp = std::numeric_limits<int64_t>::max(),
472       .tokenized_data = as_bytes(span(kMessage)),
473       .thread = kSampleThread,
474   };
475 
476   // Add entry to multisink.
477   log::pwpb::LogEntry::MemoryEncoder encoder(entry_encode_buffer_);
478   ASSERT_EQ(encoder.WriteMessage(expected_entry.tokenized_data), OkStatus());
479   ASSERT_EQ(encoder.WriteLineLevel(
480                 (expected_entry.metadata.level() & PW_LOG_LEVEL_BITMASK) |
481                 ((expected_entry.metadata.line_number() << PW_LOG_LEVEL_BITS) &
482                  ~PW_LOG_LEVEL_BITMASK)),
483             OkStatus());
484   ASSERT_EQ(encoder.WriteFlags(expected_entry.metadata.flags()), OkStatus());
485   ASSERT_EQ(encoder.WriteTimestamp(expected_entry.timestamp), OkStatus());
486   const uint32_t little_endian_module =
487       bytes::ConvertOrderTo(endian::little, expected_entry.metadata.module());
488   ASSERT_EQ(encoder.WriteModule(as_bytes(span(&little_endian_module, 1))),
489             OkStatus());
490   ASSERT_EQ(encoder.WriteThread(expected_entry.thread), OkStatus());
491   ASSERT_EQ(encoder.status(), OkStatus());
492   multisink_.HandleEntry(encoder);
493 
494   // Start log stream.
495   RpcLogDrain& active_drain = drains_[0];
496   const uint32_t drain_channel_id = active_drain.channel_id();
497   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
498   context.set_channel_id(drain_channel_id);
499   context.call(rpc_request_buffer);
500   ASSERT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
501   EXPECT_EQ(OkStatus(), active_drain.Close());
502   ASSERT_EQ(context.status(), OkStatus());
503   ASSERT_EQ(context.responses().size(), 1u);
504 
505   // Verify message.
506   protobuf::Decoder entries_decoder(context.responses()[0]);
507   ASSERT_TRUE(entries_decoder.Next().ok());
508   ConstByteSpan entry;
509   EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
510   protobuf::Decoder entry_decoder(entry);
511   uint32_t drop_count = 0;
512   VerifyLogEntry(entry_decoder, expected_entry, drop_count);
513   EXPECT_EQ(drop_count, 0u);
514 }
515 
TEST_F(LogServiceTest,InterruptedLogStreamSendsDropCount)516 TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
517   const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
518   auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
519   ASSERT_TRUE(drain.ok());
520 
521   LogService log_service(drain_map_);
522   const size_t max_packets = 10;
523   rpc::RawFakeChannelOutput<10, 512> output;
524   rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
525   rpc::Server server(span(&channel, 1));
526 
527   // Add as many entries needed to have multiple packets send.
528   StatusWithSize status =
529       AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp, kSampleThread);
530   ASSERT_TRUE(status.ok());
531 
532   const uint32_t max_messages_per_response =
533       encoding_buffer_.size() / status.size();
534   // Send less packets than the max to avoid crashes.
535   const uint32_t packets_sent = max_packets / 2;
536   const size_t total_entries = packets_sent * max_messages_per_response;
537   const size_t max_entries = 50;
538   // Check we can test all these entries.
539   ASSERT_GE(max_entries, total_entries);
540   AddLogEntries(total_entries - 1,
541                 kMessage,
542                 kSampleMetadata,
543                 kSampleTimestamp,
544                 kSampleThread);
545 
546   // Interrupt log stream with an error.
547   const uint32_t successful_packets_sent = packets_sent / 2;
548   output.set_send_status(Status::Unavailable(), successful_packets_sent);
549 
550   // Request logs.
551   rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
552       server, drain_channel_id, log_service);
553   EXPECT_EQ(drain.value()->Open(writer), OkStatus());
554   // This drain closes on errors.
555   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), Status::Aborted());
556   EXPECT_TRUE(output.done());
557 
558   // Make sure not all packets were sent.
559   ASSERT_EQ(output.payloads<Logs::Listen>().size(), successful_packets_sent);
560 
561   // Verify data in responses.
562   Vector<TestLogEntry, max_entries> expected_messages;
563   for (size_t i = 0; i < total_entries; ++i) {
564     expected_messages.push_back(
565         {.metadata = kSampleMetadata,
566          .timestamp = kSampleTimestamp,
567          .tokenized_data = as_bytes(span(std::string_view(kMessage))),
568          .thread = kSampleThread});
569   }
570   size_t entries_found = 0;
571   uint32_t drop_count_found = 0;
572   for (auto& response : output.payloads<Logs::Listen>()) {
573     protobuf::Decoder entry_decoder(response);
574     VerifyLogEntries(entry_decoder,
575                      expected_messages,
576                      entries_found,
577                      entries_found,
578                      drop_count_found);
579   }
580 
581   // Verify that not all the entries were sent.
582   EXPECT_LT(entries_found, total_entries);
583   // The drain closes on errors, thus the drop count is reported on the next
584   // call to Flush.
585   EXPECT_EQ(drop_count_found, 0u);
586 
587   // Reset channel output and resume log stream with a new writer.
588   output.clear();
589   writer = rpc::RawServerWriter::Open<Logs::Listen>(
590       server, drain_channel_id, log_service);
591   EXPECT_EQ(drain.value()->Open(writer), OkStatus());
592   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
593 
594   // One full packet was dropped. Since all messages are the same length,
595   // there are entries_found / successful_packets_sent per packet.
596   const uint32_t total_drop_count = entries_found / successful_packets_sent;
597   Vector<TestLogEntry, max_entries> expected_messages_after_reset;
598   expected_messages_after_reset.push_back(
599       {.metadata = kDropMessageMetadata,
600        .dropped = total_drop_count,
601        .tokenized_data =
602            as_bytes(span(std::string_view(RpcLogDrain::kWriterErrorMessage)))});
603 
604   const uint32_t remaining_entries = total_entries - total_drop_count;
605   for (size_t i = 0; i < remaining_entries; ++i) {
606     expected_messages_after_reset.push_back(
607         {.metadata = kSampleMetadata,
608          .timestamp = kSampleTimestamp,
609          .tokenized_data = as_bytes(span(std::string_view(kMessage))),
610          .thread = kSampleThread});
611   }
612 
613   size_t entries_found_after_reset = 0;
614   for (auto& response : output.payloads<Logs::Listen>()) {
615     protobuf::Decoder entry_decoder(response);
616     uint32_t expected_sequence_id =
617         entries_found + entries_found_after_reset + total_drop_count;
618     VerifyLogEntries(entry_decoder,
619                      expected_messages_after_reset,
620                      expected_sequence_id,
621                      entries_found_after_reset,
622                      drop_count_found);
623   }
624   EXPECT_EQ(entries_found + entries_found_after_reset, remaining_entries);
625   EXPECT_EQ(drop_count_found, total_drop_count);
626 }
627 
TEST_F(LogServiceTest,InterruptedLogStreamIgnoresErrors)628 TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
629   const uint32_t drain_channel_id = kIgnoreWriterErrorsDrainId;
630   auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
631   ASSERT_TRUE(drain.ok());
632 
633   LogService log_service(drain_map_);
634   const size_t max_packets = 20;
635   rpc::RawFakeChannelOutput<max_packets, 512> output;
636   rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
637   rpc::Server server(span(&channel, 1));
638 
639   // Add as many entries needed to have multiple packets send.
640   StatusWithSize status =
641       AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp, kSampleThread);
642   ASSERT_TRUE(status.ok());
643 
644   const uint32_t max_messages_per_response =
645       encoding_buffer_.size() / status.size();
646   // Send less packets than the max to avoid crashes.
647   const uint32_t packets_sent = 4;
648   const size_t total_entries = packets_sent * max_messages_per_response;
649   const size_t max_entries = 50;
650   // Check we can test all these entries.
651   ASSERT_GT(max_entries, total_entries);
652   AddLogEntries(total_entries - 1,
653                 kMessage,
654                 kSampleMetadata,
655                 kSampleTimestamp,
656                 kSampleThread);
657 
658   // Interrupt log stream with an error.
659   const uint32_t error_on_packet_count = packets_sent / 2;
660   output.set_send_status(Status::Unavailable(), error_on_packet_count);
661 
662   // Request logs.
663   rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
664       server, drain_channel_id, log_service);
665   EXPECT_EQ(drain.value()->Open(writer), OkStatus());
666   // This drain ignores errors.
667   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
668   EXPECT_FALSE(output.done());
669 
670   // Make sure some packets were sent.
671   ASSERT_GT(output.payloads<Logs::Listen>().size(), 0u);
672 
673   // Verify that not all the entries were sent.
674   size_t entries_found = 0;
675   for (auto& response : output.payloads<Logs::Listen>()) {
676     protobuf::Decoder entry_decoder(response);
677     entries_found += CountLogEntries(entry_decoder);
678   }
679   ASSERT_LT(entries_found, total_entries);
680 
681   // Verify that all messages were sent.
682   const uint32_t total_drop_count = total_entries - entries_found;
683   Vector<TestLogEntry, max_entries> expected_messages;
684   for (size_t i = 0; i < entries_found; ++i) {
685     expected_messages.push_back(
686         {.metadata = kSampleMetadata,
687          .timestamp = kSampleTimestamp,
688          .tokenized_data = as_bytes(span(std::string_view(kMessage))),
689          .thread = kSampleThread});
690   }
691 
692   entries_found = 0;
693   uint32_t drop_count_found = 0;
694   uint32_t i = 0;
695   for (; i < error_on_packet_count; ++i) {
696     protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
697     VerifyLogEntries(entry_decoder,
698                      expected_messages,
699                      entries_found,
700                      entries_found,
701                      drop_count_found);
702   }
703   for (; i < output.payloads<Logs::Listen>().size(); ++i) {
704     protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
705     VerifyLogEntries(entry_decoder,
706                      expected_messages,
707                      entries_found + total_drop_count,
708                      entries_found,
709                      drop_count_found);
710   }
711   // This drain ignores errors and thus doesn't report drops on its own.
712   EXPECT_EQ(drop_count_found, 0u);
713 
714   // More calls to flush with errors will not affect this stubborn drain.
715   const size_t previous_stream_packet_count =
716       output.payloads<Logs::Listen>().size();
717   output.set_send_status(Status::Unavailable());
718   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
719   EXPECT_FALSE(output.done());
720   ASSERT_EQ(output.payloads<Logs::Listen>().size(),
721             previous_stream_packet_count);
722 
723   output.clear();
724   EXPECT_EQ(drain.value()->Close(), OkStatus());
725   EXPECT_TRUE(output.done());
726 }
727 
TEST_F(LogServiceTest,FilterLogs)728 TEST_F(LogServiceTest, FilterLogs) {
729   // Add a variety of logs.
730   const uint32_t module = 0xcafe;
731   const uint32_t flags = 0x02;
732   const uint32_t line_number = 100;
733   const std::array<std::byte, 3> kNewThread = {
734       std::byte('A'), std::byte('P'), std::byte('P')};
735   const std::array<std::byte, 3> kInvalidThread = {
736       std::byte('C'), std::byte('D'), std::byte('C')};
737   const auto debug_metadata = log_tokenized::Metadata::
738       Set<PW_LOG_LEVEL_DEBUG, module, flags, line_number>();
739   ASSERT_TRUE(
740       AddLogEntry(kMessage, debug_metadata, kSampleTimestamp, kSampleThread)
741           .ok());
742   const auto info_metadata = log_tokenized::Metadata::
743       Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
744   ASSERT_TRUE(
745       AddLogEntry(kMessage, info_metadata, kSampleTimestamp, kSampleThread)
746           .ok());
747   const auto warn_metadata = log_tokenized::Metadata::
748       Set<PW_LOG_LEVEL_WARN, module, flags, line_number>();
749   ASSERT_TRUE(
750       AddLogEntry(kMessage, warn_metadata, kSampleTimestamp, kSampleThread)
751           .ok());
752   const auto error_metadata = log_tokenized::Metadata::
753       Set<PW_LOG_LEVEL_ERROR, module, flags, line_number>();
754   ASSERT_TRUE(
755       AddLogEntry(kMessage, error_metadata, kSampleTimestamp, kNewThread).ok());
756   const auto different_flags_metadata = log_tokenized::Metadata::
757       Set<PW_LOG_LEVEL_ERROR, module, 0x01, line_number>();
758   ASSERT_TRUE(
759       AddLogEntry(
760           kMessage, different_flags_metadata, kSampleTimestamp, kSampleThread)
761           .ok());
762   const auto different_module_metadata = log_tokenized::Metadata::
763       Set<PW_LOG_LEVEL_ERROR, 0xabcd, flags, line_number>();
764   ASSERT_TRUE(
765       AddLogEntry(
766           kMessage, different_module_metadata, kSampleTimestamp, kSampleThread)
767           .ok());
768   const auto second_info_metadata = log_tokenized::Metadata::
769       Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
770   ASSERT_TRUE(
771       AddLogEntry(kMessage, second_info_metadata, kSampleTimestamp, kNewThread)
772           .ok());
773   const auto metadata = log_tokenized::Metadata::
774       Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
775   ASSERT_TRUE(
776       AddLogEntry(kMessage, metadata, kSampleTimestamp, kInvalidThread).ok());
777 
778   Vector<TestLogEntry, 4> expected_messages{
779       {.metadata = info_metadata,
780        .timestamp = kSampleTimestamp,
781        .tokenized_data = as_bytes(span(std::string_view(kMessage))),
782        .thread = kSampleThread},
783       {.metadata = warn_metadata,
784        .timestamp = kSampleTimestamp,
785        .tokenized_data = as_bytes(span(std::string_view(kMessage))),
786        .thread = kSampleThread},
787       {.metadata = error_metadata,
788        .timestamp = kSampleTimestamp,
789        .tokenized_data = as_bytes(span(std::string_view(kMessage))),
790        .thread = kNewThread},
791       {.metadata = second_info_metadata,
792        .timestamp = kSampleTimestamp,
793        .tokenized_data = as_bytes(span(std::string_view(kMessage))),
794        .thread = kNewThread},
795   };
796 
797   // Set up filter rules for drain at drains_[1].
798   RpcLogDrain& drain = drains_[1];
799   for (auto& rule : rules2_) {
800     rule = {};
801   }
802   const auto module_little_endian =
803       bytes::CopyInOrder<uint32_t>(endian::little, module);
804   rules2_[0] = {
805       .action = Filter::Rule::Action::kKeep,
806       .level_greater_than_or_equal = FilterRule::Level::INFO_LEVEL,
807       .any_flags_set = flags,
808       .module_equals{module_little_endian.begin(), module_little_endian.end()},
809       .thread_equals{kSampleThread.begin(), kSampleThread.end()}};
810   rules2_[1] = {
811       .action = Filter::Rule::Action::kKeep,
812       .level_greater_than_or_equal = FilterRule::Level::DEBUG_LEVEL,
813       .any_flags_set = flags,
814       .module_equals{module_little_endian.begin(), module_little_endian.end()},
815       .thread_equals{kNewThread.begin(), kNewThread.end()}};
816   rules2_[2] = {.action = Filter::Rule::Action::kDrop,
817                 .level_greater_than_or_equal = FilterRule::Level::ANY_LEVEL,
818                 .any_flags_set = 0,
819                 .module_equals{},
820                 .thread_equals{}};
821   rules2_[3] = {
822       .action = Filter::Rule::Action::kKeep,
823       .level_greater_than_or_equal = FilterRule::Level::INFO_LEVEL,
824       .any_flags_set = flags,
825       .module_equals{module_little_endian.begin(), module_little_endian.end()},
826       .thread_equals{kNewThread.begin(), kNewThread.end()}};
827 
828   // Request logs.
829   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
830   context.set_channel_id(drain.channel_id());
831   context.call({});
832   ASSERT_EQ(drain.Flush(encoding_buffer_), OkStatus());
833 
834   size_t entries_found = 0;
835   uint32_t drop_count_found = 0;
836   for (auto& response : context.responses()) {
837     protobuf::Decoder entry_decoder(response);
838     VerifyLogEntries(entry_decoder,
839                      expected_messages,
840                      entries_found,
841                      entries_found,
842                      drop_count_found);
843   }
844   EXPECT_EQ(entries_found, 4u);
845   EXPECT_EQ(drop_count_found, 0u);
846 }
847 
TEST_F(LogServiceTest,ReopenClosedLogStreamWithAcquiredBuffer)848 TEST_F(LogServiceTest, ReopenClosedLogStreamWithAcquiredBuffer) {
849   const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
850   auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
851   ASSERT_TRUE(drain.ok());
852 
853   LogService log_service(drain_map_);
854   rpc::RawFakeChannelOutput<10, 512> output;
855   rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
856   rpc::Server server(span(&channel, 1));
857 
858   // Request logs.
859   rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
860       server, drain_channel_id, log_service);
861   EXPECT_EQ(drain.value()->Open(writer), OkStatus());
862   // This drain closes on errors.
863   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
864 
865   // Request log stream with a new writer.
866   writer = rpc::RawServerWriter::Open<Logs::Listen>(
867       server, drain_channel_id, log_service);
868   EXPECT_EQ(drain.value()->Open(writer), OkStatus());
869   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
870 }
871 
872 }  // namespace
873 }  // namespace pw::log_rpc
874