1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_transfer/transfer_thread.h"
16
17 #include "pw_assert/check.h"
18 #include "pw_bytes/array.h"
19 #include "pw_bytes/span.h"
20 #include "pw_rpc/raw/client_testing.h"
21 #include "pw_rpc/raw/test_method_context.h"
22 #include "pw_rpc/test_helpers.h"
23 #include "pw_status/status.h"
24 #include "pw_thread/thread.h"
25 #include "pw_thread_stl/options.h"
26 #include "pw_transfer/handler.h"
27 #include "pw_transfer/transfer.h"
28 #include "pw_transfer/transfer.raw_rpc.pb.h"
29 #include "pw_transfer_private/chunk_testing.h"
30 #include "pw_unit_test/framework.h"
31
32 namespace pw::transfer::test {
33 namespace {
34
35 using internal::Chunk;
36
37 // Effectively unlimited timeout as these tests should never hit it.
38 constexpr chrono::SystemClock::duration kNeverTimeout =
39 std::chrono::seconds(60);
40
41 // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()42 thread::Options& TransferThreadOptions() {
43 static thread::stl::Options options;
44 return options;
45 }
46
47 class TransferThreadTest : public ::testing::Test {
48 public:
TransferThreadTest()49 TransferThreadTest()
50 : max_parameters_(chunk_buffer_.size(),
51 chunk_buffer_.size(),
52 cfg::kDefaultExtendWindowDivisor),
53 transfer_thread_(chunk_buffer_, encode_buffer_),
54 system_thread_(TransferThreadOptions(), transfer_thread_),
55 ctx_(transfer_thread_, 512) {}
56
~TransferThreadTest()57 ~TransferThreadTest() override {
58 transfer_thread_.Terminate();
59 system_thread_.join();
60 }
61
62 protected:
63 std::array<std::byte, 64> chunk_buffer_;
64 std::array<std::byte, 64> encode_buffer_;
65
66 rpc::RawClientTestContext<> rpc_client_context_;
67 internal::TransferParameters max_parameters_;
68
69 transfer::Thread<1, 1> transfer_thread_;
70 pw::Thread system_thread_;
71 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
72 };
73
74 class SimpleReadTransfer final : public ReadOnlyHandler {
75 public:
SimpleReadTransfer(uint32_t session_id,ConstByteSpan data)76 SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
77 : ReadOnlyHandler(session_id),
78 prepare_read_called(false),
79 finalize_read_called(false),
80 finalize_read_status(Status::Unknown()),
81 reader_(data) {}
82
PrepareRead()83 Status PrepareRead() final {
84 PW_CHECK_OK(reader_.Seek(0));
85 set_reader(reader_);
86 prepare_read_called = true;
87 return OkStatus();
88 }
89
FinalizeRead(Status status)90 void FinalizeRead(Status status) final {
91 finalize_read_called = true;
92 finalize_read_status = status;
93 }
94
95 bool prepare_read_called;
96 bool finalize_read_called;
97 Status finalize_read_status;
98
99 private:
100 stream::MemoryReader reader_;
101 };
102
__anon2b14ee840202(size_t i) 103 constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
104
TEST_F(TransferThreadTest,AddTransferHandler)105 TEST_F(TransferThreadTest, AddTransferHandler) {
106 auto reader_writer = ctx_.reader_writer();
107 transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
108
109 SimpleReadTransfer handler(3, kData);
110 transfer_thread_.AddTransferHandler(handler);
111
112 transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
113 ProtocolVersion::kLegacy,
114 3,
115 3,
116 {},
117 max_parameters_,
118 kNeverTimeout,
119 3,
120 10);
121
122 transfer_thread_.WaitUntilEventIsProcessed();
123
124 EXPECT_TRUE(handler.prepare_read_called);
125
126 transfer_thread_.RemoveTransferHandler(handler);
127 }
128
TEST_F(TransferThreadTest,RemoveTransferHandler)129 TEST_F(TransferThreadTest, RemoveTransferHandler) {
130 auto reader_writer = ctx_.reader_writer();
131 transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
132
133 SimpleReadTransfer handler(3, kData);
134 transfer_thread_.AddTransferHandler(handler);
135 transfer_thread_.RemoveTransferHandler(handler);
136
137 transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
138 ProtocolVersion::kLegacy,
139 3,
140 3,
141 {},
142 max_parameters_,
143 kNeverTimeout,
144 3,
145 10);
146
147 transfer_thread_.WaitUntilEventIsProcessed();
148
149 EXPECT_FALSE(handler.prepare_read_called);
150
151 ASSERT_EQ(ctx_.total_responses(), 1u);
152 auto chunk = DecodeChunk(ctx_.response());
153 EXPECT_EQ(chunk.session_id(), 3u);
154 ASSERT_TRUE(chunk.status().has_value());
155 EXPECT_EQ(chunk.status().value(), Status::NotFound());
156
157 transfer_thread_.RemoveTransferHandler(handler);
158 }
159
TEST_F(TransferThreadTest,ProcessChunk_SendsWindow)160 TEST_F(TransferThreadTest, ProcessChunk_SendsWindow) {
161 auto reader_writer = ctx_.reader_writer();
162 transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
163
164 SimpleReadTransfer handler(3, kData);
165 transfer_thread_.AddTransferHandler(handler);
166
167 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
168 transfer_thread_.StartServerTransfer(
169 internal::TransferType::kTransmit,
170 ProtocolVersion::kLegacy,
171 3,
172 3,
173 EncodeChunk(
174 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
175 .set_session_id(3)
176 .set_window_end_offset(16)
177 .set_max_chunk_size_bytes(8)
178 .set_offset(0)),
179 max_parameters_,
180 kNeverTimeout,
181 3,
182 10);
183 });
184
185 ASSERT_EQ(ctx_.total_responses(), 2u);
186 auto chunk = DecodeChunk(ctx_.responses()[0]);
187 EXPECT_EQ(chunk.session_id(), 3u);
188 EXPECT_EQ(chunk.offset(), 0u);
189 EXPECT_EQ(chunk.payload().size(), 8u);
190 EXPECT_EQ(
191 std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
192 0);
193
194 chunk = DecodeChunk(ctx_.responses()[1]);
195 EXPECT_EQ(chunk.session_id(), 3u);
196 EXPECT_EQ(chunk.offset(), 8u);
197 EXPECT_EQ(chunk.payload().size(), 8u);
198 EXPECT_EQ(
199 std::memcmp(
200 chunk.payload().data(), kData.data() + 8, chunk.payload().size()),
201 0);
202
203 transfer_thread_.RemoveTransferHandler(handler);
204 }
205
TEST_F(TransferThreadTest,StartTransferExhausted_Server)206 TEST_F(TransferThreadTest, StartTransferExhausted_Server) {
207 auto reader_writer = ctx_.reader_writer();
208 transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
209
210 SimpleReadTransfer handler3(3, kData);
211 SimpleReadTransfer handler4(4, kData);
212 transfer_thread_.AddTransferHandler(handler3);
213 transfer_thread_.AddTransferHandler(handler4);
214
215 transfer_thread_.StartServerTransfer(
216 internal::TransferType::kTransmit,
217 ProtocolVersion::kLegacy,
218 3,
219 3,
220 EncodeChunk(
221 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
222 .set_session_id(3)
223 // Ensure only one chunk is sent as end offset equals max size.
224 .set_window_end_offset(16)
225 .set_max_chunk_size_bytes(16)
226 .set_offset(0)),
227 max_parameters_,
228 kNeverTimeout,
229 3,
230 10);
231 transfer_thread_.WaitUntilEventIsProcessed();
232
233 // First transfer starts correctly.
234 EXPECT_TRUE(handler3.prepare_read_called);
235 EXPECT_FALSE(handler4.prepare_read_called);
236 ASSERT_EQ(ctx_.total_responses(), 1u);
237
238 // Try to start a simultaneous transfer to resource 4, for which the thread
239 // does not have an available context.
240 transfer_thread_.StartServerTransfer(
241 internal::TransferType::kTransmit,
242 ProtocolVersion::kLegacy,
243 4,
244 4,
245 EncodeChunk(
246 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
247 .set_session_id(4)
248 // Ensure only one chunk is sent as end offset equals max size.
249 .set_window_end_offset(16)
250 .set_max_chunk_size_bytes(16)
251 .set_offset(0)),
252 max_parameters_,
253 kNeverTimeout,
254 3,
255 10);
256 transfer_thread_.WaitUntilEventIsProcessed();
257
258 EXPECT_FALSE(handler4.prepare_read_called);
259
260 ASSERT_EQ(ctx_.total_responses(), 2u);
261 auto chunk = DecodeChunk(ctx_.response());
262 EXPECT_EQ(chunk.session_id(), 4u);
263 ASSERT_TRUE(chunk.status().has_value());
264 EXPECT_EQ(chunk.status().value(), Status::ResourceExhausted());
265
266 transfer_thread_.RemoveTransferHandler(handler3);
267 transfer_thread_.RemoveTransferHandler(handler4);
268 }
269
TEST_F(TransferThreadTest,StartTransferExhausted_Client)270 TEST_F(TransferThreadTest, StartTransferExhausted_Client) {
271 rpc::RawClientReaderWriter read_stream = pw_rpc::raw::Transfer::Read(
272 rpc_client_context_.client(), rpc_client_context_.channel().id());
273 transfer_thread_.SetClientReadStream(read_stream, [](ConstByteSpan) {});
274
275 Status status3 = Status::Unknown();
276 Status status4 = Status::Unknown();
277
278 stream::MemoryWriterBuffer<16> buffer3;
279 stream::MemoryWriterBuffer<16> buffer4;
280
281 transfer_thread_.StartClientTransfer(
282 internal::TransferType::kReceive,
283 ProtocolVersion::kLegacy,
284 /*resource_id=*/3,
285 /*handle_id=*/27,
286 &buffer3,
287 max_parameters_,
288 [&status3](Status status) { status3 = status; },
289 kNeverTimeout,
290 kNeverTimeout,
291 3,
292 10);
293 transfer_thread_.WaitUntilEventIsProcessed();
294
295 EXPECT_EQ(status3, Status::Unknown());
296 EXPECT_EQ(status4, Status::Unknown());
297
298 // Try to start a simultaneous transfer to resource 4, for which the thread
299 // does not have an available context.
300 transfer_thread_.StartClientTransfer(
301 internal::TransferType::kReceive,
302 ProtocolVersion::kLegacy,
303 /*resource_id=*/4,
304 /*handle_id=*/27,
305 &buffer4,
306 max_parameters_,
307 [&status4](Status status) { status4 = status; },
308 kNeverTimeout,
309 kNeverTimeout,
310 3,
311 10);
312 transfer_thread_.WaitUntilEventIsProcessed();
313
314 EXPECT_EQ(status3, Status::Unknown());
315 EXPECT_EQ(status4, Status::ResourceExhausted());
316
317 transfer_thread_.EndClientTransfer(3, Status::Cancelled());
318 transfer_thread_.EndClientTransfer(4, Status::Cancelled());
319 }
320
TEST_F(TransferThreadTest,VersionTwo_NoHandler)321 TEST_F(TransferThreadTest, VersionTwo_NoHandler) {
322 auto reader_writer = ctx_.reader_writer();
323 transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
324
325 SimpleReadTransfer handler(3, kData);
326 transfer_thread_.AddTransferHandler(handler);
327 transfer_thread_.RemoveTransferHandler(handler);
328
329 transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
330 ProtocolVersion::kVersionTwo,
331 /*session_id=*/421,
332 /*resource_id=*/7,
333 {},
334 max_parameters_,
335 kNeverTimeout,
336 3,
337 10);
338
339 transfer_thread_.WaitUntilEventIsProcessed();
340
341 EXPECT_FALSE(handler.prepare_read_called);
342
343 ASSERT_EQ(ctx_.total_responses(), 1u);
344 Result<Chunk::Identifier> id = Chunk::ExtractIdentifier(ctx_.response());
345 ASSERT_TRUE(id.ok());
346 EXPECT_EQ(id->value(), 421u);
347 auto chunk = DecodeChunk(ctx_.response());
348 EXPECT_EQ(chunk.session_id(), 421u);
349 EXPECT_FALSE(chunk.resource_id().has_value());
350 ASSERT_TRUE(chunk.status().has_value());
351 EXPECT_EQ(chunk.status().value(), Status::NotFound());
352
353 transfer_thread_.RemoveTransferHandler(handler);
354 }
355
TEST_F(TransferThreadTest,SetStream_TerminatesActiveTransfers)356 TEST_F(TransferThreadTest, SetStream_TerminatesActiveTransfers) {
357 auto reader_writer = ctx_.reader_writer();
358 transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
359
360 SimpleReadTransfer handler(3, kData);
361 transfer_thread_.AddTransferHandler(handler);
362
363 transfer_thread_.StartServerTransfer(
364 internal::TransferType::kTransmit,
365 ProtocolVersion::kLegacy,
366 3,
367 3,
368 EncodeChunk(
369 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
370 .set_session_id(3)
371 .set_window_end_offset(8)
372 .set_max_chunk_size_bytes(8)
373 .set_offset(0)),
374 max_parameters_,
375 kNeverTimeout,
376 3,
377 10);
378 transfer_thread_.WaitUntilEventIsProcessed();
379
380 EXPECT_FALSE(handler.finalize_read_called);
381
382 ASSERT_EQ(ctx_.total_responses(), 1u);
383 auto chunk = DecodeChunk(ctx_.responses().back());
384 EXPECT_EQ(chunk.session_id(), 3u);
385 EXPECT_EQ(chunk.offset(), 0u);
386 EXPECT_EQ(chunk.payload().size(), 8u);
387 EXPECT_EQ(
388 std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
389 0);
390
391 auto new_reader_writer = ctx_.reader_writer();
392 transfer_thread_.SetServerReadStream(new_reader_writer, [](ConstByteSpan) {});
393 transfer_thread_.WaitUntilEventIsProcessed();
394
395 EXPECT_TRUE(handler.finalize_read_called);
396 EXPECT_EQ(handler.finalize_read_status, Status::Aborted());
397
398 transfer_thread_.RemoveTransferHandler(handler);
399 }
400
401 } // namespace
402 } // namespace pw::transfer::test
403