1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_transfer/transfer.h"
16
17 #include <limits>
18
19 #include "pw_assert/check.h"
20 #include "pw_bytes/array.h"
21 #include "pw_containers/algorithm.h"
22 #include "pw_rpc/raw/test_method_context.h"
23 #include "pw_rpc/test_helpers.h"
24 #include "pw_thread/thread.h"
25 #include "pw_thread_stl/options.h"
26 #include "pw_transfer/internal/config.h"
27 #include "pw_transfer/transfer.pwpb.h"
28 #include "pw_transfer_private/chunk_testing.h"
29 #include "pw_unit_test/framework.h"
30
31 namespace pw::transfer::test {
32 namespace {
33
34 using namespace std::chrono_literals;
35
36 // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()37 thread::Options& TransferThreadOptions() {
38 static thread::stl::Options options;
39 return options;
40 }
41
42 using internal::Chunk;
43
44 class TestMemoryReader : public stream::SeekableReader {
45 public:
TestMemoryReader(span<const std::byte> data)46 constexpr TestMemoryReader(span<const std::byte> data)
47 : memory_reader_(data) {}
48
DoSeek(ptrdiff_t offset,Whence origin)49 Status DoSeek(ptrdiff_t offset, Whence origin) override {
50 if (seek_status.ok()) {
51 return memory_reader_.Seek(offset, origin);
52 }
53 return seek_status;
54 }
55
DoRead(ByteSpan dest)56 StatusWithSize DoRead(ByteSpan dest) final {
57 if (!read_status.ok()) {
58 return StatusWithSize(read_status, 0);
59 }
60
61 auto result = memory_reader_.Read(dest);
62 return result.ok() ? StatusWithSize(result->size())
63 : StatusWithSize(result.status(), 0);
64 }
65
66 Status seek_status;
67 Status read_status;
68
69 private:
70 stream::MemoryReader memory_reader_;
71 };
72
73 class SimpleReadTransfer : public ReadOnlyHandler {
74 public:
SimpleReadTransfer(uint32_t session_id,ConstByteSpan data)75 SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
76 : ReadOnlyHandler(session_id),
77 prepare_read_called(false),
78 finalize_read_called(false),
79 finalize_read_status(Status::Unknown()),
80 resource_size_(std::numeric_limits<size_t>::max()),
81 reader_(data) {}
82
PrepareRead()83 Status PrepareRead() final {
84 prepare_read_called = true;
85
86 if (!prepare_read_return_status.ok()) {
87 return prepare_read_return_status;
88 }
89
90 EXPECT_EQ(reader_.seek_status, reader_.Seek(0));
91 set_reader(reader_);
92 return OkStatus();
93 }
94
FinalizeRead(Status status)95 void FinalizeRead(Status status) final {
96 finalize_read_called = true;
97 finalize_read_status = status;
98 }
99
ResourceSize() const100 size_t ResourceSize() const final { return resource_size_; }
101
set_resource_size(size_t resource_size)102 void set_resource_size(size_t resource_size) {
103 resource_size_ = resource_size;
104 }
set_seek_status(Status status)105 void set_seek_status(Status status) { reader_.seek_status = status; }
set_read_status(Status status)106 void set_read_status(Status status) { reader_.read_status = status; }
107
108 bool prepare_read_called;
109 bool finalize_read_called;
110 Status prepare_read_return_status;
111 Status finalize_read_status;
112 size_t resource_size_;
113
114 private:
115 TestMemoryReader reader_;
116 };
117
__anonbff93a0d0202(size_t i) 118 constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
119 constexpr uint32_t kArbitrarySessionId = 123;
120
121 class ReadTransfer : public ::testing::Test {
122 protected:
ReadTransfer(size_t max_chunk_size_bytes=64)123 ReadTransfer(size_t max_chunk_size_bytes = 64)
124 : handler_(3, kData),
125 transfer_thread_(span(data_buffer_).first(max_chunk_size_bytes),
126 encode_buffer_),
127 ctx_(transfer_thread_,
128 64,
129 // Use a long timeout to avoid accidentally triggering timeouts.
130 std::chrono::minutes(1)),
131 system_thread_(TransferThreadOptions(), transfer_thread_) {
132 ctx_.service().RegisterHandler(handler_);
133
134 PW_CHECK(!handler_.prepare_read_called);
135 PW_CHECK(!handler_.finalize_read_called);
136
137 ctx_.call(); // Open the read stream
138 transfer_thread_.WaitUntilEventIsProcessed();
139 }
140
~ReadTransfer()141 ~ReadTransfer() override {
142 transfer_thread_.Terminate();
143 system_thread_.join();
144 }
145
146 SimpleReadTransfer handler_;
147 Thread<1, 1> transfer_thread_;
148 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read, 8) ctx_;
149 pw::Thread system_thread_;
150 std::array<std::byte, 64> data_buffer_;
151 std::array<std::byte, 64> encode_buffer_;
152 };
153
TEST_F(ReadTransfer,SingleChunk)154 TEST_F(ReadTransfer, SingleChunk) {
155 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
156 ctx_.SendClientStream(
157 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
158 .set_session_id(3)
159 .set_window_end_offset(64)
160 .set_offset(0)));
161
162 transfer_thread_.WaitUntilEventIsProcessed();
163 });
164
165 EXPECT_TRUE(handler_.prepare_read_called);
166 EXPECT_FALSE(handler_.finalize_read_called);
167
168 ASSERT_EQ(ctx_.total_responses(), 2u);
169 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
170 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
171
172 // First chunk should have all the read data.
173 EXPECT_EQ(c0.session_id(), 3u);
174 EXPECT_EQ(c0.offset(), 0u);
175 ASSERT_EQ(c0.payload().size(), kData.size());
176 EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
177 0);
178
179 // Second chunk should be empty and set remaining_bytes = 0.
180 EXPECT_EQ(c1.session_id(), 3u);
181 EXPECT_FALSE(c1.has_payload());
182 ASSERT_TRUE(c1.remaining_bytes().has_value());
183 EXPECT_EQ(c1.remaining_bytes().value(), 0u);
184
185 ctx_.SendClientStream(
186 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
187 transfer_thread_.WaitUntilEventIsProcessed();
188
189 EXPECT_TRUE(handler_.finalize_read_called);
190 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
191 }
192
TEST_F(ReadTransfer,MultiChunk)193 TEST_F(ReadTransfer, MultiChunk) {
194 ctx_.SendClientStream(
195 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
196 .set_session_id(3)
197 .set_window_end_offset(16)
198 .set_offset(0)));
199
200 transfer_thread_.WaitUntilEventIsProcessed();
201
202 EXPECT_TRUE(handler_.prepare_read_called);
203 EXPECT_FALSE(handler_.finalize_read_called);
204
205 ASSERT_EQ(ctx_.total_responses(), 1u);
206 Chunk c0 = DecodeChunk(ctx_.responses().back());
207
208 EXPECT_EQ(c0.session_id(), 3u);
209 EXPECT_EQ(c0.offset(), 0u);
210 ASSERT_EQ(c0.payload().size(), 16u);
211 EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
212 0);
213
214 ctx_.SendClientStream(EncodeChunk(
215 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
216 .set_session_id(3)
217 .set_window_end_offset(32)
218 .set_offset(16)));
219 transfer_thread_.WaitUntilEventIsProcessed();
220
221 ASSERT_EQ(ctx_.total_responses(), 2u);
222 Chunk c1 = DecodeChunk(ctx_.responses().back());
223
224 EXPECT_EQ(c1.session_id(), 3u);
225 EXPECT_EQ(c1.offset(), 16u);
226 ASSERT_EQ(c1.payload().size(), 16u);
227 EXPECT_EQ(
228 std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
229 0);
230
231 ctx_.SendClientStream(EncodeChunk(
232 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
233 .set_session_id(3)
234 .set_window_end_offset(48)
235 .set_offset(32)));
236 transfer_thread_.WaitUntilEventIsProcessed();
237
238 ASSERT_EQ(ctx_.total_responses(), 3u);
239 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
240
241 EXPECT_EQ(c2.session_id(), 3u);
242 EXPECT_FALSE(c2.has_payload());
243 ASSERT_TRUE(c2.remaining_bytes().has_value());
244 EXPECT_EQ(c2.remaining_bytes().value(), 0u);
245
246 ctx_.SendClientStream(
247 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
248 transfer_thread_.WaitUntilEventIsProcessed();
249
250 EXPECT_TRUE(handler_.finalize_read_called);
251 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
252 }
253
TEST_F(ReadTransfer,MultiChunk_RepeatedContinuePackets)254 TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
255 ctx_.SendClientStream(
256 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
257 .set_session_id(3)
258 .set_window_end_offset(16)
259 .set_offset(0)));
260
261 transfer_thread_.WaitUntilEventIsProcessed();
262
263 const auto continue_chunk = EncodeChunk(
264 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
265 .set_session_id(3)
266 .set_window_end_offset(24)
267 .set_offset(16));
268 ctx_.SendClientStream(continue_chunk);
269
270 transfer_thread_.WaitUntilEventIsProcessed();
271
272 // Resend the CONTINUE packets that don't actually advance the window.
273 for (int i = 0; i < 3; ++i) {
274 ctx_.SendClientStream(continue_chunk);
275 transfer_thread_.WaitUntilEventIsProcessed();
276 }
277
278 ASSERT_EQ(ctx_.total_responses(), 2u); // Only sent one packet
279 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
280
281 EXPECT_EQ(c1.session_id(), 3u);
282 EXPECT_EQ(c1.offset(), 16u);
283 ASSERT_EQ(c1.payload().size(), 8u);
284 EXPECT_EQ(
285 std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
286 0);
287 }
288
TEST_F(ReadTransfer,OutOfOrder_SeekingSupported)289 TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
290 rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
291 ctx_.SendClientStream(
292 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
293 .set_session_id(3)
294 .set_window_end_offset(16)
295 .set_offset(0)));
296
297 transfer_thread_.WaitUntilEventIsProcessed();
298
299 Chunk chunk = DecodeChunk(ctx_.responses().back());
300 EXPECT_TRUE(pw::containers::Equal(span(kData).first(16), chunk.payload()));
301
302 ctx_.SendClientStream(EncodeChunk(
303 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
304 .set_session_id(3)
305 .set_window_end_offset(10)
306 .set_offset(2)));
307
308 transfer_thread_.WaitUntilEventIsProcessed();
309
310 chunk = DecodeChunk(ctx_.responses().back());
311 EXPECT_TRUE(
312 pw::containers::Equal(span(kData).subspan(2, 8), chunk.payload()));
313
314 ctx_.SendClientStream(EncodeChunk(
315 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
316 .set_session_id(3)
317 .set_window_end_offset(64)
318 .set_offset(17)));
319 });
320
321 ASSERT_EQ(ctx_.total_responses(), 4u);
322 Chunk chunk = DecodeChunk(ctx_.responses()[2]);
323 EXPECT_TRUE(pw::containers::Equal(
324 span(&kData[17], kData.data() + kData.size()), chunk.payload()));
325 }
326
TEST_F(ReadTransfer,OutOfOrder_SeekingNotSupported_EndsWithUnimplemented)327 TEST_F(ReadTransfer, OutOfOrder_SeekingNotSupported_EndsWithUnimplemented) {
328 handler_.set_seek_status(Status::Unimplemented());
329
330 ctx_.SendClientStream(
331 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
332 .set_session_id(3)
333 .set_window_end_offset(16)
334 .set_offset(0)));
335 ctx_.SendClientStream(EncodeChunk(
336 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
337 .set_session_id(3)
338 .set_window_end_offset(10)
339 .set_offset(2)));
340
341 transfer_thread_.WaitUntilEventIsProcessed();
342
343 ASSERT_EQ(ctx_.total_responses(), 2u);
344 Chunk chunk = DecodeChunk(ctx_.responses().back());
345 ASSERT_TRUE(chunk.status().has_value());
346 EXPECT_EQ(chunk.status().value(), Status::Unimplemented());
347 }
348
TEST_F(ReadTransfer,MaxChunkSize_Client)349 TEST_F(ReadTransfer, MaxChunkSize_Client) {
350 rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
351 ctx_.SendClientStream(
352 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
353 .set_session_id(3)
354 .set_window_end_offset(64)
355 .set_max_chunk_size_bytes(8)
356 .set_offset(0)));
357 });
358
359 EXPECT_TRUE(handler_.prepare_read_called);
360 EXPECT_FALSE(handler_.finalize_read_called);
361
362 ASSERT_EQ(ctx_.total_responses(), 5u);
363 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
364 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
365 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
366 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
367 Chunk c4 = DecodeChunk(ctx_.responses()[4]);
368
369 EXPECT_EQ(c0.session_id(), 3u);
370 EXPECT_EQ(c0.offset(), 0u);
371 ASSERT_EQ(c0.payload().size(), 8u);
372 EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
373 0);
374
375 EXPECT_EQ(c1.session_id(), 3u);
376 EXPECT_EQ(c1.offset(), 8u);
377 ASSERT_EQ(c1.payload().size(), 8u);
378 EXPECT_EQ(
379 std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
380 0);
381
382 EXPECT_EQ(c2.session_id(), 3u);
383 EXPECT_EQ(c2.offset(), 16u);
384 ASSERT_EQ(c2.payload().size(), 8u);
385 EXPECT_EQ(
386 std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
387 0);
388
389 EXPECT_EQ(c3.session_id(), 3u);
390 EXPECT_EQ(c3.offset(), 24u);
391 ASSERT_EQ(c3.payload().size(), 8u);
392 EXPECT_EQ(
393 std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
394 0);
395
396 EXPECT_EQ(c4.session_id(), 3u);
397 EXPECT_EQ(c4.payload().size(), 0u);
398 ASSERT_TRUE(c4.remaining_bytes().has_value());
399 EXPECT_EQ(c4.remaining_bytes().value(), 0u);
400
401 ctx_.SendClientStream(
402 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
403 transfer_thread_.WaitUntilEventIsProcessed();
404
405 EXPECT_TRUE(handler_.finalize_read_called);
406 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
407 }
408
TEST_F(ReadTransfer,HandlerIsClearedAfterTransfer)409 TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
410 // Request an end offset smaller than the data size to prevent the server
411 // from sending a final chunk.
412 ctx_.SendClientStream(
413 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
414 .set_session_id(3)
415 .set_window_end_offset(16)
416 .set_offset(0)));
417 ctx_.SendClientStream(
418 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
419 transfer_thread_.WaitUntilEventIsProcessed();
420
421 ASSERT_EQ(ctx_.total_responses(), 1u);
422 ASSERT_TRUE(handler_.prepare_read_called);
423 ASSERT_TRUE(handler_.finalize_read_called);
424 ASSERT_EQ(OkStatus(), handler_.finalize_read_status);
425
426 // Now, clear state and start a second transfer
427 handler_.prepare_read_return_status = Status::FailedPrecondition();
428 handler_.prepare_read_called = false;
429 handler_.finalize_read_called = false;
430
431 // Request an end offset smaller than the data size to prevent the server
432 // from sending a final chunk.
433 ctx_.SendClientStream(
434 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
435 .set_session_id(3)
436 .set_window_end_offset(16)
437 .set_offset(0)));
438 transfer_thread_.WaitUntilEventIsProcessed();
439
440 // Prepare failed, so the handler should not have been stored in the context,
441 // and finalize should not have been called.
442 ASSERT_TRUE(handler_.prepare_read_called);
443 ASSERT_FALSE(handler_.finalize_read_called);
444 }
445
446 class ReadTransferMaxChunkSize8 : public ReadTransfer {
447 protected:
ReadTransferMaxChunkSize8()448 ReadTransferMaxChunkSize8() : ReadTransfer(/*max_chunk_size_bytes=*/8) {}
449 };
450
TEST_F(ReadTransferMaxChunkSize8,MaxChunkSize_Server)451 TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
452 // Client asks for max 16-byte chunks, but service places a limit of 8 bytes.
453 // TODO(frolv): Fix
454 rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
455 ctx_.SendClientStream(
456 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
457 .set_session_id(3)
458 .set_window_end_offset(64)
459 // .set_max_chunk_size_bytes(16)
460 .set_offset(0)));
461 });
462
463 EXPECT_TRUE(handler_.prepare_read_called);
464 EXPECT_FALSE(handler_.finalize_read_called);
465
466 ASSERT_EQ(ctx_.total_responses(), 5u);
467 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
468 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
469 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
470 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
471 Chunk c4 = DecodeChunk(ctx_.responses()[4]);
472
473 EXPECT_EQ(c0.session_id(), 3u);
474 EXPECT_EQ(c0.offset(), 0u);
475 ASSERT_EQ(c0.payload().size(), 8u);
476 EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
477 0);
478
479 EXPECT_EQ(c1.session_id(), 3u);
480 EXPECT_EQ(c1.offset(), 8u);
481 ASSERT_EQ(c1.payload().size(), 8u);
482 EXPECT_EQ(
483 std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
484 0);
485
486 EXPECT_EQ(c2.session_id(), 3u);
487 EXPECT_EQ(c2.offset(), 16u);
488 ASSERT_EQ(c2.payload().size(), 8u);
489 EXPECT_EQ(
490 std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
491 0);
492
493 EXPECT_EQ(c3.session_id(), 3u);
494 EXPECT_EQ(c3.offset(), 24u);
495 ASSERT_EQ(c3.payload().size(), 8u);
496 EXPECT_EQ(
497 std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
498 0);
499
500 EXPECT_EQ(c4.session_id(), 3u);
501 EXPECT_EQ(c4.payload().size(), 0u);
502 ASSERT_TRUE(c4.remaining_bytes().has_value());
503 EXPECT_EQ(c4.remaining_bytes().value(), 0u);
504
505 ctx_.SendClientStream(
506 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
507 transfer_thread_.WaitUntilEventIsProcessed();
508
509 EXPECT_TRUE(handler_.finalize_read_called);
510 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
511 }
512
TEST_F(ReadTransfer,ClientError)513 TEST_F(ReadTransfer, ClientError) {
514 ctx_.SendClientStream(
515 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
516 .set_session_id(3)
517 .set_window_end_offset(16)
518 .set_offset(0)));
519
520 transfer_thread_.WaitUntilEventIsProcessed();
521
522 EXPECT_TRUE(handler_.prepare_read_called);
523 EXPECT_FALSE(handler_.finalize_read_called);
524 ASSERT_EQ(ctx_.total_responses(), 1u);
525
526 // Send client error.
527 ctx_.SendClientStream(EncodeChunk(
528 Chunk::Final(ProtocolVersion::kLegacy, 3, Status::OutOfRange())));
529 transfer_thread_.WaitUntilEventIsProcessed();
530
531 ASSERT_EQ(ctx_.total_responses(), 1u);
532 EXPECT_TRUE(handler_.finalize_read_called);
533 EXPECT_EQ(handler_.finalize_read_status, Status::OutOfRange());
534 }
535
TEST_F(ReadTransfer,UnregisteredHandler)536 TEST_F(ReadTransfer, UnregisteredHandler) {
537 ctx_.SendClientStream(
538 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
539 .set_session_id(11)
540 .set_window_end_offset(32)
541 .set_offset(0)));
542 transfer_thread_.WaitUntilEventIsProcessed();
543
544 ASSERT_EQ(ctx_.total_responses(), 1u);
545 Chunk chunk = DecodeChunk(ctx_.responses().back());
546 EXPECT_EQ(chunk.session_id(), 11u);
547 ASSERT_TRUE(chunk.status().has_value());
548 EXPECT_EQ(chunk.status().value(), Status::NotFound());
549 }
550
TEST_F(ReadTransfer,IgnoresNonPendingTransfers)551 TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
552 ctx_.SendClientStream(EncodeChunk(
553 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
554 .set_session_id(3)
555 .set_window_end_offset(32)
556 .set_offset(3)));
557 ctx_.SendClientStream(
558 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
559 .set_session_id(3)
560 .set_payload(span(kData).first(10))
561 .set_offset(3)));
562 ctx_.SendClientStream(
563 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
564 transfer_thread_.WaitUntilEventIsProcessed();
565
566 // Only start transfer for an initial packet.
567 EXPECT_FALSE(handler_.prepare_read_called);
568 EXPECT_FALSE(handler_.finalize_read_called);
569 }
570
TEST_F(ReadTransfer,AbortAndRestartIfInitialPacketIsReceived)571 TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
572 ctx_.SendClientStream(
573 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
574 .set_session_id(3)
575 .set_window_end_offset(16)
576 .set_offset(0)));
577 transfer_thread_.WaitUntilEventIsProcessed();
578
579 ASSERT_EQ(ctx_.total_responses(), 1u);
580
581 EXPECT_TRUE(handler_.prepare_read_called);
582 EXPECT_FALSE(handler_.finalize_read_called);
583 handler_.prepare_read_called = false; // Reset so can check if called again.
584
585 ctx_.SendClientStream( // Resend starting chunk
586 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
587 .set_session_id(3)
588 .set_window_end_offset(16)
589 .set_offset(0)));
590 transfer_thread_.WaitUntilEventIsProcessed();
591
592 ASSERT_EQ(ctx_.total_responses(), 2u);
593
594 EXPECT_TRUE(handler_.prepare_read_called);
595 EXPECT_TRUE(handler_.finalize_read_called);
596 EXPECT_EQ(handler_.finalize_read_status, Status::Aborted());
597 handler_.finalize_read_called = false; // Reset so can check later
598
599 ctx_.SendClientStream(EncodeChunk(
600 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
601 .set_session_id(3)
602 .set_window_end_offset(32)
603 .set_offset(16)));
604 ctx_.SendClientStream(
605 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
606 transfer_thread_.WaitUntilEventIsProcessed();
607
608 ASSERT_EQ(ctx_.total_responses(), 3u);
609 EXPECT_TRUE(handler_.finalize_read_called);
610 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
611 }
612
TEST_F(ReadTransfer,ZeroPendingBytesWithRemainingData_Aborts)613 TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
614 ctx_.SendClientStream(
615 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
616 .set_session_id(3)
617 .set_window_end_offset(0)
618 .set_offset(0)));
619 transfer_thread_.WaitUntilEventIsProcessed();
620
621 ASSERT_EQ(ctx_.total_responses(), 1u);
622 ASSERT_TRUE(handler_.finalize_read_called);
623 EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
624
625 Chunk chunk = DecodeChunk(ctx_.responses().back());
626 EXPECT_EQ(chunk.status(), Status::ResourceExhausted());
627 }
628
TEST_F(ReadTransfer,ZeroPendingBytesNoRemainingData_Completes)629 TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
630 // Make the next read appear to be the end of the stream.
631 handler_.set_read_status(Status::OutOfRange());
632
633 ctx_.SendClientStream(
634 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
635 .set_session_id(3)
636 .set_window_end_offset(0)
637 .set_offset(0)));
638 transfer_thread_.WaitUntilEventIsProcessed();
639
640 Chunk chunk = DecodeChunk(ctx_.responses().back());
641 EXPECT_EQ(chunk.session_id(), 3u);
642 EXPECT_EQ(chunk.remaining_bytes(), 0u);
643
644 ctx_.SendClientStream(
645 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
646 transfer_thread_.WaitUntilEventIsProcessed();
647
648 ASSERT_EQ(ctx_.total_responses(), 1u);
649 ASSERT_TRUE(handler_.finalize_read_called);
650 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
651 }
652
TEST_F(ReadTransfer,SendsErrorIfChunkIsReceivedInCompletedState)653 TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
654 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
655 ctx_.SendClientStream(
656 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
657 .set_session_id(3)
658 .set_window_end_offset(64)
659 .set_offset(0)));
660 });
661
662 EXPECT_TRUE(handler_.prepare_read_called);
663 EXPECT_FALSE(handler_.finalize_read_called);
664
665 ASSERT_EQ(ctx_.total_responses(), 2u);
666 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
667 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
668
669 // First chunk should have all the read data.
670 EXPECT_EQ(c0.session_id(), 3u);
671 EXPECT_EQ(c0.offset(), 0u);
672 ASSERT_EQ(c0.payload().size(), kData.size());
673 EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
674 0);
675
676 // Second chunk should be empty and set remaining_bytes = 0.
677 EXPECT_EQ(c1.session_id(), 3u);
678 EXPECT_FALSE(c1.has_payload());
679 ASSERT_TRUE(c1.remaining_bytes().has_value());
680 EXPECT_EQ(c1.remaining_bytes().value(), 0u);
681
682 ctx_.SendClientStream(
683 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
684 transfer_thread_.WaitUntilEventIsProcessed();
685
686 EXPECT_TRUE(handler_.finalize_read_called);
687 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
688
689 // At this point the transfer should be in a completed state. Send a
690 // non-initial chunk as a continuation of the transfer.
691 handler_.finalize_read_called = false;
692
693 ctx_.SendClientStream(EncodeChunk(
694 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
695 .set_session_id(3)
696 .set_window_end_offset(64)
697 .set_offset(16)));
698 transfer_thread_.WaitUntilEventIsProcessed();
699
700 ASSERT_EQ(ctx_.total_responses(), 3u);
701
702 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
703 ASSERT_TRUE(c2.status().has_value());
704 EXPECT_EQ(c2.status().value(), Status::FailedPrecondition());
705
706 // FinalizeRead should not be called again.
707 EXPECT_FALSE(handler_.finalize_read_called);
708 }
709
710 class SimpleWriteTransfer final : public WriteOnlyHandler {
711 public:
SimpleWriteTransfer(uint32_t session_id,ByteSpan data)712 SimpleWriteTransfer(uint32_t session_id, ByteSpan data)
713 : WriteOnlyHandler(session_id),
714 prepare_write_called(false),
715 finalize_write_called(false),
716 finalize_write_status(Status::Unknown()),
717 writer_(data) {}
718
PrepareWrite()719 Status PrepareWrite() final {
720 EXPECT_EQ(OkStatus(), writer_.Seek(0));
721 set_writer(writer_);
722 prepare_write_called = true;
723 return OkStatus();
724 }
725
FinalizeWrite(Status status)726 Status FinalizeWrite(Status status) final {
727 finalize_write_called = true;
728 finalize_write_status = status;
729 return finalize_write_return_status_;
730 }
731
set_finalize_write_return(Status status)732 void set_finalize_write_return(Status status) {
733 finalize_write_return_status_ = status;
734 }
735
736 bool prepare_write_called;
737 bool finalize_write_called;
738 Status finalize_write_status;
739
740 private:
741 Status finalize_write_return_status_;
742 stream::MemoryWriter writer_;
743 };
744
745 class WriteTransfer : public ::testing::Test {
746 protected:
WriteTransfer(size_t max_bytes_to_receive=64)747 WriteTransfer(size_t max_bytes_to_receive = 64)
748 : buffer{},
749 handler_(7, buffer),
750 transfer_thread_(data_buffer_, encode_buffer_),
751 system_thread_(TransferThreadOptions(), transfer_thread_),
752 ctx_(transfer_thread_,
753 max_bytes_to_receive,
754 // Use a long timeout to avoid accidentally triggering timeouts.
755 std::chrono::minutes(1),
756 /*max_retries=*/3) {
757 ctx_.service().RegisterHandler(handler_);
758
759 PW_CHECK(!handler_.prepare_write_called);
760 PW_CHECK(!handler_.finalize_write_called);
761
762 ctx_.call(); // Open the write stream
763 transfer_thread_.WaitUntilEventIsProcessed();
764 }
765
~WriteTransfer()766 ~WriteTransfer() override {
767 transfer_thread_.Terminate();
768 system_thread_.join();
769 }
770
771 std::array<std::byte, kData.size()> buffer;
772 SimpleWriteTransfer handler_;
773
774 Thread<1, 1> transfer_thread_;
775 pw::Thread system_thread_;
776 std::array<std::byte, 64> data_buffer_;
777 std::array<std::byte, 64> encode_buffer_;
778 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx_;
779 };
780
TEST_F(WriteTransfer,SingleChunk)781 TEST_F(WriteTransfer, SingleChunk) {
782 ctx_.SendClientStream(EncodeChunk(
783 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
784 transfer_thread_.WaitUntilEventIsProcessed();
785
786 EXPECT_TRUE(handler_.prepare_write_called);
787 EXPECT_FALSE(handler_.finalize_write_called);
788
789 ASSERT_EQ(ctx_.total_responses(), 1u);
790 Chunk chunk = DecodeChunk(ctx_.responses().back());
791 EXPECT_EQ(chunk.session_id(), 7u);
792 EXPECT_EQ(chunk.window_end_offset(), 32u);
793 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
794 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
795
796 ctx_.SendClientStream<64>(
797 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
798 .set_session_id(7)
799 .set_offset(0)
800 .set_payload(kData)
801 .set_remaining_bytes(0)));
802 transfer_thread_.WaitUntilEventIsProcessed();
803
804 ASSERT_EQ(ctx_.total_responses(), 2u);
805 chunk = DecodeChunk(ctx_.responses().back());
806 EXPECT_EQ(chunk.session_id(), 7u);
807 ASSERT_TRUE(chunk.status().has_value());
808 EXPECT_EQ(chunk.status().value(), OkStatus());
809
810 EXPECT_TRUE(handler_.finalize_write_called);
811 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
812 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
813 }
814
TEST_F(WriteTransfer,FinalizeFails)815 TEST_F(WriteTransfer, FinalizeFails) {
816 // Return an error when FinalizeWrite is called.
817 handler_.set_finalize_write_return(Status::FailedPrecondition());
818
819 ctx_.SendClientStream(EncodeChunk(
820 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
821 ctx_.SendClientStream<64>(
822 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
823 .set_session_id(7)
824 .set_offset(0)
825 .set_payload(kData)
826 .set_remaining_bytes(0)));
827 transfer_thread_.WaitUntilEventIsProcessed();
828
829 ASSERT_EQ(ctx_.total_responses(), 2u);
830 Chunk chunk = DecodeChunk(ctx_.responses()[1]);
831 EXPECT_EQ(chunk.session_id(), 7u);
832 ASSERT_TRUE(chunk.status().has_value());
833 EXPECT_EQ(chunk.status().value(), Status::DataLoss());
834
835 EXPECT_TRUE(handler_.finalize_write_called);
836 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
837 }
838
TEST_F(WriteTransfer,SendingFinalPacketFails)839 TEST_F(WriteTransfer, SendingFinalPacketFails) {
840 ctx_.SendClientStream(EncodeChunk(
841 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
842 transfer_thread_.WaitUntilEventIsProcessed();
843
844 ctx_.output().set_send_status(Status::Unknown());
845
846 ctx_.SendClientStream<64>(
847 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
848 .set_session_id(7)
849 .set_offset(0)
850 .set_payload(kData)
851 .set_remaining_bytes(0)));
852 transfer_thread_.WaitUntilEventIsProcessed();
853
854 // Should only have sent the transfer parameters.
855 ASSERT_EQ(ctx_.total_responses(), 1u);
856 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
857 EXPECT_EQ(chunk.session_id(), 7u);
858 EXPECT_EQ(chunk.window_end_offset(), 32u);
859 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
860 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
861
862 // When FinalizeWrite() was called, the transfer was considered successful.
863 EXPECT_TRUE(handler_.finalize_write_called);
864 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
865 }
866
TEST_F(WriteTransfer,MultiChunk)867 TEST_F(WriteTransfer, MultiChunk) {
868 ctx_.SendClientStream(EncodeChunk(
869 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
870 transfer_thread_.WaitUntilEventIsProcessed();
871
872 EXPECT_TRUE(handler_.prepare_write_called);
873 EXPECT_FALSE(handler_.finalize_write_called);
874
875 ASSERT_EQ(ctx_.total_responses(), 1u);
876 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
877 EXPECT_EQ(chunk.session_id(), 7u);
878 EXPECT_EQ(chunk.window_end_offset(), 32u);
879
880 ctx_.SendClientStream<64>(
881 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
882 .set_session_id(7)
883 .set_offset(0)
884 .set_payload(span(kData).first(8))));
885 transfer_thread_.WaitUntilEventIsProcessed();
886
887 ASSERT_EQ(ctx_.total_responses(), 1u);
888
889 ctx_.SendClientStream<64>(
890 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
891 .set_session_id(7)
892 .set_offset(8)
893 .set_payload(span(kData).subspan(8))
894 .set_remaining_bytes(0)));
895 transfer_thread_.WaitUntilEventIsProcessed();
896
897 ASSERT_EQ(ctx_.total_responses(), 2u);
898 chunk = DecodeChunk(ctx_.responses().back());
899 EXPECT_EQ(chunk.session_id(), 7u);
900 ASSERT_TRUE(chunk.status().has_value());
901 EXPECT_EQ(chunk.status().value(), OkStatus());
902
903 EXPECT_TRUE(handler_.finalize_write_called);
904 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
905 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
906 }
907
TEST_F(WriteTransfer,WriteFailsOnRetry)908 TEST_F(WriteTransfer, WriteFailsOnRetry) {
909 // Skip one packet to fail on a retry.
910 ctx_.output().set_send_status(Status::FailedPrecondition(), 1);
911
912 // Wait for 3 packets: initial params, retry attempt, final error
913 rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
914 // Send only one client packet so the service times out.
915 ctx_.SendClientStream(
916 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
917 .set_session_id(7)));
918 transfer_thread_.SimulateServerTimeout(7); // Time out to trigger retry
919 });
920
921 // Attempted to send 3 packets, but the 2nd packet was dropped.
922 // Check that the last packet is an INTERNAL error from the RPC write failure.
923 ASSERT_EQ(ctx_.total_responses(), 2u);
924 Chunk chunk = DecodeChunk(ctx_.responses()[1]);
925 EXPECT_EQ(chunk.session_id(), 7u);
926 ASSERT_TRUE(chunk.status().has_value());
927 EXPECT_EQ(chunk.status().value(), Status::Internal());
928 }
929
TEST_F(WriteTransfer,TimeoutInRecoveryState)930 TEST_F(WriteTransfer, TimeoutInRecoveryState) {
931 ctx_.SendClientStream(EncodeChunk(
932 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
933 transfer_thread_.WaitUntilEventIsProcessed();
934
935 ASSERT_EQ(ctx_.total_responses(), 1u);
936 Chunk chunk = DecodeChunk(ctx_.responses().back());
937 EXPECT_EQ(chunk.session_id(), 7u);
938 EXPECT_EQ(chunk.offset(), 0u);
939 EXPECT_EQ(chunk.window_end_offset(), 32u);
940
941 constexpr span data(kData);
942
943 ctx_.SendClientStream(
944 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
945 .set_session_id(7)
946 .set_offset(0)
947 .set_payload(data.first(8))));
948
949 // Skip offset 8 to enter a recovery state.
950 ctx_.SendClientStream(
951 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
952 .set_session_id(7)
953 .set_offset(12)
954 .set_payload(data.subspan(12, 4))));
955 transfer_thread_.WaitUntilEventIsProcessed();
956
957 // Recovery parameters should be sent for offset 8.
958 ASSERT_EQ(ctx_.total_responses(), 2u);
959 chunk = DecodeChunk(ctx_.responses().back());
960 EXPECT_EQ(chunk.session_id(), 7u);
961 EXPECT_EQ(chunk.offset(), 8u);
962 EXPECT_EQ(chunk.window_end_offset(), 32u);
963
964 // Timeout while in the recovery state.
965 transfer_thread_.SimulateServerTimeout(7);
966 transfer_thread_.WaitUntilEventIsProcessed();
967
968 // Same recovery parameters should be re-sent.
969 ASSERT_EQ(ctx_.total_responses(), 3u);
970 chunk = DecodeChunk(ctx_.responses().back());
971 EXPECT_EQ(chunk.session_id(), 7u);
972 EXPECT_EQ(chunk.offset(), 8u);
973 EXPECT_EQ(chunk.window_end_offset(), 32u);
974 }
975
TEST_F(WriteTransfer,ExtendWindow)976 TEST_F(WriteTransfer, ExtendWindow) {
977 ctx_.SendClientStream(EncodeChunk(
978 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
979 transfer_thread_.WaitUntilEventIsProcessed();
980
981 EXPECT_TRUE(handler_.prepare_write_called);
982 EXPECT_FALSE(handler_.finalize_write_called);
983
984 ASSERT_EQ(ctx_.total_responses(), 1u);
985 Chunk chunk = DecodeChunk(ctx_.responses().back());
986 EXPECT_EQ(chunk.session_id(), 7u);
987 EXPECT_EQ(chunk.window_end_offset(), 32u);
988
989 // Window starts at 32 bytes and should extend when half of that is sent.
990 ctx_.SendClientStream(
991 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
992 .set_session_id(7)
993 .set_offset(0)
994 .set_payload(span(kData).first(4))));
995 transfer_thread_.WaitUntilEventIsProcessed();
996 ASSERT_EQ(ctx_.total_responses(), 1u);
997
998 ctx_.SendClientStream(
999 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1000 .set_session_id(7)
1001 .set_offset(4)
1002 .set_payload(span(kData).subspan(4, 4))));
1003 transfer_thread_.WaitUntilEventIsProcessed();
1004 ASSERT_EQ(ctx_.total_responses(), 1u);
1005
1006 ctx_.SendClientStream(
1007 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1008 .set_session_id(7)
1009 .set_offset(8)
1010 .set_payload(span(kData).subspan(8, 4))));
1011 transfer_thread_.WaitUntilEventIsProcessed();
1012 ASSERT_EQ(ctx_.total_responses(), 1u);
1013
1014 ctx_.SendClientStream(
1015 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1016 .set_session_id(7)
1017 .set_offset(12)
1018 .set_payload(span(kData).subspan(12, 4))));
1019 transfer_thread_.WaitUntilEventIsProcessed();
1020 ASSERT_EQ(ctx_.total_responses(), 2u);
1021
1022 // Extend parameters chunk.
1023 chunk = DecodeChunk(ctx_.responses().back());
1024 EXPECT_EQ(chunk.session_id(), 7u);
1025 EXPECT_EQ(chunk.window_end_offset(), 32u);
1026 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
1027
1028 ctx_.SendClientStream<64>(
1029 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1030 .set_session_id(7)
1031 .set_offset(16)
1032 .set_payload(span(kData).subspan(16))
1033 .set_remaining_bytes(0)));
1034 transfer_thread_.WaitUntilEventIsProcessed();
1035
1036 ASSERT_EQ(ctx_.total_responses(), 3u);
1037 chunk = DecodeChunk(ctx_.responses()[2]);
1038 EXPECT_EQ(chunk.session_id(), 7u);
1039 ASSERT_TRUE(chunk.status().has_value());
1040 EXPECT_EQ(chunk.status().value(), OkStatus());
1041
1042 EXPECT_TRUE(handler_.finalize_write_called);
1043 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1044 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1045 }
1046
1047 class WriteTransferMaxBytes16 : public WriteTransfer {
1048 protected:
WriteTransferMaxBytes16()1049 WriteTransferMaxBytes16() : WriteTransfer(/*max_bytes_to_receive=*/16) {}
1050 };
1051
TEST_F(WriteTransfer,TransmitterReducesWindow)1052 TEST_F(WriteTransfer, TransmitterReducesWindow) {
1053 ctx_.SendClientStream(EncodeChunk(
1054 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1055 transfer_thread_.WaitUntilEventIsProcessed();
1056
1057 EXPECT_TRUE(handler_.prepare_write_called);
1058 EXPECT_FALSE(handler_.finalize_write_called);
1059
1060 ASSERT_EQ(ctx_.total_responses(), 1u);
1061 Chunk chunk = DecodeChunk(ctx_.responses().back());
1062 EXPECT_EQ(chunk.session_id(), 7u);
1063 EXPECT_EQ(chunk.window_end_offset(), 32u);
1064
1065 // Send only 12 bytes and set that as the new end offset.
1066 ctx_.SendClientStream<64>(
1067 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1068 .set_session_id(7)
1069 .set_offset(0)
1070 .set_window_end_offset(12)
1071 .set_payload(span(kData).first(12))));
1072 transfer_thread_.WaitUntilEventIsProcessed();
1073 ASSERT_EQ(ctx_.total_responses(), 2u);
1074
1075 // Receiver should respond immediately with a continue chunk as the end of
1076 // the window has been reached.
1077 chunk = DecodeChunk(ctx_.responses().back());
1078 EXPECT_EQ(chunk.session_id(), 7u);
1079 EXPECT_EQ(chunk.offset(), 12u);
1080 EXPECT_EQ(chunk.window_end_offset(), 32u);
1081 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
1082 }
1083
TEST_F(WriteTransfer,TransmitterExtendsWindow_TerminatesWithInvalid)1084 TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
1085 ctx_.SendClientStream(EncodeChunk(
1086 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1087 transfer_thread_.WaitUntilEventIsProcessed();
1088
1089 EXPECT_TRUE(handler_.prepare_write_called);
1090 EXPECT_FALSE(handler_.finalize_write_called);
1091
1092 ASSERT_EQ(ctx_.total_responses(), 1u);
1093 Chunk chunk = DecodeChunk(ctx_.responses().back());
1094 EXPECT_EQ(chunk.session_id(), 7u);
1095 EXPECT_EQ(chunk.window_end_offset(), 32u);
1096
1097 ctx_.SendClientStream<64>(
1098 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1099 .set_session_id(7)
1100 .set_offset(0)
1101 // Larger window end offset than the receiver's.
1102 .set_window_end_offset(48)
1103 .set_payload(span(kData).first(16))));
1104 transfer_thread_.WaitUntilEventIsProcessed();
1105 ASSERT_EQ(ctx_.total_responses(), 2u);
1106
1107 chunk = DecodeChunk(ctx_.responses().back());
1108 EXPECT_EQ(chunk.session_id(), 7u);
1109 ASSERT_TRUE(chunk.status().has_value());
1110 EXPECT_EQ(chunk.status().value(), Status::Internal());
1111 }
1112
TEST_F(WriteTransferMaxBytes16,MultipleParameters)1113 TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
1114 ctx_.SendClientStream(EncodeChunk(
1115 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1116 transfer_thread_.WaitUntilEventIsProcessed();
1117
1118 EXPECT_TRUE(handler_.prepare_write_called);
1119 EXPECT_FALSE(handler_.finalize_write_called);
1120
1121 ASSERT_EQ(ctx_.total_responses(), 1u);
1122 Chunk chunk = DecodeChunk(ctx_.responses().back());
1123 EXPECT_EQ(chunk.session_id(), 7u);
1124 EXPECT_EQ(chunk.window_end_offset(), 16u);
1125
1126 ctx_.SendClientStream<64>(
1127 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1128 .set_session_id(7)
1129 .set_offset(0)
1130 .set_payload(span(kData).first(8))));
1131 transfer_thread_.WaitUntilEventIsProcessed();
1132
1133 ASSERT_EQ(ctx_.total_responses(), 2u);
1134 chunk = DecodeChunk(ctx_.responses().back());
1135 EXPECT_EQ(chunk.session_id(), 7u);
1136 EXPECT_EQ(chunk.offset(), 8u);
1137 EXPECT_EQ(chunk.window_end_offset(), 24u);
1138
1139 ctx_.SendClientStream<64>(
1140 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1141 .set_session_id(7)
1142 .set_offset(8)
1143 .set_payload(span(kData).subspan(8, 8))));
1144 transfer_thread_.WaitUntilEventIsProcessed();
1145
1146 ASSERT_EQ(ctx_.total_responses(), 3u);
1147 chunk = DecodeChunk(ctx_.responses().back());
1148 EXPECT_EQ(chunk.session_id(), 7u);
1149 EXPECT_EQ(chunk.offset(), 16u);
1150 EXPECT_EQ(chunk.window_end_offset(), 32u);
1151
1152 ctx_.SendClientStream<64>(
1153 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1154 .set_session_id(7)
1155 .set_offset(16)
1156 .set_payload(span(kData).subspan(16, 8))));
1157 transfer_thread_.WaitUntilEventIsProcessed();
1158
1159 ASSERT_EQ(ctx_.total_responses(), 4u);
1160 chunk = DecodeChunk(ctx_.responses().back());
1161 EXPECT_EQ(chunk.session_id(), 7u);
1162 EXPECT_EQ(chunk.offset(), 24u);
1163 EXPECT_EQ(chunk.window_end_offset(), 32u);
1164
1165 ctx_.SendClientStream<64>(
1166 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1167 .set_session_id(7)
1168 .set_offset(24)
1169 .set_payload(span(kData).subspan(24))
1170 .set_remaining_bytes(0)));
1171 transfer_thread_.WaitUntilEventIsProcessed();
1172
1173 ASSERT_EQ(ctx_.total_responses(), 5u);
1174 chunk = DecodeChunk(ctx_.responses().back());
1175 EXPECT_EQ(chunk.session_id(), 7u);
1176 ASSERT_TRUE(chunk.status().has_value());
1177 EXPECT_EQ(chunk.status().value(), OkStatus());
1178
1179 EXPECT_TRUE(handler_.finalize_write_called);
1180 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1181 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1182 }
1183
TEST_F(WriteTransferMaxBytes16,SetsDefaultWindowEndOffset)1184 TEST_F(WriteTransferMaxBytes16, SetsDefaultWindowEndOffset) {
1185 // Default max bytes is smaller than buffer.
1186 ctx_.SendClientStream(EncodeChunk(
1187 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1188 transfer_thread_.WaitUntilEventIsProcessed();
1189
1190 ASSERT_EQ(ctx_.total_responses(), 1u);
1191 Chunk chunk = DecodeChunk(ctx_.responses().back());
1192 EXPECT_EQ(chunk.session_id(), 7u);
1193 EXPECT_EQ(chunk.window_end_offset(), 16u);
1194 }
1195
TEST_F(WriteTransfer,SetsWriterWindowEndOffset)1196 TEST_F(WriteTransfer, SetsWriterWindowEndOffset) {
1197 // Buffer is smaller than constructor's default max bytes.
1198 std::array<std::byte, 8> small_buffer = {};
1199
1200 SimpleWriteTransfer handler_(987, small_buffer);
1201 ctx_.service().RegisterHandler(handler_);
1202
1203 ctx_.SendClientStream(
1204 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1205 .set_session_id(987)));
1206 transfer_thread_.WaitUntilEventIsProcessed();
1207
1208 ASSERT_EQ(ctx_.total_responses(), 1u);
1209 Chunk chunk = DecodeChunk(ctx_.responses().back());
1210 EXPECT_EQ(chunk.session_id(), 987u);
1211 EXPECT_EQ(chunk.window_end_offset(), 8u);
1212
1213 ctx_.service().UnregisterHandler(handler_);
1214 }
1215
TEST_F(WriteTransfer,UnexpectedOffset)1216 TEST_F(WriteTransfer, UnexpectedOffset) {
1217 ctx_.SendClientStream(EncodeChunk(
1218 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1219 transfer_thread_.WaitUntilEventIsProcessed();
1220
1221 EXPECT_TRUE(handler_.prepare_write_called);
1222 EXPECT_FALSE(handler_.finalize_write_called);
1223
1224 ASSERT_EQ(ctx_.total_responses(), 1u);
1225 Chunk chunk = DecodeChunk(ctx_.responses().back());
1226 EXPECT_EQ(chunk.session_id(), 7u);
1227 EXPECT_EQ(chunk.offset(), 0u);
1228 EXPECT_EQ(chunk.window_end_offset(), 32u);
1229
1230 ctx_.SendClientStream<64>(
1231 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1232 .set_session_id(7)
1233 .set_offset(0)
1234 .set_payload(span(kData).first(8))));
1235 transfer_thread_.WaitUntilEventIsProcessed();
1236
1237 ASSERT_EQ(ctx_.total_responses(), 1u);
1238
1239 ctx_.SendClientStream<64>(
1240 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1241 .set_session_id(7)
1242 .set_offset(4) // incorrect
1243 .set_payload(span(kData).subspan(8))
1244 .set_remaining_bytes(0)));
1245 transfer_thread_.WaitUntilEventIsProcessed();
1246
1247 ASSERT_EQ(ctx_.total_responses(), 2u);
1248 chunk = DecodeChunk(ctx_.responses().back());
1249 EXPECT_EQ(chunk.session_id(), 7u);
1250 EXPECT_EQ(chunk.offset(), 8u);
1251 EXPECT_EQ(chunk.window_end_offset(), 32u);
1252
1253 ctx_.SendClientStream<64>(
1254 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1255 .set_session_id(7)
1256 .set_offset(8) // correct
1257 .set_payload(span(kData).subspan(8))
1258 .set_remaining_bytes(0)));
1259 transfer_thread_.WaitUntilEventIsProcessed();
1260
1261 ASSERT_EQ(ctx_.total_responses(), 3u);
1262 chunk = DecodeChunk(ctx_.responses().back());
1263 EXPECT_EQ(chunk.session_id(), 7u);
1264 ASSERT_TRUE(chunk.status().has_value());
1265 EXPECT_EQ(chunk.status().value(), OkStatus());
1266
1267 EXPECT_TRUE(handler_.finalize_write_called);
1268 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1269 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1270 }
1271
TEST_F(WriteTransferMaxBytes16,TooMuchData_EntersRecovery)1272 TEST_F(WriteTransferMaxBytes16, TooMuchData_EntersRecovery) {
1273 ctx_.SendClientStream(EncodeChunk(
1274 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1275 transfer_thread_.WaitUntilEventIsProcessed();
1276
1277 EXPECT_TRUE(handler_.prepare_write_called);
1278 EXPECT_FALSE(handler_.finalize_write_called);
1279
1280 ASSERT_EQ(ctx_.total_responses(), 1u);
1281 Chunk chunk = DecodeChunk(ctx_.responses().back());
1282 EXPECT_EQ(chunk.session_id(), 7u);
1283 EXPECT_EQ(chunk.window_end_offset(), 16u);
1284
1285 // window_end_offset = 16, but send 24 bytes of data.
1286 ctx_.SendClientStream<64>(
1287 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1288 .set_session_id(7)
1289 .set_offset(0)
1290 .set_payload(span(kData).first(24))));
1291 transfer_thread_.WaitUntilEventIsProcessed();
1292
1293 // Transfer should resend a parameters chunk.
1294 ASSERT_EQ(ctx_.total_responses(), 2u);
1295 chunk = DecodeChunk(ctx_.responses().back());
1296 EXPECT_EQ(chunk.session_id(), 7u);
1297 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
1298 EXPECT_EQ(chunk.offset(), 0u);
1299 EXPECT_EQ(chunk.window_end_offset(), 16u);
1300 }
1301
TEST_F(WriteTransfer,UnregisteredHandler)1302 TEST_F(WriteTransfer, UnregisteredHandler) {
1303 ctx_.SendClientStream(
1304 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1305 .set_session_id(999)));
1306 transfer_thread_.WaitUntilEventIsProcessed();
1307
1308 ASSERT_EQ(ctx_.total_responses(), 1u);
1309 Chunk chunk = DecodeChunk(ctx_.responses().back());
1310 EXPECT_EQ(chunk.session_id(), 999u);
1311 ASSERT_TRUE(chunk.status().has_value());
1312 EXPECT_EQ(chunk.status().value(), Status::NotFound());
1313 }
1314
TEST_F(WriteTransfer,ClientError)1315 TEST_F(WriteTransfer, ClientError) {
1316 ctx_.SendClientStream(EncodeChunk(
1317 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1318 transfer_thread_.WaitUntilEventIsProcessed();
1319
1320 EXPECT_TRUE(handler_.prepare_write_called);
1321 EXPECT_FALSE(handler_.finalize_write_called);
1322
1323 ASSERT_EQ(ctx_.total_responses(), 1u);
1324 Chunk chunk = DecodeChunk(ctx_.responses().back());
1325 EXPECT_EQ(chunk.session_id(), 7u);
1326 EXPECT_EQ(chunk.window_end_offset(), 32u);
1327
1328 ctx_.SendClientStream<64>(EncodeChunk(
1329 Chunk::Final(ProtocolVersion::kLegacy, 7, Status::DataLoss())));
1330 transfer_thread_.WaitUntilEventIsProcessed();
1331
1332 EXPECT_EQ(ctx_.total_responses(), 1u);
1333
1334 EXPECT_TRUE(handler_.finalize_write_called);
1335 EXPECT_EQ(handler_.finalize_write_status, Status::DataLoss());
1336 }
1337
TEST_F(WriteTransfer,OnlySendParametersUpdateOnceAfterDrop)1338 TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
1339 ctx_.SendClientStream(EncodeChunk(
1340 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1341 transfer_thread_.WaitUntilEventIsProcessed();
1342
1343 ASSERT_EQ(ctx_.total_responses(), 1u);
1344
1345 constexpr span data(kData);
1346 ctx_.SendClientStream<64>(
1347 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1348 .set_session_id(7)
1349 .set_offset(0)
1350 .set_payload(data.first(1))));
1351
1352 // Drop offset 1, then send the rest of the data.
1353 for (uint32_t i = 2; i < kData.size(); ++i) {
1354 ctx_.SendClientStream<64>(
1355 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1356 .set_session_id(7)
1357 .set_offset(i)
1358 .set_payload(data.subspan(i, 1))));
1359 }
1360
1361 transfer_thread_.WaitUntilEventIsProcessed();
1362
1363 ASSERT_EQ(ctx_.total_responses(), 2u);
1364 Chunk chunk = DecodeChunk(ctx_.responses().back());
1365 EXPECT_EQ(chunk.session_id(), 7u);
1366 EXPECT_EQ(chunk.offset(), 1u);
1367
1368 // Send the remaining data.
1369 ctx_.SendClientStream<64>(
1370 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1371 .set_session_id(7)
1372 .set_offset(1)
1373 .set_payload(data.subspan(1, 31))
1374 .set_remaining_bytes(0)));
1375 transfer_thread_.WaitUntilEventIsProcessed();
1376
1377 EXPECT_TRUE(handler_.finalize_write_called);
1378 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1379 }
1380
TEST_F(WriteTransfer,ResendParametersIfSentRepeatedChunkDuringRecovery)1381 TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
1382 ctx_.SendClientStream(EncodeChunk(
1383 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1384 transfer_thread_.WaitUntilEventIsProcessed();
1385
1386 ASSERT_EQ(ctx_.total_responses(), 1u);
1387
1388 constexpr span data(kData);
1389
1390 // Skip offset 0, then send the rest of the data.
1391 for (uint32_t i = 1; i < kData.size(); ++i) {
1392 ctx_.SendClientStream<64>(
1393 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1394 .set_session_id(7)
1395 .set_offset(i)
1396 .set_payload(data.subspan(i, 1))));
1397 }
1398
1399 transfer_thread_.WaitUntilEventIsProcessed();
1400
1401 ASSERT_EQ(ctx_.total_responses(), 2u); // Resent transfer parameters once.
1402
1403 const auto last_chunk =
1404 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1405 .set_session_id(7)
1406 .set_offset(kData.size() - 1)
1407 .set_payload(data.last(1)));
1408 ctx_.SendClientStream<64>(last_chunk);
1409 transfer_thread_.WaitUntilEventIsProcessed();
1410
1411 // Resent transfer parameters since the packet is repeated
1412 ASSERT_EQ(ctx_.total_responses(), 3u);
1413
1414 ctx_.SendClientStream<64>(last_chunk);
1415 transfer_thread_.WaitUntilEventIsProcessed();
1416
1417 ASSERT_EQ(ctx_.total_responses(), 4u);
1418
1419 Chunk chunk = DecodeChunk(ctx_.responses().back());
1420 EXPECT_EQ(chunk.session_id(), 7u);
1421 EXPECT_EQ(chunk.offset(), 0u);
1422 EXPECT_EQ(chunk.window_end_offset(), 32u);
1423
1424 // Resumes normal operation when correct offset is sent.
1425 ctx_.SendClientStream<64>(
1426 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1427 .set_session_id(7)
1428 .set_offset(0)
1429 .set_payload(kData)
1430 .set_remaining_bytes(0)));
1431 transfer_thread_.WaitUntilEventIsProcessed();
1432
1433 EXPECT_TRUE(handler_.finalize_write_called);
1434 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1435 }
1436
TEST_F(WriteTransfer,ResendsStatusIfClientRetriesAfterStatusChunk)1437 TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
1438 ctx_.SendClientStream(EncodeChunk(
1439 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1440 transfer_thread_.WaitUntilEventIsProcessed();
1441
1442 ASSERT_EQ(ctx_.total_responses(), 1u);
1443
1444 ctx_.SendClientStream<64>(
1445 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1446 .set_session_id(7)
1447 .set_offset(0)
1448 .set_payload(kData)
1449 .set_remaining_bytes(0)));
1450 transfer_thread_.WaitUntilEventIsProcessed();
1451
1452 ASSERT_EQ(ctx_.total_responses(), 2u);
1453 Chunk chunk = DecodeChunk(ctx_.responses().back());
1454 ASSERT_TRUE(chunk.status().has_value());
1455 EXPECT_EQ(chunk.status().value(), OkStatus());
1456
1457 ctx_.SendClientStream<64>(
1458 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1459 .set_session_id(7)
1460 .set_offset(0)
1461 .set_payload(kData)
1462 .set_remaining_bytes(0)));
1463 transfer_thread_.WaitUntilEventIsProcessed();
1464
1465 ASSERT_EQ(ctx_.total_responses(), 3u);
1466 chunk = DecodeChunk(ctx_.responses().back());
1467 ASSERT_TRUE(chunk.status().has_value());
1468 EXPECT_EQ(chunk.status().value(), OkStatus());
1469 }
1470
TEST_F(WriteTransfer,IgnoresNonPendingTransfers)1471 TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
1472 ctx_.SendClientStream<64>(
1473 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1474 .set_session_id(7)
1475 .set_offset(3)));
1476 ctx_.SendClientStream<64>(
1477 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1478 .set_session_id(7)
1479 .set_offset(0)
1480 .set_payload(span(kData).first(10))
1481 .set_remaining_bytes(0)));
1482
1483 transfer_thread_.WaitUntilEventIsProcessed();
1484
1485 // Only start transfer for initial packet.
1486 EXPECT_FALSE(handler_.prepare_write_called);
1487 EXPECT_FALSE(handler_.finalize_write_called);
1488 }
1489
TEST_F(WriteTransfer,AbortAndRestartIfInitialPacketIsReceived)1490 TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
1491 ctx_.SendClientStream(EncodeChunk(
1492 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1493 transfer_thread_.WaitUntilEventIsProcessed();
1494
1495 ASSERT_EQ(ctx_.total_responses(), 1u);
1496
1497 ctx_.SendClientStream<64>(
1498 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1499 .set_session_id(7)
1500 .set_offset(0)
1501 .set_payload(span(kData).first(8))));
1502 transfer_thread_.WaitUntilEventIsProcessed();
1503
1504 ASSERT_EQ(ctx_.total_responses(), 1u);
1505
1506 ASSERT_TRUE(handler_.prepare_write_called);
1507 ASSERT_FALSE(handler_.finalize_write_called);
1508 handler_.prepare_write_called = false; // Reset to check it's called again.
1509
1510 // Simulate client disappearing then restarting the transfer.
1511 ctx_.SendClientStream(EncodeChunk(
1512 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1513 transfer_thread_.WaitUntilEventIsProcessed();
1514
1515 EXPECT_TRUE(handler_.prepare_write_called);
1516 EXPECT_TRUE(handler_.finalize_write_called);
1517 EXPECT_EQ(handler_.finalize_write_status, Status::Aborted());
1518
1519 handler_.finalize_write_called = false; // Reset to check it's called again.
1520
1521 ASSERT_EQ(ctx_.total_responses(), 2u);
1522
1523 ctx_.SendClientStream<64>(
1524 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1525 .set_session_id(7)
1526 .set_offset(0)
1527 .set_payload(kData)
1528 .set_remaining_bytes(0)));
1529 transfer_thread_.WaitUntilEventIsProcessed();
1530
1531 ASSERT_EQ(ctx_.total_responses(), 3u);
1532
1533 EXPECT_TRUE(handler_.finalize_write_called);
1534 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1535 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1536 }
1537
1538 class SometimesUnavailableReadHandler final : public ReadOnlyHandler {
1539 public:
SometimesUnavailableReadHandler(uint32_t session_id,ConstByteSpan data)1540 SometimesUnavailableReadHandler(uint32_t session_id, ConstByteSpan data)
1541 : ReadOnlyHandler(session_id), reader_(data), call_count_(0) {}
1542
PrepareRead()1543 Status PrepareRead() final {
1544 if ((call_count_++ % 2) == 0) {
1545 return Status::Unavailable();
1546 }
1547
1548 set_reader(reader_);
1549 return OkStatus();
1550 }
1551
1552 private:
1553 stream::MemoryReader reader_;
1554 int call_count_;
1555 };
1556
TEST_F(ReadTransfer,PrepareError)1557 TEST_F(ReadTransfer, PrepareError) {
1558 SometimesUnavailableReadHandler unavailable_handler(88, kData);
1559 ctx_.service().RegisterHandler(unavailable_handler);
1560
1561 ctx_.SendClientStream(
1562 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1563 .set_session_id(88)
1564 .set_window_end_offset(128)
1565 .set_offset(0)));
1566 transfer_thread_.WaitUntilEventIsProcessed();
1567
1568 ASSERT_EQ(ctx_.total_responses(), 1u);
1569 Chunk chunk = DecodeChunk(ctx_.responses().back());
1570 EXPECT_EQ(chunk.session_id(), 88u);
1571 ASSERT_TRUE(chunk.status().has_value());
1572 EXPECT_EQ(chunk.status().value(), Status::DataLoss());
1573
1574 // Try starting the transfer again. It should work this time.
1575 // TODO(frolv): This won't work until completion ACKs are supported.
1576 if (false) {
1577 ctx_.SendClientStream(
1578 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1579 .set_session_id(88)
1580 .set_window_end_offset(128)
1581 .set_offset(0)));
1582 transfer_thread_.WaitUntilEventIsProcessed();
1583
1584 ASSERT_EQ(ctx_.total_responses(), 2u);
1585 chunk = DecodeChunk(ctx_.responses().back());
1586 EXPECT_EQ(chunk.session_id(), 88u);
1587 ASSERT_EQ(chunk.payload().size(), kData.size());
1588 EXPECT_EQ(std::memcmp(
1589 chunk.payload().data(), kData.data(), chunk.payload().size()),
1590 0);
1591 }
1592 }
1593
TEST_F(WriteTransferMaxBytes16,Service_SetMaxPendingBytes)1594 TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
1595 ctx_.SendClientStream(EncodeChunk(
1596 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1597 transfer_thread_.WaitUntilEventIsProcessed();
1598
1599 EXPECT_TRUE(handler_.prepare_write_called);
1600 EXPECT_FALSE(handler_.finalize_write_called);
1601
1602 // First parameters chunk has the default window end offset of 16.
1603 ASSERT_EQ(ctx_.total_responses(), 1u);
1604 Chunk chunk = DecodeChunk(ctx_.responses().back());
1605 EXPECT_EQ(chunk.session_id(), 7u);
1606 EXPECT_EQ(chunk.window_end_offset(), 16u);
1607
1608 // Update the max window size.
1609 ctx_.service().set_max_window_size_bytes(12);
1610
1611 ctx_.SendClientStream<64>(
1612 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1613 .set_session_id(7)
1614 .set_offset(0)
1615 .set_payload(span(kData).first(8))));
1616 transfer_thread_.WaitUntilEventIsProcessed();
1617
1618 // Second parameters chunk should use the new max pending bytes.
1619 ASSERT_EQ(ctx_.total_responses(), 2u);
1620 chunk = DecodeChunk(ctx_.responses().back());
1621 EXPECT_EQ(chunk.session_id(), 7u);
1622 EXPECT_EQ(chunk.offset(), 8u);
1623 EXPECT_EQ(chunk.window_end_offset(), 8u + 12u);
1624 }
1625
TEST_F(ReadTransfer,Version2_SimpleTransfer)1626 TEST_F(ReadTransfer, Version2_SimpleTransfer) {
1627 ctx_.SendClientStream(
1628 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1629 .set_desired_session_id(kArbitrarySessionId)
1630 .set_resource_id(3)));
1631
1632 transfer_thread_.WaitUntilEventIsProcessed();
1633
1634 EXPECT_TRUE(handler_.prepare_read_called);
1635 EXPECT_FALSE(handler_.finalize_read_called);
1636
1637 // First, the server responds with a START_ACK, accepting the session ID and
1638 // confirming the protocol version.
1639 ASSERT_EQ(ctx_.total_responses(), 1u);
1640 Chunk chunk = DecodeChunk(ctx_.responses().back());
1641 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1642 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1643 EXPECT_FALSE(chunk.desired_session_id().has_value());
1644 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1645 EXPECT_EQ(chunk.resource_id(), 3u);
1646
1647 // Complete the handshake by confirming the server's ACK and sending the first
1648 // read transfer parameters.
1649 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
1650 ctx_.SendClientStream(EncodeChunk(
1651 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1652 .set_session_id(kArbitrarySessionId)
1653 .set_window_end_offset(64)
1654 .set_offset(0)));
1655
1656 transfer_thread_.WaitUntilEventIsProcessed();
1657 });
1658
1659 // Server should respond by starting the data transfer, sending its sole data
1660 // chunk and a remaining_bytes 0 chunk.
1661 ASSERT_EQ(ctx_.total_responses(), 3u);
1662
1663 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1664 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1665 EXPECT_EQ(c1.type(), Chunk::Type::kData);
1666 EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
1667 EXPECT_EQ(c1.offset(), 0u);
1668 ASSERT_TRUE(c1.has_payload());
1669 ASSERT_EQ(c1.payload().size(), kData.size());
1670 EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1671 0);
1672
1673 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1674 EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1675 EXPECT_EQ(c2.type(), Chunk::Type::kData);
1676 EXPECT_EQ(c2.session_id(), kArbitrarySessionId);
1677 EXPECT_FALSE(c2.has_payload());
1678 EXPECT_EQ(c2.remaining_bytes(), 0u);
1679
1680 ctx_.SendClientStream(EncodeChunk(Chunk::Final(
1681 ProtocolVersion::kVersionTwo, kArbitrarySessionId, OkStatus())));
1682 transfer_thread_.WaitUntilEventIsProcessed();
1683
1684 EXPECT_TRUE(handler_.finalize_read_called);
1685 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1686 }
1687
TEST_F(ReadTransfer,Version2_MultiChunk)1688 TEST_F(ReadTransfer, Version2_MultiChunk) {
1689 ctx_.SendClientStream(
1690 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1691 .set_desired_session_id(kArbitrarySessionId)
1692 .set_resource_id(3)));
1693
1694 transfer_thread_.WaitUntilEventIsProcessed();
1695
1696 EXPECT_TRUE(handler_.prepare_read_called);
1697 EXPECT_FALSE(handler_.finalize_read_called);
1698
1699 // First, the server responds with a START_ACK, accepting the session ID and
1700 // confirming the protocol version.
1701 ASSERT_EQ(ctx_.total_responses(), 1u);
1702 Chunk chunk = DecodeChunk(ctx_.responses().back());
1703 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1704 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1705 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1706 EXPECT_EQ(chunk.resource_id(), 3u);
1707
1708 // Complete the handshake by confirming the server's ACK and sending the first
1709 // read transfer parameters.
1710 rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
1711 ctx_.SendClientStream(EncodeChunk(
1712 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1713 .set_session_id(kArbitrarySessionId)
1714 .set_window_end_offset(64)
1715 .set_max_chunk_size_bytes(16)
1716 .set_offset(0)));
1717
1718 transfer_thread_.WaitUntilEventIsProcessed();
1719 });
1720
1721 ASSERT_EQ(ctx_.total_responses(), 4u);
1722
1723 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1724 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1725 EXPECT_EQ(c1.type(), Chunk::Type::kData);
1726 EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
1727 EXPECT_EQ(c1.offset(), 0u);
1728 ASSERT_TRUE(c1.has_payload());
1729 ASSERT_EQ(c1.payload().size(), 16u);
1730 EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1731 0);
1732
1733 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1734 EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1735 EXPECT_EQ(c2.type(), Chunk::Type::kData);
1736 EXPECT_EQ(c2.session_id(), kArbitrarySessionId);
1737 EXPECT_EQ(c2.offset(), 16u);
1738 ASSERT_TRUE(c2.has_payload());
1739 ASSERT_EQ(c2.payload().size(), 16u);
1740 EXPECT_EQ(
1741 std::memcmp(
1742 c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
1743 0);
1744
1745 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
1746 EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
1747 EXPECT_EQ(c3.type(), Chunk::Type::kData);
1748 EXPECT_EQ(c3.session_id(), kArbitrarySessionId);
1749 EXPECT_FALSE(c3.has_payload());
1750 EXPECT_EQ(c3.remaining_bytes(), 0u);
1751
1752 ctx_.SendClientStream(EncodeChunk(Chunk::Final(
1753 ProtocolVersion::kVersionTwo, kArbitrarySessionId, OkStatus())));
1754 transfer_thread_.WaitUntilEventIsProcessed();
1755
1756 EXPECT_TRUE(handler_.finalize_read_called);
1757 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1758 }
1759
TEST_F(ReadTransfer,Version2_MultiParameters)1760 TEST_F(ReadTransfer, Version2_MultiParameters) {
1761 ctx_.SendClientStream(
1762 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1763 .set_desired_session_id(kArbitrarySessionId)
1764 .set_resource_id(3)));
1765
1766 transfer_thread_.WaitUntilEventIsProcessed();
1767
1768 EXPECT_TRUE(handler_.prepare_read_called);
1769 EXPECT_FALSE(handler_.finalize_read_called);
1770
1771 // First, the server responds with a START_ACK, accepting the session ID and
1772 // confirming the protocol version.
1773 ASSERT_EQ(ctx_.total_responses(), 1u);
1774 Chunk chunk = DecodeChunk(ctx_.responses().back());
1775 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1776 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1777 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1778 EXPECT_EQ(chunk.resource_id(), 3u);
1779
1780 // Complete the handshake by confirming the server's ACK and sending the first
1781 // read transfer parameters.
1782 ctx_.SendClientStream(EncodeChunk(
1783 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1784 .set_session_id(kArbitrarySessionId)
1785 .set_window_end_offset(16)
1786 .set_offset(0)));
1787 transfer_thread_.WaitUntilEventIsProcessed();
1788
1789 ASSERT_EQ(ctx_.total_responses(), 2u);
1790
1791 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1792 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1793 EXPECT_EQ(c1.type(), Chunk::Type::kData);
1794 EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
1795 EXPECT_EQ(c1.offset(), 0u);
1796 ASSERT_TRUE(c1.has_payload());
1797 ASSERT_EQ(c1.payload().size(), 16u);
1798 EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1799 0);
1800
1801 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
1802 ctx_.SendClientStream(EncodeChunk(
1803 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersContinue)
1804 .set_session_id(kArbitrarySessionId)
1805 .set_window_end_offset(64)
1806 .set_offset(16)));
1807 transfer_thread_.WaitUntilEventIsProcessed();
1808 });
1809
1810 ASSERT_EQ(ctx_.total_responses(), 4u);
1811
1812 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1813 EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1814 EXPECT_EQ(c2.type(), Chunk::Type::kData);
1815 EXPECT_EQ(c2.session_id(), kArbitrarySessionId);
1816 EXPECT_EQ(c2.offset(), 16u);
1817 ASSERT_TRUE(c2.has_payload());
1818 ASSERT_EQ(c2.payload().size(), 16u);
1819 EXPECT_EQ(
1820 std::memcmp(
1821 c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
1822 0);
1823
1824 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
1825 EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
1826 EXPECT_EQ(c3.type(), Chunk::Type::kData);
1827 EXPECT_EQ(c3.session_id(), kArbitrarySessionId);
1828 EXPECT_FALSE(c3.has_payload());
1829 EXPECT_EQ(c3.remaining_bytes(), 0u);
1830
1831 ctx_.SendClientStream(EncodeChunk(Chunk::Final(
1832 ProtocolVersion::kVersionTwo, kArbitrarySessionId, OkStatus())));
1833 transfer_thread_.WaitUntilEventIsProcessed();
1834
1835 EXPECT_TRUE(handler_.finalize_read_called);
1836 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1837 }
1838
TEST_F(ReadTransfer,Version2_ClientTerminatesDuringHandshake)1839 TEST_F(ReadTransfer, Version2_ClientTerminatesDuringHandshake) {
1840 ctx_.SendClientStream(
1841 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1842 .set_desired_session_id(kArbitrarySessionId)
1843 .set_resource_id(3)));
1844
1845 transfer_thread_.WaitUntilEventIsProcessed();
1846
1847 EXPECT_TRUE(handler_.prepare_read_called);
1848 EXPECT_FALSE(handler_.finalize_read_called);
1849
1850 // First, the server responds with a START_ACK, accepting the session ID and
1851 // confirming the protocol version.
1852 ASSERT_EQ(ctx_.total_responses(), 1u);
1853 Chunk chunk = DecodeChunk(ctx_.responses().back());
1854 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1855 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1856 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1857 EXPECT_EQ(chunk.resource_id(), 3u);
1858
1859 // Send a terminating chunk instead of the third part of the handshake.
1860 ctx_.SendClientStream(EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo,
1861 kArbitrarySessionId,
1862 Status::ResourceExhausted())));
1863 transfer_thread_.WaitUntilEventIsProcessed();
1864
1865 EXPECT_TRUE(handler_.finalize_read_called);
1866 EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
1867 }
1868
TEST_F(ReadTransfer,Version2_ClientSendsWrongProtocolVersion)1869 TEST_F(ReadTransfer, Version2_ClientSendsWrongProtocolVersion) {
1870 ctx_.SendClientStream(
1871 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1872 .set_desired_session_id(kArbitrarySessionId)
1873 .set_resource_id(3)));
1874
1875 transfer_thread_.WaitUntilEventIsProcessed();
1876
1877 EXPECT_TRUE(handler_.prepare_read_called);
1878 EXPECT_FALSE(handler_.finalize_read_called);
1879
1880 // First, the server responds with a START_ACK, accepting the session ID and
1881 // confirming the protocol version.
1882 ASSERT_EQ(ctx_.total_responses(), 1u);
1883 Chunk chunk = DecodeChunk(ctx_.responses().back());
1884 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1885 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1886 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1887 EXPECT_EQ(chunk.resource_id(), 3u);
1888
1889 // Complete the handshake by confirming the server's ACK and sending the first
1890 // read transfer parameters.
1891 ctx_.SendClientStream(EncodeChunk(
1892 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1893 .set_session_id(kArbitrarySessionId)
1894 .set_window_end_offset(16)
1895 .set_offset(0)));
1896 transfer_thread_.WaitUntilEventIsProcessed();
1897
1898 ASSERT_EQ(ctx_.total_responses(), 2u);
1899
1900 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1901 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1902 EXPECT_EQ(c1.type(), Chunk::Type::kData);
1903 EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
1904 EXPECT_EQ(c1.offset(), 0u);
1905 ASSERT_TRUE(c1.has_payload());
1906 ASSERT_EQ(c1.payload().size(), 16u);
1907 EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1908 0);
1909
1910 // Send a parameters update, but with the incorrect protocol version. The
1911 // server should terminate the transfer.
1912 ctx_.SendClientStream(EncodeChunk(
1913 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
1914 .set_session_id(3)
1915 .set_window_end_offset(64)
1916 .set_offset(16)));
1917 transfer_thread_.WaitUntilEventIsProcessed();
1918
1919 ASSERT_EQ(ctx_.total_responses(), 3u);
1920
1921 chunk = DecodeChunk(ctx_.responses().back());
1922 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1923 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1924 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1925 ASSERT_TRUE(chunk.status().has_value());
1926 EXPECT_EQ(chunk.status().value(), Status::Internal());
1927
1928 EXPECT_TRUE(handler_.finalize_read_called);
1929 EXPECT_EQ(handler_.finalize_read_status, Status::Internal());
1930 }
1931
TEST_F(ReadTransfer,Version2_BadParametersInHandshake)1932 TEST_F(ReadTransfer, Version2_BadParametersInHandshake) {
1933 ctx_.SendClientStream(
1934 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1935 .set_desired_session_id(kArbitrarySessionId)
1936 .set_resource_id(3)));
1937
1938 transfer_thread_.WaitUntilEventIsProcessed();
1939
1940 EXPECT_TRUE(handler_.prepare_read_called);
1941 EXPECT_FALSE(handler_.finalize_read_called);
1942
1943 ASSERT_EQ(ctx_.total_responses(), 1u);
1944 Chunk chunk = DecodeChunk(ctx_.responses().back());
1945 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1946 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1947 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1948 EXPECT_EQ(chunk.resource_id(), 3u);
1949
1950 // Complete the handshake, but send an invalid parameters chunk. The server
1951 // should terminate the transfer.
1952 ctx_.SendClientStream(EncodeChunk(
1953 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1954 .set_session_id(kArbitrarySessionId)
1955 .set_window_end_offset(0)
1956 .set_offset(0)));
1957
1958 transfer_thread_.WaitUntilEventIsProcessed();
1959
1960 ASSERT_EQ(ctx_.total_responses(), 2u);
1961
1962 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1963 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1964 EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
1965 EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
1966 ASSERT_TRUE(c1.status().has_value());
1967 EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
1968 }
1969
TEST_F(ReadTransfer,Version2_InvalidResourceId)1970 TEST_F(ReadTransfer, Version2_InvalidResourceId) {
1971 ctx_.SendClientStream(
1972 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1973 .set_desired_session_id(kArbitrarySessionId)
1974 .set_resource_id(99)));
1975
1976 transfer_thread_.WaitUntilEventIsProcessed();
1977
1978 ASSERT_EQ(ctx_.total_responses(), 1u);
1979
1980 Chunk chunk = DecodeChunk(ctx_.responses().back());
1981 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1982 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1983 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1984 EXPECT_EQ(chunk.status().value(), Status::NotFound());
1985 }
1986
TEST_F(ReadTransfer,Version2_PrepareError)1987 TEST_F(ReadTransfer, Version2_PrepareError) {
1988 SometimesUnavailableReadHandler unavailable_handler(99, kData);
1989 ctx_.service().RegisterHandler(unavailable_handler);
1990
1991 ctx_.SendClientStream(
1992 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1993 .set_desired_session_id(kArbitrarySessionId)
1994 .set_resource_id(99)));
1995 transfer_thread_.WaitUntilEventIsProcessed();
1996
1997 ASSERT_EQ(ctx_.total_responses(), 1u);
1998 Chunk chunk = DecodeChunk(ctx_.responses().back());
1999 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2000 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2001 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2002 EXPECT_EQ(chunk.resource_id(), 99u);
2003 EXPECT_EQ(chunk.status().value(), Status::DataLoss());
2004 }
2005
TEST_F(ReadTransfer,Version2_HandlerSetsTransferSize)2006 TEST_F(ReadTransfer, Version2_HandlerSetsTransferSize) {
2007 handler_.set_resource_size(kData.size());
2008
2009 ctx_.SendClientStream(
2010 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2011 .set_desired_session_id(kArbitrarySessionId)
2012 .set_resource_id(3)));
2013
2014 transfer_thread_.WaitUntilEventIsProcessed();
2015
2016 EXPECT_TRUE(handler_.prepare_read_called);
2017 EXPECT_FALSE(handler_.finalize_read_called);
2018
2019 // First, the server responds with a START_ACK, accepting the session ID and
2020 // confirming the protocol version.
2021 ASSERT_EQ(ctx_.total_responses(), 1u);
2022 Chunk chunk = DecodeChunk(ctx_.responses().back());
2023 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2024 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2025 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2026 EXPECT_EQ(chunk.resource_id(), 3u);
2027
2028 // Complete the handshake by confirming the server's ACK and sending the first
2029 // read transfer parameters.
2030 rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
2031 ctx_.SendClientStream(EncodeChunk(
2032 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2033 .set_session_id(kArbitrarySessionId)
2034 .set_window_end_offset(64)
2035 .set_max_chunk_size_bytes(8)
2036 .set_offset(0)));
2037
2038 transfer_thread_.WaitUntilEventIsProcessed();
2039 });
2040
2041 ASSERT_EQ(ctx_.total_responses(), 5u);
2042
2043 // Each of the sent chunks should have a remaining_bytes value set.
2044 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
2045 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
2046 EXPECT_EQ(c1.type(), Chunk::Type::kData);
2047 EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
2048 EXPECT_EQ(c1.offset(), 0u);
2049 ASSERT_TRUE(c1.remaining_bytes().has_value());
2050 EXPECT_EQ(c1.remaining_bytes().value(), 24u);
2051
2052 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
2053 EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
2054 EXPECT_EQ(c2.type(), Chunk::Type::kData);
2055 EXPECT_EQ(c2.session_id(), kArbitrarySessionId);
2056 EXPECT_EQ(c2.offset(), 8u);
2057 ASSERT_TRUE(c2.remaining_bytes().has_value());
2058 EXPECT_EQ(c2.remaining_bytes().value(), 16u);
2059
2060 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
2061 EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
2062 EXPECT_EQ(c3.type(), Chunk::Type::kData);
2063 EXPECT_EQ(c3.session_id(), kArbitrarySessionId);
2064 EXPECT_EQ(c3.offset(), 16u);
2065 ASSERT_TRUE(c3.remaining_bytes().has_value());
2066 EXPECT_EQ(c3.remaining_bytes().value(), 8u);
2067
2068 Chunk c4 = DecodeChunk(ctx_.responses()[4]);
2069 EXPECT_EQ(c4.protocol_version(), ProtocolVersion::kVersionTwo);
2070 EXPECT_EQ(c4.type(), Chunk::Type::kData);
2071 EXPECT_EQ(c4.session_id(), kArbitrarySessionId);
2072 EXPECT_EQ(c4.offset(), 24u);
2073 ASSERT_TRUE(c4.remaining_bytes().has_value());
2074 EXPECT_EQ(c4.remaining_bytes().value(), 0u);
2075
2076 ctx_.SendClientStream(EncodeChunk(Chunk::Final(
2077 ProtocolVersion::kVersionTwo, kArbitrarySessionId, OkStatus())));
2078 transfer_thread_.WaitUntilEventIsProcessed();
2079
2080 EXPECT_TRUE(handler_.finalize_read_called);
2081 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
2082 }
2083
TEST_F(WriteTransfer,Version2_SimpleTransfer)2084 TEST_F(WriteTransfer, Version2_SimpleTransfer) {
2085 ctx_.SendClientStream(
2086 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2087 .set_desired_session_id(kArbitrarySessionId)
2088 .set_resource_id(7)));
2089
2090 transfer_thread_.WaitUntilEventIsProcessed();
2091
2092 EXPECT_TRUE(handler_.prepare_write_called);
2093 EXPECT_FALSE(handler_.finalize_write_called);
2094
2095 // First, the server responds with a START_ACK, accepting the session ID and
2096 // confirming the protocol version.
2097 ASSERT_EQ(ctx_.total_responses(), 1u);
2098 Chunk chunk = DecodeChunk(ctx_.responses().back());
2099 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2100 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2101 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2102 EXPECT_EQ(chunk.resource_id(), 7u);
2103
2104 // Complete the handshake by confirming the server's ACK.
2105 ctx_.SendClientStream(EncodeChunk(
2106 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2107 .set_session_id(kArbitrarySessionId)));
2108 transfer_thread_.WaitUntilEventIsProcessed();
2109
2110 // Server should respond by sending its initial transfer parameters.
2111 ASSERT_EQ(ctx_.total_responses(), 2u);
2112
2113 chunk = DecodeChunk(ctx_.responses()[1]);
2114 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2115 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2116 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2117 EXPECT_EQ(chunk.offset(), 0u);
2118 EXPECT_EQ(chunk.window_end_offset(), 32u);
2119 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2120 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2121
2122 // Send all of our data.
2123 ctx_.SendClientStream<64>(
2124 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2125 .set_session_id(kArbitrarySessionId)
2126 .set_offset(0)
2127 .set_payload(kData)
2128 .set_remaining_bytes(0)));
2129 transfer_thread_.WaitUntilEventIsProcessed();
2130
2131 ASSERT_EQ(ctx_.total_responses(), 3u);
2132
2133 chunk = DecodeChunk(ctx_.responses().back());
2134 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2135 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2136 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2137 ASSERT_TRUE(chunk.status().has_value());
2138 EXPECT_EQ(chunk.status().value(), OkStatus());
2139
2140 // Send the completion acknowledgement.
2141 ctx_.SendClientStream(EncodeChunk(
2142 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2143 .set_session_id(kArbitrarySessionId)));
2144 transfer_thread_.WaitUntilEventIsProcessed();
2145
2146 ASSERT_EQ(ctx_.total_responses(), 3u);
2147
2148 EXPECT_TRUE(handler_.finalize_write_called);
2149 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2150 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2151 }
2152
TEST_F(WriteTransfer,Version2_Multichunk)2153 TEST_F(WriteTransfer, Version2_Multichunk) {
2154 ctx_.SendClientStream(
2155 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2156 .set_desired_session_id(kArbitrarySessionId)
2157 .set_resource_id(7)));
2158
2159 transfer_thread_.WaitUntilEventIsProcessed();
2160
2161 EXPECT_TRUE(handler_.prepare_write_called);
2162 EXPECT_FALSE(handler_.finalize_write_called);
2163
2164 // First, the server responds with a START_ACK, accepting the session ID and
2165 // confirming the protocol version.
2166 ASSERT_EQ(ctx_.total_responses(), 1u);
2167 Chunk chunk = DecodeChunk(ctx_.responses().back());
2168 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2169 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2170 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2171 EXPECT_EQ(chunk.resource_id(), 7u);
2172
2173 // Complete the handshake by confirming the server's ACK.
2174 ctx_.SendClientStream(EncodeChunk(
2175 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2176 .set_session_id(kArbitrarySessionId)));
2177 transfer_thread_.WaitUntilEventIsProcessed();
2178
2179 // Server should respond by sending its initial transfer parameters.
2180 ASSERT_EQ(ctx_.total_responses(), 2u);
2181
2182 chunk = DecodeChunk(ctx_.responses()[1]);
2183 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2184 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2185 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2186 EXPECT_EQ(chunk.offset(), 0u);
2187 EXPECT_EQ(chunk.window_end_offset(), 32u);
2188 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2189 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2190
2191 // Send all of our data across two chunks.
2192 ctx_.SendClientStream<64>(
2193 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2194 .set_session_id(kArbitrarySessionId)
2195 .set_offset(0)
2196 .set_payload(span(kData).first(8))));
2197 ctx_.SendClientStream<64>(
2198 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2199 .set_session_id(kArbitrarySessionId)
2200 .set_offset(8)
2201 .set_payload(span(kData).subspan(8))
2202 .set_remaining_bytes(0)));
2203 transfer_thread_.WaitUntilEventIsProcessed();
2204
2205 ASSERT_EQ(ctx_.total_responses(), 3u);
2206
2207 chunk = DecodeChunk(ctx_.responses().back());
2208 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2209 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2210 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2211 ASSERT_TRUE(chunk.status().has_value());
2212 EXPECT_EQ(chunk.status().value(), OkStatus());
2213
2214 // Send the completion acknowledgement.
2215 ctx_.SendClientStream(EncodeChunk(
2216 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2217 .set_session_id(kArbitrarySessionId)));
2218 transfer_thread_.WaitUntilEventIsProcessed();
2219
2220 ASSERT_EQ(ctx_.total_responses(), 3u);
2221
2222 EXPECT_TRUE(handler_.finalize_write_called);
2223 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2224 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2225 }
2226
TEST_F(WriteTransfer,Version2_ContinueParameters)2227 TEST_F(WriteTransfer, Version2_ContinueParameters) {
2228 ctx_.SendClientStream(
2229 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2230 .set_desired_session_id(kArbitrarySessionId)
2231 .set_resource_id(7)));
2232
2233 transfer_thread_.WaitUntilEventIsProcessed();
2234
2235 EXPECT_TRUE(handler_.prepare_write_called);
2236 EXPECT_FALSE(handler_.finalize_write_called);
2237
2238 // First, the server responds with a START_ACK, accepting the session ID and
2239 // confirming the protocol version.
2240 ASSERT_EQ(ctx_.total_responses(), 1u);
2241 Chunk chunk = DecodeChunk(ctx_.responses().back());
2242 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2243 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2244 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2245 EXPECT_EQ(chunk.resource_id(), 7u);
2246
2247 // Complete the handshake by confirming the server's ACK.
2248 ctx_.SendClientStream(EncodeChunk(
2249 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2250 .set_session_id(kArbitrarySessionId)));
2251 transfer_thread_.WaitUntilEventIsProcessed();
2252
2253 // Server should respond by sending its initial transfer parameters.
2254 ASSERT_EQ(ctx_.total_responses(), 2u);
2255
2256 chunk = DecodeChunk(ctx_.responses()[1]);
2257 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2258 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2259 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2260 EXPECT_EQ(chunk.offset(), 0u);
2261 EXPECT_EQ(chunk.window_end_offset(), 32u);
2262 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2263 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2264
2265 // Send all of our data across several chunks.
2266 ctx_.SendClientStream<64>(
2267 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2268 .set_session_id(kArbitrarySessionId)
2269 .set_offset(0)
2270 .set_payload(span(kData).first(8))));
2271
2272 transfer_thread_.WaitUntilEventIsProcessed();
2273 ASSERT_EQ(ctx_.total_responses(), 2u);
2274
2275 ctx_.SendClientStream<64>(
2276 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2277 .set_session_id(kArbitrarySessionId)
2278 .set_offset(8)
2279 .set_payload(span(kData).subspan(8, 8))));
2280
2281 transfer_thread_.WaitUntilEventIsProcessed();
2282 ASSERT_EQ(ctx_.total_responses(), 3u);
2283
2284 chunk = DecodeChunk(ctx_.responses().back());
2285 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2286 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2287 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2288 EXPECT_EQ(chunk.offset(), 16u);
2289 EXPECT_EQ(chunk.window_end_offset(), 32u);
2290
2291 ctx_.SendClientStream<64>(
2292 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2293 .set_session_id(kArbitrarySessionId)
2294 .set_offset(16)
2295 .set_payload(span(kData).subspan(16, 8))));
2296
2297 transfer_thread_.WaitUntilEventIsProcessed();
2298 ASSERT_EQ(ctx_.total_responses(), 4u);
2299
2300 chunk = DecodeChunk(ctx_.responses().back());
2301 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2302 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2303 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2304 EXPECT_EQ(chunk.offset(), 24u);
2305 EXPECT_EQ(chunk.window_end_offset(), 32u);
2306
2307 ctx_.SendClientStream<64>(
2308 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2309 .set_session_id(kArbitrarySessionId)
2310 .set_offset(24)
2311 .set_payload(span(kData).subspan(24))
2312 .set_remaining_bytes(0)));
2313 transfer_thread_.WaitUntilEventIsProcessed();
2314
2315 ASSERT_EQ(ctx_.total_responses(), 5u);
2316
2317 chunk = DecodeChunk(ctx_.responses().back());
2318 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2319 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2320 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2321 ASSERT_TRUE(chunk.status().has_value());
2322 EXPECT_EQ(chunk.status().value(), OkStatus());
2323
2324 // Send the completion acknowledgement.
2325 ctx_.SendClientStream(EncodeChunk(
2326 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2327 .set_session_id(kArbitrarySessionId)));
2328 transfer_thread_.WaitUntilEventIsProcessed();
2329
2330 ASSERT_EQ(ctx_.total_responses(), 5u);
2331
2332 EXPECT_TRUE(handler_.finalize_write_called);
2333 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2334 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2335 }
2336
TEST_F(WriteTransfer,Version2_ClientTerminatesDuringHandshake)2337 TEST_F(WriteTransfer, Version2_ClientTerminatesDuringHandshake) {
2338 ctx_.SendClientStream(
2339 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2340 .set_desired_session_id(kArbitrarySessionId)
2341 .set_resource_id(7)));
2342
2343 transfer_thread_.WaitUntilEventIsProcessed();
2344
2345 EXPECT_TRUE(handler_.prepare_write_called);
2346 EXPECT_FALSE(handler_.finalize_write_called);
2347
2348 // First, the server responds with a START_ACK, accepting the session ID and
2349 // confirming the protocol version.
2350 ASSERT_EQ(ctx_.total_responses(), 1u);
2351 Chunk chunk = DecodeChunk(ctx_.responses().back());
2352 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2353 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2354 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2355 EXPECT_EQ(chunk.resource_id(), 7u);
2356
2357 // Send an error chunk instead of completing the handshake.
2358 ctx_.SendClientStream(
2359 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo,
2360 kArbitrarySessionId,
2361 Status::FailedPrecondition())));
2362 transfer_thread_.WaitUntilEventIsProcessed();
2363
2364 EXPECT_TRUE(handler_.finalize_write_called);
2365 EXPECT_EQ(handler_.finalize_write_status, Status::FailedPrecondition());
2366 }
2367
TEST_F(WriteTransfer,Version2_ClientSendsWrongProtocolVersion)2368 TEST_F(WriteTransfer, Version2_ClientSendsWrongProtocolVersion) {
2369 ctx_.SendClientStream(
2370 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2371 .set_desired_session_id(kArbitrarySessionId)
2372 .set_resource_id(7)));
2373
2374 transfer_thread_.WaitUntilEventIsProcessed();
2375
2376 EXPECT_TRUE(handler_.prepare_write_called);
2377 EXPECT_FALSE(handler_.finalize_write_called);
2378
2379 // First, the server responds with a START_ACK, accepting the session ID and
2380 // confirming the protocol version.
2381 ASSERT_EQ(ctx_.total_responses(), 1u);
2382 Chunk chunk = DecodeChunk(ctx_.responses().back());
2383 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2384 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2385 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2386 EXPECT_EQ(chunk.resource_id(), 7u);
2387
2388 // Complete the handshake by confirming the server's ACK.
2389 ctx_.SendClientStream(EncodeChunk(
2390 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2391 .set_session_id(kArbitrarySessionId)));
2392 transfer_thread_.WaitUntilEventIsProcessed();
2393
2394 // Server should respond by sending its initial transfer parameters.
2395 ASSERT_EQ(ctx_.total_responses(), 2u);
2396
2397 chunk = DecodeChunk(ctx_.responses()[1]);
2398 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2399 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2400 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2401 EXPECT_EQ(chunk.offset(), 0u);
2402 EXPECT_EQ(chunk.window_end_offset(), 32u);
2403 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2404 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2405
2406 // The transfer was configured to use protocol version 2. Send a legacy chunk
2407 // instead.
2408 ctx_.SendClientStream<64>(
2409 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
2410 .set_session_id(7)
2411 .set_offset(0)
2412 .set_payload(kData)
2413 .set_remaining_bytes(0)));
2414 transfer_thread_.WaitUntilEventIsProcessed();
2415
2416 // Server should terminate the transfer.
2417 ASSERT_EQ(ctx_.total_responses(), 3u);
2418
2419 chunk = DecodeChunk(ctx_.responses()[2]);
2420 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2421 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2422 EXPECT_EQ(chunk.status().value(), Status::Internal());
2423
2424 // Send the completion acknowledgement.
2425 ctx_.SendClientStream(EncodeChunk(
2426 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2427 .set_session_id(kArbitrarySessionId)));
2428 transfer_thread_.WaitUntilEventIsProcessed();
2429
2430 ASSERT_EQ(ctx_.total_responses(), 3u);
2431 }
2432
TEST_F(WriteTransfer,Version2_InvalidResourceId)2433 TEST_F(WriteTransfer, Version2_InvalidResourceId) {
2434 ctx_.SendClientStream(
2435 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2436 .set_desired_session_id(kArbitrarySessionId)
2437 .set_resource_id(99)));
2438
2439 transfer_thread_.WaitUntilEventIsProcessed();
2440
2441 ASSERT_EQ(ctx_.total_responses(), 1u);
2442
2443 Chunk chunk = DecodeChunk(ctx_.responses().back());
2444 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2445 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2446 EXPECT_FALSE(chunk.resource_id().has_value());
2447 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2448 EXPECT_EQ(chunk.status().value(), Status::NotFound());
2449 }
2450
2451 class ReadTransferLowMaxRetries : public ::testing::Test {
2452 protected:
2453 static constexpr uint32_t kMaxRetries = 3;
2454 static constexpr uint32_t kMaxLifetimeRetries = 4;
2455
ReadTransferLowMaxRetries()2456 ReadTransferLowMaxRetries()
2457 : handler_(9, kData),
2458 transfer_thread_(data_buffer_, encode_buffer_),
2459 ctx_(transfer_thread_,
2460 64,
2461 // Use a long timeout to avoid accidentally triggering timeouts.
2462 std::chrono::minutes(1),
2463 kMaxRetries,
2464 cfg::kDefaultExtendWindowDivisor,
2465 kMaxLifetimeRetries),
2466 system_thread_(TransferThreadOptions(), transfer_thread_) {
2467 ctx_.service().RegisterHandler(handler_);
2468
2469 PW_CHECK(!handler_.prepare_read_called);
2470 PW_CHECK(!handler_.finalize_read_called);
2471
2472 ctx_.call(); // Open the read stream
2473 transfer_thread_.WaitUntilEventIsProcessed();
2474 }
2475
~ReadTransferLowMaxRetries()2476 ~ReadTransferLowMaxRetries() override {
2477 transfer_thread_.Terminate();
2478 system_thread_.join();
2479 }
2480
2481 SimpleReadTransfer handler_;
2482 Thread<1, 1> transfer_thread_;
2483 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read, 10) ctx_;
2484 pw::Thread system_thread_;
2485 std::array<std::byte, 64> data_buffer_;
2486 std::array<std::byte, 64> encode_buffer_;
2487 };
2488
TEST_F(ReadTransferLowMaxRetries,FailsAfterLifetimeRetryCount)2489 TEST_F(ReadTransferLowMaxRetries, FailsAfterLifetimeRetryCount) {
2490 ctx_.SendClientStream(
2491 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
2492 .set_session_id(9)
2493 .set_window_end_offset(16)
2494 .set_offset(0)));
2495
2496 transfer_thread_.WaitUntilEventIsProcessed();
2497
2498 EXPECT_TRUE(handler_.prepare_read_called);
2499 EXPECT_FALSE(handler_.finalize_read_called);
2500
2501 ASSERT_EQ(ctx_.total_responses(), 1u);
2502 Chunk chunk = DecodeChunk(ctx_.responses().back());
2503
2504 EXPECT_EQ(chunk.session_id(), 9u);
2505 EXPECT_EQ(chunk.offset(), 0u);
2506 ASSERT_EQ(chunk.payload().size(), 16u);
2507 EXPECT_EQ(
2508 std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
2509 0);
2510
2511 // Time out twice. Server should retry both times.
2512 transfer_thread_.SimulateServerTimeout(9);
2513 transfer_thread_.SimulateServerTimeout(9);
2514 ASSERT_EQ(ctx_.total_responses(), 3u);
2515 chunk = DecodeChunk(ctx_.responses().back());
2516 EXPECT_EQ(chunk.session_id(), 9u);
2517 EXPECT_EQ(chunk.offset(), 0u);
2518 ASSERT_EQ(chunk.payload().size(), 16u);
2519 EXPECT_EQ(
2520 std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
2521 0);
2522
2523 ctx_.SendClientStream(EncodeChunk(
2524 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
2525 .set_session_id(9)
2526 .set_window_end_offset(32)
2527 .set_offset(16)));
2528 transfer_thread_.WaitUntilEventIsProcessed();
2529
2530 ASSERT_EQ(ctx_.total_responses(), 4u);
2531 chunk = DecodeChunk(ctx_.responses().back());
2532 EXPECT_EQ(chunk.session_id(), 9u);
2533 EXPECT_EQ(chunk.offset(), 16u);
2534 ASSERT_EQ(chunk.payload().size(), 16u);
2535 EXPECT_EQ(
2536 std::memcmp(
2537 chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2538 0);
2539
2540 // Time out three more times. The transfer should terminate.
2541 transfer_thread_.SimulateServerTimeout(9);
2542 ASSERT_EQ(ctx_.total_responses(), 5u);
2543 chunk = DecodeChunk(ctx_.responses().back());
2544 EXPECT_EQ(chunk.session_id(), 9u);
2545 EXPECT_EQ(chunk.offset(), 16u);
2546 ASSERT_EQ(chunk.payload().size(), 16u);
2547 EXPECT_EQ(
2548 std::memcmp(
2549 chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2550 0);
2551
2552 transfer_thread_.SimulateServerTimeout(9);
2553 ASSERT_EQ(ctx_.total_responses(), 6u);
2554 chunk = DecodeChunk(ctx_.responses().back());
2555 EXPECT_EQ(chunk.session_id(), 9u);
2556 EXPECT_EQ(chunk.offset(), 16u);
2557 ASSERT_EQ(chunk.payload().size(), 16u);
2558 EXPECT_EQ(
2559 std::memcmp(
2560 chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2561 0);
2562
2563 transfer_thread_.SimulateServerTimeout(9);
2564 ASSERT_EQ(ctx_.total_responses(), 7u);
2565 chunk = DecodeChunk(ctx_.responses().back());
2566 EXPECT_EQ(chunk.status(), Status::DeadlineExceeded());
2567 }
2568
TEST_F(ReadTransferLowMaxRetries,Version2_FailsAfterLifetimeRetryCount)2569 TEST_F(ReadTransferLowMaxRetries, Version2_FailsAfterLifetimeRetryCount) {
2570 ctx_.SendClientStream(
2571 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2572 .set_desired_session_id(kArbitrarySessionId)
2573 .set_resource_id(9)));
2574
2575 transfer_thread_.WaitUntilEventIsProcessed();
2576
2577 EXPECT_TRUE(handler_.prepare_read_called);
2578 EXPECT_FALSE(handler_.finalize_read_called);
2579
2580 // First, the server responds with a START_ACK, accepting the session ID and
2581 // confirming the protocol version.
2582 ASSERT_EQ(ctx_.total_responses(), 1u);
2583 Chunk chunk = DecodeChunk(ctx_.responses().back());
2584 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2585 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2586 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2587 EXPECT_EQ(chunk.resource_id(), 9u);
2588
2589 // Time out twice. Server should retry both times.
2590 transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2591 transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2592 ASSERT_EQ(ctx_.total_responses(), 3u);
2593 chunk = DecodeChunk(ctx_.responses().back());
2594 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2595
2596 // Complete the handshake, allowing the transfer to continue.
2597 ctx_.SendClientStream(EncodeChunk(
2598 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2599 .set_session_id(kArbitrarySessionId)
2600 .set_window_end_offset(16)
2601 .set_offset(0)));
2602 transfer_thread_.WaitUntilEventIsProcessed();
2603
2604 ASSERT_EQ(ctx_.total_responses(), 4u);
2605 chunk = DecodeChunk(ctx_.responses().back());
2606 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2607
2608 // Time out three more times. The transfer should terminate.
2609 transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2610 ASSERT_EQ(ctx_.total_responses(), 5u);
2611 chunk = DecodeChunk(ctx_.responses().back());
2612 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2613
2614 transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2615 ASSERT_EQ(ctx_.total_responses(), 6u);
2616 chunk = DecodeChunk(ctx_.responses().back());
2617 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2618
2619 transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2620 ASSERT_EQ(ctx_.total_responses(), 7u);
2621 chunk = DecodeChunk(ctx_.responses().back());
2622 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2623 EXPECT_EQ(chunk.status(), Status::DeadlineExceeded());
2624 }
2625
TEST_F(WriteTransfer,Version2_ClientRetriesOpeningChunk)2626 TEST_F(WriteTransfer, Version2_ClientRetriesOpeningChunk) {
2627 ctx_.SendClientStream(
2628 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2629 .set_desired_session_id(kArbitrarySessionId)
2630 .set_resource_id(7)));
2631
2632 transfer_thread_.WaitUntilEventIsProcessed();
2633
2634 EXPECT_TRUE(handler_.prepare_write_called);
2635 EXPECT_FALSE(handler_.finalize_write_called);
2636
2637 // First, the server responds with a START_ACK, accepting the session ID and
2638 // confirming the protocol version.
2639 ASSERT_EQ(ctx_.total_responses(), 1u);
2640 Chunk chunk = DecodeChunk(ctx_.responses().back());
2641 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2642 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2643 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2644 EXPECT_EQ(chunk.resource_id(), 7u);
2645
2646 // Reset prepare_write_called to ensure it isn't called again.
2647 handler_.prepare_write_called = false;
2648
2649 // Client re-sends the same chunk instead of finishing the handshake.
2650 ctx_.SendClientStream(
2651 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2652 .set_desired_session_id(kArbitrarySessionId)
2653 .set_resource_id(7)));
2654
2655 transfer_thread_.WaitUntilEventIsProcessed();
2656
2657 // The server should re-send the same START_ACK without reinitializing the
2658 // handler.
2659 ASSERT_EQ(ctx_.total_responses(), 2u);
2660 chunk = DecodeChunk(ctx_.responses().back());
2661 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2662 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2663 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2664 EXPECT_EQ(chunk.resource_id(), 7u);
2665
2666 EXPECT_FALSE(handler_.prepare_write_called);
2667 EXPECT_FALSE(handler_.finalize_write_called);
2668 }
2669
TEST_F(WriteTransfer,Version2_RegularSessionIdInStartChunk)2670 TEST_F(WriteTransfer, Version2_RegularSessionIdInStartChunk) {
2671 // Client incorrectly sets session_id instead of desired_session_id in its
2672 // START chunk. Server should immediately respond with a protocol error.
2673 ctx_.SendClientStream(
2674 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2675 .set_session_id(kArbitrarySessionId)
2676 .set_resource_id(99)));
2677
2678 transfer_thread_.WaitUntilEventIsProcessed();
2679
2680 EXPECT_FALSE(handler_.prepare_write_called);
2681 EXPECT_FALSE(handler_.finalize_write_called);
2682
2683 ASSERT_EQ(ctx_.total_responses(), 1u);
2684
2685 Chunk chunk = DecodeChunk(ctx_.responses().back());
2686 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2687 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2688 EXPECT_FALSE(chunk.resource_id().has_value());
2689 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2690 EXPECT_EQ(chunk.status().value(), Status::DataLoss());
2691 }
2692
__anonbff93a0d0d02(size_t i) 2693 constexpr auto kData128 = bytes::Initialized<128>([](size_t i) { return i; });
2694
2695 class WriteTransferLargeData : public ::testing::Test {
2696 protected:
WriteTransferLargeData()2697 WriteTransferLargeData()
2698 : buffer{},
2699 handler_(7, buffer),
2700 transfer_thread_(chunk_buffer_, encode_buffer_),
2701 system_thread_(TransferThreadOptions(), transfer_thread_),
2702 ctx_(transfer_thread_,
2703 kData128.size(),
2704 // Use a long timeout to avoid accidentally triggering timeouts.
2705 std::chrono::minutes(1),
2706 /*max_retries=*/3) {
2707 ctx_.service().RegisterHandler(handler_);
2708
2709 PW_CHECK(!handler_.prepare_write_called);
2710 PW_CHECK(!handler_.finalize_write_called);
2711
2712 ctx_.call(); // Open the write stream
2713 transfer_thread_.WaitUntilEventIsProcessed();
2714 }
2715
~WriteTransferLargeData()2716 ~WriteTransferLargeData() override {
2717 transfer_thread_.Terminate();
2718 system_thread_.join();
2719 }
2720
2721 std::array<std::byte, kData128.size()> buffer;
2722 SimpleWriteTransfer handler_;
2723
2724 Thread<1, 1> transfer_thread_;
2725 pw::Thread system_thread_;
2726 std::array<std::byte, 48> chunk_buffer_;
2727 std::array<std::byte, 64> encode_buffer_;
2728 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write, 10) ctx_;
2729 };
2730
TEST_F(WriteTransferLargeData,Version2_AdaptiveWindow_SlowStart)2731 TEST_F(WriteTransferLargeData, Version2_AdaptiveWindow_SlowStart) {
2732 ctx_.SendClientStream(
2733 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2734 .set_desired_session_id(kArbitrarySessionId)
2735 .set_resource_id(7)));
2736
2737 transfer_thread_.WaitUntilEventIsProcessed();
2738
2739 EXPECT_TRUE(handler_.prepare_write_called);
2740 EXPECT_FALSE(handler_.finalize_write_called);
2741
2742 // First, the server responds with a START_ACK, accepting the session ID and
2743 // confirming the protocol version.
2744 ASSERT_EQ(ctx_.total_responses(), 1u);
2745 Chunk chunk = DecodeChunk(ctx_.responses().back());
2746 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2747 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2748 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2749 EXPECT_EQ(chunk.resource_id(), 7u);
2750
2751 // Complete the handshake by confirming the server's ACK.
2752 ctx_.SendClientStream(EncodeChunk(
2753 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2754 .set_session_id(kArbitrarySessionId)));
2755 transfer_thread_.WaitUntilEventIsProcessed();
2756
2757 // Server should respond by sending its initial transfer parameters.
2758 ASSERT_EQ(ctx_.total_responses(), 2u);
2759
2760 constexpr size_t kExpectedMaxChunkSize = 21;
2761
2762 chunk = DecodeChunk(ctx_.responses()[1]);
2763 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2764 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2765 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2766 EXPECT_EQ(chunk.offset(), 0u);
2767 EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
2768 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2769 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSize);
2770
2771 ctx_.SendClientStream<64>(EncodeChunk(
2772 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2773 .set_session_id(kArbitrarySessionId)
2774 .set_offset(0)
2775 .set_payload(span(kData128).first(kExpectedMaxChunkSize))));
2776
2777 transfer_thread_.WaitUntilEventIsProcessed();
2778 ASSERT_EQ(ctx_.total_responses(), 3u);
2779
2780 // Window doubles in response to successful receive.
2781 chunk = DecodeChunk(ctx_.responses().back());
2782 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2783 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2784 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2785 EXPECT_EQ(chunk.offset(), kExpectedMaxChunkSize);
2786 EXPECT_EQ(chunk.window_end_offset(),
2787 chunk.offset() + 2 * kExpectedMaxChunkSize);
2788
2789 ctx_.SendClientStream<64>(
2790 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2791 .set_session_id(kArbitrarySessionId)
2792 .set_offset(chunk.offset())
2793 .set_payload(span(kData128).subspan(
2794 chunk.offset(), kExpectedMaxChunkSize))));
2795
2796 transfer_thread_.WaitUntilEventIsProcessed();
2797 ASSERT_EQ(ctx_.total_responses(), 4u);
2798
2799 // Window doubles in response to successful receive.
2800 chunk = DecodeChunk(ctx_.responses().back());
2801 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2802 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2803 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2804 EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
2805 EXPECT_EQ(chunk.window_end_offset(),
2806 chunk.offset() + 4 * kExpectedMaxChunkSize);
2807
2808 ctx_.SendClientStream<64>(
2809 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2810 .set_session_id(kArbitrarySessionId)
2811 .set_offset(2 * kExpectedMaxChunkSize)
2812 .set_payload(span(kData128).subspan(
2813 chunk.offset(), kExpectedMaxChunkSize))
2814 .set_remaining_bytes(0)));
2815 transfer_thread_.WaitUntilEventIsProcessed();
2816
2817 ASSERT_EQ(ctx_.total_responses(), 5u);
2818
2819 chunk = DecodeChunk(ctx_.responses().back());
2820 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2821 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2822 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2823 ASSERT_TRUE(chunk.status().has_value());
2824 EXPECT_EQ(chunk.status().value(), OkStatus());
2825
2826 // Send the completion acknowledgement.
2827 ctx_.SendClientStream(EncodeChunk(
2828 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2829 .set_session_id(kArbitrarySessionId)));
2830 transfer_thread_.WaitUntilEventIsProcessed();
2831
2832 ASSERT_EQ(ctx_.total_responses(), 5u);
2833
2834 EXPECT_TRUE(handler_.finalize_write_called);
2835 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2836 }
2837
TEST_F(WriteTransferLargeData,Version2_AdaptiveWindow_CongestionAvoidance)2838 TEST_F(WriteTransferLargeData, Version2_AdaptiveWindow_CongestionAvoidance) {
2839 ctx_.SendClientStream(
2840 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2841 .set_desired_session_id(kArbitrarySessionId)
2842 .set_resource_id(7)));
2843
2844 transfer_thread_.WaitUntilEventIsProcessed();
2845
2846 EXPECT_TRUE(handler_.prepare_write_called);
2847 EXPECT_FALSE(handler_.finalize_write_called);
2848
2849 // First, the server responds with a START_ACK, accepting the session ID and
2850 // confirming the protocol version.
2851 ASSERT_EQ(ctx_.total_responses(), 1u);
2852 Chunk chunk = DecodeChunk(ctx_.responses().back());
2853 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2854 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2855 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2856 EXPECT_EQ(chunk.resource_id(), 7u);
2857
2858 // Complete the handshake by confirming the server's ACK.
2859 ctx_.SendClientStream(EncodeChunk(
2860 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2861 .set_session_id(kArbitrarySessionId)));
2862 transfer_thread_.WaitUntilEventIsProcessed();
2863
2864 // Server should respond by sending its initial transfer parameters.
2865 ASSERT_EQ(ctx_.total_responses(), 2u);
2866
2867 constexpr size_t kExpectedMaxChunkSize = 21;
2868
2869 chunk = DecodeChunk(ctx_.responses()[1]);
2870 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2871 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2872 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2873 EXPECT_EQ(chunk.offset(), 0u);
2874 EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
2875 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2876 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSize);
2877
2878 ctx_.SendClientStream<64>(EncodeChunk(
2879 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2880 .set_session_id(kArbitrarySessionId)
2881 .set_offset(0)
2882 .set_payload(span(kData128).first(kExpectedMaxChunkSize))));
2883
2884 transfer_thread_.WaitUntilEventIsProcessed();
2885 ASSERT_EQ(ctx_.total_responses(), 3u);
2886
2887 // Window doubles in response to successful receive.
2888 chunk = DecodeChunk(ctx_.responses().back());
2889 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2890 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2891 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2892 EXPECT_EQ(chunk.offset(), kExpectedMaxChunkSize);
2893 EXPECT_EQ(chunk.window_end_offset(),
2894 chunk.offset() + 2 * kExpectedMaxChunkSize);
2895
2896 ctx_.SendClientStream<64>(
2897 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2898 .set_session_id(kArbitrarySessionId)
2899 .set_offset(chunk.offset())
2900 .set_payload(span(kData128).subspan(
2901 chunk.offset(), kExpectedMaxChunkSize))));
2902
2903 transfer_thread_.WaitUntilEventIsProcessed();
2904 ASSERT_EQ(ctx_.total_responses(), 4u);
2905
2906 // Window doubles in response to successful receive.
2907 chunk = DecodeChunk(ctx_.responses().back());
2908 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2909 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2910 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2911 EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
2912 EXPECT_EQ(chunk.window_end_offset(),
2913 chunk.offset() + 4 * kExpectedMaxChunkSize);
2914
2915 // Don't send any data in response.
2916 transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2917 ASSERT_EQ(ctx_.total_responses(), 5u);
2918
2919 // Window size should now half from the previous one.
2920 chunk = DecodeChunk(ctx_.responses().back());
2921 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2922 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2923 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2924 EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
2925 EXPECT_EQ(chunk.window_end_offset(),
2926 chunk.offset() + 2 * kExpectedMaxChunkSize);
2927
2928 // Send the appropriate chunk.
2929 ctx_.SendClientStream<64>(
2930 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2931 .set_session_id(kArbitrarySessionId)
2932 .set_offset(2 * kExpectedMaxChunkSize)
2933 .set_payload(span(kData128).subspan(
2934 chunk.offset(), kExpectedMaxChunkSize))));
2935 transfer_thread_.WaitUntilEventIsProcessed();
2936
2937 ASSERT_EQ(ctx_.total_responses(), 6u);
2938
2939 // Window size should now only increase by one chunk instead of doubling.
2940 chunk = DecodeChunk(ctx_.responses().back());
2941 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2942 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2943 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2944 EXPECT_EQ(chunk.offset(), 3 * kExpectedMaxChunkSize);
2945 EXPECT_EQ(chunk.window_end_offset(),
2946 chunk.offset() + 3 * kExpectedMaxChunkSize);
2947
2948 // Complete the transfer.
2949 ctx_.SendClientStream<64>(
2950 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2951 .set_session_id(kArbitrarySessionId)
2952 .set_offset(3 * kExpectedMaxChunkSize)
2953 .set_payload(span(kData128).subspan(
2954 chunk.offset(), kExpectedMaxChunkSize))
2955 .set_remaining_bytes(0)));
2956 transfer_thread_.WaitUntilEventIsProcessed();
2957
2958 ASSERT_EQ(ctx_.total_responses(), 7u);
2959
2960 chunk = DecodeChunk(ctx_.responses().back());
2961 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2962 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2963 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2964 ASSERT_TRUE(chunk.status().has_value());
2965 EXPECT_EQ(chunk.status().value(), OkStatus());
2966
2967 // Send the completion acknowledgement.
2968 ctx_.SendClientStream(EncodeChunk(
2969 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2970 .set_session_id(kArbitrarySessionId)));
2971 transfer_thread_.WaitUntilEventIsProcessed();
2972
2973 ASSERT_EQ(ctx_.total_responses(), 7u);
2974
2975 EXPECT_TRUE(handler_.finalize_write_called);
2976 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2977 }
2978
TEST_F(WriteTransferLargeData,Version2_ResendPreviousData_ReceivesContinueParameters)2979 TEST_F(WriteTransferLargeData,
2980 Version2_ResendPreviousData_ReceivesContinueParameters) {
2981 ctx_.SendClientStream(
2982 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2983 .set_desired_session_id(kArbitrarySessionId)
2984 .set_resource_id(7)));
2985
2986 transfer_thread_.WaitUntilEventIsProcessed();
2987
2988 EXPECT_TRUE(handler_.prepare_write_called);
2989 EXPECT_FALSE(handler_.finalize_write_called);
2990
2991 // First, the server responds with a START_ACK, accepting the session ID and
2992 // confirming the protocol version.
2993 ASSERT_EQ(ctx_.total_responses(), 1u);
2994 Chunk chunk = DecodeChunk(ctx_.responses().back());
2995 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2996 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2997 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2998 EXPECT_EQ(chunk.resource_id(), 7u);
2999
3000 // Complete the handshake by confirming the server's ACK.
3001 ctx_.SendClientStream(EncodeChunk(
3002 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
3003 .set_session_id(kArbitrarySessionId)));
3004 transfer_thread_.WaitUntilEventIsProcessed();
3005
3006 // Server should respond by sending its initial transfer parameters.
3007 ASSERT_EQ(ctx_.total_responses(), 2u);
3008
3009 constexpr size_t kExpectedMaxChunkSizeBytes = 21u;
3010
3011 chunk = DecodeChunk(ctx_.responses()[1]);
3012 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3013 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
3014 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3015 EXPECT_EQ(chunk.offset(), 0u);
3016 EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSizeBytes);
3017 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
3018 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSizeBytes);
3019
3020 ctx_.SendClientStream<64>(
3021 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3022 .set_session_id(kArbitrarySessionId)
3023 .set_offset(0)
3024 .set_payload(span(kData).first(16))));
3025 transfer_thread_.WaitUntilEventIsProcessed();
3026
3027 ASSERT_EQ(ctx_.total_responses(), 3u);
3028
3029 // Window size grows to 2 chunks on successful receipt.
3030 chunk = DecodeChunk(ctx_.responses().back());
3031 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3032 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
3033 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3034 EXPECT_EQ(chunk.offset(), 16u);
3035 EXPECT_EQ(chunk.window_end_offset(),
3036 chunk.offset() + 2 * kExpectedMaxChunkSizeBytes);
3037
3038 // Resend data that has already been received.
3039 ctx_.SendClientStream<64>(
3040 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3041 .set_session_id(kArbitrarySessionId)
3042 .set_offset(8)
3043 .set_payload(span(kData).subspan(8, 8))
3044 .set_remaining_bytes(0)));
3045 transfer_thread_.WaitUntilEventIsProcessed();
3046
3047 // The server should respond with CONTINUE parameters rather than requesting
3048 // retransmission and starting a recovery cycle. However, the window size
3049 // should shrink in response to the retried data.
3050 ASSERT_EQ(ctx_.total_responses(), 4u);
3051 chunk = DecodeChunk(ctx_.responses().back());
3052 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
3053 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3054 EXPECT_EQ(chunk.offset(), 16u);
3055 EXPECT_EQ(chunk.window_end_offset(),
3056 chunk.offset() + kExpectedMaxChunkSizeBytes);
3057
3058 // Send the expected chunk.
3059 ctx_.SendClientStream<64>(
3060 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3061 .set_session_id(kArbitrarySessionId)
3062 .set_offset(16)
3063 .set_payload(span(kData).subspan(16))
3064 .set_remaining_bytes(0)));
3065 transfer_thread_.WaitUntilEventIsProcessed();
3066
3067 ASSERT_EQ(ctx_.total_responses(), 5u);
3068 chunk = DecodeChunk(ctx_.responses().back());
3069 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3070 ASSERT_TRUE(chunk.status().has_value());
3071 EXPECT_EQ(chunk.status().value(), OkStatus());
3072
3073 EXPECT_TRUE(handler_.finalize_write_called);
3074 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
3075 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
3076
3077 ctx_.SendClientStream<64>(EncodeChunk(
3078 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
3079 .set_session_id(kArbitrarySessionId)));
3080 transfer_thread_.WaitUntilEventIsProcessed();
3081 ASSERT_EQ(ctx_.total_responses(), 5u);
3082 }
3083
3084 class WriteTransferMultibyteVarintChunkSize : public ::testing::Test {
3085 protected:
WriteTransferMultibyteVarintChunkSize()3086 WriteTransferMultibyteVarintChunkSize()
3087 : buffer{},
3088 handler_(7, buffer),
3089 transfer_thread_(chunk_buffer_, encode_buffer_),
3090 system_thread_(TransferThreadOptions(), transfer_thread_),
3091 ctx_(transfer_thread_,
3092 kData256.size(),
3093 // Use a long timeout to avoid accidentally triggering timeouts.
3094 std::chrono::minutes(1),
3095 /*max_retries=*/3) {
3096 ctx_.service().RegisterHandler(handler_);
3097
3098 PW_CHECK(!handler_.prepare_write_called);
3099 PW_CHECK(!handler_.finalize_write_called);
3100
3101 ctx_.call(); // Open the write stream
3102 transfer_thread_.WaitUntilEventIsProcessed();
3103 }
3104
~WriteTransferMultibyteVarintChunkSize()3105 ~WriteTransferMultibyteVarintChunkSize() override {
3106 transfer_thread_.Terminate();
3107 system_thread_.join();
3108 }
3109
3110 static constexpr auto kData256 =
__anonbff93a0d0e02(size_t i) 3111 bytes::Initialized<256>([](size_t i) { return i; });
3112
3113 std::array<std::byte, kData256.size()> buffer;
3114 SimpleWriteTransfer handler_;
3115
3116 Thread<1, 1> transfer_thread_;
3117 pw::Thread system_thread_;
3118 std::array<std::byte, kData256.size()> chunk_buffer_;
3119 std::array<std::byte, kData256.size()> encode_buffer_;
3120 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write, 10) ctx_;
3121 };
3122
TEST_F(WriteTransferMultibyteVarintChunkSize,AssumesMinimumWindowSizeInFirstParameters)3123 TEST_F(WriteTransferMultibyteVarintChunkSize,
3124 AssumesMinimumWindowSizeInFirstParameters) {
3125 ctx_.SendClientStream(
3126 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
3127 .set_desired_session_id(kArbitrarySessionId)
3128 .set_resource_id(7)));
3129
3130 transfer_thread_.WaitUntilEventIsProcessed();
3131
3132 EXPECT_TRUE(handler_.prepare_write_called);
3133 EXPECT_FALSE(handler_.finalize_write_called);
3134
3135 // First, the server responds with a START_ACK, accepting the session ID and
3136 // confirming the protocol version.
3137 ASSERT_EQ(ctx_.total_responses(), 1u);
3138 Chunk chunk = DecodeChunk(ctx_.responses().back());
3139 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3140 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
3141 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3142 EXPECT_EQ(chunk.resource_id(), 7u);
3143
3144 // Complete the handshake by confirming the server's ACK.
3145 ctx_.SendClientStream(EncodeChunk(
3146 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
3147 .set_session_id(kArbitrarySessionId)));
3148 transfer_thread_.WaitUntilEventIsProcessed();
3149
3150 // Server should respond by sending its initial transfer parameters.
3151 ASSERT_EQ(ctx_.total_responses(), 2u);
3152
3153 constexpr uint32_t kExpectedMaxChunkSize = 226;
3154
3155 chunk = DecodeChunk(ctx_.responses()[1]);
3156 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3157 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
3158 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3159 EXPECT_EQ(chunk.offset(), 0u);
3160 EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
3161 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
3162 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSize);
3163
3164 // Simulate a timeout, prompting the server to recalculate its transfer
3165 // parameters. The max chunk size and window size should not change.
3166 transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
3167 ASSERT_EQ(ctx_.total_responses(), 3u);
3168
3169 chunk = DecodeChunk(ctx_.responses().back());
3170 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3171 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
3172 EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3173 EXPECT_EQ(chunk.offset(), 0u);
3174 EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
3175 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
3176 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSize);
3177 }
3178
3179 class ReadTransferWithStats : public SimpleReadTransfer {
3180 public:
3181 using SimpleReadTransfer::SimpleReadTransfer;
3182
GetStatus(uint64_t & readable_offset,uint64_t & writeable_offset,uint64_t & read_checksum,uint64_t & write_checksum)3183 Status GetStatus(uint64_t& readable_offset,
3184 uint64_t& writeable_offset,
3185 uint64_t& read_checksum,
3186 uint64_t& write_checksum) override {
3187 readable_offset = 64;
3188 read_checksum = 0xffee;
3189 writeable_offset = 128;
3190 write_checksum = 0xeeff;
3191 return OkStatus();
3192 }
3193 };
3194
3195 class GetResourceStatus : public ::testing::Test {
3196 protected:
GetResourceStatus(size_t max_chunk_size_bytes=64)3197 GetResourceStatus(size_t max_chunk_size_bytes = 64)
3198 : handler_(3, kData),
3199 transfer_thread_(span(data_buffer_).first(max_chunk_size_bytes),
3200 encode_buffer_),
3201 ctx_(transfer_thread_,
3202 64,
3203 // Use a long timeout to avoid accidentally triggering timeouts.
3204 std::chrono::minutes(1)),
3205 system_thread_(TransferThreadOptions(), transfer_thread_) {
3206 ctx_.service().RegisterHandler(handler_);
3207
3208 PW_CHECK(!handler_.prepare_read_called);
3209 PW_CHECK(!handler_.finalize_read_called);
3210 }
3211
~GetResourceStatus()3212 ~GetResourceStatus() override {
3213 transfer_thread_.Terminate();
3214 system_thread_.join();
3215 }
3216
3217 ReadTransferWithStats handler_;
3218 Thread<1, 1> transfer_thread_;
3219 PW_RAW_TEST_METHOD_CONTEXT(TransferService, GetResourceStatus) ctx_;
3220 pw::Thread system_thread_;
3221 std::array<std::byte, 64> data_buffer_;
3222 std::array<std::byte, 64> encode_buffer_;
3223 };
3224
TEST_F(GetResourceStatus,ValidResourceId_ReturnsStatus)3225 TEST_F(GetResourceStatus, ValidResourceId_ReturnsStatus) {
3226 pwpb::ResourceStatusRequest::MemoryEncoder encoder(encode_buffer_);
3227 ASSERT_EQ(encoder.Write({.resource_id = 3}), OkStatus());
3228 ctx_.call(encoder);
3229
3230 transfer_thread_.WaitUntilEventIsProcessed();
3231
3232 ASSERT_EQ(ctx_.status(), OkStatus());
3233
3234 stream::MemoryReader reader(ctx_.response());
3235 pwpb::ResourceStatus::StreamDecoder decoder(reader);
3236 pwpb::ResourceStatus::Message response;
3237 ASSERT_EQ(decoder.Read(response), OkStatus());
3238
3239 EXPECT_EQ(response.resource_id, 3u);
3240 EXPECT_EQ(response.readable_offset, 64u);
3241 EXPECT_EQ(response.read_checksum, 0xffee);
3242 EXPECT_EQ(response.writeable_offset, 128u);
3243 EXPECT_EQ(response.write_checksum, 0xeeff);
3244 }
3245
TEST_F(GetResourceStatus,InvalidResourceId_ReturnsNotFound)3246 TEST_F(GetResourceStatus, InvalidResourceId_ReturnsNotFound) {
3247 pwpb::ResourceStatusRequest::MemoryEncoder encoder(encode_buffer_);
3248 ASSERT_EQ(encoder.Write({.resource_id = 27}), OkStatus());
3249 ctx_.call(encoder);
3250
3251 transfer_thread_.WaitUntilEventIsProcessed();
3252
3253 ASSERT_EQ(ctx_.status(), Status::NotFound());
3254
3255 stream::MemoryReader reader(ctx_.response());
3256 pwpb::ResourceStatus::StreamDecoder decoder(reader);
3257 pwpb::ResourceStatus::Message response;
3258 ASSERT_EQ(decoder.Read(response), OkStatus());
3259
3260 EXPECT_EQ(response.resource_id, 27u);
3261 }
3262
3263 } // namespace
3264 } // namespace pw::transfer::test
3265