xref: /aosp_15_r20/external/pigweed/pw_transfer/transfer_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_transfer/transfer.h"
16 
17 #include <limits>
18 
19 #include "pw_assert/check.h"
20 #include "pw_bytes/array.h"
21 #include "pw_containers/algorithm.h"
22 #include "pw_rpc/raw/test_method_context.h"
23 #include "pw_rpc/test_helpers.h"
24 #include "pw_thread/thread.h"
25 #include "pw_thread_stl/options.h"
26 #include "pw_transfer/internal/config.h"
27 #include "pw_transfer/transfer.pwpb.h"
28 #include "pw_transfer_private/chunk_testing.h"
29 #include "pw_unit_test/framework.h"
30 
31 namespace pw::transfer::test {
32 namespace {
33 
34 using namespace std::chrono_literals;
35 
36 // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()37 thread::Options& TransferThreadOptions() {
38   static thread::stl::Options options;
39   return options;
40 }
41 
42 using internal::Chunk;
43 
44 class TestMemoryReader : public stream::SeekableReader {
45  public:
TestMemoryReader(span<const std::byte> data)46   constexpr TestMemoryReader(span<const std::byte> data)
47       : memory_reader_(data) {}
48 
DoSeek(ptrdiff_t offset,Whence origin)49   Status DoSeek(ptrdiff_t offset, Whence origin) override {
50     if (seek_status.ok()) {
51       return memory_reader_.Seek(offset, origin);
52     }
53     return seek_status;
54   }
55 
DoRead(ByteSpan dest)56   StatusWithSize DoRead(ByteSpan dest) final {
57     if (!read_status.ok()) {
58       return StatusWithSize(read_status, 0);
59     }
60 
61     auto result = memory_reader_.Read(dest);
62     return result.ok() ? StatusWithSize(result->size())
63                        : StatusWithSize(result.status(), 0);
64   }
65 
66   Status seek_status;
67   Status read_status;
68 
69  private:
70   stream::MemoryReader memory_reader_;
71 };
72 
73 class SimpleReadTransfer : public ReadOnlyHandler {
74  public:
SimpleReadTransfer(uint32_t session_id,ConstByteSpan data)75   SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
76       : ReadOnlyHandler(session_id),
77         prepare_read_called(false),
78         finalize_read_called(false),
79         finalize_read_status(Status::Unknown()),
80         resource_size_(std::numeric_limits<size_t>::max()),
81         reader_(data) {}
82 
PrepareRead()83   Status PrepareRead() final {
84     prepare_read_called = true;
85 
86     if (!prepare_read_return_status.ok()) {
87       return prepare_read_return_status;
88     }
89 
90     EXPECT_EQ(reader_.seek_status, reader_.Seek(0));
91     set_reader(reader_);
92     return OkStatus();
93   }
94 
FinalizeRead(Status status)95   void FinalizeRead(Status status) final {
96     finalize_read_called = true;
97     finalize_read_status = status;
98   }
99 
ResourceSize() const100   size_t ResourceSize() const final { return resource_size_; }
101 
set_resource_size(size_t resource_size)102   void set_resource_size(size_t resource_size) {
103     resource_size_ = resource_size;
104   }
set_seek_status(Status status)105   void set_seek_status(Status status) { reader_.seek_status = status; }
set_read_status(Status status)106   void set_read_status(Status status) { reader_.read_status = status; }
107 
108   bool prepare_read_called;
109   bool finalize_read_called;
110   Status prepare_read_return_status;
111   Status finalize_read_status;
112   size_t resource_size_;
113 
114  private:
115   TestMemoryReader reader_;
116 };
117 
__anonbff93a0d0202(size_t i) 118 constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
119 constexpr uint32_t kArbitrarySessionId = 123;
120 
121 class ReadTransfer : public ::testing::Test {
122  protected:
ReadTransfer(size_t max_chunk_size_bytes=64)123   ReadTransfer(size_t max_chunk_size_bytes = 64)
124       : handler_(3, kData),
125         transfer_thread_(span(data_buffer_).first(max_chunk_size_bytes),
126                          encode_buffer_),
127         ctx_(transfer_thread_,
128              64,
129              // Use a long timeout to avoid accidentally triggering timeouts.
130              std::chrono::minutes(1)),
131         system_thread_(TransferThreadOptions(), transfer_thread_) {
132     ctx_.service().RegisterHandler(handler_);
133 
134     PW_CHECK(!handler_.prepare_read_called);
135     PW_CHECK(!handler_.finalize_read_called);
136 
137     ctx_.call();  // Open the read stream
138     transfer_thread_.WaitUntilEventIsProcessed();
139   }
140 
~ReadTransfer()141   ~ReadTransfer() override {
142     transfer_thread_.Terminate();
143     system_thread_.join();
144   }
145 
146   SimpleReadTransfer handler_;
147   Thread<1, 1> transfer_thread_;
148   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read, 8) ctx_;
149   pw::Thread system_thread_;
150   std::array<std::byte, 64> data_buffer_;
151   std::array<std::byte, 64> encode_buffer_;
152 };
153 
TEST_F(ReadTransfer,SingleChunk)154 TEST_F(ReadTransfer, SingleChunk) {
155   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
156     ctx_.SendClientStream(
157         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
158                         .set_session_id(3)
159                         .set_window_end_offset(64)
160                         .set_offset(0)));
161 
162     transfer_thread_.WaitUntilEventIsProcessed();
163   });
164 
165   EXPECT_TRUE(handler_.prepare_read_called);
166   EXPECT_FALSE(handler_.finalize_read_called);
167 
168   ASSERT_EQ(ctx_.total_responses(), 2u);
169   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
170   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
171 
172   // First chunk should have all the read data.
173   EXPECT_EQ(c0.session_id(), 3u);
174   EXPECT_EQ(c0.offset(), 0u);
175   ASSERT_EQ(c0.payload().size(), kData.size());
176   EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
177             0);
178 
179   // Second chunk should be empty and set remaining_bytes = 0.
180   EXPECT_EQ(c1.session_id(), 3u);
181   EXPECT_FALSE(c1.has_payload());
182   ASSERT_TRUE(c1.remaining_bytes().has_value());
183   EXPECT_EQ(c1.remaining_bytes().value(), 0u);
184 
185   ctx_.SendClientStream(
186       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
187   transfer_thread_.WaitUntilEventIsProcessed();
188 
189   EXPECT_TRUE(handler_.finalize_read_called);
190   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
191 }
192 
TEST_F(ReadTransfer,MultiChunk)193 TEST_F(ReadTransfer, MultiChunk) {
194   ctx_.SendClientStream(
195       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
196                       .set_session_id(3)
197                       .set_window_end_offset(16)
198                       .set_offset(0)));
199 
200   transfer_thread_.WaitUntilEventIsProcessed();
201 
202   EXPECT_TRUE(handler_.prepare_read_called);
203   EXPECT_FALSE(handler_.finalize_read_called);
204 
205   ASSERT_EQ(ctx_.total_responses(), 1u);
206   Chunk c0 = DecodeChunk(ctx_.responses().back());
207 
208   EXPECT_EQ(c0.session_id(), 3u);
209   EXPECT_EQ(c0.offset(), 0u);
210   ASSERT_EQ(c0.payload().size(), 16u);
211   EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
212             0);
213 
214   ctx_.SendClientStream(EncodeChunk(
215       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
216           .set_session_id(3)
217           .set_window_end_offset(32)
218           .set_offset(16)));
219   transfer_thread_.WaitUntilEventIsProcessed();
220 
221   ASSERT_EQ(ctx_.total_responses(), 2u);
222   Chunk c1 = DecodeChunk(ctx_.responses().back());
223 
224   EXPECT_EQ(c1.session_id(), 3u);
225   EXPECT_EQ(c1.offset(), 16u);
226   ASSERT_EQ(c1.payload().size(), 16u);
227   EXPECT_EQ(
228       std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
229       0);
230 
231   ctx_.SendClientStream(EncodeChunk(
232       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
233           .set_session_id(3)
234           .set_window_end_offset(48)
235           .set_offset(32)));
236   transfer_thread_.WaitUntilEventIsProcessed();
237 
238   ASSERT_EQ(ctx_.total_responses(), 3u);
239   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
240 
241   EXPECT_EQ(c2.session_id(), 3u);
242   EXPECT_FALSE(c2.has_payload());
243   ASSERT_TRUE(c2.remaining_bytes().has_value());
244   EXPECT_EQ(c2.remaining_bytes().value(), 0u);
245 
246   ctx_.SendClientStream(
247       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
248   transfer_thread_.WaitUntilEventIsProcessed();
249 
250   EXPECT_TRUE(handler_.finalize_read_called);
251   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
252 }
253 
TEST_F(ReadTransfer,MultiChunk_RepeatedContinuePackets)254 TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
255   ctx_.SendClientStream(
256       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
257                       .set_session_id(3)
258                       .set_window_end_offset(16)
259                       .set_offset(0)));
260 
261   transfer_thread_.WaitUntilEventIsProcessed();
262 
263   const auto continue_chunk = EncodeChunk(
264       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
265           .set_session_id(3)
266           .set_window_end_offset(24)
267           .set_offset(16));
268   ctx_.SendClientStream(continue_chunk);
269 
270   transfer_thread_.WaitUntilEventIsProcessed();
271 
272   // Resend the CONTINUE packets that don't actually advance the window.
273   for (int i = 0; i < 3; ++i) {
274     ctx_.SendClientStream(continue_chunk);
275     transfer_thread_.WaitUntilEventIsProcessed();
276   }
277 
278   ASSERT_EQ(ctx_.total_responses(), 2u);  // Only sent one packet
279   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
280 
281   EXPECT_EQ(c1.session_id(), 3u);
282   EXPECT_EQ(c1.offset(), 16u);
283   ASSERT_EQ(c1.payload().size(), 8u);
284   EXPECT_EQ(
285       std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
286       0);
287 }
288 
TEST_F(ReadTransfer,OutOfOrder_SeekingSupported)289 TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
290   rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
291     ctx_.SendClientStream(
292         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
293                         .set_session_id(3)
294                         .set_window_end_offset(16)
295                         .set_offset(0)));
296 
297     transfer_thread_.WaitUntilEventIsProcessed();
298 
299     Chunk chunk = DecodeChunk(ctx_.responses().back());
300     EXPECT_TRUE(pw::containers::Equal(span(kData).first(16), chunk.payload()));
301 
302     ctx_.SendClientStream(EncodeChunk(
303         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
304             .set_session_id(3)
305             .set_window_end_offset(10)
306             .set_offset(2)));
307 
308     transfer_thread_.WaitUntilEventIsProcessed();
309 
310     chunk = DecodeChunk(ctx_.responses().back());
311     EXPECT_TRUE(
312         pw::containers::Equal(span(kData).subspan(2, 8), chunk.payload()));
313 
314     ctx_.SendClientStream(EncodeChunk(
315         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
316             .set_session_id(3)
317             .set_window_end_offset(64)
318             .set_offset(17)));
319   });
320 
321   ASSERT_EQ(ctx_.total_responses(), 4u);
322   Chunk chunk = DecodeChunk(ctx_.responses()[2]);
323   EXPECT_TRUE(pw::containers::Equal(
324       span(&kData[17], kData.data() + kData.size()), chunk.payload()));
325 }
326 
TEST_F(ReadTransfer,OutOfOrder_SeekingNotSupported_EndsWithUnimplemented)327 TEST_F(ReadTransfer, OutOfOrder_SeekingNotSupported_EndsWithUnimplemented) {
328   handler_.set_seek_status(Status::Unimplemented());
329 
330   ctx_.SendClientStream(
331       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
332                       .set_session_id(3)
333                       .set_window_end_offset(16)
334                       .set_offset(0)));
335   ctx_.SendClientStream(EncodeChunk(
336       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
337           .set_session_id(3)
338           .set_window_end_offset(10)
339           .set_offset(2)));
340 
341   transfer_thread_.WaitUntilEventIsProcessed();
342 
343   ASSERT_EQ(ctx_.total_responses(), 2u);
344   Chunk chunk = DecodeChunk(ctx_.responses().back());
345   ASSERT_TRUE(chunk.status().has_value());
346   EXPECT_EQ(chunk.status().value(), Status::Unimplemented());
347 }
348 
TEST_F(ReadTransfer,MaxChunkSize_Client)349 TEST_F(ReadTransfer, MaxChunkSize_Client) {
350   rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
351     ctx_.SendClientStream(
352         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
353                         .set_session_id(3)
354                         .set_window_end_offset(64)
355                         .set_max_chunk_size_bytes(8)
356                         .set_offset(0)));
357   });
358 
359   EXPECT_TRUE(handler_.prepare_read_called);
360   EXPECT_FALSE(handler_.finalize_read_called);
361 
362   ASSERT_EQ(ctx_.total_responses(), 5u);
363   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
364   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
365   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
366   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
367   Chunk c4 = DecodeChunk(ctx_.responses()[4]);
368 
369   EXPECT_EQ(c0.session_id(), 3u);
370   EXPECT_EQ(c0.offset(), 0u);
371   ASSERT_EQ(c0.payload().size(), 8u);
372   EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
373             0);
374 
375   EXPECT_EQ(c1.session_id(), 3u);
376   EXPECT_EQ(c1.offset(), 8u);
377   ASSERT_EQ(c1.payload().size(), 8u);
378   EXPECT_EQ(
379       std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
380       0);
381 
382   EXPECT_EQ(c2.session_id(), 3u);
383   EXPECT_EQ(c2.offset(), 16u);
384   ASSERT_EQ(c2.payload().size(), 8u);
385   EXPECT_EQ(
386       std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
387       0);
388 
389   EXPECT_EQ(c3.session_id(), 3u);
390   EXPECT_EQ(c3.offset(), 24u);
391   ASSERT_EQ(c3.payload().size(), 8u);
392   EXPECT_EQ(
393       std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
394       0);
395 
396   EXPECT_EQ(c4.session_id(), 3u);
397   EXPECT_EQ(c4.payload().size(), 0u);
398   ASSERT_TRUE(c4.remaining_bytes().has_value());
399   EXPECT_EQ(c4.remaining_bytes().value(), 0u);
400 
401   ctx_.SendClientStream(
402       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
403   transfer_thread_.WaitUntilEventIsProcessed();
404 
405   EXPECT_TRUE(handler_.finalize_read_called);
406   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
407 }
408 
TEST_F(ReadTransfer,HandlerIsClearedAfterTransfer)409 TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
410   // Request an end offset smaller than the data size to prevent the server
411   // from sending a final chunk.
412   ctx_.SendClientStream(
413       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
414                       .set_session_id(3)
415                       .set_window_end_offset(16)
416                       .set_offset(0)));
417   ctx_.SendClientStream(
418       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
419   transfer_thread_.WaitUntilEventIsProcessed();
420 
421   ASSERT_EQ(ctx_.total_responses(), 1u);
422   ASSERT_TRUE(handler_.prepare_read_called);
423   ASSERT_TRUE(handler_.finalize_read_called);
424   ASSERT_EQ(OkStatus(), handler_.finalize_read_status);
425 
426   // Now, clear state and start a second transfer
427   handler_.prepare_read_return_status = Status::FailedPrecondition();
428   handler_.prepare_read_called = false;
429   handler_.finalize_read_called = false;
430 
431   // Request an end offset smaller than the data size to prevent the server
432   // from sending a final chunk.
433   ctx_.SendClientStream(
434       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
435                       .set_session_id(3)
436                       .set_window_end_offset(16)
437                       .set_offset(0)));
438   transfer_thread_.WaitUntilEventIsProcessed();
439 
440   // Prepare failed, so the handler should not have been stored in the context,
441   // and finalize should not have been called.
442   ASSERT_TRUE(handler_.prepare_read_called);
443   ASSERT_FALSE(handler_.finalize_read_called);
444 }
445 
446 class ReadTransferMaxChunkSize8 : public ReadTransfer {
447  protected:
ReadTransferMaxChunkSize8()448   ReadTransferMaxChunkSize8() : ReadTransfer(/*max_chunk_size_bytes=*/8) {}
449 };
450 
TEST_F(ReadTransferMaxChunkSize8,MaxChunkSize_Server)451 TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
452   // Client asks for max 16-byte chunks, but service places a limit of 8 bytes.
453   // TODO(frolv): Fix
454   rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
455     ctx_.SendClientStream(
456         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
457                         .set_session_id(3)
458                         .set_window_end_offset(64)
459                         // .set_max_chunk_size_bytes(16)
460                         .set_offset(0)));
461   });
462 
463   EXPECT_TRUE(handler_.prepare_read_called);
464   EXPECT_FALSE(handler_.finalize_read_called);
465 
466   ASSERT_EQ(ctx_.total_responses(), 5u);
467   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
468   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
469   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
470   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
471   Chunk c4 = DecodeChunk(ctx_.responses()[4]);
472 
473   EXPECT_EQ(c0.session_id(), 3u);
474   EXPECT_EQ(c0.offset(), 0u);
475   ASSERT_EQ(c0.payload().size(), 8u);
476   EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
477             0);
478 
479   EXPECT_EQ(c1.session_id(), 3u);
480   EXPECT_EQ(c1.offset(), 8u);
481   ASSERT_EQ(c1.payload().size(), 8u);
482   EXPECT_EQ(
483       std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
484       0);
485 
486   EXPECT_EQ(c2.session_id(), 3u);
487   EXPECT_EQ(c2.offset(), 16u);
488   ASSERT_EQ(c2.payload().size(), 8u);
489   EXPECT_EQ(
490       std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
491       0);
492 
493   EXPECT_EQ(c3.session_id(), 3u);
494   EXPECT_EQ(c3.offset(), 24u);
495   ASSERT_EQ(c3.payload().size(), 8u);
496   EXPECT_EQ(
497       std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
498       0);
499 
500   EXPECT_EQ(c4.session_id(), 3u);
501   EXPECT_EQ(c4.payload().size(), 0u);
502   ASSERT_TRUE(c4.remaining_bytes().has_value());
503   EXPECT_EQ(c4.remaining_bytes().value(), 0u);
504 
505   ctx_.SendClientStream(
506       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
507   transfer_thread_.WaitUntilEventIsProcessed();
508 
509   EXPECT_TRUE(handler_.finalize_read_called);
510   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
511 }
512 
TEST_F(ReadTransfer,ClientError)513 TEST_F(ReadTransfer, ClientError) {
514   ctx_.SendClientStream(
515       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
516                       .set_session_id(3)
517                       .set_window_end_offset(16)
518                       .set_offset(0)));
519 
520   transfer_thread_.WaitUntilEventIsProcessed();
521 
522   EXPECT_TRUE(handler_.prepare_read_called);
523   EXPECT_FALSE(handler_.finalize_read_called);
524   ASSERT_EQ(ctx_.total_responses(), 1u);
525 
526   // Send client error.
527   ctx_.SendClientStream(EncodeChunk(
528       Chunk::Final(ProtocolVersion::kLegacy, 3, Status::OutOfRange())));
529   transfer_thread_.WaitUntilEventIsProcessed();
530 
531   ASSERT_EQ(ctx_.total_responses(), 1u);
532   EXPECT_TRUE(handler_.finalize_read_called);
533   EXPECT_EQ(handler_.finalize_read_status, Status::OutOfRange());
534 }
535 
TEST_F(ReadTransfer,UnregisteredHandler)536 TEST_F(ReadTransfer, UnregisteredHandler) {
537   ctx_.SendClientStream(
538       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
539                       .set_session_id(11)
540                       .set_window_end_offset(32)
541                       .set_offset(0)));
542   transfer_thread_.WaitUntilEventIsProcessed();
543 
544   ASSERT_EQ(ctx_.total_responses(), 1u);
545   Chunk chunk = DecodeChunk(ctx_.responses().back());
546   EXPECT_EQ(chunk.session_id(), 11u);
547   ASSERT_TRUE(chunk.status().has_value());
548   EXPECT_EQ(chunk.status().value(), Status::NotFound());
549 }
550 
TEST_F(ReadTransfer,IgnoresNonPendingTransfers)551 TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
552   ctx_.SendClientStream(EncodeChunk(
553       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
554           .set_session_id(3)
555           .set_window_end_offset(32)
556           .set_offset(3)));
557   ctx_.SendClientStream(
558       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
559                       .set_session_id(3)
560                       .set_payload(span(kData).first(10))
561                       .set_offset(3)));
562   ctx_.SendClientStream(
563       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
564   transfer_thread_.WaitUntilEventIsProcessed();
565 
566   // Only start transfer for an initial packet.
567   EXPECT_FALSE(handler_.prepare_read_called);
568   EXPECT_FALSE(handler_.finalize_read_called);
569 }
570 
TEST_F(ReadTransfer,AbortAndRestartIfInitialPacketIsReceived)571 TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
572   ctx_.SendClientStream(
573       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
574                       .set_session_id(3)
575                       .set_window_end_offset(16)
576                       .set_offset(0)));
577   transfer_thread_.WaitUntilEventIsProcessed();
578 
579   ASSERT_EQ(ctx_.total_responses(), 1u);
580 
581   EXPECT_TRUE(handler_.prepare_read_called);
582   EXPECT_FALSE(handler_.finalize_read_called);
583   handler_.prepare_read_called = false;  // Reset so can check if called again.
584 
585   ctx_.SendClientStream(  // Resend starting chunk
586       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
587                       .set_session_id(3)
588                       .set_window_end_offset(16)
589                       .set_offset(0)));
590   transfer_thread_.WaitUntilEventIsProcessed();
591 
592   ASSERT_EQ(ctx_.total_responses(), 2u);
593 
594   EXPECT_TRUE(handler_.prepare_read_called);
595   EXPECT_TRUE(handler_.finalize_read_called);
596   EXPECT_EQ(handler_.finalize_read_status, Status::Aborted());
597   handler_.finalize_read_called = false;  // Reset so can check later
598 
599   ctx_.SendClientStream(EncodeChunk(
600       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
601           .set_session_id(3)
602           .set_window_end_offset(32)
603           .set_offset(16)));
604   ctx_.SendClientStream(
605       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
606   transfer_thread_.WaitUntilEventIsProcessed();
607 
608   ASSERT_EQ(ctx_.total_responses(), 3u);
609   EXPECT_TRUE(handler_.finalize_read_called);
610   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
611 }
612 
TEST_F(ReadTransfer,ZeroPendingBytesWithRemainingData_Aborts)613 TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
614   ctx_.SendClientStream(
615       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
616                       .set_session_id(3)
617                       .set_window_end_offset(0)
618                       .set_offset(0)));
619   transfer_thread_.WaitUntilEventIsProcessed();
620 
621   ASSERT_EQ(ctx_.total_responses(), 1u);
622   ASSERT_TRUE(handler_.finalize_read_called);
623   EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
624 
625   Chunk chunk = DecodeChunk(ctx_.responses().back());
626   EXPECT_EQ(chunk.status(), Status::ResourceExhausted());
627 }
628 
TEST_F(ReadTransfer,ZeroPendingBytesNoRemainingData_Completes)629 TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
630   // Make the next read appear to be the end of the stream.
631   handler_.set_read_status(Status::OutOfRange());
632 
633   ctx_.SendClientStream(
634       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
635                       .set_session_id(3)
636                       .set_window_end_offset(0)
637                       .set_offset(0)));
638   transfer_thread_.WaitUntilEventIsProcessed();
639 
640   Chunk chunk = DecodeChunk(ctx_.responses().back());
641   EXPECT_EQ(chunk.session_id(), 3u);
642   EXPECT_EQ(chunk.remaining_bytes(), 0u);
643 
644   ctx_.SendClientStream(
645       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
646   transfer_thread_.WaitUntilEventIsProcessed();
647 
648   ASSERT_EQ(ctx_.total_responses(), 1u);
649   ASSERT_TRUE(handler_.finalize_read_called);
650   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
651 }
652 
TEST_F(ReadTransfer,SendsErrorIfChunkIsReceivedInCompletedState)653 TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
654   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
655     ctx_.SendClientStream(
656         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
657                         .set_session_id(3)
658                         .set_window_end_offset(64)
659                         .set_offset(0)));
660   });
661 
662   EXPECT_TRUE(handler_.prepare_read_called);
663   EXPECT_FALSE(handler_.finalize_read_called);
664 
665   ASSERT_EQ(ctx_.total_responses(), 2u);
666   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
667   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
668 
669   // First chunk should have all the read data.
670   EXPECT_EQ(c0.session_id(), 3u);
671   EXPECT_EQ(c0.offset(), 0u);
672   ASSERT_EQ(c0.payload().size(), kData.size());
673   EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
674             0);
675 
676   // Second chunk should be empty and set remaining_bytes = 0.
677   EXPECT_EQ(c1.session_id(), 3u);
678   EXPECT_FALSE(c1.has_payload());
679   ASSERT_TRUE(c1.remaining_bytes().has_value());
680   EXPECT_EQ(c1.remaining_bytes().value(), 0u);
681 
682   ctx_.SendClientStream(
683       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
684   transfer_thread_.WaitUntilEventIsProcessed();
685 
686   EXPECT_TRUE(handler_.finalize_read_called);
687   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
688 
689   // At this point the transfer should be in a completed state. Send a
690   // non-initial chunk as a continuation of the transfer.
691   handler_.finalize_read_called = false;
692 
693   ctx_.SendClientStream(EncodeChunk(
694       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
695           .set_session_id(3)
696           .set_window_end_offset(64)
697           .set_offset(16)));
698   transfer_thread_.WaitUntilEventIsProcessed();
699 
700   ASSERT_EQ(ctx_.total_responses(), 3u);
701 
702   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
703   ASSERT_TRUE(c2.status().has_value());
704   EXPECT_EQ(c2.status().value(), Status::FailedPrecondition());
705 
706   // FinalizeRead should not be called again.
707   EXPECT_FALSE(handler_.finalize_read_called);
708 }
709 
710 class SimpleWriteTransfer final : public WriteOnlyHandler {
711  public:
SimpleWriteTransfer(uint32_t session_id,ByteSpan data)712   SimpleWriteTransfer(uint32_t session_id, ByteSpan data)
713       : WriteOnlyHandler(session_id),
714         prepare_write_called(false),
715         finalize_write_called(false),
716         finalize_write_status(Status::Unknown()),
717         writer_(data) {}
718 
PrepareWrite()719   Status PrepareWrite() final {
720     EXPECT_EQ(OkStatus(), writer_.Seek(0));
721     set_writer(writer_);
722     prepare_write_called = true;
723     return OkStatus();
724   }
725 
FinalizeWrite(Status status)726   Status FinalizeWrite(Status status) final {
727     finalize_write_called = true;
728     finalize_write_status = status;
729     return finalize_write_return_status_;
730   }
731 
set_finalize_write_return(Status status)732   void set_finalize_write_return(Status status) {
733     finalize_write_return_status_ = status;
734   }
735 
736   bool prepare_write_called;
737   bool finalize_write_called;
738   Status finalize_write_status;
739 
740  private:
741   Status finalize_write_return_status_;
742   stream::MemoryWriter writer_;
743 };
744 
745 class WriteTransfer : public ::testing::Test {
746  protected:
WriteTransfer(size_t max_bytes_to_receive=64)747   WriteTransfer(size_t max_bytes_to_receive = 64)
748       : buffer{},
749         handler_(7, buffer),
750         transfer_thread_(data_buffer_, encode_buffer_),
751         system_thread_(TransferThreadOptions(), transfer_thread_),
752         ctx_(transfer_thread_,
753              max_bytes_to_receive,
754              // Use a long timeout to avoid accidentally triggering timeouts.
755              std::chrono::minutes(1),
756              /*max_retries=*/3) {
757     ctx_.service().RegisterHandler(handler_);
758 
759     PW_CHECK(!handler_.prepare_write_called);
760     PW_CHECK(!handler_.finalize_write_called);
761 
762     ctx_.call();  // Open the write stream
763     transfer_thread_.WaitUntilEventIsProcessed();
764   }
765 
~WriteTransfer()766   ~WriteTransfer() override {
767     transfer_thread_.Terminate();
768     system_thread_.join();
769   }
770 
771   std::array<std::byte, kData.size()> buffer;
772   SimpleWriteTransfer handler_;
773 
774   Thread<1, 1> transfer_thread_;
775   pw::Thread system_thread_;
776   std::array<std::byte, 64> data_buffer_;
777   std::array<std::byte, 64> encode_buffer_;
778   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx_;
779 };
780 
TEST_F(WriteTransfer,SingleChunk)781 TEST_F(WriteTransfer, SingleChunk) {
782   ctx_.SendClientStream(EncodeChunk(
783       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
784   transfer_thread_.WaitUntilEventIsProcessed();
785 
786   EXPECT_TRUE(handler_.prepare_write_called);
787   EXPECT_FALSE(handler_.finalize_write_called);
788 
789   ASSERT_EQ(ctx_.total_responses(), 1u);
790   Chunk chunk = DecodeChunk(ctx_.responses().back());
791   EXPECT_EQ(chunk.session_id(), 7u);
792   EXPECT_EQ(chunk.window_end_offset(), 32u);
793   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
794   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
795 
796   ctx_.SendClientStream<64>(
797       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
798                       .set_session_id(7)
799                       .set_offset(0)
800                       .set_payload(kData)
801                       .set_remaining_bytes(0)));
802   transfer_thread_.WaitUntilEventIsProcessed();
803 
804   ASSERT_EQ(ctx_.total_responses(), 2u);
805   chunk = DecodeChunk(ctx_.responses().back());
806   EXPECT_EQ(chunk.session_id(), 7u);
807   ASSERT_TRUE(chunk.status().has_value());
808   EXPECT_EQ(chunk.status().value(), OkStatus());
809 
810   EXPECT_TRUE(handler_.finalize_write_called);
811   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
812   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
813 }
814 
TEST_F(WriteTransfer,FinalizeFails)815 TEST_F(WriteTransfer, FinalizeFails) {
816   // Return an error when FinalizeWrite is called.
817   handler_.set_finalize_write_return(Status::FailedPrecondition());
818 
819   ctx_.SendClientStream(EncodeChunk(
820       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
821   ctx_.SendClientStream<64>(
822       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
823                       .set_session_id(7)
824                       .set_offset(0)
825                       .set_payload(kData)
826                       .set_remaining_bytes(0)));
827   transfer_thread_.WaitUntilEventIsProcessed();
828 
829   ASSERT_EQ(ctx_.total_responses(), 2u);
830   Chunk chunk = DecodeChunk(ctx_.responses()[1]);
831   EXPECT_EQ(chunk.session_id(), 7u);
832   ASSERT_TRUE(chunk.status().has_value());
833   EXPECT_EQ(chunk.status().value(), Status::DataLoss());
834 
835   EXPECT_TRUE(handler_.finalize_write_called);
836   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
837 }
838 
TEST_F(WriteTransfer,SendingFinalPacketFails)839 TEST_F(WriteTransfer, SendingFinalPacketFails) {
840   ctx_.SendClientStream(EncodeChunk(
841       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
842   transfer_thread_.WaitUntilEventIsProcessed();
843 
844   ctx_.output().set_send_status(Status::Unknown());
845 
846   ctx_.SendClientStream<64>(
847       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
848                       .set_session_id(7)
849                       .set_offset(0)
850                       .set_payload(kData)
851                       .set_remaining_bytes(0)));
852   transfer_thread_.WaitUntilEventIsProcessed();
853 
854   // Should only have sent the transfer parameters.
855   ASSERT_EQ(ctx_.total_responses(), 1u);
856   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
857   EXPECT_EQ(chunk.session_id(), 7u);
858   EXPECT_EQ(chunk.window_end_offset(), 32u);
859   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
860   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
861 
862   // When FinalizeWrite() was called, the transfer was considered successful.
863   EXPECT_TRUE(handler_.finalize_write_called);
864   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
865 }
866 
TEST_F(WriteTransfer,MultiChunk)867 TEST_F(WriteTransfer, MultiChunk) {
868   ctx_.SendClientStream(EncodeChunk(
869       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
870   transfer_thread_.WaitUntilEventIsProcessed();
871 
872   EXPECT_TRUE(handler_.prepare_write_called);
873   EXPECT_FALSE(handler_.finalize_write_called);
874 
875   ASSERT_EQ(ctx_.total_responses(), 1u);
876   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
877   EXPECT_EQ(chunk.session_id(), 7u);
878   EXPECT_EQ(chunk.window_end_offset(), 32u);
879 
880   ctx_.SendClientStream<64>(
881       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
882                       .set_session_id(7)
883                       .set_offset(0)
884                       .set_payload(span(kData).first(8))));
885   transfer_thread_.WaitUntilEventIsProcessed();
886 
887   ASSERT_EQ(ctx_.total_responses(), 1u);
888 
889   ctx_.SendClientStream<64>(
890       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
891                       .set_session_id(7)
892                       .set_offset(8)
893                       .set_payload(span(kData).subspan(8))
894                       .set_remaining_bytes(0)));
895   transfer_thread_.WaitUntilEventIsProcessed();
896 
897   ASSERT_EQ(ctx_.total_responses(), 2u);
898   chunk = DecodeChunk(ctx_.responses().back());
899   EXPECT_EQ(chunk.session_id(), 7u);
900   ASSERT_TRUE(chunk.status().has_value());
901   EXPECT_EQ(chunk.status().value(), OkStatus());
902 
903   EXPECT_TRUE(handler_.finalize_write_called);
904   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
905   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
906 }
907 
TEST_F(WriteTransfer,WriteFailsOnRetry)908 TEST_F(WriteTransfer, WriteFailsOnRetry) {
909   // Skip one packet to fail on a retry.
910   ctx_.output().set_send_status(Status::FailedPrecondition(), 1);
911 
912   // Wait for 3 packets: initial params, retry attempt, final error
913   rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
914     // Send only one client packet so the service times out.
915     ctx_.SendClientStream(
916         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
917                         .set_session_id(7)));
918     transfer_thread_.SimulateServerTimeout(7);  // Time out to trigger retry
919   });
920 
921   // Attempted to send 3 packets, but the 2nd packet was dropped.
922   // Check that the last packet is an INTERNAL error from the RPC write failure.
923   ASSERT_EQ(ctx_.total_responses(), 2u);
924   Chunk chunk = DecodeChunk(ctx_.responses()[1]);
925   EXPECT_EQ(chunk.session_id(), 7u);
926   ASSERT_TRUE(chunk.status().has_value());
927   EXPECT_EQ(chunk.status().value(), Status::Internal());
928 }
929 
TEST_F(WriteTransfer,TimeoutInRecoveryState)930 TEST_F(WriteTransfer, TimeoutInRecoveryState) {
931   ctx_.SendClientStream(EncodeChunk(
932       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
933   transfer_thread_.WaitUntilEventIsProcessed();
934 
935   ASSERT_EQ(ctx_.total_responses(), 1u);
936   Chunk chunk = DecodeChunk(ctx_.responses().back());
937   EXPECT_EQ(chunk.session_id(), 7u);
938   EXPECT_EQ(chunk.offset(), 0u);
939   EXPECT_EQ(chunk.window_end_offset(), 32u);
940 
941   constexpr span data(kData);
942 
943   ctx_.SendClientStream(
944       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
945                       .set_session_id(7)
946                       .set_offset(0)
947                       .set_payload(data.first(8))));
948 
949   // Skip offset 8 to enter a recovery state.
950   ctx_.SendClientStream(
951       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
952                       .set_session_id(7)
953                       .set_offset(12)
954                       .set_payload(data.subspan(12, 4))));
955   transfer_thread_.WaitUntilEventIsProcessed();
956 
957   // Recovery parameters should be sent for offset 8.
958   ASSERT_EQ(ctx_.total_responses(), 2u);
959   chunk = DecodeChunk(ctx_.responses().back());
960   EXPECT_EQ(chunk.session_id(), 7u);
961   EXPECT_EQ(chunk.offset(), 8u);
962   EXPECT_EQ(chunk.window_end_offset(), 32u);
963 
964   // Timeout while in the recovery state.
965   transfer_thread_.SimulateServerTimeout(7);
966   transfer_thread_.WaitUntilEventIsProcessed();
967 
968   // Same recovery parameters should be re-sent.
969   ASSERT_EQ(ctx_.total_responses(), 3u);
970   chunk = DecodeChunk(ctx_.responses().back());
971   EXPECT_EQ(chunk.session_id(), 7u);
972   EXPECT_EQ(chunk.offset(), 8u);
973   EXPECT_EQ(chunk.window_end_offset(), 32u);
974 }
975 
TEST_F(WriteTransfer,ExtendWindow)976 TEST_F(WriteTransfer, ExtendWindow) {
977   ctx_.SendClientStream(EncodeChunk(
978       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
979   transfer_thread_.WaitUntilEventIsProcessed();
980 
981   EXPECT_TRUE(handler_.prepare_write_called);
982   EXPECT_FALSE(handler_.finalize_write_called);
983 
984   ASSERT_EQ(ctx_.total_responses(), 1u);
985   Chunk chunk = DecodeChunk(ctx_.responses().back());
986   EXPECT_EQ(chunk.session_id(), 7u);
987   EXPECT_EQ(chunk.window_end_offset(), 32u);
988 
989   // Window starts at 32 bytes and should extend when half of that is sent.
990   ctx_.SendClientStream(
991       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
992                       .set_session_id(7)
993                       .set_offset(0)
994                       .set_payload(span(kData).first(4))));
995   transfer_thread_.WaitUntilEventIsProcessed();
996   ASSERT_EQ(ctx_.total_responses(), 1u);
997 
998   ctx_.SendClientStream(
999       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1000                       .set_session_id(7)
1001                       .set_offset(4)
1002                       .set_payload(span(kData).subspan(4, 4))));
1003   transfer_thread_.WaitUntilEventIsProcessed();
1004   ASSERT_EQ(ctx_.total_responses(), 1u);
1005 
1006   ctx_.SendClientStream(
1007       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1008                       .set_session_id(7)
1009                       .set_offset(8)
1010                       .set_payload(span(kData).subspan(8, 4))));
1011   transfer_thread_.WaitUntilEventIsProcessed();
1012   ASSERT_EQ(ctx_.total_responses(), 1u);
1013 
1014   ctx_.SendClientStream(
1015       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1016                       .set_session_id(7)
1017                       .set_offset(12)
1018                       .set_payload(span(kData).subspan(12, 4))));
1019   transfer_thread_.WaitUntilEventIsProcessed();
1020   ASSERT_EQ(ctx_.total_responses(), 2u);
1021 
1022   // Extend parameters chunk.
1023   chunk = DecodeChunk(ctx_.responses().back());
1024   EXPECT_EQ(chunk.session_id(), 7u);
1025   EXPECT_EQ(chunk.window_end_offset(), 32u);
1026   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
1027 
1028   ctx_.SendClientStream<64>(
1029       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1030                       .set_session_id(7)
1031                       .set_offset(16)
1032                       .set_payload(span(kData).subspan(16))
1033                       .set_remaining_bytes(0)));
1034   transfer_thread_.WaitUntilEventIsProcessed();
1035 
1036   ASSERT_EQ(ctx_.total_responses(), 3u);
1037   chunk = DecodeChunk(ctx_.responses()[2]);
1038   EXPECT_EQ(chunk.session_id(), 7u);
1039   ASSERT_TRUE(chunk.status().has_value());
1040   EXPECT_EQ(chunk.status().value(), OkStatus());
1041 
1042   EXPECT_TRUE(handler_.finalize_write_called);
1043   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1044   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1045 }
1046 
1047 class WriteTransferMaxBytes16 : public WriteTransfer {
1048  protected:
WriteTransferMaxBytes16()1049   WriteTransferMaxBytes16() : WriteTransfer(/*max_bytes_to_receive=*/16) {}
1050 };
1051 
TEST_F(WriteTransfer,TransmitterReducesWindow)1052 TEST_F(WriteTransfer, TransmitterReducesWindow) {
1053   ctx_.SendClientStream(EncodeChunk(
1054       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1055   transfer_thread_.WaitUntilEventIsProcessed();
1056 
1057   EXPECT_TRUE(handler_.prepare_write_called);
1058   EXPECT_FALSE(handler_.finalize_write_called);
1059 
1060   ASSERT_EQ(ctx_.total_responses(), 1u);
1061   Chunk chunk = DecodeChunk(ctx_.responses().back());
1062   EXPECT_EQ(chunk.session_id(), 7u);
1063   EXPECT_EQ(chunk.window_end_offset(), 32u);
1064 
1065   // Send only 12 bytes and set that as the new end offset.
1066   ctx_.SendClientStream<64>(
1067       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1068                       .set_session_id(7)
1069                       .set_offset(0)
1070                       .set_window_end_offset(12)
1071                       .set_payload(span(kData).first(12))));
1072   transfer_thread_.WaitUntilEventIsProcessed();
1073   ASSERT_EQ(ctx_.total_responses(), 2u);
1074 
1075   // Receiver should respond immediately with a continue chunk as the end of
1076   // the window has been reached.
1077   chunk = DecodeChunk(ctx_.responses().back());
1078   EXPECT_EQ(chunk.session_id(), 7u);
1079   EXPECT_EQ(chunk.offset(), 12u);
1080   EXPECT_EQ(chunk.window_end_offset(), 32u);
1081   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
1082 }
1083 
TEST_F(WriteTransfer,TransmitterExtendsWindow_TerminatesWithInvalid)1084 TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
1085   ctx_.SendClientStream(EncodeChunk(
1086       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1087   transfer_thread_.WaitUntilEventIsProcessed();
1088 
1089   EXPECT_TRUE(handler_.prepare_write_called);
1090   EXPECT_FALSE(handler_.finalize_write_called);
1091 
1092   ASSERT_EQ(ctx_.total_responses(), 1u);
1093   Chunk chunk = DecodeChunk(ctx_.responses().back());
1094   EXPECT_EQ(chunk.session_id(), 7u);
1095   EXPECT_EQ(chunk.window_end_offset(), 32u);
1096 
1097   ctx_.SendClientStream<64>(
1098       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1099                       .set_session_id(7)
1100                       .set_offset(0)
1101                       // Larger window end offset than the receiver's.
1102                       .set_window_end_offset(48)
1103                       .set_payload(span(kData).first(16))));
1104   transfer_thread_.WaitUntilEventIsProcessed();
1105   ASSERT_EQ(ctx_.total_responses(), 2u);
1106 
1107   chunk = DecodeChunk(ctx_.responses().back());
1108   EXPECT_EQ(chunk.session_id(), 7u);
1109   ASSERT_TRUE(chunk.status().has_value());
1110   EXPECT_EQ(chunk.status().value(), Status::Internal());
1111 }
1112 
TEST_F(WriteTransferMaxBytes16,MultipleParameters)1113 TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
1114   ctx_.SendClientStream(EncodeChunk(
1115       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1116   transfer_thread_.WaitUntilEventIsProcessed();
1117 
1118   EXPECT_TRUE(handler_.prepare_write_called);
1119   EXPECT_FALSE(handler_.finalize_write_called);
1120 
1121   ASSERT_EQ(ctx_.total_responses(), 1u);
1122   Chunk chunk = DecodeChunk(ctx_.responses().back());
1123   EXPECT_EQ(chunk.session_id(), 7u);
1124   EXPECT_EQ(chunk.window_end_offset(), 16u);
1125 
1126   ctx_.SendClientStream<64>(
1127       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1128                       .set_session_id(7)
1129                       .set_offset(0)
1130                       .set_payload(span(kData).first(8))));
1131   transfer_thread_.WaitUntilEventIsProcessed();
1132 
1133   ASSERT_EQ(ctx_.total_responses(), 2u);
1134   chunk = DecodeChunk(ctx_.responses().back());
1135   EXPECT_EQ(chunk.session_id(), 7u);
1136   EXPECT_EQ(chunk.offset(), 8u);
1137   EXPECT_EQ(chunk.window_end_offset(), 24u);
1138 
1139   ctx_.SendClientStream<64>(
1140       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1141                       .set_session_id(7)
1142                       .set_offset(8)
1143                       .set_payload(span(kData).subspan(8, 8))));
1144   transfer_thread_.WaitUntilEventIsProcessed();
1145 
1146   ASSERT_EQ(ctx_.total_responses(), 3u);
1147   chunk = DecodeChunk(ctx_.responses().back());
1148   EXPECT_EQ(chunk.session_id(), 7u);
1149   EXPECT_EQ(chunk.offset(), 16u);
1150   EXPECT_EQ(chunk.window_end_offset(), 32u);
1151 
1152   ctx_.SendClientStream<64>(
1153       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1154                       .set_session_id(7)
1155                       .set_offset(16)
1156                       .set_payload(span(kData).subspan(16, 8))));
1157   transfer_thread_.WaitUntilEventIsProcessed();
1158 
1159   ASSERT_EQ(ctx_.total_responses(), 4u);
1160   chunk = DecodeChunk(ctx_.responses().back());
1161   EXPECT_EQ(chunk.session_id(), 7u);
1162   EXPECT_EQ(chunk.offset(), 24u);
1163   EXPECT_EQ(chunk.window_end_offset(), 32u);
1164 
1165   ctx_.SendClientStream<64>(
1166       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1167                       .set_session_id(7)
1168                       .set_offset(24)
1169                       .set_payload(span(kData).subspan(24))
1170                       .set_remaining_bytes(0)));
1171   transfer_thread_.WaitUntilEventIsProcessed();
1172 
1173   ASSERT_EQ(ctx_.total_responses(), 5u);
1174   chunk = DecodeChunk(ctx_.responses().back());
1175   EXPECT_EQ(chunk.session_id(), 7u);
1176   ASSERT_TRUE(chunk.status().has_value());
1177   EXPECT_EQ(chunk.status().value(), OkStatus());
1178 
1179   EXPECT_TRUE(handler_.finalize_write_called);
1180   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1181   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1182 }
1183 
TEST_F(WriteTransferMaxBytes16,SetsDefaultWindowEndOffset)1184 TEST_F(WriteTransferMaxBytes16, SetsDefaultWindowEndOffset) {
1185   // Default max bytes is smaller than buffer.
1186   ctx_.SendClientStream(EncodeChunk(
1187       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1188   transfer_thread_.WaitUntilEventIsProcessed();
1189 
1190   ASSERT_EQ(ctx_.total_responses(), 1u);
1191   Chunk chunk = DecodeChunk(ctx_.responses().back());
1192   EXPECT_EQ(chunk.session_id(), 7u);
1193   EXPECT_EQ(chunk.window_end_offset(), 16u);
1194 }
1195 
TEST_F(WriteTransfer,SetsWriterWindowEndOffset)1196 TEST_F(WriteTransfer, SetsWriterWindowEndOffset) {
1197   // Buffer is smaller than constructor's default max bytes.
1198   std::array<std::byte, 8> small_buffer = {};
1199 
1200   SimpleWriteTransfer handler_(987, small_buffer);
1201   ctx_.service().RegisterHandler(handler_);
1202 
1203   ctx_.SendClientStream(
1204       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1205                       .set_session_id(987)));
1206   transfer_thread_.WaitUntilEventIsProcessed();
1207 
1208   ASSERT_EQ(ctx_.total_responses(), 1u);
1209   Chunk chunk = DecodeChunk(ctx_.responses().back());
1210   EXPECT_EQ(chunk.session_id(), 987u);
1211   EXPECT_EQ(chunk.window_end_offset(), 8u);
1212 
1213   ctx_.service().UnregisterHandler(handler_);
1214 }
1215 
TEST_F(WriteTransfer,UnexpectedOffset)1216 TEST_F(WriteTransfer, UnexpectedOffset) {
1217   ctx_.SendClientStream(EncodeChunk(
1218       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1219   transfer_thread_.WaitUntilEventIsProcessed();
1220 
1221   EXPECT_TRUE(handler_.prepare_write_called);
1222   EXPECT_FALSE(handler_.finalize_write_called);
1223 
1224   ASSERT_EQ(ctx_.total_responses(), 1u);
1225   Chunk chunk = DecodeChunk(ctx_.responses().back());
1226   EXPECT_EQ(chunk.session_id(), 7u);
1227   EXPECT_EQ(chunk.offset(), 0u);
1228   EXPECT_EQ(chunk.window_end_offset(), 32u);
1229 
1230   ctx_.SendClientStream<64>(
1231       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1232                       .set_session_id(7)
1233                       .set_offset(0)
1234                       .set_payload(span(kData).first(8))));
1235   transfer_thread_.WaitUntilEventIsProcessed();
1236 
1237   ASSERT_EQ(ctx_.total_responses(), 1u);
1238 
1239   ctx_.SendClientStream<64>(
1240       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1241                       .set_session_id(7)
1242                       .set_offset(4)  // incorrect
1243                       .set_payload(span(kData).subspan(8))
1244                       .set_remaining_bytes(0)));
1245   transfer_thread_.WaitUntilEventIsProcessed();
1246 
1247   ASSERT_EQ(ctx_.total_responses(), 2u);
1248   chunk = DecodeChunk(ctx_.responses().back());
1249   EXPECT_EQ(chunk.session_id(), 7u);
1250   EXPECT_EQ(chunk.offset(), 8u);
1251   EXPECT_EQ(chunk.window_end_offset(), 32u);
1252 
1253   ctx_.SendClientStream<64>(
1254       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1255                       .set_session_id(7)
1256                       .set_offset(8)  // correct
1257                       .set_payload(span(kData).subspan(8))
1258                       .set_remaining_bytes(0)));
1259   transfer_thread_.WaitUntilEventIsProcessed();
1260 
1261   ASSERT_EQ(ctx_.total_responses(), 3u);
1262   chunk = DecodeChunk(ctx_.responses().back());
1263   EXPECT_EQ(chunk.session_id(), 7u);
1264   ASSERT_TRUE(chunk.status().has_value());
1265   EXPECT_EQ(chunk.status().value(), OkStatus());
1266 
1267   EXPECT_TRUE(handler_.finalize_write_called);
1268   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1269   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1270 }
1271 
TEST_F(WriteTransferMaxBytes16,TooMuchData_EntersRecovery)1272 TEST_F(WriteTransferMaxBytes16, TooMuchData_EntersRecovery) {
1273   ctx_.SendClientStream(EncodeChunk(
1274       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1275   transfer_thread_.WaitUntilEventIsProcessed();
1276 
1277   EXPECT_TRUE(handler_.prepare_write_called);
1278   EXPECT_FALSE(handler_.finalize_write_called);
1279 
1280   ASSERT_EQ(ctx_.total_responses(), 1u);
1281   Chunk chunk = DecodeChunk(ctx_.responses().back());
1282   EXPECT_EQ(chunk.session_id(), 7u);
1283   EXPECT_EQ(chunk.window_end_offset(), 16u);
1284 
1285   // window_end_offset = 16, but send 24 bytes of data.
1286   ctx_.SendClientStream<64>(
1287       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1288                       .set_session_id(7)
1289                       .set_offset(0)
1290                       .set_payload(span(kData).first(24))));
1291   transfer_thread_.WaitUntilEventIsProcessed();
1292 
1293   // Transfer should resend a parameters chunk.
1294   ASSERT_EQ(ctx_.total_responses(), 2u);
1295   chunk = DecodeChunk(ctx_.responses().back());
1296   EXPECT_EQ(chunk.session_id(), 7u);
1297   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
1298   EXPECT_EQ(chunk.offset(), 0u);
1299   EXPECT_EQ(chunk.window_end_offset(), 16u);
1300 }
1301 
TEST_F(WriteTransfer,UnregisteredHandler)1302 TEST_F(WriteTransfer, UnregisteredHandler) {
1303   ctx_.SendClientStream(
1304       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1305                       .set_session_id(999)));
1306   transfer_thread_.WaitUntilEventIsProcessed();
1307 
1308   ASSERT_EQ(ctx_.total_responses(), 1u);
1309   Chunk chunk = DecodeChunk(ctx_.responses().back());
1310   EXPECT_EQ(chunk.session_id(), 999u);
1311   ASSERT_TRUE(chunk.status().has_value());
1312   EXPECT_EQ(chunk.status().value(), Status::NotFound());
1313 }
1314 
TEST_F(WriteTransfer,ClientError)1315 TEST_F(WriteTransfer, ClientError) {
1316   ctx_.SendClientStream(EncodeChunk(
1317       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1318   transfer_thread_.WaitUntilEventIsProcessed();
1319 
1320   EXPECT_TRUE(handler_.prepare_write_called);
1321   EXPECT_FALSE(handler_.finalize_write_called);
1322 
1323   ASSERT_EQ(ctx_.total_responses(), 1u);
1324   Chunk chunk = DecodeChunk(ctx_.responses().back());
1325   EXPECT_EQ(chunk.session_id(), 7u);
1326   EXPECT_EQ(chunk.window_end_offset(), 32u);
1327 
1328   ctx_.SendClientStream<64>(EncodeChunk(
1329       Chunk::Final(ProtocolVersion::kLegacy, 7, Status::DataLoss())));
1330   transfer_thread_.WaitUntilEventIsProcessed();
1331 
1332   EXPECT_EQ(ctx_.total_responses(), 1u);
1333 
1334   EXPECT_TRUE(handler_.finalize_write_called);
1335   EXPECT_EQ(handler_.finalize_write_status, Status::DataLoss());
1336 }
1337 
TEST_F(WriteTransfer,OnlySendParametersUpdateOnceAfterDrop)1338 TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
1339   ctx_.SendClientStream(EncodeChunk(
1340       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1341   transfer_thread_.WaitUntilEventIsProcessed();
1342 
1343   ASSERT_EQ(ctx_.total_responses(), 1u);
1344 
1345   constexpr span data(kData);
1346   ctx_.SendClientStream<64>(
1347       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1348                       .set_session_id(7)
1349                       .set_offset(0)
1350                       .set_payload(data.first(1))));
1351 
1352   // Drop offset 1, then send the rest of the data.
1353   for (uint32_t i = 2; i < kData.size(); ++i) {
1354     ctx_.SendClientStream<64>(
1355         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1356                         .set_session_id(7)
1357                         .set_offset(i)
1358                         .set_payload(data.subspan(i, 1))));
1359   }
1360 
1361   transfer_thread_.WaitUntilEventIsProcessed();
1362 
1363   ASSERT_EQ(ctx_.total_responses(), 2u);
1364   Chunk chunk = DecodeChunk(ctx_.responses().back());
1365   EXPECT_EQ(chunk.session_id(), 7u);
1366   EXPECT_EQ(chunk.offset(), 1u);
1367 
1368   // Send the remaining data.
1369   ctx_.SendClientStream<64>(
1370       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1371                       .set_session_id(7)
1372                       .set_offset(1)
1373                       .set_payload(data.subspan(1, 31))
1374                       .set_remaining_bytes(0)));
1375   transfer_thread_.WaitUntilEventIsProcessed();
1376 
1377   EXPECT_TRUE(handler_.finalize_write_called);
1378   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1379 }
1380 
TEST_F(WriteTransfer,ResendParametersIfSentRepeatedChunkDuringRecovery)1381 TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
1382   ctx_.SendClientStream(EncodeChunk(
1383       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1384   transfer_thread_.WaitUntilEventIsProcessed();
1385 
1386   ASSERT_EQ(ctx_.total_responses(), 1u);
1387 
1388   constexpr span data(kData);
1389 
1390   // Skip offset 0, then send the rest of the data.
1391   for (uint32_t i = 1; i < kData.size(); ++i) {
1392     ctx_.SendClientStream<64>(
1393         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1394                         .set_session_id(7)
1395                         .set_offset(i)
1396                         .set_payload(data.subspan(i, 1))));
1397   }
1398 
1399   transfer_thread_.WaitUntilEventIsProcessed();
1400 
1401   ASSERT_EQ(ctx_.total_responses(), 2u);  // Resent transfer parameters once.
1402 
1403   const auto last_chunk =
1404       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1405                       .set_session_id(7)
1406                       .set_offset(kData.size() - 1)
1407                       .set_payload(data.last(1)));
1408   ctx_.SendClientStream<64>(last_chunk);
1409   transfer_thread_.WaitUntilEventIsProcessed();
1410 
1411   // Resent transfer parameters since the packet is repeated
1412   ASSERT_EQ(ctx_.total_responses(), 3u);
1413 
1414   ctx_.SendClientStream<64>(last_chunk);
1415   transfer_thread_.WaitUntilEventIsProcessed();
1416 
1417   ASSERT_EQ(ctx_.total_responses(), 4u);
1418 
1419   Chunk chunk = DecodeChunk(ctx_.responses().back());
1420   EXPECT_EQ(chunk.session_id(), 7u);
1421   EXPECT_EQ(chunk.offset(), 0u);
1422   EXPECT_EQ(chunk.window_end_offset(), 32u);
1423 
1424   // Resumes normal operation when correct offset is sent.
1425   ctx_.SendClientStream<64>(
1426       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1427                       .set_session_id(7)
1428                       .set_offset(0)
1429                       .set_payload(kData)
1430                       .set_remaining_bytes(0)));
1431   transfer_thread_.WaitUntilEventIsProcessed();
1432 
1433   EXPECT_TRUE(handler_.finalize_write_called);
1434   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1435 }
1436 
TEST_F(WriteTransfer,ResendsStatusIfClientRetriesAfterStatusChunk)1437 TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
1438   ctx_.SendClientStream(EncodeChunk(
1439       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1440   transfer_thread_.WaitUntilEventIsProcessed();
1441 
1442   ASSERT_EQ(ctx_.total_responses(), 1u);
1443 
1444   ctx_.SendClientStream<64>(
1445       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1446                       .set_session_id(7)
1447                       .set_offset(0)
1448                       .set_payload(kData)
1449                       .set_remaining_bytes(0)));
1450   transfer_thread_.WaitUntilEventIsProcessed();
1451 
1452   ASSERT_EQ(ctx_.total_responses(), 2u);
1453   Chunk chunk = DecodeChunk(ctx_.responses().back());
1454   ASSERT_TRUE(chunk.status().has_value());
1455   EXPECT_EQ(chunk.status().value(), OkStatus());
1456 
1457   ctx_.SendClientStream<64>(
1458       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1459                       .set_session_id(7)
1460                       .set_offset(0)
1461                       .set_payload(kData)
1462                       .set_remaining_bytes(0)));
1463   transfer_thread_.WaitUntilEventIsProcessed();
1464 
1465   ASSERT_EQ(ctx_.total_responses(), 3u);
1466   chunk = DecodeChunk(ctx_.responses().back());
1467   ASSERT_TRUE(chunk.status().has_value());
1468   EXPECT_EQ(chunk.status().value(), OkStatus());
1469 }
1470 
TEST_F(WriteTransfer,IgnoresNonPendingTransfers)1471 TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
1472   ctx_.SendClientStream<64>(
1473       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1474                       .set_session_id(7)
1475                       .set_offset(3)));
1476   ctx_.SendClientStream<64>(
1477       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1478                       .set_session_id(7)
1479                       .set_offset(0)
1480                       .set_payload(span(kData).first(10))
1481                       .set_remaining_bytes(0)));
1482 
1483   transfer_thread_.WaitUntilEventIsProcessed();
1484 
1485   // Only start transfer for initial packet.
1486   EXPECT_FALSE(handler_.prepare_write_called);
1487   EXPECT_FALSE(handler_.finalize_write_called);
1488 }
1489 
TEST_F(WriteTransfer,AbortAndRestartIfInitialPacketIsReceived)1490 TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
1491   ctx_.SendClientStream(EncodeChunk(
1492       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1493   transfer_thread_.WaitUntilEventIsProcessed();
1494 
1495   ASSERT_EQ(ctx_.total_responses(), 1u);
1496 
1497   ctx_.SendClientStream<64>(
1498       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1499                       .set_session_id(7)
1500                       .set_offset(0)
1501                       .set_payload(span(kData).first(8))));
1502   transfer_thread_.WaitUntilEventIsProcessed();
1503 
1504   ASSERT_EQ(ctx_.total_responses(), 1u);
1505 
1506   ASSERT_TRUE(handler_.prepare_write_called);
1507   ASSERT_FALSE(handler_.finalize_write_called);
1508   handler_.prepare_write_called = false;  // Reset to check it's called again.
1509 
1510   // Simulate client disappearing then restarting the transfer.
1511   ctx_.SendClientStream(EncodeChunk(
1512       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1513   transfer_thread_.WaitUntilEventIsProcessed();
1514 
1515   EXPECT_TRUE(handler_.prepare_write_called);
1516   EXPECT_TRUE(handler_.finalize_write_called);
1517   EXPECT_EQ(handler_.finalize_write_status, Status::Aborted());
1518 
1519   handler_.finalize_write_called = false;  // Reset to check it's called again.
1520 
1521   ASSERT_EQ(ctx_.total_responses(), 2u);
1522 
1523   ctx_.SendClientStream<64>(
1524       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1525                       .set_session_id(7)
1526                       .set_offset(0)
1527                       .set_payload(kData)
1528                       .set_remaining_bytes(0)));
1529   transfer_thread_.WaitUntilEventIsProcessed();
1530 
1531   ASSERT_EQ(ctx_.total_responses(), 3u);
1532 
1533   EXPECT_TRUE(handler_.finalize_write_called);
1534   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1535   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1536 }
1537 
1538 class SometimesUnavailableReadHandler final : public ReadOnlyHandler {
1539  public:
SometimesUnavailableReadHandler(uint32_t session_id,ConstByteSpan data)1540   SometimesUnavailableReadHandler(uint32_t session_id, ConstByteSpan data)
1541       : ReadOnlyHandler(session_id), reader_(data), call_count_(0) {}
1542 
PrepareRead()1543   Status PrepareRead() final {
1544     if ((call_count_++ % 2) == 0) {
1545       return Status::Unavailable();
1546     }
1547 
1548     set_reader(reader_);
1549     return OkStatus();
1550   }
1551 
1552  private:
1553   stream::MemoryReader reader_;
1554   int call_count_;
1555 };
1556 
TEST_F(ReadTransfer,PrepareError)1557 TEST_F(ReadTransfer, PrepareError) {
1558   SometimesUnavailableReadHandler unavailable_handler(88, kData);
1559   ctx_.service().RegisterHandler(unavailable_handler);
1560 
1561   ctx_.SendClientStream(
1562       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1563                       .set_session_id(88)
1564                       .set_window_end_offset(128)
1565                       .set_offset(0)));
1566   transfer_thread_.WaitUntilEventIsProcessed();
1567 
1568   ASSERT_EQ(ctx_.total_responses(), 1u);
1569   Chunk chunk = DecodeChunk(ctx_.responses().back());
1570   EXPECT_EQ(chunk.session_id(), 88u);
1571   ASSERT_TRUE(chunk.status().has_value());
1572   EXPECT_EQ(chunk.status().value(), Status::DataLoss());
1573 
1574   // Try starting the transfer again. It should work this time.
1575   // TODO(frolv): This won't work until completion ACKs are supported.
1576   if (false) {
1577     ctx_.SendClientStream(
1578         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1579                         .set_session_id(88)
1580                         .set_window_end_offset(128)
1581                         .set_offset(0)));
1582     transfer_thread_.WaitUntilEventIsProcessed();
1583 
1584     ASSERT_EQ(ctx_.total_responses(), 2u);
1585     chunk = DecodeChunk(ctx_.responses().back());
1586     EXPECT_EQ(chunk.session_id(), 88u);
1587     ASSERT_EQ(chunk.payload().size(), kData.size());
1588     EXPECT_EQ(std::memcmp(
1589                   chunk.payload().data(), kData.data(), chunk.payload().size()),
1590               0);
1591   }
1592 }
1593 
TEST_F(WriteTransferMaxBytes16,Service_SetMaxPendingBytes)1594 TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
1595   ctx_.SendClientStream(EncodeChunk(
1596       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1597   transfer_thread_.WaitUntilEventIsProcessed();
1598 
1599   EXPECT_TRUE(handler_.prepare_write_called);
1600   EXPECT_FALSE(handler_.finalize_write_called);
1601 
1602   // First parameters chunk has the default window end offset of 16.
1603   ASSERT_EQ(ctx_.total_responses(), 1u);
1604   Chunk chunk = DecodeChunk(ctx_.responses().back());
1605   EXPECT_EQ(chunk.session_id(), 7u);
1606   EXPECT_EQ(chunk.window_end_offset(), 16u);
1607 
1608   // Update the max window size.
1609   ctx_.service().set_max_window_size_bytes(12);
1610 
1611   ctx_.SendClientStream<64>(
1612       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1613                       .set_session_id(7)
1614                       .set_offset(0)
1615                       .set_payload(span(kData).first(8))));
1616   transfer_thread_.WaitUntilEventIsProcessed();
1617 
1618   // Second parameters chunk should use the new max pending bytes.
1619   ASSERT_EQ(ctx_.total_responses(), 2u);
1620   chunk = DecodeChunk(ctx_.responses().back());
1621   EXPECT_EQ(chunk.session_id(), 7u);
1622   EXPECT_EQ(chunk.offset(), 8u);
1623   EXPECT_EQ(chunk.window_end_offset(), 8u + 12u);
1624 }
1625 
TEST_F(ReadTransfer,Version2_SimpleTransfer)1626 TEST_F(ReadTransfer, Version2_SimpleTransfer) {
1627   ctx_.SendClientStream(
1628       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1629                       .set_desired_session_id(kArbitrarySessionId)
1630                       .set_resource_id(3)));
1631 
1632   transfer_thread_.WaitUntilEventIsProcessed();
1633 
1634   EXPECT_TRUE(handler_.prepare_read_called);
1635   EXPECT_FALSE(handler_.finalize_read_called);
1636 
1637   // First, the server responds with a START_ACK, accepting the session ID and
1638   // confirming the protocol version.
1639   ASSERT_EQ(ctx_.total_responses(), 1u);
1640   Chunk chunk = DecodeChunk(ctx_.responses().back());
1641   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1642   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1643   EXPECT_FALSE(chunk.desired_session_id().has_value());
1644   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1645   EXPECT_EQ(chunk.resource_id(), 3u);
1646 
1647   // Complete the handshake by confirming the server's ACK and sending the first
1648   // read transfer parameters.
1649   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
1650     ctx_.SendClientStream(EncodeChunk(
1651         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1652             .set_session_id(kArbitrarySessionId)
1653             .set_window_end_offset(64)
1654             .set_offset(0)));
1655 
1656     transfer_thread_.WaitUntilEventIsProcessed();
1657   });
1658 
1659   // Server should respond by starting the data transfer, sending its sole data
1660   // chunk and a remaining_bytes 0 chunk.
1661   ASSERT_EQ(ctx_.total_responses(), 3u);
1662 
1663   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1664   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1665   EXPECT_EQ(c1.type(), Chunk::Type::kData);
1666   EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
1667   EXPECT_EQ(c1.offset(), 0u);
1668   ASSERT_TRUE(c1.has_payload());
1669   ASSERT_EQ(c1.payload().size(), kData.size());
1670   EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1671             0);
1672 
1673   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1674   EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1675   EXPECT_EQ(c2.type(), Chunk::Type::kData);
1676   EXPECT_EQ(c2.session_id(), kArbitrarySessionId);
1677   EXPECT_FALSE(c2.has_payload());
1678   EXPECT_EQ(c2.remaining_bytes(), 0u);
1679 
1680   ctx_.SendClientStream(EncodeChunk(Chunk::Final(
1681       ProtocolVersion::kVersionTwo, kArbitrarySessionId, OkStatus())));
1682   transfer_thread_.WaitUntilEventIsProcessed();
1683 
1684   EXPECT_TRUE(handler_.finalize_read_called);
1685   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1686 }
1687 
TEST_F(ReadTransfer,Version2_MultiChunk)1688 TEST_F(ReadTransfer, Version2_MultiChunk) {
1689   ctx_.SendClientStream(
1690       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1691                       .set_desired_session_id(kArbitrarySessionId)
1692                       .set_resource_id(3)));
1693 
1694   transfer_thread_.WaitUntilEventIsProcessed();
1695 
1696   EXPECT_TRUE(handler_.prepare_read_called);
1697   EXPECT_FALSE(handler_.finalize_read_called);
1698 
1699   // First, the server responds with a START_ACK, accepting the session ID and
1700   // confirming the protocol version.
1701   ASSERT_EQ(ctx_.total_responses(), 1u);
1702   Chunk chunk = DecodeChunk(ctx_.responses().back());
1703   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1704   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1705   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1706   EXPECT_EQ(chunk.resource_id(), 3u);
1707 
1708   // Complete the handshake by confirming the server's ACK and sending the first
1709   // read transfer parameters.
1710   rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
1711     ctx_.SendClientStream(EncodeChunk(
1712         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1713             .set_session_id(kArbitrarySessionId)
1714             .set_window_end_offset(64)
1715             .set_max_chunk_size_bytes(16)
1716             .set_offset(0)));
1717 
1718     transfer_thread_.WaitUntilEventIsProcessed();
1719   });
1720 
1721   ASSERT_EQ(ctx_.total_responses(), 4u);
1722 
1723   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1724   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1725   EXPECT_EQ(c1.type(), Chunk::Type::kData);
1726   EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
1727   EXPECT_EQ(c1.offset(), 0u);
1728   ASSERT_TRUE(c1.has_payload());
1729   ASSERT_EQ(c1.payload().size(), 16u);
1730   EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1731             0);
1732 
1733   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1734   EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1735   EXPECT_EQ(c2.type(), Chunk::Type::kData);
1736   EXPECT_EQ(c2.session_id(), kArbitrarySessionId);
1737   EXPECT_EQ(c2.offset(), 16u);
1738   ASSERT_TRUE(c2.has_payload());
1739   ASSERT_EQ(c2.payload().size(), 16u);
1740   EXPECT_EQ(
1741       std::memcmp(
1742           c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
1743       0);
1744 
1745   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
1746   EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
1747   EXPECT_EQ(c3.type(), Chunk::Type::kData);
1748   EXPECT_EQ(c3.session_id(), kArbitrarySessionId);
1749   EXPECT_FALSE(c3.has_payload());
1750   EXPECT_EQ(c3.remaining_bytes(), 0u);
1751 
1752   ctx_.SendClientStream(EncodeChunk(Chunk::Final(
1753       ProtocolVersion::kVersionTwo, kArbitrarySessionId, OkStatus())));
1754   transfer_thread_.WaitUntilEventIsProcessed();
1755 
1756   EXPECT_TRUE(handler_.finalize_read_called);
1757   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1758 }
1759 
TEST_F(ReadTransfer,Version2_MultiParameters)1760 TEST_F(ReadTransfer, Version2_MultiParameters) {
1761   ctx_.SendClientStream(
1762       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1763                       .set_desired_session_id(kArbitrarySessionId)
1764                       .set_resource_id(3)));
1765 
1766   transfer_thread_.WaitUntilEventIsProcessed();
1767 
1768   EXPECT_TRUE(handler_.prepare_read_called);
1769   EXPECT_FALSE(handler_.finalize_read_called);
1770 
1771   // First, the server responds with a START_ACK, accepting the session ID and
1772   // confirming the protocol version.
1773   ASSERT_EQ(ctx_.total_responses(), 1u);
1774   Chunk chunk = DecodeChunk(ctx_.responses().back());
1775   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1776   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1777   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1778   EXPECT_EQ(chunk.resource_id(), 3u);
1779 
1780   // Complete the handshake by confirming the server's ACK and sending the first
1781   // read transfer parameters.
1782   ctx_.SendClientStream(EncodeChunk(
1783       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1784           .set_session_id(kArbitrarySessionId)
1785           .set_window_end_offset(16)
1786           .set_offset(0)));
1787   transfer_thread_.WaitUntilEventIsProcessed();
1788 
1789   ASSERT_EQ(ctx_.total_responses(), 2u);
1790 
1791   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1792   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1793   EXPECT_EQ(c1.type(), Chunk::Type::kData);
1794   EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
1795   EXPECT_EQ(c1.offset(), 0u);
1796   ASSERT_TRUE(c1.has_payload());
1797   ASSERT_EQ(c1.payload().size(), 16u);
1798   EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1799             0);
1800 
1801   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
1802     ctx_.SendClientStream(EncodeChunk(
1803         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersContinue)
1804             .set_session_id(kArbitrarySessionId)
1805             .set_window_end_offset(64)
1806             .set_offset(16)));
1807     transfer_thread_.WaitUntilEventIsProcessed();
1808   });
1809 
1810   ASSERT_EQ(ctx_.total_responses(), 4u);
1811 
1812   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1813   EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1814   EXPECT_EQ(c2.type(), Chunk::Type::kData);
1815   EXPECT_EQ(c2.session_id(), kArbitrarySessionId);
1816   EXPECT_EQ(c2.offset(), 16u);
1817   ASSERT_TRUE(c2.has_payload());
1818   ASSERT_EQ(c2.payload().size(), 16u);
1819   EXPECT_EQ(
1820       std::memcmp(
1821           c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
1822       0);
1823 
1824   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
1825   EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
1826   EXPECT_EQ(c3.type(), Chunk::Type::kData);
1827   EXPECT_EQ(c3.session_id(), kArbitrarySessionId);
1828   EXPECT_FALSE(c3.has_payload());
1829   EXPECT_EQ(c3.remaining_bytes(), 0u);
1830 
1831   ctx_.SendClientStream(EncodeChunk(Chunk::Final(
1832       ProtocolVersion::kVersionTwo, kArbitrarySessionId, OkStatus())));
1833   transfer_thread_.WaitUntilEventIsProcessed();
1834 
1835   EXPECT_TRUE(handler_.finalize_read_called);
1836   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1837 }
1838 
TEST_F(ReadTransfer,Version2_ClientTerminatesDuringHandshake)1839 TEST_F(ReadTransfer, Version2_ClientTerminatesDuringHandshake) {
1840   ctx_.SendClientStream(
1841       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1842                       .set_desired_session_id(kArbitrarySessionId)
1843                       .set_resource_id(3)));
1844 
1845   transfer_thread_.WaitUntilEventIsProcessed();
1846 
1847   EXPECT_TRUE(handler_.prepare_read_called);
1848   EXPECT_FALSE(handler_.finalize_read_called);
1849 
1850   // First, the server responds with a START_ACK, accepting the session ID and
1851   // confirming the protocol version.
1852   ASSERT_EQ(ctx_.total_responses(), 1u);
1853   Chunk chunk = DecodeChunk(ctx_.responses().back());
1854   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1855   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1856   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1857   EXPECT_EQ(chunk.resource_id(), 3u);
1858 
1859   // Send a terminating chunk instead of the third part of the handshake.
1860   ctx_.SendClientStream(EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo,
1861                                                  kArbitrarySessionId,
1862                                                  Status::ResourceExhausted())));
1863   transfer_thread_.WaitUntilEventIsProcessed();
1864 
1865   EXPECT_TRUE(handler_.finalize_read_called);
1866   EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
1867 }
1868 
TEST_F(ReadTransfer,Version2_ClientSendsWrongProtocolVersion)1869 TEST_F(ReadTransfer, Version2_ClientSendsWrongProtocolVersion) {
1870   ctx_.SendClientStream(
1871       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1872                       .set_desired_session_id(kArbitrarySessionId)
1873                       .set_resource_id(3)));
1874 
1875   transfer_thread_.WaitUntilEventIsProcessed();
1876 
1877   EXPECT_TRUE(handler_.prepare_read_called);
1878   EXPECT_FALSE(handler_.finalize_read_called);
1879 
1880   // First, the server responds with a START_ACK, accepting the session ID and
1881   // confirming the protocol version.
1882   ASSERT_EQ(ctx_.total_responses(), 1u);
1883   Chunk chunk = DecodeChunk(ctx_.responses().back());
1884   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1885   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1886   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1887   EXPECT_EQ(chunk.resource_id(), 3u);
1888 
1889   // Complete the handshake by confirming the server's ACK and sending the first
1890   // read transfer parameters.
1891   ctx_.SendClientStream(EncodeChunk(
1892       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1893           .set_session_id(kArbitrarySessionId)
1894           .set_window_end_offset(16)
1895           .set_offset(0)));
1896   transfer_thread_.WaitUntilEventIsProcessed();
1897 
1898   ASSERT_EQ(ctx_.total_responses(), 2u);
1899 
1900   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1901   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1902   EXPECT_EQ(c1.type(), Chunk::Type::kData);
1903   EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
1904   EXPECT_EQ(c1.offset(), 0u);
1905   ASSERT_TRUE(c1.has_payload());
1906   ASSERT_EQ(c1.payload().size(), 16u);
1907   EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1908             0);
1909 
1910   // Send a parameters update, but with the incorrect protocol version. The
1911   // server should terminate the transfer.
1912   ctx_.SendClientStream(EncodeChunk(
1913       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
1914           .set_session_id(3)
1915           .set_window_end_offset(64)
1916           .set_offset(16)));
1917   transfer_thread_.WaitUntilEventIsProcessed();
1918 
1919   ASSERT_EQ(ctx_.total_responses(), 3u);
1920 
1921   chunk = DecodeChunk(ctx_.responses().back());
1922   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1923   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1924   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1925   ASSERT_TRUE(chunk.status().has_value());
1926   EXPECT_EQ(chunk.status().value(), Status::Internal());
1927 
1928   EXPECT_TRUE(handler_.finalize_read_called);
1929   EXPECT_EQ(handler_.finalize_read_status, Status::Internal());
1930 }
1931 
TEST_F(ReadTransfer,Version2_BadParametersInHandshake)1932 TEST_F(ReadTransfer, Version2_BadParametersInHandshake) {
1933   ctx_.SendClientStream(
1934       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1935                       .set_desired_session_id(kArbitrarySessionId)
1936                       .set_resource_id(3)));
1937 
1938   transfer_thread_.WaitUntilEventIsProcessed();
1939 
1940   EXPECT_TRUE(handler_.prepare_read_called);
1941   EXPECT_FALSE(handler_.finalize_read_called);
1942 
1943   ASSERT_EQ(ctx_.total_responses(), 1u);
1944   Chunk chunk = DecodeChunk(ctx_.responses().back());
1945   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1946   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1947   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1948   EXPECT_EQ(chunk.resource_id(), 3u);
1949 
1950   // Complete the handshake, but send an invalid parameters chunk. The server
1951   // should terminate the transfer.
1952   ctx_.SendClientStream(EncodeChunk(
1953       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1954           .set_session_id(kArbitrarySessionId)
1955           .set_window_end_offset(0)
1956           .set_offset(0)));
1957 
1958   transfer_thread_.WaitUntilEventIsProcessed();
1959 
1960   ASSERT_EQ(ctx_.total_responses(), 2u);
1961 
1962   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1963   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1964   EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
1965   EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
1966   ASSERT_TRUE(c1.status().has_value());
1967   EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
1968 }
1969 
TEST_F(ReadTransfer,Version2_InvalidResourceId)1970 TEST_F(ReadTransfer, Version2_InvalidResourceId) {
1971   ctx_.SendClientStream(
1972       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1973                       .set_desired_session_id(kArbitrarySessionId)
1974                       .set_resource_id(99)));
1975 
1976   transfer_thread_.WaitUntilEventIsProcessed();
1977 
1978   ASSERT_EQ(ctx_.total_responses(), 1u);
1979 
1980   Chunk chunk = DecodeChunk(ctx_.responses().back());
1981   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1982   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
1983   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1984   EXPECT_EQ(chunk.status().value(), Status::NotFound());
1985 }
1986 
TEST_F(ReadTransfer,Version2_PrepareError)1987 TEST_F(ReadTransfer, Version2_PrepareError) {
1988   SometimesUnavailableReadHandler unavailable_handler(99, kData);
1989   ctx_.service().RegisterHandler(unavailable_handler);
1990 
1991   ctx_.SendClientStream(
1992       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1993                       .set_desired_session_id(kArbitrarySessionId)
1994                       .set_resource_id(99)));
1995   transfer_thread_.WaitUntilEventIsProcessed();
1996 
1997   ASSERT_EQ(ctx_.total_responses(), 1u);
1998   Chunk chunk = DecodeChunk(ctx_.responses().back());
1999   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2000   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2001   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2002   EXPECT_EQ(chunk.resource_id(), 99u);
2003   EXPECT_EQ(chunk.status().value(), Status::DataLoss());
2004 }
2005 
TEST_F(ReadTransfer,Version2_HandlerSetsTransferSize)2006 TEST_F(ReadTransfer, Version2_HandlerSetsTransferSize) {
2007   handler_.set_resource_size(kData.size());
2008 
2009   ctx_.SendClientStream(
2010       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2011                       .set_desired_session_id(kArbitrarySessionId)
2012                       .set_resource_id(3)));
2013 
2014   transfer_thread_.WaitUntilEventIsProcessed();
2015 
2016   EXPECT_TRUE(handler_.prepare_read_called);
2017   EXPECT_FALSE(handler_.finalize_read_called);
2018 
2019   // First, the server responds with a START_ACK, accepting the session ID and
2020   // confirming the protocol version.
2021   ASSERT_EQ(ctx_.total_responses(), 1u);
2022   Chunk chunk = DecodeChunk(ctx_.responses().back());
2023   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2024   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2025   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2026   EXPECT_EQ(chunk.resource_id(), 3u);
2027 
2028   // Complete the handshake by confirming the server's ACK and sending the first
2029   // read transfer parameters.
2030   rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
2031     ctx_.SendClientStream(EncodeChunk(
2032         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2033             .set_session_id(kArbitrarySessionId)
2034             .set_window_end_offset(64)
2035             .set_max_chunk_size_bytes(8)
2036             .set_offset(0)));
2037 
2038     transfer_thread_.WaitUntilEventIsProcessed();
2039   });
2040 
2041   ASSERT_EQ(ctx_.total_responses(), 5u);
2042 
2043   // Each of the sent chunks should have a remaining_bytes value set.
2044   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
2045   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
2046   EXPECT_EQ(c1.type(), Chunk::Type::kData);
2047   EXPECT_EQ(c1.session_id(), kArbitrarySessionId);
2048   EXPECT_EQ(c1.offset(), 0u);
2049   ASSERT_TRUE(c1.remaining_bytes().has_value());
2050   EXPECT_EQ(c1.remaining_bytes().value(), 24u);
2051 
2052   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
2053   EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
2054   EXPECT_EQ(c2.type(), Chunk::Type::kData);
2055   EXPECT_EQ(c2.session_id(), kArbitrarySessionId);
2056   EXPECT_EQ(c2.offset(), 8u);
2057   ASSERT_TRUE(c2.remaining_bytes().has_value());
2058   EXPECT_EQ(c2.remaining_bytes().value(), 16u);
2059 
2060   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
2061   EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
2062   EXPECT_EQ(c3.type(), Chunk::Type::kData);
2063   EXPECT_EQ(c3.session_id(), kArbitrarySessionId);
2064   EXPECT_EQ(c3.offset(), 16u);
2065   ASSERT_TRUE(c3.remaining_bytes().has_value());
2066   EXPECT_EQ(c3.remaining_bytes().value(), 8u);
2067 
2068   Chunk c4 = DecodeChunk(ctx_.responses()[4]);
2069   EXPECT_EQ(c4.protocol_version(), ProtocolVersion::kVersionTwo);
2070   EXPECT_EQ(c4.type(), Chunk::Type::kData);
2071   EXPECT_EQ(c4.session_id(), kArbitrarySessionId);
2072   EXPECT_EQ(c4.offset(), 24u);
2073   ASSERT_TRUE(c4.remaining_bytes().has_value());
2074   EXPECT_EQ(c4.remaining_bytes().value(), 0u);
2075 
2076   ctx_.SendClientStream(EncodeChunk(Chunk::Final(
2077       ProtocolVersion::kVersionTwo, kArbitrarySessionId, OkStatus())));
2078   transfer_thread_.WaitUntilEventIsProcessed();
2079 
2080   EXPECT_TRUE(handler_.finalize_read_called);
2081   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
2082 }
2083 
TEST_F(WriteTransfer,Version2_SimpleTransfer)2084 TEST_F(WriteTransfer, Version2_SimpleTransfer) {
2085   ctx_.SendClientStream(
2086       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2087                       .set_desired_session_id(kArbitrarySessionId)
2088                       .set_resource_id(7)));
2089 
2090   transfer_thread_.WaitUntilEventIsProcessed();
2091 
2092   EXPECT_TRUE(handler_.prepare_write_called);
2093   EXPECT_FALSE(handler_.finalize_write_called);
2094 
2095   // First, the server responds with a START_ACK, accepting the session ID and
2096   // confirming the protocol version.
2097   ASSERT_EQ(ctx_.total_responses(), 1u);
2098   Chunk chunk = DecodeChunk(ctx_.responses().back());
2099   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2100   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2101   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2102   EXPECT_EQ(chunk.resource_id(), 7u);
2103 
2104   // Complete the handshake by confirming the server's ACK.
2105   ctx_.SendClientStream(EncodeChunk(
2106       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2107           .set_session_id(kArbitrarySessionId)));
2108   transfer_thread_.WaitUntilEventIsProcessed();
2109 
2110   // Server should respond by sending its initial transfer parameters.
2111   ASSERT_EQ(ctx_.total_responses(), 2u);
2112 
2113   chunk = DecodeChunk(ctx_.responses()[1]);
2114   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2115   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2116   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2117   EXPECT_EQ(chunk.offset(), 0u);
2118   EXPECT_EQ(chunk.window_end_offset(), 32u);
2119   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2120   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2121 
2122   // Send all of our data.
2123   ctx_.SendClientStream<64>(
2124       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2125                       .set_session_id(kArbitrarySessionId)
2126                       .set_offset(0)
2127                       .set_payload(kData)
2128                       .set_remaining_bytes(0)));
2129   transfer_thread_.WaitUntilEventIsProcessed();
2130 
2131   ASSERT_EQ(ctx_.total_responses(), 3u);
2132 
2133   chunk = DecodeChunk(ctx_.responses().back());
2134   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2135   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2136   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2137   ASSERT_TRUE(chunk.status().has_value());
2138   EXPECT_EQ(chunk.status().value(), OkStatus());
2139 
2140   // Send the completion acknowledgement.
2141   ctx_.SendClientStream(EncodeChunk(
2142       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2143           .set_session_id(kArbitrarySessionId)));
2144   transfer_thread_.WaitUntilEventIsProcessed();
2145 
2146   ASSERT_EQ(ctx_.total_responses(), 3u);
2147 
2148   EXPECT_TRUE(handler_.finalize_write_called);
2149   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2150   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2151 }
2152 
TEST_F(WriteTransfer,Version2_Multichunk)2153 TEST_F(WriteTransfer, Version2_Multichunk) {
2154   ctx_.SendClientStream(
2155       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2156                       .set_desired_session_id(kArbitrarySessionId)
2157                       .set_resource_id(7)));
2158 
2159   transfer_thread_.WaitUntilEventIsProcessed();
2160 
2161   EXPECT_TRUE(handler_.prepare_write_called);
2162   EXPECT_FALSE(handler_.finalize_write_called);
2163 
2164   // First, the server responds with a START_ACK, accepting the session ID and
2165   // confirming the protocol version.
2166   ASSERT_EQ(ctx_.total_responses(), 1u);
2167   Chunk chunk = DecodeChunk(ctx_.responses().back());
2168   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2169   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2170   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2171   EXPECT_EQ(chunk.resource_id(), 7u);
2172 
2173   // Complete the handshake by confirming the server's ACK.
2174   ctx_.SendClientStream(EncodeChunk(
2175       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2176           .set_session_id(kArbitrarySessionId)));
2177   transfer_thread_.WaitUntilEventIsProcessed();
2178 
2179   // Server should respond by sending its initial transfer parameters.
2180   ASSERT_EQ(ctx_.total_responses(), 2u);
2181 
2182   chunk = DecodeChunk(ctx_.responses()[1]);
2183   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2184   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2185   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2186   EXPECT_EQ(chunk.offset(), 0u);
2187   EXPECT_EQ(chunk.window_end_offset(), 32u);
2188   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2189   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2190 
2191   // Send all of our data across two chunks.
2192   ctx_.SendClientStream<64>(
2193       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2194                       .set_session_id(kArbitrarySessionId)
2195                       .set_offset(0)
2196                       .set_payload(span(kData).first(8))));
2197   ctx_.SendClientStream<64>(
2198       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2199                       .set_session_id(kArbitrarySessionId)
2200                       .set_offset(8)
2201                       .set_payload(span(kData).subspan(8))
2202                       .set_remaining_bytes(0)));
2203   transfer_thread_.WaitUntilEventIsProcessed();
2204 
2205   ASSERT_EQ(ctx_.total_responses(), 3u);
2206 
2207   chunk = DecodeChunk(ctx_.responses().back());
2208   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2209   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2210   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2211   ASSERT_TRUE(chunk.status().has_value());
2212   EXPECT_EQ(chunk.status().value(), OkStatus());
2213 
2214   // Send the completion acknowledgement.
2215   ctx_.SendClientStream(EncodeChunk(
2216       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2217           .set_session_id(kArbitrarySessionId)));
2218   transfer_thread_.WaitUntilEventIsProcessed();
2219 
2220   ASSERT_EQ(ctx_.total_responses(), 3u);
2221 
2222   EXPECT_TRUE(handler_.finalize_write_called);
2223   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2224   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2225 }
2226 
TEST_F(WriteTransfer,Version2_ContinueParameters)2227 TEST_F(WriteTransfer, Version2_ContinueParameters) {
2228   ctx_.SendClientStream(
2229       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2230                       .set_desired_session_id(kArbitrarySessionId)
2231                       .set_resource_id(7)));
2232 
2233   transfer_thread_.WaitUntilEventIsProcessed();
2234 
2235   EXPECT_TRUE(handler_.prepare_write_called);
2236   EXPECT_FALSE(handler_.finalize_write_called);
2237 
2238   // First, the server responds with a START_ACK, accepting the session ID and
2239   // confirming the protocol version.
2240   ASSERT_EQ(ctx_.total_responses(), 1u);
2241   Chunk chunk = DecodeChunk(ctx_.responses().back());
2242   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2243   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2244   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2245   EXPECT_EQ(chunk.resource_id(), 7u);
2246 
2247   // Complete the handshake by confirming the server's ACK.
2248   ctx_.SendClientStream(EncodeChunk(
2249       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2250           .set_session_id(kArbitrarySessionId)));
2251   transfer_thread_.WaitUntilEventIsProcessed();
2252 
2253   // Server should respond by sending its initial transfer parameters.
2254   ASSERT_EQ(ctx_.total_responses(), 2u);
2255 
2256   chunk = DecodeChunk(ctx_.responses()[1]);
2257   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2258   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2259   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2260   EXPECT_EQ(chunk.offset(), 0u);
2261   EXPECT_EQ(chunk.window_end_offset(), 32u);
2262   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2263   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2264 
2265   // Send all of our data across several chunks.
2266   ctx_.SendClientStream<64>(
2267       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2268                       .set_session_id(kArbitrarySessionId)
2269                       .set_offset(0)
2270                       .set_payload(span(kData).first(8))));
2271 
2272   transfer_thread_.WaitUntilEventIsProcessed();
2273   ASSERT_EQ(ctx_.total_responses(), 2u);
2274 
2275   ctx_.SendClientStream<64>(
2276       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2277                       .set_session_id(kArbitrarySessionId)
2278                       .set_offset(8)
2279                       .set_payload(span(kData).subspan(8, 8))));
2280 
2281   transfer_thread_.WaitUntilEventIsProcessed();
2282   ASSERT_EQ(ctx_.total_responses(), 3u);
2283 
2284   chunk = DecodeChunk(ctx_.responses().back());
2285   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2286   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2287   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2288   EXPECT_EQ(chunk.offset(), 16u);
2289   EXPECT_EQ(chunk.window_end_offset(), 32u);
2290 
2291   ctx_.SendClientStream<64>(
2292       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2293                       .set_session_id(kArbitrarySessionId)
2294                       .set_offset(16)
2295                       .set_payload(span(kData).subspan(16, 8))));
2296 
2297   transfer_thread_.WaitUntilEventIsProcessed();
2298   ASSERT_EQ(ctx_.total_responses(), 4u);
2299 
2300   chunk = DecodeChunk(ctx_.responses().back());
2301   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2302   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2303   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2304   EXPECT_EQ(chunk.offset(), 24u);
2305   EXPECT_EQ(chunk.window_end_offset(), 32u);
2306 
2307   ctx_.SendClientStream<64>(
2308       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2309                       .set_session_id(kArbitrarySessionId)
2310                       .set_offset(24)
2311                       .set_payload(span(kData).subspan(24))
2312                       .set_remaining_bytes(0)));
2313   transfer_thread_.WaitUntilEventIsProcessed();
2314 
2315   ASSERT_EQ(ctx_.total_responses(), 5u);
2316 
2317   chunk = DecodeChunk(ctx_.responses().back());
2318   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2319   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2320   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2321   ASSERT_TRUE(chunk.status().has_value());
2322   EXPECT_EQ(chunk.status().value(), OkStatus());
2323 
2324   // Send the completion acknowledgement.
2325   ctx_.SendClientStream(EncodeChunk(
2326       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2327           .set_session_id(kArbitrarySessionId)));
2328   transfer_thread_.WaitUntilEventIsProcessed();
2329 
2330   ASSERT_EQ(ctx_.total_responses(), 5u);
2331 
2332   EXPECT_TRUE(handler_.finalize_write_called);
2333   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2334   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2335 }
2336 
TEST_F(WriteTransfer,Version2_ClientTerminatesDuringHandshake)2337 TEST_F(WriteTransfer, Version2_ClientTerminatesDuringHandshake) {
2338   ctx_.SendClientStream(
2339       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2340                       .set_desired_session_id(kArbitrarySessionId)
2341                       .set_resource_id(7)));
2342 
2343   transfer_thread_.WaitUntilEventIsProcessed();
2344 
2345   EXPECT_TRUE(handler_.prepare_write_called);
2346   EXPECT_FALSE(handler_.finalize_write_called);
2347 
2348   // First, the server responds with a START_ACK, accepting the session ID and
2349   // confirming the protocol version.
2350   ASSERT_EQ(ctx_.total_responses(), 1u);
2351   Chunk chunk = DecodeChunk(ctx_.responses().back());
2352   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2353   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2354   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2355   EXPECT_EQ(chunk.resource_id(), 7u);
2356 
2357   // Send an error chunk instead of completing the handshake.
2358   ctx_.SendClientStream(
2359       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo,
2360                                kArbitrarySessionId,
2361                                Status::FailedPrecondition())));
2362   transfer_thread_.WaitUntilEventIsProcessed();
2363 
2364   EXPECT_TRUE(handler_.finalize_write_called);
2365   EXPECT_EQ(handler_.finalize_write_status, Status::FailedPrecondition());
2366 }
2367 
TEST_F(WriteTransfer,Version2_ClientSendsWrongProtocolVersion)2368 TEST_F(WriteTransfer, Version2_ClientSendsWrongProtocolVersion) {
2369   ctx_.SendClientStream(
2370       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2371                       .set_desired_session_id(kArbitrarySessionId)
2372                       .set_resource_id(7)));
2373 
2374   transfer_thread_.WaitUntilEventIsProcessed();
2375 
2376   EXPECT_TRUE(handler_.prepare_write_called);
2377   EXPECT_FALSE(handler_.finalize_write_called);
2378 
2379   // First, the server responds with a START_ACK, accepting the session ID and
2380   // confirming the protocol version.
2381   ASSERT_EQ(ctx_.total_responses(), 1u);
2382   Chunk chunk = DecodeChunk(ctx_.responses().back());
2383   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2384   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2385   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2386   EXPECT_EQ(chunk.resource_id(), 7u);
2387 
2388   // Complete the handshake by confirming the server's ACK.
2389   ctx_.SendClientStream(EncodeChunk(
2390       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2391           .set_session_id(kArbitrarySessionId)));
2392   transfer_thread_.WaitUntilEventIsProcessed();
2393 
2394   // Server should respond by sending its initial transfer parameters.
2395   ASSERT_EQ(ctx_.total_responses(), 2u);
2396 
2397   chunk = DecodeChunk(ctx_.responses()[1]);
2398   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2399   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2400   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2401   EXPECT_EQ(chunk.offset(), 0u);
2402   EXPECT_EQ(chunk.window_end_offset(), 32u);
2403   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2404   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2405 
2406   // The transfer was configured to use protocol version 2. Send a legacy chunk
2407   // instead.
2408   ctx_.SendClientStream<64>(
2409       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
2410                       .set_session_id(7)
2411                       .set_offset(0)
2412                       .set_payload(kData)
2413                       .set_remaining_bytes(0)));
2414   transfer_thread_.WaitUntilEventIsProcessed();
2415 
2416   // Server should terminate the transfer.
2417   ASSERT_EQ(ctx_.total_responses(), 3u);
2418 
2419   chunk = DecodeChunk(ctx_.responses()[2]);
2420   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2421   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2422   EXPECT_EQ(chunk.status().value(), Status::Internal());
2423 
2424   // Send the completion acknowledgement.
2425   ctx_.SendClientStream(EncodeChunk(
2426       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2427           .set_session_id(kArbitrarySessionId)));
2428   transfer_thread_.WaitUntilEventIsProcessed();
2429 
2430   ASSERT_EQ(ctx_.total_responses(), 3u);
2431 }
2432 
TEST_F(WriteTransfer,Version2_InvalidResourceId)2433 TEST_F(WriteTransfer, Version2_InvalidResourceId) {
2434   ctx_.SendClientStream(
2435       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2436                       .set_desired_session_id(kArbitrarySessionId)
2437                       .set_resource_id(99)));
2438 
2439   transfer_thread_.WaitUntilEventIsProcessed();
2440 
2441   ASSERT_EQ(ctx_.total_responses(), 1u);
2442 
2443   Chunk chunk = DecodeChunk(ctx_.responses().back());
2444   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2445   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2446   EXPECT_FALSE(chunk.resource_id().has_value());
2447   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2448   EXPECT_EQ(chunk.status().value(), Status::NotFound());
2449 }
2450 
2451 class ReadTransferLowMaxRetries : public ::testing::Test {
2452  protected:
2453   static constexpr uint32_t kMaxRetries = 3;
2454   static constexpr uint32_t kMaxLifetimeRetries = 4;
2455 
ReadTransferLowMaxRetries()2456   ReadTransferLowMaxRetries()
2457       : handler_(9, kData),
2458         transfer_thread_(data_buffer_, encode_buffer_),
2459         ctx_(transfer_thread_,
2460              64,
2461              // Use a long timeout to avoid accidentally triggering timeouts.
2462              std::chrono::minutes(1),
2463              kMaxRetries,
2464              cfg::kDefaultExtendWindowDivisor,
2465              kMaxLifetimeRetries),
2466         system_thread_(TransferThreadOptions(), transfer_thread_) {
2467     ctx_.service().RegisterHandler(handler_);
2468 
2469     PW_CHECK(!handler_.prepare_read_called);
2470     PW_CHECK(!handler_.finalize_read_called);
2471 
2472     ctx_.call();  // Open the read stream
2473     transfer_thread_.WaitUntilEventIsProcessed();
2474   }
2475 
~ReadTransferLowMaxRetries()2476   ~ReadTransferLowMaxRetries() override {
2477     transfer_thread_.Terminate();
2478     system_thread_.join();
2479   }
2480 
2481   SimpleReadTransfer handler_;
2482   Thread<1, 1> transfer_thread_;
2483   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read, 10) ctx_;
2484   pw::Thread system_thread_;
2485   std::array<std::byte, 64> data_buffer_;
2486   std::array<std::byte, 64> encode_buffer_;
2487 };
2488 
TEST_F(ReadTransferLowMaxRetries,FailsAfterLifetimeRetryCount)2489 TEST_F(ReadTransferLowMaxRetries, FailsAfterLifetimeRetryCount) {
2490   ctx_.SendClientStream(
2491       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
2492                       .set_session_id(9)
2493                       .set_window_end_offset(16)
2494                       .set_offset(0)));
2495 
2496   transfer_thread_.WaitUntilEventIsProcessed();
2497 
2498   EXPECT_TRUE(handler_.prepare_read_called);
2499   EXPECT_FALSE(handler_.finalize_read_called);
2500 
2501   ASSERT_EQ(ctx_.total_responses(), 1u);
2502   Chunk chunk = DecodeChunk(ctx_.responses().back());
2503 
2504   EXPECT_EQ(chunk.session_id(), 9u);
2505   EXPECT_EQ(chunk.offset(), 0u);
2506   ASSERT_EQ(chunk.payload().size(), 16u);
2507   EXPECT_EQ(
2508       std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
2509       0);
2510 
2511   // Time out twice. Server should retry both times.
2512   transfer_thread_.SimulateServerTimeout(9);
2513   transfer_thread_.SimulateServerTimeout(9);
2514   ASSERT_EQ(ctx_.total_responses(), 3u);
2515   chunk = DecodeChunk(ctx_.responses().back());
2516   EXPECT_EQ(chunk.session_id(), 9u);
2517   EXPECT_EQ(chunk.offset(), 0u);
2518   ASSERT_EQ(chunk.payload().size(), 16u);
2519   EXPECT_EQ(
2520       std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
2521       0);
2522 
2523   ctx_.SendClientStream(EncodeChunk(
2524       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
2525           .set_session_id(9)
2526           .set_window_end_offset(32)
2527           .set_offset(16)));
2528   transfer_thread_.WaitUntilEventIsProcessed();
2529 
2530   ASSERT_EQ(ctx_.total_responses(), 4u);
2531   chunk = DecodeChunk(ctx_.responses().back());
2532   EXPECT_EQ(chunk.session_id(), 9u);
2533   EXPECT_EQ(chunk.offset(), 16u);
2534   ASSERT_EQ(chunk.payload().size(), 16u);
2535   EXPECT_EQ(
2536       std::memcmp(
2537           chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2538       0);
2539 
2540   // Time out three more times. The transfer should terminate.
2541   transfer_thread_.SimulateServerTimeout(9);
2542   ASSERT_EQ(ctx_.total_responses(), 5u);
2543   chunk = DecodeChunk(ctx_.responses().back());
2544   EXPECT_EQ(chunk.session_id(), 9u);
2545   EXPECT_EQ(chunk.offset(), 16u);
2546   ASSERT_EQ(chunk.payload().size(), 16u);
2547   EXPECT_EQ(
2548       std::memcmp(
2549           chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2550       0);
2551 
2552   transfer_thread_.SimulateServerTimeout(9);
2553   ASSERT_EQ(ctx_.total_responses(), 6u);
2554   chunk = DecodeChunk(ctx_.responses().back());
2555   EXPECT_EQ(chunk.session_id(), 9u);
2556   EXPECT_EQ(chunk.offset(), 16u);
2557   ASSERT_EQ(chunk.payload().size(), 16u);
2558   EXPECT_EQ(
2559       std::memcmp(
2560           chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2561       0);
2562 
2563   transfer_thread_.SimulateServerTimeout(9);
2564   ASSERT_EQ(ctx_.total_responses(), 7u);
2565   chunk = DecodeChunk(ctx_.responses().back());
2566   EXPECT_EQ(chunk.status(), Status::DeadlineExceeded());
2567 }
2568 
TEST_F(ReadTransferLowMaxRetries,Version2_FailsAfterLifetimeRetryCount)2569 TEST_F(ReadTransferLowMaxRetries, Version2_FailsAfterLifetimeRetryCount) {
2570   ctx_.SendClientStream(
2571       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2572                       .set_desired_session_id(kArbitrarySessionId)
2573                       .set_resource_id(9)));
2574 
2575   transfer_thread_.WaitUntilEventIsProcessed();
2576 
2577   EXPECT_TRUE(handler_.prepare_read_called);
2578   EXPECT_FALSE(handler_.finalize_read_called);
2579 
2580   // First, the server responds with a START_ACK, accepting the session ID and
2581   // confirming the protocol version.
2582   ASSERT_EQ(ctx_.total_responses(), 1u);
2583   Chunk chunk = DecodeChunk(ctx_.responses().back());
2584   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2585   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2586   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2587   EXPECT_EQ(chunk.resource_id(), 9u);
2588 
2589   // Time out twice. Server should retry both times.
2590   transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2591   transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2592   ASSERT_EQ(ctx_.total_responses(), 3u);
2593   chunk = DecodeChunk(ctx_.responses().back());
2594   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2595 
2596   // Complete the handshake, allowing the transfer to continue.
2597   ctx_.SendClientStream(EncodeChunk(
2598       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2599           .set_session_id(kArbitrarySessionId)
2600           .set_window_end_offset(16)
2601           .set_offset(0)));
2602   transfer_thread_.WaitUntilEventIsProcessed();
2603 
2604   ASSERT_EQ(ctx_.total_responses(), 4u);
2605   chunk = DecodeChunk(ctx_.responses().back());
2606   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2607 
2608   // Time out three more times. The transfer should terminate.
2609   transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2610   ASSERT_EQ(ctx_.total_responses(), 5u);
2611   chunk = DecodeChunk(ctx_.responses().back());
2612   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2613 
2614   transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2615   ASSERT_EQ(ctx_.total_responses(), 6u);
2616   chunk = DecodeChunk(ctx_.responses().back());
2617   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2618 
2619   transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2620   ASSERT_EQ(ctx_.total_responses(), 7u);
2621   chunk = DecodeChunk(ctx_.responses().back());
2622   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2623   EXPECT_EQ(chunk.status(), Status::DeadlineExceeded());
2624 }
2625 
TEST_F(WriteTransfer,Version2_ClientRetriesOpeningChunk)2626 TEST_F(WriteTransfer, Version2_ClientRetriesOpeningChunk) {
2627   ctx_.SendClientStream(
2628       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2629                       .set_desired_session_id(kArbitrarySessionId)
2630                       .set_resource_id(7)));
2631 
2632   transfer_thread_.WaitUntilEventIsProcessed();
2633 
2634   EXPECT_TRUE(handler_.prepare_write_called);
2635   EXPECT_FALSE(handler_.finalize_write_called);
2636 
2637   // First, the server responds with a START_ACK, accepting the session ID and
2638   // confirming the protocol version.
2639   ASSERT_EQ(ctx_.total_responses(), 1u);
2640   Chunk chunk = DecodeChunk(ctx_.responses().back());
2641   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2642   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2643   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2644   EXPECT_EQ(chunk.resource_id(), 7u);
2645 
2646   // Reset prepare_write_called to ensure it isn't called again.
2647   handler_.prepare_write_called = false;
2648 
2649   // Client re-sends the same chunk instead of finishing the handshake.
2650   ctx_.SendClientStream(
2651       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2652                       .set_desired_session_id(kArbitrarySessionId)
2653                       .set_resource_id(7)));
2654 
2655   transfer_thread_.WaitUntilEventIsProcessed();
2656 
2657   // The server should re-send the same START_ACK without reinitializing the
2658   // handler.
2659   ASSERT_EQ(ctx_.total_responses(), 2u);
2660   chunk = DecodeChunk(ctx_.responses().back());
2661   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2662   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2663   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2664   EXPECT_EQ(chunk.resource_id(), 7u);
2665 
2666   EXPECT_FALSE(handler_.prepare_write_called);
2667   EXPECT_FALSE(handler_.finalize_write_called);
2668 }
2669 
TEST_F(WriteTransfer,Version2_RegularSessionIdInStartChunk)2670 TEST_F(WriteTransfer, Version2_RegularSessionIdInStartChunk) {
2671   // Client incorrectly sets session_id instead of desired_session_id in its
2672   // START chunk. Server should immediately respond with a protocol error.
2673   ctx_.SendClientStream(
2674       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2675                       .set_session_id(kArbitrarySessionId)
2676                       .set_resource_id(99)));
2677 
2678   transfer_thread_.WaitUntilEventIsProcessed();
2679 
2680   EXPECT_FALSE(handler_.prepare_write_called);
2681   EXPECT_FALSE(handler_.finalize_write_called);
2682 
2683   ASSERT_EQ(ctx_.total_responses(), 1u);
2684 
2685   Chunk chunk = DecodeChunk(ctx_.responses().back());
2686   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2687   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2688   EXPECT_FALSE(chunk.resource_id().has_value());
2689   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2690   EXPECT_EQ(chunk.status().value(), Status::DataLoss());
2691 }
2692 
__anonbff93a0d0d02(size_t i) 2693 constexpr auto kData128 = bytes::Initialized<128>([](size_t i) { return i; });
2694 
2695 class WriteTransferLargeData : public ::testing::Test {
2696  protected:
WriteTransferLargeData()2697   WriteTransferLargeData()
2698       : buffer{},
2699         handler_(7, buffer),
2700         transfer_thread_(chunk_buffer_, encode_buffer_),
2701         system_thread_(TransferThreadOptions(), transfer_thread_),
2702         ctx_(transfer_thread_,
2703              kData128.size(),
2704              // Use a long timeout to avoid accidentally triggering timeouts.
2705              std::chrono::minutes(1),
2706              /*max_retries=*/3) {
2707     ctx_.service().RegisterHandler(handler_);
2708 
2709     PW_CHECK(!handler_.prepare_write_called);
2710     PW_CHECK(!handler_.finalize_write_called);
2711 
2712     ctx_.call();  // Open the write stream
2713     transfer_thread_.WaitUntilEventIsProcessed();
2714   }
2715 
~WriteTransferLargeData()2716   ~WriteTransferLargeData() override {
2717     transfer_thread_.Terminate();
2718     system_thread_.join();
2719   }
2720 
2721   std::array<std::byte, kData128.size()> buffer;
2722   SimpleWriteTransfer handler_;
2723 
2724   Thread<1, 1> transfer_thread_;
2725   pw::Thread system_thread_;
2726   std::array<std::byte, 48> chunk_buffer_;
2727   std::array<std::byte, 64> encode_buffer_;
2728   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write, 10) ctx_;
2729 };
2730 
TEST_F(WriteTransferLargeData,Version2_AdaptiveWindow_SlowStart)2731 TEST_F(WriteTransferLargeData, Version2_AdaptiveWindow_SlowStart) {
2732   ctx_.SendClientStream(
2733       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2734                       .set_desired_session_id(kArbitrarySessionId)
2735                       .set_resource_id(7)));
2736 
2737   transfer_thread_.WaitUntilEventIsProcessed();
2738 
2739   EXPECT_TRUE(handler_.prepare_write_called);
2740   EXPECT_FALSE(handler_.finalize_write_called);
2741 
2742   // First, the server responds with a START_ACK, accepting the session ID and
2743   // confirming the protocol version.
2744   ASSERT_EQ(ctx_.total_responses(), 1u);
2745   Chunk chunk = DecodeChunk(ctx_.responses().back());
2746   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2747   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2748   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2749   EXPECT_EQ(chunk.resource_id(), 7u);
2750 
2751   // Complete the handshake by confirming the server's ACK.
2752   ctx_.SendClientStream(EncodeChunk(
2753       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2754           .set_session_id(kArbitrarySessionId)));
2755   transfer_thread_.WaitUntilEventIsProcessed();
2756 
2757   // Server should respond by sending its initial transfer parameters.
2758   ASSERT_EQ(ctx_.total_responses(), 2u);
2759 
2760   constexpr size_t kExpectedMaxChunkSize = 21;
2761 
2762   chunk = DecodeChunk(ctx_.responses()[1]);
2763   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2764   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2765   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2766   EXPECT_EQ(chunk.offset(), 0u);
2767   EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
2768   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2769   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSize);
2770 
2771   ctx_.SendClientStream<64>(EncodeChunk(
2772       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2773           .set_session_id(kArbitrarySessionId)
2774           .set_offset(0)
2775           .set_payload(span(kData128).first(kExpectedMaxChunkSize))));
2776 
2777   transfer_thread_.WaitUntilEventIsProcessed();
2778   ASSERT_EQ(ctx_.total_responses(), 3u);
2779 
2780   // Window doubles in response to successful receive.
2781   chunk = DecodeChunk(ctx_.responses().back());
2782   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2783   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2784   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2785   EXPECT_EQ(chunk.offset(), kExpectedMaxChunkSize);
2786   EXPECT_EQ(chunk.window_end_offset(),
2787             chunk.offset() + 2 * kExpectedMaxChunkSize);
2788 
2789   ctx_.SendClientStream<64>(
2790       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2791                       .set_session_id(kArbitrarySessionId)
2792                       .set_offset(chunk.offset())
2793                       .set_payload(span(kData128).subspan(
2794                           chunk.offset(), kExpectedMaxChunkSize))));
2795 
2796   transfer_thread_.WaitUntilEventIsProcessed();
2797   ASSERT_EQ(ctx_.total_responses(), 4u);
2798 
2799   // Window doubles in response to successful receive.
2800   chunk = DecodeChunk(ctx_.responses().back());
2801   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2802   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2803   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2804   EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
2805   EXPECT_EQ(chunk.window_end_offset(),
2806             chunk.offset() + 4 * kExpectedMaxChunkSize);
2807 
2808   ctx_.SendClientStream<64>(
2809       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2810                       .set_session_id(kArbitrarySessionId)
2811                       .set_offset(2 * kExpectedMaxChunkSize)
2812                       .set_payload(span(kData128).subspan(
2813                           chunk.offset(), kExpectedMaxChunkSize))
2814                       .set_remaining_bytes(0)));
2815   transfer_thread_.WaitUntilEventIsProcessed();
2816 
2817   ASSERT_EQ(ctx_.total_responses(), 5u);
2818 
2819   chunk = DecodeChunk(ctx_.responses().back());
2820   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2821   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2822   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2823   ASSERT_TRUE(chunk.status().has_value());
2824   EXPECT_EQ(chunk.status().value(), OkStatus());
2825 
2826   // Send the completion acknowledgement.
2827   ctx_.SendClientStream(EncodeChunk(
2828       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2829           .set_session_id(kArbitrarySessionId)));
2830   transfer_thread_.WaitUntilEventIsProcessed();
2831 
2832   ASSERT_EQ(ctx_.total_responses(), 5u);
2833 
2834   EXPECT_TRUE(handler_.finalize_write_called);
2835   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2836 }
2837 
TEST_F(WriteTransferLargeData,Version2_AdaptiveWindow_CongestionAvoidance)2838 TEST_F(WriteTransferLargeData, Version2_AdaptiveWindow_CongestionAvoidance) {
2839   ctx_.SendClientStream(
2840       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2841                       .set_desired_session_id(kArbitrarySessionId)
2842                       .set_resource_id(7)));
2843 
2844   transfer_thread_.WaitUntilEventIsProcessed();
2845 
2846   EXPECT_TRUE(handler_.prepare_write_called);
2847   EXPECT_FALSE(handler_.finalize_write_called);
2848 
2849   // First, the server responds with a START_ACK, accepting the session ID and
2850   // confirming the protocol version.
2851   ASSERT_EQ(ctx_.total_responses(), 1u);
2852   Chunk chunk = DecodeChunk(ctx_.responses().back());
2853   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2854   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2855   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2856   EXPECT_EQ(chunk.resource_id(), 7u);
2857 
2858   // Complete the handshake by confirming the server's ACK.
2859   ctx_.SendClientStream(EncodeChunk(
2860       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2861           .set_session_id(kArbitrarySessionId)));
2862   transfer_thread_.WaitUntilEventIsProcessed();
2863 
2864   // Server should respond by sending its initial transfer parameters.
2865   ASSERT_EQ(ctx_.total_responses(), 2u);
2866 
2867   constexpr size_t kExpectedMaxChunkSize = 21;
2868 
2869   chunk = DecodeChunk(ctx_.responses()[1]);
2870   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2871   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2872   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2873   EXPECT_EQ(chunk.offset(), 0u);
2874   EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
2875   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2876   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSize);
2877 
2878   ctx_.SendClientStream<64>(EncodeChunk(
2879       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2880           .set_session_id(kArbitrarySessionId)
2881           .set_offset(0)
2882           .set_payload(span(kData128).first(kExpectedMaxChunkSize))));
2883 
2884   transfer_thread_.WaitUntilEventIsProcessed();
2885   ASSERT_EQ(ctx_.total_responses(), 3u);
2886 
2887   // Window doubles in response to successful receive.
2888   chunk = DecodeChunk(ctx_.responses().back());
2889   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2890   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2891   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2892   EXPECT_EQ(chunk.offset(), kExpectedMaxChunkSize);
2893   EXPECT_EQ(chunk.window_end_offset(),
2894             chunk.offset() + 2 * kExpectedMaxChunkSize);
2895 
2896   ctx_.SendClientStream<64>(
2897       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2898                       .set_session_id(kArbitrarySessionId)
2899                       .set_offset(chunk.offset())
2900                       .set_payload(span(kData128).subspan(
2901                           chunk.offset(), kExpectedMaxChunkSize))));
2902 
2903   transfer_thread_.WaitUntilEventIsProcessed();
2904   ASSERT_EQ(ctx_.total_responses(), 4u);
2905 
2906   // Window doubles in response to successful receive.
2907   chunk = DecodeChunk(ctx_.responses().back());
2908   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2909   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2910   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2911   EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
2912   EXPECT_EQ(chunk.window_end_offset(),
2913             chunk.offset() + 4 * kExpectedMaxChunkSize);
2914 
2915   // Don't send any data in response.
2916   transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
2917   ASSERT_EQ(ctx_.total_responses(), 5u);
2918 
2919   // Window size should now half from the previous one.
2920   chunk = DecodeChunk(ctx_.responses().back());
2921   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2922   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2923   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2924   EXPECT_EQ(chunk.offset(), 2 * kExpectedMaxChunkSize);
2925   EXPECT_EQ(chunk.window_end_offset(),
2926             chunk.offset() + 2 * kExpectedMaxChunkSize);
2927 
2928   // Send the appropriate chunk.
2929   ctx_.SendClientStream<64>(
2930       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2931                       .set_session_id(kArbitrarySessionId)
2932                       .set_offset(2 * kExpectedMaxChunkSize)
2933                       .set_payload(span(kData128).subspan(
2934                           chunk.offset(), kExpectedMaxChunkSize))));
2935   transfer_thread_.WaitUntilEventIsProcessed();
2936 
2937   ASSERT_EQ(ctx_.total_responses(), 6u);
2938 
2939   // Window size should now only increase by one chunk instead of doubling.
2940   chunk = DecodeChunk(ctx_.responses().back());
2941   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2942   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2943   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2944   EXPECT_EQ(chunk.offset(), 3 * kExpectedMaxChunkSize);
2945   EXPECT_EQ(chunk.window_end_offset(),
2946             chunk.offset() + 3 * kExpectedMaxChunkSize);
2947 
2948   // Complete the transfer.
2949   ctx_.SendClientStream<64>(
2950       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2951                       .set_session_id(kArbitrarySessionId)
2952                       .set_offset(3 * kExpectedMaxChunkSize)
2953                       .set_payload(span(kData128).subspan(
2954                           chunk.offset(), kExpectedMaxChunkSize))
2955                       .set_remaining_bytes(0)));
2956   transfer_thread_.WaitUntilEventIsProcessed();
2957 
2958   ASSERT_EQ(ctx_.total_responses(), 7u);
2959 
2960   chunk = DecodeChunk(ctx_.responses().back());
2961   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2962   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2963   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2964   ASSERT_TRUE(chunk.status().has_value());
2965   EXPECT_EQ(chunk.status().value(), OkStatus());
2966 
2967   // Send the completion acknowledgement.
2968   ctx_.SendClientStream(EncodeChunk(
2969       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2970           .set_session_id(kArbitrarySessionId)));
2971   transfer_thread_.WaitUntilEventIsProcessed();
2972 
2973   ASSERT_EQ(ctx_.total_responses(), 7u);
2974 
2975   EXPECT_TRUE(handler_.finalize_write_called);
2976   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2977 }
2978 
TEST_F(WriteTransferLargeData,Version2_ResendPreviousData_ReceivesContinueParameters)2979 TEST_F(WriteTransferLargeData,
2980        Version2_ResendPreviousData_ReceivesContinueParameters) {
2981   ctx_.SendClientStream(
2982       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2983                       .set_desired_session_id(kArbitrarySessionId)
2984                       .set_resource_id(7)));
2985 
2986   transfer_thread_.WaitUntilEventIsProcessed();
2987 
2988   EXPECT_TRUE(handler_.prepare_write_called);
2989   EXPECT_FALSE(handler_.finalize_write_called);
2990 
2991   // First, the server responds with a START_ACK, accepting the session ID and
2992   // confirming the protocol version.
2993   ASSERT_EQ(ctx_.total_responses(), 1u);
2994   Chunk chunk = DecodeChunk(ctx_.responses().back());
2995   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2996   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2997   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
2998   EXPECT_EQ(chunk.resource_id(), 7u);
2999 
3000   // Complete the handshake by confirming the server's ACK.
3001   ctx_.SendClientStream(EncodeChunk(
3002       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
3003           .set_session_id(kArbitrarySessionId)));
3004   transfer_thread_.WaitUntilEventIsProcessed();
3005 
3006   // Server should respond by sending its initial transfer parameters.
3007   ASSERT_EQ(ctx_.total_responses(), 2u);
3008 
3009   constexpr size_t kExpectedMaxChunkSizeBytes = 21u;
3010 
3011   chunk = DecodeChunk(ctx_.responses()[1]);
3012   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3013   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
3014   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3015   EXPECT_EQ(chunk.offset(), 0u);
3016   EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSizeBytes);
3017   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
3018   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSizeBytes);
3019 
3020   ctx_.SendClientStream<64>(
3021       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3022                       .set_session_id(kArbitrarySessionId)
3023                       .set_offset(0)
3024                       .set_payload(span(kData).first(16))));
3025   transfer_thread_.WaitUntilEventIsProcessed();
3026 
3027   ASSERT_EQ(ctx_.total_responses(), 3u);
3028 
3029   // Window size grows to 2 chunks on successful receipt.
3030   chunk = DecodeChunk(ctx_.responses().back());
3031   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3032   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
3033   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3034   EXPECT_EQ(chunk.offset(), 16u);
3035   EXPECT_EQ(chunk.window_end_offset(),
3036             chunk.offset() + 2 * kExpectedMaxChunkSizeBytes);
3037 
3038   // Resend data that has already been received.
3039   ctx_.SendClientStream<64>(
3040       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3041                       .set_session_id(kArbitrarySessionId)
3042                       .set_offset(8)
3043                       .set_payload(span(kData).subspan(8, 8))
3044                       .set_remaining_bytes(0)));
3045   transfer_thread_.WaitUntilEventIsProcessed();
3046 
3047   // The server should respond with CONTINUE parameters rather than requesting
3048   // retransmission and starting a recovery cycle. However, the window size
3049   // should shrink in response to the retried data.
3050   ASSERT_EQ(ctx_.total_responses(), 4u);
3051   chunk = DecodeChunk(ctx_.responses().back());
3052   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
3053   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3054   EXPECT_EQ(chunk.offset(), 16u);
3055   EXPECT_EQ(chunk.window_end_offset(),
3056             chunk.offset() + kExpectedMaxChunkSizeBytes);
3057 
3058   // Send the expected chunk.
3059   ctx_.SendClientStream<64>(
3060       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
3061                       .set_session_id(kArbitrarySessionId)
3062                       .set_offset(16)
3063                       .set_payload(span(kData).subspan(16))
3064                       .set_remaining_bytes(0)));
3065   transfer_thread_.WaitUntilEventIsProcessed();
3066 
3067   ASSERT_EQ(ctx_.total_responses(), 5u);
3068   chunk = DecodeChunk(ctx_.responses().back());
3069   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3070   ASSERT_TRUE(chunk.status().has_value());
3071   EXPECT_EQ(chunk.status().value(), OkStatus());
3072 
3073   EXPECT_TRUE(handler_.finalize_write_called);
3074   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
3075   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
3076 
3077   ctx_.SendClientStream<64>(EncodeChunk(
3078       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
3079           .set_session_id(kArbitrarySessionId)));
3080   transfer_thread_.WaitUntilEventIsProcessed();
3081   ASSERT_EQ(ctx_.total_responses(), 5u);
3082 }
3083 
3084 class WriteTransferMultibyteVarintChunkSize : public ::testing::Test {
3085  protected:
WriteTransferMultibyteVarintChunkSize()3086   WriteTransferMultibyteVarintChunkSize()
3087       : buffer{},
3088         handler_(7, buffer),
3089         transfer_thread_(chunk_buffer_, encode_buffer_),
3090         system_thread_(TransferThreadOptions(), transfer_thread_),
3091         ctx_(transfer_thread_,
3092              kData256.size(),
3093              // Use a long timeout to avoid accidentally triggering timeouts.
3094              std::chrono::minutes(1),
3095              /*max_retries=*/3) {
3096     ctx_.service().RegisterHandler(handler_);
3097 
3098     PW_CHECK(!handler_.prepare_write_called);
3099     PW_CHECK(!handler_.finalize_write_called);
3100 
3101     ctx_.call();  // Open the write stream
3102     transfer_thread_.WaitUntilEventIsProcessed();
3103   }
3104 
~WriteTransferMultibyteVarintChunkSize()3105   ~WriteTransferMultibyteVarintChunkSize() override {
3106     transfer_thread_.Terminate();
3107     system_thread_.join();
3108   }
3109 
3110   static constexpr auto kData256 =
__anonbff93a0d0e02(size_t i) 3111       bytes::Initialized<256>([](size_t i) { return i; });
3112 
3113   std::array<std::byte, kData256.size()> buffer;
3114   SimpleWriteTransfer handler_;
3115 
3116   Thread<1, 1> transfer_thread_;
3117   pw::Thread system_thread_;
3118   std::array<std::byte, kData256.size()> chunk_buffer_;
3119   std::array<std::byte, kData256.size()> encode_buffer_;
3120   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write, 10) ctx_;
3121 };
3122 
TEST_F(WriteTransferMultibyteVarintChunkSize,AssumesMinimumWindowSizeInFirstParameters)3123 TEST_F(WriteTransferMultibyteVarintChunkSize,
3124        AssumesMinimumWindowSizeInFirstParameters) {
3125   ctx_.SendClientStream(
3126       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
3127                       .set_desired_session_id(kArbitrarySessionId)
3128                       .set_resource_id(7)));
3129 
3130   transfer_thread_.WaitUntilEventIsProcessed();
3131 
3132   EXPECT_TRUE(handler_.prepare_write_called);
3133   EXPECT_FALSE(handler_.finalize_write_called);
3134 
3135   // First, the server responds with a START_ACK, accepting the session ID and
3136   // confirming the protocol version.
3137   ASSERT_EQ(ctx_.total_responses(), 1u);
3138   Chunk chunk = DecodeChunk(ctx_.responses().back());
3139   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3140   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
3141   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3142   EXPECT_EQ(chunk.resource_id(), 7u);
3143 
3144   // Complete the handshake by confirming the server's ACK.
3145   ctx_.SendClientStream(EncodeChunk(
3146       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
3147           .set_session_id(kArbitrarySessionId)));
3148   transfer_thread_.WaitUntilEventIsProcessed();
3149 
3150   // Server should respond by sending its initial transfer parameters.
3151   ASSERT_EQ(ctx_.total_responses(), 2u);
3152 
3153   constexpr uint32_t kExpectedMaxChunkSize = 226;
3154 
3155   chunk = DecodeChunk(ctx_.responses()[1]);
3156   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3157   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
3158   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3159   EXPECT_EQ(chunk.offset(), 0u);
3160   EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
3161   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
3162   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSize);
3163 
3164   // Simulate a timeout, prompting the server to recalculate its transfer
3165   // parameters. The max chunk size and window size should not change.
3166   transfer_thread_.SimulateServerTimeout(kArbitrarySessionId);
3167   ASSERT_EQ(ctx_.total_responses(), 3u);
3168 
3169   chunk = DecodeChunk(ctx_.responses().back());
3170   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
3171   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
3172   EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
3173   EXPECT_EQ(chunk.offset(), 0u);
3174   EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSize);
3175   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
3176   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSize);
3177 }
3178 
3179 class ReadTransferWithStats : public SimpleReadTransfer {
3180  public:
3181   using SimpleReadTransfer::SimpleReadTransfer;
3182 
GetStatus(uint64_t & readable_offset,uint64_t & writeable_offset,uint64_t & read_checksum,uint64_t & write_checksum)3183   Status GetStatus(uint64_t& readable_offset,
3184                    uint64_t& writeable_offset,
3185                    uint64_t& read_checksum,
3186                    uint64_t& write_checksum) override {
3187     readable_offset = 64;
3188     read_checksum = 0xffee;
3189     writeable_offset = 128;
3190     write_checksum = 0xeeff;
3191     return OkStatus();
3192   }
3193 };
3194 
3195 class GetResourceStatus : public ::testing::Test {
3196  protected:
GetResourceStatus(size_t max_chunk_size_bytes=64)3197   GetResourceStatus(size_t max_chunk_size_bytes = 64)
3198       : handler_(3, kData),
3199         transfer_thread_(span(data_buffer_).first(max_chunk_size_bytes),
3200                          encode_buffer_),
3201         ctx_(transfer_thread_,
3202              64,
3203              // Use a long timeout to avoid accidentally triggering timeouts.
3204              std::chrono::minutes(1)),
3205         system_thread_(TransferThreadOptions(), transfer_thread_) {
3206     ctx_.service().RegisterHandler(handler_);
3207 
3208     PW_CHECK(!handler_.prepare_read_called);
3209     PW_CHECK(!handler_.finalize_read_called);
3210   }
3211 
~GetResourceStatus()3212   ~GetResourceStatus() override {
3213     transfer_thread_.Terminate();
3214     system_thread_.join();
3215   }
3216 
3217   ReadTransferWithStats handler_;
3218   Thread<1, 1> transfer_thread_;
3219   PW_RAW_TEST_METHOD_CONTEXT(TransferService, GetResourceStatus) ctx_;
3220   pw::Thread system_thread_;
3221   std::array<std::byte, 64> data_buffer_;
3222   std::array<std::byte, 64> encode_buffer_;
3223 };
3224 
TEST_F(GetResourceStatus,ValidResourceId_ReturnsStatus)3225 TEST_F(GetResourceStatus, ValidResourceId_ReturnsStatus) {
3226   pwpb::ResourceStatusRequest::MemoryEncoder encoder(encode_buffer_);
3227   ASSERT_EQ(encoder.Write({.resource_id = 3}), OkStatus());
3228   ctx_.call(encoder);
3229 
3230   transfer_thread_.WaitUntilEventIsProcessed();
3231 
3232   ASSERT_EQ(ctx_.status(), OkStatus());
3233 
3234   stream::MemoryReader reader(ctx_.response());
3235   pwpb::ResourceStatus::StreamDecoder decoder(reader);
3236   pwpb::ResourceStatus::Message response;
3237   ASSERT_EQ(decoder.Read(response), OkStatus());
3238 
3239   EXPECT_EQ(response.resource_id, 3u);
3240   EXPECT_EQ(response.readable_offset, 64u);
3241   EXPECT_EQ(response.read_checksum, 0xffee);
3242   EXPECT_EQ(response.writeable_offset, 128u);
3243   EXPECT_EQ(response.write_checksum, 0xeeff);
3244 }
3245 
TEST_F(GetResourceStatus,InvalidResourceId_ReturnsNotFound)3246 TEST_F(GetResourceStatus, InvalidResourceId_ReturnsNotFound) {
3247   pwpb::ResourceStatusRequest::MemoryEncoder encoder(encode_buffer_);
3248   ASSERT_EQ(encoder.Write({.resource_id = 27}), OkStatus());
3249   ctx_.call(encoder);
3250 
3251   transfer_thread_.WaitUntilEventIsProcessed();
3252 
3253   ASSERT_EQ(ctx_.status(), Status::NotFound());
3254 
3255   stream::MemoryReader reader(ctx_.response());
3256   pwpb::ResourceStatus::StreamDecoder decoder(reader);
3257   pwpb::ResourceStatus::Message response;
3258   ASSERT_EQ(decoder.Read(response), OkStatus());
3259 
3260   EXPECT_EQ(response.resource_id, 27u);
3261 }
3262 
3263 }  // namespace
3264 }  // namespace pw::transfer::test
3265