xref: /aosp_15_r20/external/pigweed/pw_transfer/transfer_thread_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_transfer/transfer_thread.h"
16 
17 #include "pw_assert/check.h"
18 #include "pw_bytes/array.h"
19 #include "pw_bytes/span.h"
20 #include "pw_rpc/raw/client_testing.h"
21 #include "pw_rpc/raw/test_method_context.h"
22 #include "pw_rpc/test_helpers.h"
23 #include "pw_status/status.h"
24 #include "pw_thread/thread.h"
25 #include "pw_thread_stl/options.h"
26 #include "pw_transfer/handler.h"
27 #include "pw_transfer/transfer.h"
28 #include "pw_transfer/transfer.raw_rpc.pb.h"
29 #include "pw_transfer_private/chunk_testing.h"
30 #include "pw_unit_test/framework.h"
31 
32 namespace pw::transfer::test {
33 namespace {
34 
35 using internal::Chunk;
36 
37 // Effectively unlimited timeout as these tests should never hit it.
38 constexpr chrono::SystemClock::duration kNeverTimeout =
39     std::chrono::seconds(60);
40 
41 // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()42 thread::Options& TransferThreadOptions() {
43   static thread::stl::Options options;
44   return options;
45 }
46 
47 class TransferThreadTest : public ::testing::Test {
48  public:
TransferThreadTest()49   TransferThreadTest()
50       : max_parameters_(chunk_buffer_.size(),
51                         chunk_buffer_.size(),
52                         cfg::kDefaultExtendWindowDivisor),
53         transfer_thread_(chunk_buffer_, encode_buffer_),
54         system_thread_(TransferThreadOptions(), transfer_thread_),
55         ctx_(transfer_thread_, 512) {}
56 
~TransferThreadTest()57   ~TransferThreadTest() override {
58     transfer_thread_.Terminate();
59     system_thread_.join();
60   }
61 
62  protected:
63   std::array<std::byte, 64> chunk_buffer_;
64   std::array<std::byte, 64> encode_buffer_;
65 
66   rpc::RawClientTestContext<> rpc_client_context_;
67   internal::TransferParameters max_parameters_;
68 
69   transfer::Thread<1, 1> transfer_thread_;
70   pw::Thread system_thread_;
71   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
72 };
73 
74 class SimpleReadTransfer final : public ReadOnlyHandler {
75  public:
SimpleReadTransfer(uint32_t session_id,ConstByteSpan data)76   SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
77       : ReadOnlyHandler(session_id),
78         prepare_read_called(false),
79         finalize_read_called(false),
80         finalize_read_status(Status::Unknown()),
81         reader_(data) {}
82 
PrepareRead()83   Status PrepareRead() final {
84     PW_CHECK_OK(reader_.Seek(0));
85     set_reader(reader_);
86     prepare_read_called = true;
87     return OkStatus();
88   }
89 
FinalizeRead(Status status)90   void FinalizeRead(Status status) final {
91     finalize_read_called = true;
92     finalize_read_status = status;
93   }
94 
95   bool prepare_read_called;
96   bool finalize_read_called;
97   Status finalize_read_status;
98 
99  private:
100   stream::MemoryReader reader_;
101 };
102 
__anon2b14ee840202(size_t i) 103 constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
104 
TEST_F(TransferThreadTest,AddTransferHandler)105 TEST_F(TransferThreadTest, AddTransferHandler) {
106   auto reader_writer = ctx_.reader_writer();
107   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
108 
109   SimpleReadTransfer handler(3, kData);
110   transfer_thread_.AddTransferHandler(handler);
111 
112   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
113                                        ProtocolVersion::kLegacy,
114                                        3,
115                                        3,
116                                        {},
117                                        max_parameters_,
118                                        kNeverTimeout,
119                                        3,
120                                        10);
121 
122   transfer_thread_.WaitUntilEventIsProcessed();
123 
124   EXPECT_TRUE(handler.prepare_read_called);
125 
126   transfer_thread_.RemoveTransferHandler(handler);
127 }
128 
TEST_F(TransferThreadTest,RemoveTransferHandler)129 TEST_F(TransferThreadTest, RemoveTransferHandler) {
130   auto reader_writer = ctx_.reader_writer();
131   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
132 
133   SimpleReadTransfer handler(3, kData);
134   transfer_thread_.AddTransferHandler(handler);
135   transfer_thread_.RemoveTransferHandler(handler);
136 
137   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
138                                        ProtocolVersion::kLegacy,
139                                        3,
140                                        3,
141                                        {},
142                                        max_parameters_,
143                                        kNeverTimeout,
144                                        3,
145                                        10);
146 
147   transfer_thread_.WaitUntilEventIsProcessed();
148 
149   EXPECT_FALSE(handler.prepare_read_called);
150 
151   ASSERT_EQ(ctx_.total_responses(), 1u);
152   auto chunk = DecodeChunk(ctx_.response());
153   EXPECT_EQ(chunk.session_id(), 3u);
154   ASSERT_TRUE(chunk.status().has_value());
155   EXPECT_EQ(chunk.status().value(), Status::NotFound());
156 
157   transfer_thread_.RemoveTransferHandler(handler);
158 }
159 
TEST_F(TransferThreadTest,ProcessChunk_SendsWindow)160 TEST_F(TransferThreadTest, ProcessChunk_SendsWindow) {
161   auto reader_writer = ctx_.reader_writer();
162   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
163 
164   SimpleReadTransfer handler(3, kData);
165   transfer_thread_.AddTransferHandler(handler);
166 
167   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
168     transfer_thread_.StartServerTransfer(
169         internal::TransferType::kTransmit,
170         ProtocolVersion::kLegacy,
171         3,
172         3,
173         EncodeChunk(
174             Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
175                 .set_session_id(3)
176                 .set_window_end_offset(16)
177                 .set_max_chunk_size_bytes(8)
178                 .set_offset(0)),
179         max_parameters_,
180         kNeverTimeout,
181         3,
182         10);
183   });
184 
185   ASSERT_EQ(ctx_.total_responses(), 2u);
186   auto chunk = DecodeChunk(ctx_.responses()[0]);
187   EXPECT_EQ(chunk.session_id(), 3u);
188   EXPECT_EQ(chunk.offset(), 0u);
189   EXPECT_EQ(chunk.payload().size(), 8u);
190   EXPECT_EQ(
191       std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
192       0);
193 
194   chunk = DecodeChunk(ctx_.responses()[1]);
195   EXPECT_EQ(chunk.session_id(), 3u);
196   EXPECT_EQ(chunk.offset(), 8u);
197   EXPECT_EQ(chunk.payload().size(), 8u);
198   EXPECT_EQ(
199       std::memcmp(
200           chunk.payload().data(), kData.data() + 8, chunk.payload().size()),
201       0);
202 
203   transfer_thread_.RemoveTransferHandler(handler);
204 }
205 
TEST_F(TransferThreadTest,StartTransferExhausted_Server)206 TEST_F(TransferThreadTest, StartTransferExhausted_Server) {
207   auto reader_writer = ctx_.reader_writer();
208   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
209 
210   SimpleReadTransfer handler3(3, kData);
211   SimpleReadTransfer handler4(4, kData);
212   transfer_thread_.AddTransferHandler(handler3);
213   transfer_thread_.AddTransferHandler(handler4);
214 
215   transfer_thread_.StartServerTransfer(
216       internal::TransferType::kTransmit,
217       ProtocolVersion::kLegacy,
218       3,
219       3,
220       EncodeChunk(
221           Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
222               .set_session_id(3)
223               // Ensure only one chunk is sent as end offset equals max size.
224               .set_window_end_offset(16)
225               .set_max_chunk_size_bytes(16)
226               .set_offset(0)),
227       max_parameters_,
228       kNeverTimeout,
229       3,
230       10);
231   transfer_thread_.WaitUntilEventIsProcessed();
232 
233   // First transfer starts correctly.
234   EXPECT_TRUE(handler3.prepare_read_called);
235   EXPECT_FALSE(handler4.prepare_read_called);
236   ASSERT_EQ(ctx_.total_responses(), 1u);
237 
238   // Try to start a simultaneous transfer to resource 4, for which the thread
239   // does not have an available context.
240   transfer_thread_.StartServerTransfer(
241       internal::TransferType::kTransmit,
242       ProtocolVersion::kLegacy,
243       4,
244       4,
245       EncodeChunk(
246           Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
247               .set_session_id(4)
248               // Ensure only one chunk is sent as end offset equals max size.
249               .set_window_end_offset(16)
250               .set_max_chunk_size_bytes(16)
251               .set_offset(0)),
252       max_parameters_,
253       kNeverTimeout,
254       3,
255       10);
256   transfer_thread_.WaitUntilEventIsProcessed();
257 
258   EXPECT_FALSE(handler4.prepare_read_called);
259 
260   ASSERT_EQ(ctx_.total_responses(), 2u);
261   auto chunk = DecodeChunk(ctx_.response());
262   EXPECT_EQ(chunk.session_id(), 4u);
263   ASSERT_TRUE(chunk.status().has_value());
264   EXPECT_EQ(chunk.status().value(), Status::ResourceExhausted());
265 
266   transfer_thread_.RemoveTransferHandler(handler3);
267   transfer_thread_.RemoveTransferHandler(handler4);
268 }
269 
TEST_F(TransferThreadTest,StartTransferExhausted_Client)270 TEST_F(TransferThreadTest, StartTransferExhausted_Client) {
271   rpc::RawClientReaderWriter read_stream = pw_rpc::raw::Transfer::Read(
272       rpc_client_context_.client(), rpc_client_context_.channel().id());
273   transfer_thread_.SetClientReadStream(read_stream, [](ConstByteSpan) {});
274 
275   Status status3 = Status::Unknown();
276   Status status4 = Status::Unknown();
277 
278   stream::MemoryWriterBuffer<16> buffer3;
279   stream::MemoryWriterBuffer<16> buffer4;
280 
281   transfer_thread_.StartClientTransfer(
282       internal::TransferType::kReceive,
283       ProtocolVersion::kLegacy,
284       /*resource_id=*/3,
285       /*handle_id=*/27,
286       &buffer3,
287       max_parameters_,
288       [&status3](Status status) { status3 = status; },
289       kNeverTimeout,
290       kNeverTimeout,
291       3,
292       10);
293   transfer_thread_.WaitUntilEventIsProcessed();
294 
295   EXPECT_EQ(status3, Status::Unknown());
296   EXPECT_EQ(status4, Status::Unknown());
297 
298   // Try to start a simultaneous transfer to resource 4, for which the thread
299   // does not have an available context.
300   transfer_thread_.StartClientTransfer(
301       internal::TransferType::kReceive,
302       ProtocolVersion::kLegacy,
303       /*resource_id=*/4,
304       /*handle_id=*/27,
305       &buffer4,
306       max_parameters_,
307       [&status4](Status status) { status4 = status; },
308       kNeverTimeout,
309       kNeverTimeout,
310       3,
311       10);
312   transfer_thread_.WaitUntilEventIsProcessed();
313 
314   EXPECT_EQ(status3, Status::Unknown());
315   EXPECT_EQ(status4, Status::ResourceExhausted());
316 
317   transfer_thread_.EndClientTransfer(3, Status::Cancelled());
318   transfer_thread_.EndClientTransfer(4, Status::Cancelled());
319 }
320 
TEST_F(TransferThreadTest,VersionTwo_NoHandler)321 TEST_F(TransferThreadTest, VersionTwo_NoHandler) {
322   auto reader_writer = ctx_.reader_writer();
323   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
324 
325   SimpleReadTransfer handler(3, kData);
326   transfer_thread_.AddTransferHandler(handler);
327   transfer_thread_.RemoveTransferHandler(handler);
328 
329   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
330                                        ProtocolVersion::kVersionTwo,
331                                        /*session_id=*/421,
332                                        /*resource_id=*/7,
333                                        {},
334                                        max_parameters_,
335                                        kNeverTimeout,
336                                        3,
337                                        10);
338 
339   transfer_thread_.WaitUntilEventIsProcessed();
340 
341   EXPECT_FALSE(handler.prepare_read_called);
342 
343   ASSERT_EQ(ctx_.total_responses(), 1u);
344   Result<Chunk::Identifier> id = Chunk::ExtractIdentifier(ctx_.response());
345   ASSERT_TRUE(id.ok());
346   EXPECT_EQ(id->value(), 421u);
347   auto chunk = DecodeChunk(ctx_.response());
348   EXPECT_EQ(chunk.session_id(), 421u);
349   EXPECT_FALSE(chunk.resource_id().has_value());
350   ASSERT_TRUE(chunk.status().has_value());
351   EXPECT_EQ(chunk.status().value(), Status::NotFound());
352 
353   transfer_thread_.RemoveTransferHandler(handler);
354 }
355 
TEST_F(TransferThreadTest,SetStream_TerminatesActiveTransfers)356 TEST_F(TransferThreadTest, SetStream_TerminatesActiveTransfers) {
357   auto reader_writer = ctx_.reader_writer();
358   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
359 
360   SimpleReadTransfer handler(3, kData);
361   transfer_thread_.AddTransferHandler(handler);
362 
363   transfer_thread_.StartServerTransfer(
364       internal::TransferType::kTransmit,
365       ProtocolVersion::kLegacy,
366       3,
367       3,
368       EncodeChunk(
369           Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
370               .set_session_id(3)
371               .set_window_end_offset(8)
372               .set_max_chunk_size_bytes(8)
373               .set_offset(0)),
374       max_parameters_,
375       kNeverTimeout,
376       3,
377       10);
378   transfer_thread_.WaitUntilEventIsProcessed();
379 
380   EXPECT_FALSE(handler.finalize_read_called);
381 
382   ASSERT_EQ(ctx_.total_responses(), 1u);
383   auto chunk = DecodeChunk(ctx_.responses().back());
384   EXPECT_EQ(chunk.session_id(), 3u);
385   EXPECT_EQ(chunk.offset(), 0u);
386   EXPECT_EQ(chunk.payload().size(), 8u);
387   EXPECT_EQ(
388       std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
389       0);
390 
391   auto new_reader_writer = ctx_.reader_writer();
392   transfer_thread_.SetServerReadStream(new_reader_writer, [](ConstByteSpan) {});
393   transfer_thread_.WaitUntilEventIsProcessed();
394 
395   EXPECT_TRUE(handler.finalize_read_called);
396   EXPECT_EQ(handler.finalize_read_status, Status::Aborted());
397 
398   transfer_thread_.RemoveTransferHandler(handler);
399 }
400 
401 }  // namespace
402 }  // namespace pw::transfer::test
403