xref: /aosp_15_r20/external/pigweed/pw_transfer/transfer_thread_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker 
15*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/transfer_thread.h"
16*61c4878aSAndroid Build Coastguard Worker 
17*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
18*61c4878aSAndroid Build Coastguard Worker #include "pw_bytes/array.h"
19*61c4878aSAndroid Build Coastguard Worker #include "pw_bytes/span.h"
20*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/raw/client_testing.h"
21*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/raw/test_method_context.h"
22*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/test_helpers.h"
23*61c4878aSAndroid Build Coastguard Worker #include "pw_status/status.h"
24*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/thread.h"
25*61c4878aSAndroid Build Coastguard Worker #include "pw_thread_stl/options.h"
26*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/handler.h"
27*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/transfer.h"
28*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/transfer.raw_rpc.pb.h"
29*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer_private/chunk_testing.h"
30*61c4878aSAndroid Build Coastguard Worker #include "pw_unit_test/framework.h"
31*61c4878aSAndroid Build Coastguard Worker 
32*61c4878aSAndroid Build Coastguard Worker namespace pw::transfer::test {
33*61c4878aSAndroid Build Coastguard Worker namespace {
34*61c4878aSAndroid Build Coastguard Worker 
35*61c4878aSAndroid Build Coastguard Worker using internal::Chunk;
36*61c4878aSAndroid Build Coastguard Worker 
37*61c4878aSAndroid Build Coastguard Worker // Effectively unlimited timeout as these tests should never hit it.
38*61c4878aSAndroid Build Coastguard Worker constexpr chrono::SystemClock::duration kNeverTimeout =
39*61c4878aSAndroid Build Coastguard Worker     std::chrono::seconds(60);
40*61c4878aSAndroid Build Coastguard Worker 
41*61c4878aSAndroid Build Coastguard Worker // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()42*61c4878aSAndroid Build Coastguard Worker thread::Options& TransferThreadOptions() {
43*61c4878aSAndroid Build Coastguard Worker   static thread::stl::Options options;
44*61c4878aSAndroid Build Coastguard Worker   return options;
45*61c4878aSAndroid Build Coastguard Worker }
46*61c4878aSAndroid Build Coastguard Worker 
47*61c4878aSAndroid Build Coastguard Worker class TransferThreadTest : public ::testing::Test {
48*61c4878aSAndroid Build Coastguard Worker  public:
TransferThreadTest()49*61c4878aSAndroid Build Coastguard Worker   TransferThreadTest()
50*61c4878aSAndroid Build Coastguard Worker       : max_parameters_(chunk_buffer_.size(),
51*61c4878aSAndroid Build Coastguard Worker                         chunk_buffer_.size(),
52*61c4878aSAndroid Build Coastguard Worker                         cfg::kDefaultExtendWindowDivisor),
53*61c4878aSAndroid Build Coastguard Worker         transfer_thread_(chunk_buffer_, encode_buffer_),
54*61c4878aSAndroid Build Coastguard Worker         system_thread_(TransferThreadOptions(), transfer_thread_),
55*61c4878aSAndroid Build Coastguard Worker         ctx_(transfer_thread_, 512) {}
56*61c4878aSAndroid Build Coastguard Worker 
~TransferThreadTest()57*61c4878aSAndroid Build Coastguard Worker   ~TransferThreadTest() override {
58*61c4878aSAndroid Build Coastguard Worker     transfer_thread_.Terminate();
59*61c4878aSAndroid Build Coastguard Worker     system_thread_.join();
60*61c4878aSAndroid Build Coastguard Worker   }
61*61c4878aSAndroid Build Coastguard Worker 
62*61c4878aSAndroid Build Coastguard Worker  protected:
63*61c4878aSAndroid Build Coastguard Worker   std::array<std::byte, 64> chunk_buffer_;
64*61c4878aSAndroid Build Coastguard Worker   std::array<std::byte, 64> encode_buffer_;
65*61c4878aSAndroid Build Coastguard Worker 
66*61c4878aSAndroid Build Coastguard Worker   rpc::RawClientTestContext<> rpc_client_context_;
67*61c4878aSAndroid Build Coastguard Worker   internal::TransferParameters max_parameters_;
68*61c4878aSAndroid Build Coastguard Worker 
69*61c4878aSAndroid Build Coastguard Worker   transfer::Thread<1, 1> transfer_thread_;
70*61c4878aSAndroid Build Coastguard Worker   pw::Thread system_thread_;
71*61c4878aSAndroid Build Coastguard Worker   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
72*61c4878aSAndroid Build Coastguard Worker };
73*61c4878aSAndroid Build Coastguard Worker 
74*61c4878aSAndroid Build Coastguard Worker class SimpleReadTransfer final : public ReadOnlyHandler {
75*61c4878aSAndroid Build Coastguard Worker  public:
SimpleReadTransfer(uint32_t session_id,ConstByteSpan data)76*61c4878aSAndroid Build Coastguard Worker   SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
77*61c4878aSAndroid Build Coastguard Worker       : ReadOnlyHandler(session_id),
78*61c4878aSAndroid Build Coastguard Worker         prepare_read_called(false),
79*61c4878aSAndroid Build Coastguard Worker         finalize_read_called(false),
80*61c4878aSAndroid Build Coastguard Worker         finalize_read_status(Status::Unknown()),
81*61c4878aSAndroid Build Coastguard Worker         reader_(data) {}
82*61c4878aSAndroid Build Coastguard Worker 
PrepareRead()83*61c4878aSAndroid Build Coastguard Worker   Status PrepareRead() final {
84*61c4878aSAndroid Build Coastguard Worker     PW_CHECK_OK(reader_.Seek(0));
85*61c4878aSAndroid Build Coastguard Worker     set_reader(reader_);
86*61c4878aSAndroid Build Coastguard Worker     prepare_read_called = true;
87*61c4878aSAndroid Build Coastguard Worker     return OkStatus();
88*61c4878aSAndroid Build Coastguard Worker   }
89*61c4878aSAndroid Build Coastguard Worker 
FinalizeRead(Status status)90*61c4878aSAndroid Build Coastguard Worker   void FinalizeRead(Status status) final {
91*61c4878aSAndroid Build Coastguard Worker     finalize_read_called = true;
92*61c4878aSAndroid Build Coastguard Worker     finalize_read_status = status;
93*61c4878aSAndroid Build Coastguard Worker   }
94*61c4878aSAndroid Build Coastguard Worker 
95*61c4878aSAndroid Build Coastguard Worker   bool prepare_read_called;
96*61c4878aSAndroid Build Coastguard Worker   bool finalize_read_called;
97*61c4878aSAndroid Build Coastguard Worker   Status finalize_read_status;
98*61c4878aSAndroid Build Coastguard Worker 
99*61c4878aSAndroid Build Coastguard Worker  private:
100*61c4878aSAndroid Build Coastguard Worker   stream::MemoryReader reader_;
101*61c4878aSAndroid Build Coastguard Worker };
102*61c4878aSAndroid Build Coastguard Worker 
__anon2b14ee840202(size_t i) 103*61c4878aSAndroid Build Coastguard Worker constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
104*61c4878aSAndroid Build Coastguard Worker 
TEST_F(TransferThreadTest,AddTransferHandler)105*61c4878aSAndroid Build Coastguard Worker TEST_F(TransferThreadTest, AddTransferHandler) {
106*61c4878aSAndroid Build Coastguard Worker   auto reader_writer = ctx_.reader_writer();
107*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
108*61c4878aSAndroid Build Coastguard Worker 
109*61c4878aSAndroid Build Coastguard Worker   SimpleReadTransfer handler(3, kData);
110*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.AddTransferHandler(handler);
111*61c4878aSAndroid Build Coastguard Worker 
112*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
113*61c4878aSAndroid Build Coastguard Worker                                        ProtocolVersion::kLegacy,
114*61c4878aSAndroid Build Coastguard Worker                                        3,
115*61c4878aSAndroid Build Coastguard Worker                                        3,
116*61c4878aSAndroid Build Coastguard Worker                                        {},
117*61c4878aSAndroid Build Coastguard Worker                                        max_parameters_,
118*61c4878aSAndroid Build Coastguard Worker                                        kNeverTimeout,
119*61c4878aSAndroid Build Coastguard Worker                                        3,
120*61c4878aSAndroid Build Coastguard Worker                                        10);
121*61c4878aSAndroid Build Coastguard Worker 
122*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.WaitUntilEventIsProcessed();
123*61c4878aSAndroid Build Coastguard Worker 
124*61c4878aSAndroid Build Coastguard Worker   EXPECT_TRUE(handler.prepare_read_called);
125*61c4878aSAndroid Build Coastguard Worker 
126*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.RemoveTransferHandler(handler);
127*61c4878aSAndroid Build Coastguard Worker }
128*61c4878aSAndroid Build Coastguard Worker 
TEST_F(TransferThreadTest,RemoveTransferHandler)129*61c4878aSAndroid Build Coastguard Worker TEST_F(TransferThreadTest, RemoveTransferHandler) {
130*61c4878aSAndroid Build Coastguard Worker   auto reader_writer = ctx_.reader_writer();
131*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
132*61c4878aSAndroid Build Coastguard Worker 
133*61c4878aSAndroid Build Coastguard Worker   SimpleReadTransfer handler(3, kData);
134*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.AddTransferHandler(handler);
135*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.RemoveTransferHandler(handler);
136*61c4878aSAndroid Build Coastguard Worker 
137*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
138*61c4878aSAndroid Build Coastguard Worker                                        ProtocolVersion::kLegacy,
139*61c4878aSAndroid Build Coastguard Worker                                        3,
140*61c4878aSAndroid Build Coastguard Worker                                        3,
141*61c4878aSAndroid Build Coastguard Worker                                        {},
142*61c4878aSAndroid Build Coastguard Worker                                        max_parameters_,
143*61c4878aSAndroid Build Coastguard Worker                                        kNeverTimeout,
144*61c4878aSAndroid Build Coastguard Worker                                        3,
145*61c4878aSAndroid Build Coastguard Worker                                        10);
146*61c4878aSAndroid Build Coastguard Worker 
147*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.WaitUntilEventIsProcessed();
148*61c4878aSAndroid Build Coastguard Worker 
149*61c4878aSAndroid Build Coastguard Worker   EXPECT_FALSE(handler.prepare_read_called);
150*61c4878aSAndroid Build Coastguard Worker 
151*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(ctx_.total_responses(), 1u);
152*61c4878aSAndroid Build Coastguard Worker   auto chunk = DecodeChunk(ctx_.response());
153*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.session_id(), 3u);
154*61c4878aSAndroid Build Coastguard Worker   ASSERT_TRUE(chunk.status().has_value());
155*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.status().value(), Status::NotFound());
156*61c4878aSAndroid Build Coastguard Worker 
157*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.RemoveTransferHandler(handler);
158*61c4878aSAndroid Build Coastguard Worker }
159*61c4878aSAndroid Build Coastguard Worker 
TEST_F(TransferThreadTest,ProcessChunk_SendsWindow)160*61c4878aSAndroid Build Coastguard Worker TEST_F(TransferThreadTest, ProcessChunk_SendsWindow) {
161*61c4878aSAndroid Build Coastguard Worker   auto reader_writer = ctx_.reader_writer();
162*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
163*61c4878aSAndroid Build Coastguard Worker 
164*61c4878aSAndroid Build Coastguard Worker   SimpleReadTransfer handler(3, kData);
165*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.AddTransferHandler(handler);
166*61c4878aSAndroid Build Coastguard Worker 
167*61c4878aSAndroid Build Coastguard Worker   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
168*61c4878aSAndroid Build Coastguard Worker     transfer_thread_.StartServerTransfer(
169*61c4878aSAndroid Build Coastguard Worker         internal::TransferType::kTransmit,
170*61c4878aSAndroid Build Coastguard Worker         ProtocolVersion::kLegacy,
171*61c4878aSAndroid Build Coastguard Worker         3,
172*61c4878aSAndroid Build Coastguard Worker         3,
173*61c4878aSAndroid Build Coastguard Worker         EncodeChunk(
174*61c4878aSAndroid Build Coastguard Worker             Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
175*61c4878aSAndroid Build Coastguard Worker                 .set_session_id(3)
176*61c4878aSAndroid Build Coastguard Worker                 .set_window_end_offset(16)
177*61c4878aSAndroid Build Coastguard Worker                 .set_max_chunk_size_bytes(8)
178*61c4878aSAndroid Build Coastguard Worker                 .set_offset(0)),
179*61c4878aSAndroid Build Coastguard Worker         max_parameters_,
180*61c4878aSAndroid Build Coastguard Worker         kNeverTimeout,
181*61c4878aSAndroid Build Coastguard Worker         3,
182*61c4878aSAndroid Build Coastguard Worker         10);
183*61c4878aSAndroid Build Coastguard Worker   });
184*61c4878aSAndroid Build Coastguard Worker 
185*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(ctx_.total_responses(), 2u);
186*61c4878aSAndroid Build Coastguard Worker   auto chunk = DecodeChunk(ctx_.responses()[0]);
187*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.session_id(), 3u);
188*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.offset(), 0u);
189*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.payload().size(), 8u);
190*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(
191*61c4878aSAndroid Build Coastguard Worker       std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
192*61c4878aSAndroid Build Coastguard Worker       0);
193*61c4878aSAndroid Build Coastguard Worker 
194*61c4878aSAndroid Build Coastguard Worker   chunk = DecodeChunk(ctx_.responses()[1]);
195*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.session_id(), 3u);
196*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.offset(), 8u);
197*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.payload().size(), 8u);
198*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(
199*61c4878aSAndroid Build Coastguard Worker       std::memcmp(
200*61c4878aSAndroid Build Coastguard Worker           chunk.payload().data(), kData.data() + 8, chunk.payload().size()),
201*61c4878aSAndroid Build Coastguard Worker       0);
202*61c4878aSAndroid Build Coastguard Worker 
203*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.RemoveTransferHandler(handler);
204*61c4878aSAndroid Build Coastguard Worker }
205*61c4878aSAndroid Build Coastguard Worker 
TEST_F(TransferThreadTest,StartTransferExhausted_Server)206*61c4878aSAndroid Build Coastguard Worker TEST_F(TransferThreadTest, StartTransferExhausted_Server) {
207*61c4878aSAndroid Build Coastguard Worker   auto reader_writer = ctx_.reader_writer();
208*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
209*61c4878aSAndroid Build Coastguard Worker 
210*61c4878aSAndroid Build Coastguard Worker   SimpleReadTransfer handler3(3, kData);
211*61c4878aSAndroid Build Coastguard Worker   SimpleReadTransfer handler4(4, kData);
212*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.AddTransferHandler(handler3);
213*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.AddTransferHandler(handler4);
214*61c4878aSAndroid Build Coastguard Worker 
215*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.StartServerTransfer(
216*61c4878aSAndroid Build Coastguard Worker       internal::TransferType::kTransmit,
217*61c4878aSAndroid Build Coastguard Worker       ProtocolVersion::kLegacy,
218*61c4878aSAndroid Build Coastguard Worker       3,
219*61c4878aSAndroid Build Coastguard Worker       3,
220*61c4878aSAndroid Build Coastguard Worker       EncodeChunk(
221*61c4878aSAndroid Build Coastguard Worker           Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
222*61c4878aSAndroid Build Coastguard Worker               .set_session_id(3)
223*61c4878aSAndroid Build Coastguard Worker               // Ensure only one chunk is sent as end offset equals max size.
224*61c4878aSAndroid Build Coastguard Worker               .set_window_end_offset(16)
225*61c4878aSAndroid Build Coastguard Worker               .set_max_chunk_size_bytes(16)
226*61c4878aSAndroid Build Coastguard Worker               .set_offset(0)),
227*61c4878aSAndroid Build Coastguard Worker       max_parameters_,
228*61c4878aSAndroid Build Coastguard Worker       kNeverTimeout,
229*61c4878aSAndroid Build Coastguard Worker       3,
230*61c4878aSAndroid Build Coastguard Worker       10);
231*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.WaitUntilEventIsProcessed();
232*61c4878aSAndroid Build Coastguard Worker 
233*61c4878aSAndroid Build Coastguard Worker   // First transfer starts correctly.
234*61c4878aSAndroid Build Coastguard Worker   EXPECT_TRUE(handler3.prepare_read_called);
235*61c4878aSAndroid Build Coastguard Worker   EXPECT_FALSE(handler4.prepare_read_called);
236*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(ctx_.total_responses(), 1u);
237*61c4878aSAndroid Build Coastguard Worker 
238*61c4878aSAndroid Build Coastguard Worker   // Try to start a simultaneous transfer to resource 4, for which the thread
239*61c4878aSAndroid Build Coastguard Worker   // does not have an available context.
240*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.StartServerTransfer(
241*61c4878aSAndroid Build Coastguard Worker       internal::TransferType::kTransmit,
242*61c4878aSAndroid Build Coastguard Worker       ProtocolVersion::kLegacy,
243*61c4878aSAndroid Build Coastguard Worker       4,
244*61c4878aSAndroid Build Coastguard Worker       4,
245*61c4878aSAndroid Build Coastguard Worker       EncodeChunk(
246*61c4878aSAndroid Build Coastguard Worker           Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
247*61c4878aSAndroid Build Coastguard Worker               .set_session_id(4)
248*61c4878aSAndroid Build Coastguard Worker               // Ensure only one chunk is sent as end offset equals max size.
249*61c4878aSAndroid Build Coastguard Worker               .set_window_end_offset(16)
250*61c4878aSAndroid Build Coastguard Worker               .set_max_chunk_size_bytes(16)
251*61c4878aSAndroid Build Coastguard Worker               .set_offset(0)),
252*61c4878aSAndroid Build Coastguard Worker       max_parameters_,
253*61c4878aSAndroid Build Coastguard Worker       kNeverTimeout,
254*61c4878aSAndroid Build Coastguard Worker       3,
255*61c4878aSAndroid Build Coastguard Worker       10);
256*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.WaitUntilEventIsProcessed();
257*61c4878aSAndroid Build Coastguard Worker 
258*61c4878aSAndroid Build Coastguard Worker   EXPECT_FALSE(handler4.prepare_read_called);
259*61c4878aSAndroid Build Coastguard Worker 
260*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(ctx_.total_responses(), 2u);
261*61c4878aSAndroid Build Coastguard Worker   auto chunk = DecodeChunk(ctx_.response());
262*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.session_id(), 4u);
263*61c4878aSAndroid Build Coastguard Worker   ASSERT_TRUE(chunk.status().has_value());
264*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.status().value(), Status::ResourceExhausted());
265*61c4878aSAndroid Build Coastguard Worker 
266*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.RemoveTransferHandler(handler3);
267*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.RemoveTransferHandler(handler4);
268*61c4878aSAndroid Build Coastguard Worker }
269*61c4878aSAndroid Build Coastguard Worker 
TEST_F(TransferThreadTest,StartTransferExhausted_Client)270*61c4878aSAndroid Build Coastguard Worker TEST_F(TransferThreadTest, StartTransferExhausted_Client) {
271*61c4878aSAndroid Build Coastguard Worker   rpc::RawClientReaderWriter read_stream = pw_rpc::raw::Transfer::Read(
272*61c4878aSAndroid Build Coastguard Worker       rpc_client_context_.client(), rpc_client_context_.channel().id());
273*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.SetClientReadStream(read_stream, [](ConstByteSpan) {});
274*61c4878aSAndroid Build Coastguard Worker 
275*61c4878aSAndroid Build Coastguard Worker   Status status3 = Status::Unknown();
276*61c4878aSAndroid Build Coastguard Worker   Status status4 = Status::Unknown();
277*61c4878aSAndroid Build Coastguard Worker 
278*61c4878aSAndroid Build Coastguard Worker   stream::MemoryWriterBuffer<16> buffer3;
279*61c4878aSAndroid Build Coastguard Worker   stream::MemoryWriterBuffer<16> buffer4;
280*61c4878aSAndroid Build Coastguard Worker 
281*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.StartClientTransfer(
282*61c4878aSAndroid Build Coastguard Worker       internal::TransferType::kReceive,
283*61c4878aSAndroid Build Coastguard Worker       ProtocolVersion::kLegacy,
284*61c4878aSAndroid Build Coastguard Worker       /*resource_id=*/3,
285*61c4878aSAndroid Build Coastguard Worker       /*handle_id=*/27,
286*61c4878aSAndroid Build Coastguard Worker       &buffer3,
287*61c4878aSAndroid Build Coastguard Worker       max_parameters_,
288*61c4878aSAndroid Build Coastguard Worker       [&status3](Status status) { status3 = status; },
289*61c4878aSAndroid Build Coastguard Worker       kNeverTimeout,
290*61c4878aSAndroid Build Coastguard Worker       kNeverTimeout,
291*61c4878aSAndroid Build Coastguard Worker       3,
292*61c4878aSAndroid Build Coastguard Worker       10);
293*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.WaitUntilEventIsProcessed();
294*61c4878aSAndroid Build Coastguard Worker 
295*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(status3, Status::Unknown());
296*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(status4, Status::Unknown());
297*61c4878aSAndroid Build Coastguard Worker 
298*61c4878aSAndroid Build Coastguard Worker   // Try to start a simultaneous transfer to resource 4, for which the thread
299*61c4878aSAndroid Build Coastguard Worker   // does not have an available context.
300*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.StartClientTransfer(
301*61c4878aSAndroid Build Coastguard Worker       internal::TransferType::kReceive,
302*61c4878aSAndroid Build Coastguard Worker       ProtocolVersion::kLegacy,
303*61c4878aSAndroid Build Coastguard Worker       /*resource_id=*/4,
304*61c4878aSAndroid Build Coastguard Worker       /*handle_id=*/27,
305*61c4878aSAndroid Build Coastguard Worker       &buffer4,
306*61c4878aSAndroid Build Coastguard Worker       max_parameters_,
307*61c4878aSAndroid Build Coastguard Worker       [&status4](Status status) { status4 = status; },
308*61c4878aSAndroid Build Coastguard Worker       kNeverTimeout,
309*61c4878aSAndroid Build Coastguard Worker       kNeverTimeout,
310*61c4878aSAndroid Build Coastguard Worker       3,
311*61c4878aSAndroid Build Coastguard Worker       10);
312*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.WaitUntilEventIsProcessed();
313*61c4878aSAndroid Build Coastguard Worker 
314*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(status3, Status::Unknown());
315*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(status4, Status::ResourceExhausted());
316*61c4878aSAndroid Build Coastguard Worker 
317*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.EndClientTransfer(3, Status::Cancelled());
318*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.EndClientTransfer(4, Status::Cancelled());
319*61c4878aSAndroid Build Coastguard Worker }
320*61c4878aSAndroid Build Coastguard Worker 
TEST_F(TransferThreadTest,VersionTwo_NoHandler)321*61c4878aSAndroid Build Coastguard Worker TEST_F(TransferThreadTest, VersionTwo_NoHandler) {
322*61c4878aSAndroid Build Coastguard Worker   auto reader_writer = ctx_.reader_writer();
323*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
324*61c4878aSAndroid Build Coastguard Worker 
325*61c4878aSAndroid Build Coastguard Worker   SimpleReadTransfer handler(3, kData);
326*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.AddTransferHandler(handler);
327*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.RemoveTransferHandler(handler);
328*61c4878aSAndroid Build Coastguard Worker 
329*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
330*61c4878aSAndroid Build Coastguard Worker                                        ProtocolVersion::kVersionTwo,
331*61c4878aSAndroid Build Coastguard Worker                                        /*session_id=*/421,
332*61c4878aSAndroid Build Coastguard Worker                                        /*resource_id=*/7,
333*61c4878aSAndroid Build Coastguard Worker                                        {},
334*61c4878aSAndroid Build Coastguard Worker                                        max_parameters_,
335*61c4878aSAndroid Build Coastguard Worker                                        kNeverTimeout,
336*61c4878aSAndroid Build Coastguard Worker                                        3,
337*61c4878aSAndroid Build Coastguard Worker                                        10);
338*61c4878aSAndroid Build Coastguard Worker 
339*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.WaitUntilEventIsProcessed();
340*61c4878aSAndroid Build Coastguard Worker 
341*61c4878aSAndroid Build Coastguard Worker   EXPECT_FALSE(handler.prepare_read_called);
342*61c4878aSAndroid Build Coastguard Worker 
343*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(ctx_.total_responses(), 1u);
344*61c4878aSAndroid Build Coastguard Worker   Result<Chunk::Identifier> id = Chunk::ExtractIdentifier(ctx_.response());
345*61c4878aSAndroid Build Coastguard Worker   ASSERT_TRUE(id.ok());
346*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(id->value(), 421u);
347*61c4878aSAndroid Build Coastguard Worker   auto chunk = DecodeChunk(ctx_.response());
348*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.session_id(), 421u);
349*61c4878aSAndroid Build Coastguard Worker   EXPECT_FALSE(chunk.resource_id().has_value());
350*61c4878aSAndroid Build Coastguard Worker   ASSERT_TRUE(chunk.status().has_value());
351*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.status().value(), Status::NotFound());
352*61c4878aSAndroid Build Coastguard Worker 
353*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.RemoveTransferHandler(handler);
354*61c4878aSAndroid Build Coastguard Worker }
355*61c4878aSAndroid Build Coastguard Worker 
TEST_F(TransferThreadTest,SetStream_TerminatesActiveTransfers)356*61c4878aSAndroid Build Coastguard Worker TEST_F(TransferThreadTest, SetStream_TerminatesActiveTransfers) {
357*61c4878aSAndroid Build Coastguard Worker   auto reader_writer = ctx_.reader_writer();
358*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.SetServerReadStream(reader_writer, [](ConstByteSpan) {});
359*61c4878aSAndroid Build Coastguard Worker 
360*61c4878aSAndroid Build Coastguard Worker   SimpleReadTransfer handler(3, kData);
361*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.AddTransferHandler(handler);
362*61c4878aSAndroid Build Coastguard Worker 
363*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.StartServerTransfer(
364*61c4878aSAndroid Build Coastguard Worker       internal::TransferType::kTransmit,
365*61c4878aSAndroid Build Coastguard Worker       ProtocolVersion::kLegacy,
366*61c4878aSAndroid Build Coastguard Worker       3,
367*61c4878aSAndroid Build Coastguard Worker       3,
368*61c4878aSAndroid Build Coastguard Worker       EncodeChunk(
369*61c4878aSAndroid Build Coastguard Worker           Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
370*61c4878aSAndroid Build Coastguard Worker               .set_session_id(3)
371*61c4878aSAndroid Build Coastguard Worker               .set_window_end_offset(8)
372*61c4878aSAndroid Build Coastguard Worker               .set_max_chunk_size_bytes(8)
373*61c4878aSAndroid Build Coastguard Worker               .set_offset(0)),
374*61c4878aSAndroid Build Coastguard Worker       max_parameters_,
375*61c4878aSAndroid Build Coastguard Worker       kNeverTimeout,
376*61c4878aSAndroid Build Coastguard Worker       3,
377*61c4878aSAndroid Build Coastguard Worker       10);
378*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.WaitUntilEventIsProcessed();
379*61c4878aSAndroid Build Coastguard Worker 
380*61c4878aSAndroid Build Coastguard Worker   EXPECT_FALSE(handler.finalize_read_called);
381*61c4878aSAndroid Build Coastguard Worker 
382*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(ctx_.total_responses(), 1u);
383*61c4878aSAndroid Build Coastguard Worker   auto chunk = DecodeChunk(ctx_.responses().back());
384*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.session_id(), 3u);
385*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.offset(), 0u);
386*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(chunk.payload().size(), 8u);
387*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(
388*61c4878aSAndroid Build Coastguard Worker       std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
389*61c4878aSAndroid Build Coastguard Worker       0);
390*61c4878aSAndroid Build Coastguard Worker 
391*61c4878aSAndroid Build Coastguard Worker   auto new_reader_writer = ctx_.reader_writer();
392*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.SetServerReadStream(new_reader_writer, [](ConstByteSpan) {});
393*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.WaitUntilEventIsProcessed();
394*61c4878aSAndroid Build Coastguard Worker 
395*61c4878aSAndroid Build Coastguard Worker   EXPECT_TRUE(handler.finalize_read_called);
396*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(handler.finalize_read_status, Status::Aborted());
397*61c4878aSAndroid Build Coastguard Worker 
398*61c4878aSAndroid Build Coastguard Worker   transfer_thread_.RemoveTransferHandler(handler);
399*61c4878aSAndroid Build Coastguard Worker }
400*61c4878aSAndroid Build Coastguard Worker 
401*61c4878aSAndroid Build Coastguard Worker }  // namespace
402*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::transfer::test
403