xref: /aosp_15_r20/external/pigweed/pw_transfer/transfer_thread.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker 
15*61c4878aSAndroid Build Coastguard Worker #define PW_LOG_MODULE_NAME "TRN"
16*61c4878aSAndroid Build Coastguard Worker #define PW_LOG_LEVEL PW_TRANSFER_CONFIG_LOG_LEVEL
17*61c4878aSAndroid Build Coastguard Worker 
18*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/transfer_thread.h"
19*61c4878aSAndroid Build Coastguard Worker 
20*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
21*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
22*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/internal/chunk.h"
23*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/internal/client_context.h"
24*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/internal/config.h"
25*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/internal/event.h"
26*61c4878aSAndroid Build Coastguard Worker 
27*61c4878aSAndroid Build Coastguard Worker PW_MODIFY_DIAGNOSTICS_PUSH();
28*61c4878aSAndroid Build Coastguard Worker PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
29*61c4878aSAndroid Build Coastguard Worker 
30*61c4878aSAndroid Build Coastguard Worker namespace pw::transfer::internal {
31*61c4878aSAndroid Build Coastguard Worker 
Terminate()32*61c4878aSAndroid Build Coastguard Worker void TransferThread::Terminate() {
33*61c4878aSAndroid Build Coastguard Worker   next_event_ownership_.acquire();
34*61c4878aSAndroid Build Coastguard Worker   next_event_.type = EventType::kTerminate;
35*61c4878aSAndroid Build Coastguard Worker   event_notification_.release();
36*61c4878aSAndroid Build Coastguard Worker }
37*61c4878aSAndroid Build Coastguard Worker 
SimulateTimeout(EventType type,uint32_t session_id)38*61c4878aSAndroid Build Coastguard Worker void TransferThread::SimulateTimeout(EventType type, uint32_t session_id) {
39*61c4878aSAndroid Build Coastguard Worker   next_event_ownership_.acquire();
40*61c4878aSAndroid Build Coastguard Worker 
41*61c4878aSAndroid Build Coastguard Worker   next_event_.type = type;
42*61c4878aSAndroid Build Coastguard Worker   next_event_.chunk = {};
43*61c4878aSAndroid Build Coastguard Worker   next_event_.chunk.context_identifier = session_id;
44*61c4878aSAndroid Build Coastguard Worker 
45*61c4878aSAndroid Build Coastguard Worker   event_notification_.release();
46*61c4878aSAndroid Build Coastguard Worker 
47*61c4878aSAndroid Build Coastguard Worker   WaitUntilEventIsProcessed();
48*61c4878aSAndroid Build Coastguard Worker }
49*61c4878aSAndroid Build Coastguard Worker 
Run()50*61c4878aSAndroid Build Coastguard Worker void TransferThread::Run() {
51*61c4878aSAndroid Build Coastguard Worker   // Next event starts freed.
52*61c4878aSAndroid Build Coastguard Worker   next_event_ownership_.release();
53*61c4878aSAndroid Build Coastguard Worker 
54*61c4878aSAndroid Build Coastguard Worker   while (true) {
55*61c4878aSAndroid Build Coastguard Worker     if (event_notification_.try_acquire_until(GetNextTransferTimeout())) {
56*61c4878aSAndroid Build Coastguard Worker       HandleEvent(next_event_);
57*61c4878aSAndroid Build Coastguard Worker 
58*61c4878aSAndroid Build Coastguard Worker       // Sample event type before we release ownership of next_event_.
59*61c4878aSAndroid Build Coastguard Worker       bool is_terminating = next_event_.type == EventType::kTerminate;
60*61c4878aSAndroid Build Coastguard Worker 
61*61c4878aSAndroid Build Coastguard Worker       // Finished processing the event. Allow the next_event struct to be
62*61c4878aSAndroid Build Coastguard Worker       // overwritten.
63*61c4878aSAndroid Build Coastguard Worker       next_event_ownership_.release();
64*61c4878aSAndroid Build Coastguard Worker 
65*61c4878aSAndroid Build Coastguard Worker       if (is_terminating) {
66*61c4878aSAndroid Build Coastguard Worker         return;
67*61c4878aSAndroid Build Coastguard Worker       }
68*61c4878aSAndroid Build Coastguard Worker     }
69*61c4878aSAndroid Build Coastguard Worker 
70*61c4878aSAndroid Build Coastguard Worker     // Regardless of whether an event was received or not, check for any
71*61c4878aSAndroid Build Coastguard Worker     // transfers which have timed out and process them if so.
72*61c4878aSAndroid Build Coastguard Worker     for (Context& context : client_transfers_) {
73*61c4878aSAndroid Build Coastguard Worker       if (context.timed_out()) {
74*61c4878aSAndroid Build Coastguard Worker         context.HandleEvent({.type = EventType::kClientTimeout});
75*61c4878aSAndroid Build Coastguard Worker       }
76*61c4878aSAndroid Build Coastguard Worker     }
77*61c4878aSAndroid Build Coastguard Worker     for (Context& context : server_transfers_) {
78*61c4878aSAndroid Build Coastguard Worker       if (context.timed_out()) {
79*61c4878aSAndroid Build Coastguard Worker         context.HandleEvent({.type = EventType::kServerTimeout});
80*61c4878aSAndroid Build Coastguard Worker       }
81*61c4878aSAndroid Build Coastguard Worker     }
82*61c4878aSAndroid Build Coastguard Worker   }
83*61c4878aSAndroid Build Coastguard Worker }
84*61c4878aSAndroid Build Coastguard Worker 
GetNextTransferTimeout() const85*61c4878aSAndroid Build Coastguard Worker chrono::SystemClock::time_point TransferThread::GetNextTransferTimeout() const {
86*61c4878aSAndroid Build Coastguard Worker   chrono::SystemClock::time_point timeout =
87*61c4878aSAndroid Build Coastguard Worker       chrono::SystemClock::TimePointAfterAtLeast(kMaxTimeout);
88*61c4878aSAndroid Build Coastguard Worker 
89*61c4878aSAndroid Build Coastguard Worker   for (Context& context : client_transfers_) {
90*61c4878aSAndroid Build Coastguard Worker     auto ctx_timeout = context.timeout();
91*61c4878aSAndroid Build Coastguard Worker     if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
92*61c4878aSAndroid Build Coastguard Worker       timeout = ctx_timeout.value();
93*61c4878aSAndroid Build Coastguard Worker     }
94*61c4878aSAndroid Build Coastguard Worker   }
95*61c4878aSAndroid Build Coastguard Worker   for (Context& context : server_transfers_) {
96*61c4878aSAndroid Build Coastguard Worker     auto ctx_timeout = context.timeout();
97*61c4878aSAndroid Build Coastguard Worker     if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
98*61c4878aSAndroid Build Coastguard Worker       timeout = ctx_timeout.value();
99*61c4878aSAndroid Build Coastguard Worker     }
100*61c4878aSAndroid Build Coastguard Worker   }
101*61c4878aSAndroid Build Coastguard Worker 
102*61c4878aSAndroid Build Coastguard Worker   return timeout;
103*61c4878aSAndroid Build Coastguard Worker }
104*61c4878aSAndroid Build Coastguard Worker 
StartTransfer(TransferType type,ProtocolVersion version,uint32_t session_id,uint32_t resource_id,uint32_t handle_id,ConstByteSpan raw_chunk,stream::Stream * stream,const TransferParameters & max_parameters,Function<void (Status)> && on_completion,chrono::SystemClock::duration timeout,chrono::SystemClock::duration initial_timeout,uint8_t max_retries,uint32_t max_lifetime_retries,uint32_t initial_offset)105*61c4878aSAndroid Build Coastguard Worker void TransferThread::StartTransfer(
106*61c4878aSAndroid Build Coastguard Worker     TransferType type,
107*61c4878aSAndroid Build Coastguard Worker     ProtocolVersion version,
108*61c4878aSAndroid Build Coastguard Worker     uint32_t session_id,
109*61c4878aSAndroid Build Coastguard Worker     uint32_t resource_id,
110*61c4878aSAndroid Build Coastguard Worker     uint32_t handle_id,
111*61c4878aSAndroid Build Coastguard Worker     ConstByteSpan raw_chunk,
112*61c4878aSAndroid Build Coastguard Worker     stream::Stream* stream,
113*61c4878aSAndroid Build Coastguard Worker     const TransferParameters& max_parameters,
114*61c4878aSAndroid Build Coastguard Worker     Function<void(Status)>&& on_completion,
115*61c4878aSAndroid Build Coastguard Worker     chrono::SystemClock::duration timeout,
116*61c4878aSAndroid Build Coastguard Worker     chrono::SystemClock::duration initial_timeout,
117*61c4878aSAndroid Build Coastguard Worker     uint8_t max_retries,
118*61c4878aSAndroid Build Coastguard Worker     uint32_t max_lifetime_retries,
119*61c4878aSAndroid Build Coastguard Worker     uint32_t initial_offset) {
120*61c4878aSAndroid Build Coastguard Worker   if (!TryWaitForEventToProcess()) {
121*61c4878aSAndroid Build Coastguard Worker     return;
122*61c4878aSAndroid Build Coastguard Worker   }
123*61c4878aSAndroid Build Coastguard Worker 
124*61c4878aSAndroid Build Coastguard Worker   bool is_client_transfer = stream != nullptr;
125*61c4878aSAndroid Build Coastguard Worker 
126*61c4878aSAndroid Build Coastguard Worker   if (is_client_transfer) {
127*61c4878aSAndroid Build Coastguard Worker     if (version == ProtocolVersion::kLegacy) {
128*61c4878aSAndroid Build Coastguard Worker       session_id = resource_id;
129*61c4878aSAndroid Build Coastguard Worker     } else if (session_id == Context::kUnassignedSessionId) {
130*61c4878aSAndroid Build Coastguard Worker       session_id = AssignSessionId();
131*61c4878aSAndroid Build Coastguard Worker     }
132*61c4878aSAndroid Build Coastguard Worker   }
133*61c4878aSAndroid Build Coastguard Worker 
134*61c4878aSAndroid Build Coastguard Worker   next_event_.type = is_client_transfer ? EventType::kNewClientTransfer
135*61c4878aSAndroid Build Coastguard Worker                                         : EventType::kNewServerTransfer;
136*61c4878aSAndroid Build Coastguard Worker 
137*61c4878aSAndroid Build Coastguard Worker   if (!raw_chunk.empty()) {
138*61c4878aSAndroid Build Coastguard Worker     std::memcpy(chunk_buffer_.data(), raw_chunk.data(), raw_chunk.size());
139*61c4878aSAndroid Build Coastguard Worker   }
140*61c4878aSAndroid Build Coastguard Worker 
141*61c4878aSAndroid Build Coastguard Worker   next_event_.new_transfer = {
142*61c4878aSAndroid Build Coastguard Worker       .type = type,
143*61c4878aSAndroid Build Coastguard Worker       .protocol_version = version,
144*61c4878aSAndroid Build Coastguard Worker       .session_id = session_id,
145*61c4878aSAndroid Build Coastguard Worker       .resource_id = resource_id,
146*61c4878aSAndroid Build Coastguard Worker       .handle_id = handle_id,
147*61c4878aSAndroid Build Coastguard Worker       .max_parameters = &max_parameters,
148*61c4878aSAndroid Build Coastguard Worker       .timeout = timeout,
149*61c4878aSAndroid Build Coastguard Worker       .initial_timeout = initial_timeout,
150*61c4878aSAndroid Build Coastguard Worker       .max_retries = max_retries,
151*61c4878aSAndroid Build Coastguard Worker       .max_lifetime_retries = max_lifetime_retries,
152*61c4878aSAndroid Build Coastguard Worker       .transfer_thread = this,
153*61c4878aSAndroid Build Coastguard Worker       .raw_chunk_data = chunk_buffer_.data(),
154*61c4878aSAndroid Build Coastguard Worker       .raw_chunk_size = raw_chunk.size(),
155*61c4878aSAndroid Build Coastguard Worker       .initial_offset = initial_offset,
156*61c4878aSAndroid Build Coastguard Worker   };
157*61c4878aSAndroid Build Coastguard Worker 
158*61c4878aSAndroid Build Coastguard Worker   staged_on_completion_ = std::move(on_completion);
159*61c4878aSAndroid Build Coastguard Worker 
160*61c4878aSAndroid Build Coastguard Worker   // The transfer is initialized with either a stream (client-side) or a handler
161*61c4878aSAndroid Build Coastguard Worker   // (server-side). If no stream is provided, try to find a registered handler
162*61c4878aSAndroid Build Coastguard Worker   // with the specified ID.
163*61c4878aSAndroid Build Coastguard Worker   if (is_client_transfer) {
164*61c4878aSAndroid Build Coastguard Worker     next_event_.new_transfer.stream = stream;
165*61c4878aSAndroid Build Coastguard Worker     next_event_.new_transfer.rpc_writer =
166*61c4878aSAndroid Build Coastguard Worker         &(type == TransferType::kTransmit ? client_write_stream_
167*61c4878aSAndroid Build Coastguard Worker                                           : client_read_stream_)
168*61c4878aSAndroid Build Coastguard Worker              .as_writer();
169*61c4878aSAndroid Build Coastguard Worker   } else {
170*61c4878aSAndroid Build Coastguard Worker     auto handler = std::find_if(handlers_.begin(),
171*61c4878aSAndroid Build Coastguard Worker                                 handlers_.end(),
172*61c4878aSAndroid Build Coastguard Worker                                 [&](auto& h) { return h.id() == resource_id; });
173*61c4878aSAndroid Build Coastguard Worker     if (handler != handlers_.end()) {
174*61c4878aSAndroid Build Coastguard Worker       next_event_.new_transfer.handler = &*handler;
175*61c4878aSAndroid Build Coastguard Worker       next_event_.new_transfer.rpc_writer =
176*61c4878aSAndroid Build Coastguard Worker           &(type == TransferType::kTransmit ? server_read_stream_
177*61c4878aSAndroid Build Coastguard Worker                                             : server_write_stream_)
178*61c4878aSAndroid Build Coastguard Worker                .as_writer();
179*61c4878aSAndroid Build Coastguard Worker     } else {
180*61c4878aSAndroid Build Coastguard Worker       // No handler exists for the transfer: return a NOT_FOUND.
181*61c4878aSAndroid Build Coastguard Worker       next_event_.type = EventType::kSendStatusChunk;
182*61c4878aSAndroid Build Coastguard Worker       next_event_.send_status_chunk = {
183*61c4878aSAndroid Build Coastguard Worker           .session_id = session_id,
184*61c4878aSAndroid Build Coastguard Worker           .protocol_version = version,
185*61c4878aSAndroid Build Coastguard Worker           .status = Status::NotFound().code(),
186*61c4878aSAndroid Build Coastguard Worker           .stream = type == TransferType::kTransmit
187*61c4878aSAndroid Build Coastguard Worker                         ? TransferStream::kServerRead
188*61c4878aSAndroid Build Coastguard Worker                         : TransferStream::kServerWrite,
189*61c4878aSAndroid Build Coastguard Worker       };
190*61c4878aSAndroid Build Coastguard Worker     }
191*61c4878aSAndroid Build Coastguard Worker   }
192*61c4878aSAndroid Build Coastguard Worker 
193*61c4878aSAndroid Build Coastguard Worker   event_notification_.release();
194*61c4878aSAndroid Build Coastguard Worker }
195*61c4878aSAndroid Build Coastguard Worker 
ProcessChunk(EventType type,ConstByteSpan chunk)196*61c4878aSAndroid Build Coastguard Worker void TransferThread::ProcessChunk(EventType type, ConstByteSpan chunk) {
197*61c4878aSAndroid Build Coastguard Worker   // If this assert is hit, there is a bug in the transfer implementation.
198*61c4878aSAndroid Build Coastguard Worker   // Contexts' max_chunk_size_bytes fields should be set based on the size of
199*61c4878aSAndroid Build Coastguard Worker   // chunk_buffer_.
200*61c4878aSAndroid Build Coastguard Worker   PW_CHECK(chunk.size() <= chunk_buffer_.size(),
201*61c4878aSAndroid Build Coastguard Worker            "Transfer received a larger chunk than it can handle.");
202*61c4878aSAndroid Build Coastguard Worker 
203*61c4878aSAndroid Build Coastguard Worker   Result<Chunk::Identifier> identifier = Chunk::ExtractIdentifier(chunk);
204*61c4878aSAndroid Build Coastguard Worker   if (!identifier.ok()) {
205*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Received a malformed chunk without a context identifier");
206*61c4878aSAndroid Build Coastguard Worker     return;
207*61c4878aSAndroid Build Coastguard Worker   }
208*61c4878aSAndroid Build Coastguard Worker 
209*61c4878aSAndroid Build Coastguard Worker   if (!TryWaitForEventToProcess()) {
210*61c4878aSAndroid Build Coastguard Worker     return;
211*61c4878aSAndroid Build Coastguard Worker   }
212*61c4878aSAndroid Build Coastguard Worker 
213*61c4878aSAndroid Build Coastguard Worker   std::memcpy(chunk_buffer_.data(), chunk.data(), chunk.size());
214*61c4878aSAndroid Build Coastguard Worker 
215*61c4878aSAndroid Build Coastguard Worker   next_event_.type = type;
216*61c4878aSAndroid Build Coastguard Worker   next_event_.chunk = {
217*61c4878aSAndroid Build Coastguard Worker       .context_identifier = identifier->value(),
218*61c4878aSAndroid Build Coastguard Worker       .match_resource_id = identifier->is_legacy(),
219*61c4878aSAndroid Build Coastguard Worker       .data = chunk_buffer_.data(),
220*61c4878aSAndroid Build Coastguard Worker       .size = chunk.size(),
221*61c4878aSAndroid Build Coastguard Worker   };
222*61c4878aSAndroid Build Coastguard Worker 
223*61c4878aSAndroid Build Coastguard Worker   event_notification_.release();
224*61c4878aSAndroid Build Coastguard Worker }
225*61c4878aSAndroid Build Coastguard Worker 
SendStatus(TransferStream stream,uint32_t session_id,ProtocolVersion version,Status status)226*61c4878aSAndroid Build Coastguard Worker void TransferThread::SendStatus(TransferStream stream,
227*61c4878aSAndroid Build Coastguard Worker                                 uint32_t session_id,
228*61c4878aSAndroid Build Coastguard Worker                                 ProtocolVersion version,
229*61c4878aSAndroid Build Coastguard Worker                                 Status status) {
230*61c4878aSAndroid Build Coastguard Worker   if (!TryWaitForEventToProcess()) {
231*61c4878aSAndroid Build Coastguard Worker     return;
232*61c4878aSAndroid Build Coastguard Worker   }
233*61c4878aSAndroid Build Coastguard Worker 
234*61c4878aSAndroid Build Coastguard Worker   next_event_.type = EventType::kSendStatusChunk;
235*61c4878aSAndroid Build Coastguard Worker   next_event_.send_status_chunk = {
236*61c4878aSAndroid Build Coastguard Worker       .session_id = session_id,
237*61c4878aSAndroid Build Coastguard Worker       .protocol_version = version,
238*61c4878aSAndroid Build Coastguard Worker       .status = status.code(),
239*61c4878aSAndroid Build Coastguard Worker       .stream = stream,
240*61c4878aSAndroid Build Coastguard Worker   };
241*61c4878aSAndroid Build Coastguard Worker 
242*61c4878aSAndroid Build Coastguard Worker   event_notification_.release();
243*61c4878aSAndroid Build Coastguard Worker }
244*61c4878aSAndroid Build Coastguard Worker 
EndTransfer(EventType type,IdentifierType id_type,uint32_t id,Status status,bool send_status_chunk)245*61c4878aSAndroid Build Coastguard Worker void TransferThread::EndTransfer(EventType type,
246*61c4878aSAndroid Build Coastguard Worker                                  IdentifierType id_type,
247*61c4878aSAndroid Build Coastguard Worker                                  uint32_t id,
248*61c4878aSAndroid Build Coastguard Worker                                  Status status,
249*61c4878aSAndroid Build Coastguard Worker                                  bool send_status_chunk) {
250*61c4878aSAndroid Build Coastguard Worker   if (!TryWaitForEventToProcess()) {
251*61c4878aSAndroid Build Coastguard Worker     return;
252*61c4878aSAndroid Build Coastguard Worker   }
253*61c4878aSAndroid Build Coastguard Worker 
254*61c4878aSAndroid Build Coastguard Worker   next_event_.type = type;
255*61c4878aSAndroid Build Coastguard Worker   next_event_.end_transfer = {
256*61c4878aSAndroid Build Coastguard Worker       .id_type = id_type,
257*61c4878aSAndroid Build Coastguard Worker       .id = id,
258*61c4878aSAndroid Build Coastguard Worker       .status = status.code(),
259*61c4878aSAndroid Build Coastguard Worker       .send_status_chunk = send_status_chunk,
260*61c4878aSAndroid Build Coastguard Worker   };
261*61c4878aSAndroid Build Coastguard Worker 
262*61c4878aSAndroid Build Coastguard Worker   event_notification_.release();
263*61c4878aSAndroid Build Coastguard Worker }
264*61c4878aSAndroid Build Coastguard Worker 
SetStream(TransferStream stream)265*61c4878aSAndroid Build Coastguard Worker void TransferThread::SetStream(TransferStream stream) {
266*61c4878aSAndroid Build Coastguard Worker   if (!TryWaitForEventToProcess()) {
267*61c4878aSAndroid Build Coastguard Worker     return;
268*61c4878aSAndroid Build Coastguard Worker   }
269*61c4878aSAndroid Build Coastguard Worker 
270*61c4878aSAndroid Build Coastguard Worker   next_event_.type = EventType::kSetStream;
271*61c4878aSAndroid Build Coastguard Worker   next_event_.set_stream = {
272*61c4878aSAndroid Build Coastguard Worker       .stream = stream,
273*61c4878aSAndroid Build Coastguard Worker   };
274*61c4878aSAndroid Build Coastguard Worker 
275*61c4878aSAndroid Build Coastguard Worker   event_notification_.release();
276*61c4878aSAndroid Build Coastguard Worker }
277*61c4878aSAndroid Build Coastguard Worker 
UpdateClientTransfer(uint32_t handle_id,size_t transfer_size_bytes)278*61c4878aSAndroid Build Coastguard Worker void TransferThread::UpdateClientTransfer(uint32_t handle_id,
279*61c4878aSAndroid Build Coastguard Worker                                           size_t transfer_size_bytes) {
280*61c4878aSAndroid Build Coastguard Worker   if (!TryWaitForEventToProcess()) {
281*61c4878aSAndroid Build Coastguard Worker     return;
282*61c4878aSAndroid Build Coastguard Worker   }
283*61c4878aSAndroid Build Coastguard Worker 
284*61c4878aSAndroid Build Coastguard Worker   next_event_.type = EventType::kUpdateClientTransfer;
285*61c4878aSAndroid Build Coastguard Worker   next_event_.update_transfer.handle_id = handle_id;
286*61c4878aSAndroid Build Coastguard Worker   next_event_.update_transfer.transfer_size_bytes = transfer_size_bytes;
287*61c4878aSAndroid Build Coastguard Worker 
288*61c4878aSAndroid Build Coastguard Worker   event_notification_.release();
289*61c4878aSAndroid Build Coastguard Worker }
290*61c4878aSAndroid Build Coastguard Worker 
TransferHandlerEvent(EventType type,Handler & handler)291*61c4878aSAndroid Build Coastguard Worker bool TransferThread::TransferHandlerEvent(EventType type, Handler& handler) {
292*61c4878aSAndroid Build Coastguard Worker   if (!TryWaitForEventToProcess()) {
293*61c4878aSAndroid Build Coastguard Worker     return false;
294*61c4878aSAndroid Build Coastguard Worker   }
295*61c4878aSAndroid Build Coastguard Worker 
296*61c4878aSAndroid Build Coastguard Worker   next_event_.type = type;
297*61c4878aSAndroid Build Coastguard Worker   if (type == EventType::kAddTransferHandler) {
298*61c4878aSAndroid Build Coastguard Worker     next_event_.add_transfer_handler = &handler;
299*61c4878aSAndroid Build Coastguard Worker   } else {
300*61c4878aSAndroid Build Coastguard Worker     next_event_.remove_transfer_handler = &handler;
301*61c4878aSAndroid Build Coastguard Worker   }
302*61c4878aSAndroid Build Coastguard Worker 
303*61c4878aSAndroid Build Coastguard Worker   event_notification_.release();
304*61c4878aSAndroid Build Coastguard Worker   return true;
305*61c4878aSAndroid Build Coastguard Worker }
306*61c4878aSAndroid Build Coastguard Worker 
HandleEvent(const internal::Event & event)307*61c4878aSAndroid Build Coastguard Worker void TransferThread::HandleEvent(const internal::Event& event) {
308*61c4878aSAndroid Build Coastguard Worker   switch (event.type) {
309*61c4878aSAndroid Build Coastguard Worker     case EventType::kTerminate:
310*61c4878aSAndroid Build Coastguard Worker       // Terminate server contexts.
311*61c4878aSAndroid Build Coastguard Worker       for (ServerContext& server_context : server_transfers_) {
312*61c4878aSAndroid Build Coastguard Worker         server_context.HandleEvent(Event{
313*61c4878aSAndroid Build Coastguard Worker             .type = EventType::kServerEndTransfer,
314*61c4878aSAndroid Build Coastguard Worker             .end_transfer =
315*61c4878aSAndroid Build Coastguard Worker                 EndTransferEvent{
316*61c4878aSAndroid Build Coastguard Worker                     .id_type = IdentifierType::Session,
317*61c4878aSAndroid Build Coastguard Worker                     .id = server_context.session_id(),
318*61c4878aSAndroid Build Coastguard Worker                     .status = Status::Aborted().code(),
319*61c4878aSAndroid Build Coastguard Worker                     .send_status_chunk = false,
320*61c4878aSAndroid Build Coastguard Worker                 },
321*61c4878aSAndroid Build Coastguard Worker         });
322*61c4878aSAndroid Build Coastguard Worker       }
323*61c4878aSAndroid Build Coastguard Worker 
324*61c4878aSAndroid Build Coastguard Worker       // Terminate client contexts.
325*61c4878aSAndroid Build Coastguard Worker       for (ClientContext& client_context : client_transfers_) {
326*61c4878aSAndroid Build Coastguard Worker         client_context.HandleEvent(Event{
327*61c4878aSAndroid Build Coastguard Worker             .type = EventType::kClientEndTransfer,
328*61c4878aSAndroid Build Coastguard Worker             .end_transfer =
329*61c4878aSAndroid Build Coastguard Worker                 EndTransferEvent{
330*61c4878aSAndroid Build Coastguard Worker                     .id_type = IdentifierType::Session,
331*61c4878aSAndroid Build Coastguard Worker                     .id = client_context.session_id(),
332*61c4878aSAndroid Build Coastguard Worker                     .status = Status::Aborted().code(),
333*61c4878aSAndroid Build Coastguard Worker                     .send_status_chunk = false,
334*61c4878aSAndroid Build Coastguard Worker                 },
335*61c4878aSAndroid Build Coastguard Worker         });
336*61c4878aSAndroid Build Coastguard Worker       }
337*61c4878aSAndroid Build Coastguard Worker 
338*61c4878aSAndroid Build Coastguard Worker       // Cancel/Finish streams.
339*61c4878aSAndroid Build Coastguard Worker       client_read_stream_.Cancel().IgnoreError();
340*61c4878aSAndroid Build Coastguard Worker       client_write_stream_.Cancel().IgnoreError();
341*61c4878aSAndroid Build Coastguard Worker       server_read_stream_.Finish(Status::Aborted()).IgnoreError();
342*61c4878aSAndroid Build Coastguard Worker       server_write_stream_.Finish(Status::Aborted()).IgnoreError();
343*61c4878aSAndroid Build Coastguard Worker       return;
344*61c4878aSAndroid Build Coastguard Worker 
345*61c4878aSAndroid Build Coastguard Worker     case EventType::kSendStatusChunk:
346*61c4878aSAndroid Build Coastguard Worker       SendStatusChunk(event.send_status_chunk);
347*61c4878aSAndroid Build Coastguard Worker       break;
348*61c4878aSAndroid Build Coastguard Worker 
349*61c4878aSAndroid Build Coastguard Worker     case EventType::kAddTransferHandler:
350*61c4878aSAndroid Build Coastguard Worker       handlers_.push_front(*event.add_transfer_handler);
351*61c4878aSAndroid Build Coastguard Worker       return;
352*61c4878aSAndroid Build Coastguard Worker 
353*61c4878aSAndroid Build Coastguard Worker     case EventType::kRemoveTransferHandler:
354*61c4878aSAndroid Build Coastguard Worker       for (ServerContext& server_context : server_transfers_) {
355*61c4878aSAndroid Build Coastguard Worker         if (server_context.handler() == event.remove_transfer_handler) {
356*61c4878aSAndroid Build Coastguard Worker           server_context.HandleEvent(Event{
357*61c4878aSAndroid Build Coastguard Worker               .type = EventType::kServerEndTransfer,
358*61c4878aSAndroid Build Coastguard Worker               .end_transfer =
359*61c4878aSAndroid Build Coastguard Worker                   EndTransferEvent{
360*61c4878aSAndroid Build Coastguard Worker                       .id_type = IdentifierType::Session,
361*61c4878aSAndroid Build Coastguard Worker                       .id = server_context.session_id(),
362*61c4878aSAndroid Build Coastguard Worker                       .status = Status::Aborted().code(),
363*61c4878aSAndroid Build Coastguard Worker                       .send_status_chunk = false,
364*61c4878aSAndroid Build Coastguard Worker                   },
365*61c4878aSAndroid Build Coastguard Worker           });
366*61c4878aSAndroid Build Coastguard Worker         }
367*61c4878aSAndroid Build Coastguard Worker       }
368*61c4878aSAndroid Build Coastguard Worker       handlers_.remove(*event.remove_transfer_handler);
369*61c4878aSAndroid Build Coastguard Worker       return;
370*61c4878aSAndroid Build Coastguard Worker 
371*61c4878aSAndroid Build Coastguard Worker     case EventType::kSetStream:
372*61c4878aSAndroid Build Coastguard Worker       HandleSetStreamEvent(event.set_stream.stream);
373*61c4878aSAndroid Build Coastguard Worker       return;
374*61c4878aSAndroid Build Coastguard Worker 
375*61c4878aSAndroid Build Coastguard Worker     case EventType::kGetResourceStatus:
376*61c4878aSAndroid Build Coastguard Worker       GetResourceState(event.resource_status.resource_id);
377*61c4878aSAndroid Build Coastguard Worker       return;
378*61c4878aSAndroid Build Coastguard Worker 
379*61c4878aSAndroid Build Coastguard Worker     case EventType::kNewClientTransfer:
380*61c4878aSAndroid Build Coastguard Worker     case EventType::kNewServerTransfer:
381*61c4878aSAndroid Build Coastguard Worker     case EventType::kClientChunk:
382*61c4878aSAndroid Build Coastguard Worker     case EventType::kServerChunk:
383*61c4878aSAndroid Build Coastguard Worker     case EventType::kClientTimeout:
384*61c4878aSAndroid Build Coastguard Worker     case EventType::kServerTimeout:
385*61c4878aSAndroid Build Coastguard Worker     case EventType::kClientEndTransfer:
386*61c4878aSAndroid Build Coastguard Worker     case EventType::kServerEndTransfer:
387*61c4878aSAndroid Build Coastguard Worker     case EventType::kUpdateClientTransfer:
388*61c4878aSAndroid Build Coastguard Worker     default:
389*61c4878aSAndroid Build Coastguard Worker       // Other events are handled by individual transfer contexts.
390*61c4878aSAndroid Build Coastguard Worker       break;
391*61c4878aSAndroid Build Coastguard Worker   }
392*61c4878aSAndroid Build Coastguard Worker 
393*61c4878aSAndroid Build Coastguard Worker   Context* ctx = FindContextForEvent(event);
394*61c4878aSAndroid Build Coastguard Worker   if (ctx == nullptr) {
395*61c4878aSAndroid Build Coastguard Worker     // No context was found. For new transfer events, report a
396*61c4878aSAndroid Build Coastguard Worker     // RESOURCE_EXHAUSTED error with starting the transfer.
397*61c4878aSAndroid Build Coastguard Worker     if (event.type == EventType::kNewClientTransfer) {
398*61c4878aSAndroid Build Coastguard Worker       // On the client, invoke the completion callback directly.
399*61c4878aSAndroid Build Coastguard Worker       staged_on_completion_(Status::ResourceExhausted());
400*61c4878aSAndroid Build Coastguard Worker     } else if (event.type == EventType::kNewServerTransfer) {
401*61c4878aSAndroid Build Coastguard Worker       // On the server, send a status chunk back to the client.
402*61c4878aSAndroid Build Coastguard Worker       SendStatusChunk(
403*61c4878aSAndroid Build Coastguard Worker           {.session_id = event.new_transfer.session_id,
404*61c4878aSAndroid Build Coastguard Worker            .protocol_version = event.new_transfer.protocol_version,
405*61c4878aSAndroid Build Coastguard Worker            .status = Status::ResourceExhausted().code(),
406*61c4878aSAndroid Build Coastguard Worker            .stream = event.new_transfer.type == TransferType::kTransmit
407*61c4878aSAndroid Build Coastguard Worker                          ? TransferStream::kServerRead
408*61c4878aSAndroid Build Coastguard Worker                          : TransferStream::kServerWrite});
409*61c4878aSAndroid Build Coastguard Worker     }
410*61c4878aSAndroid Build Coastguard Worker     return;
411*61c4878aSAndroid Build Coastguard Worker   }
412*61c4878aSAndroid Build Coastguard Worker 
413*61c4878aSAndroid Build Coastguard Worker   if (event.type == EventType::kNewClientTransfer) {
414*61c4878aSAndroid Build Coastguard Worker     // TODO(frolv): This is terrible.
415*61c4878aSAndroid Build Coastguard Worker     ClientContext* cctx = static_cast<ClientContext*>(ctx);
416*61c4878aSAndroid Build Coastguard Worker     cctx->set_on_completion(std::move(staged_on_completion_));
417*61c4878aSAndroid Build Coastguard Worker     cctx->set_handle_id(event.new_transfer.handle_id);
418*61c4878aSAndroid Build Coastguard Worker   }
419*61c4878aSAndroid Build Coastguard Worker 
420*61c4878aSAndroid Build Coastguard Worker   if (event.type == EventType::kUpdateClientTransfer) {
421*61c4878aSAndroid Build Coastguard Worker     static_cast<ClientContext&>(*ctx).set_transfer_size_bytes(
422*61c4878aSAndroid Build Coastguard Worker         event.update_transfer.transfer_size_bytes);
423*61c4878aSAndroid Build Coastguard Worker     return;
424*61c4878aSAndroid Build Coastguard Worker   }
425*61c4878aSAndroid Build Coastguard Worker 
426*61c4878aSAndroid Build Coastguard Worker   ctx->HandleEvent(event);
427*61c4878aSAndroid Build Coastguard Worker }
428*61c4878aSAndroid Build Coastguard Worker 
FindContextForEvent(const internal::Event & event) const429*61c4878aSAndroid Build Coastguard Worker Context* TransferThread::FindContextForEvent(
430*61c4878aSAndroid Build Coastguard Worker     const internal::Event& event) const {
431*61c4878aSAndroid Build Coastguard Worker   switch (event.type) {
432*61c4878aSAndroid Build Coastguard Worker     case EventType::kNewClientTransfer:
433*61c4878aSAndroid Build Coastguard Worker       return FindNewTransfer(client_transfers_, event.new_transfer.session_id);
434*61c4878aSAndroid Build Coastguard Worker     case EventType::kNewServerTransfer:
435*61c4878aSAndroid Build Coastguard Worker       return FindNewTransfer(server_transfers_, event.new_transfer.session_id);
436*61c4878aSAndroid Build Coastguard Worker 
437*61c4878aSAndroid Build Coastguard Worker     case EventType::kClientChunk:
438*61c4878aSAndroid Build Coastguard Worker       if (event.chunk.match_resource_id) {
439*61c4878aSAndroid Build Coastguard Worker         return FindActiveTransferByResourceId(client_transfers_,
440*61c4878aSAndroid Build Coastguard Worker                                               event.chunk.context_identifier);
441*61c4878aSAndroid Build Coastguard Worker       }
442*61c4878aSAndroid Build Coastguard Worker       return FindActiveTransferByLegacyId(client_transfers_,
443*61c4878aSAndroid Build Coastguard Worker                                           event.chunk.context_identifier);
444*61c4878aSAndroid Build Coastguard Worker 
445*61c4878aSAndroid Build Coastguard Worker     case EventType::kServerChunk:
446*61c4878aSAndroid Build Coastguard Worker       if (event.chunk.match_resource_id) {
447*61c4878aSAndroid Build Coastguard Worker         return FindActiveTransferByResourceId(server_transfers_,
448*61c4878aSAndroid Build Coastguard Worker                                               event.chunk.context_identifier);
449*61c4878aSAndroid Build Coastguard Worker       }
450*61c4878aSAndroid Build Coastguard Worker       return FindActiveTransferByLegacyId(server_transfers_,
451*61c4878aSAndroid Build Coastguard Worker                                           event.chunk.context_identifier);
452*61c4878aSAndroid Build Coastguard Worker 
453*61c4878aSAndroid Build Coastguard Worker     case EventType::kClientTimeout:  // Manually triggered client timeout
454*61c4878aSAndroid Build Coastguard Worker       return FindActiveTransferByLegacyId(client_transfers_,
455*61c4878aSAndroid Build Coastguard Worker                                           event.chunk.context_identifier);
456*61c4878aSAndroid Build Coastguard Worker     case EventType::kServerTimeout:  // Manually triggered server timeout
457*61c4878aSAndroid Build Coastguard Worker       return FindActiveTransferByLegacyId(server_transfers_,
458*61c4878aSAndroid Build Coastguard Worker                                           event.chunk.context_identifier);
459*61c4878aSAndroid Build Coastguard Worker 
460*61c4878aSAndroid Build Coastguard Worker     case EventType::kClientEndTransfer:
461*61c4878aSAndroid Build Coastguard Worker       if (event.end_transfer.id_type == IdentifierType::Handle) {
462*61c4878aSAndroid Build Coastguard Worker         return FindClientTransferByHandleId(event.end_transfer.id);
463*61c4878aSAndroid Build Coastguard Worker       }
464*61c4878aSAndroid Build Coastguard Worker       return FindActiveTransferByLegacyId(client_transfers_,
465*61c4878aSAndroid Build Coastguard Worker                                           event.end_transfer.id);
466*61c4878aSAndroid Build Coastguard Worker     case EventType::kServerEndTransfer:
467*61c4878aSAndroid Build Coastguard Worker       PW_DCHECK(event.end_transfer.id_type != IdentifierType::Handle);
468*61c4878aSAndroid Build Coastguard Worker       return FindActiveTransferByLegacyId(server_transfers_,
469*61c4878aSAndroid Build Coastguard Worker                                           event.end_transfer.id);
470*61c4878aSAndroid Build Coastguard Worker 
471*61c4878aSAndroid Build Coastguard Worker     case EventType::kUpdateClientTransfer:
472*61c4878aSAndroid Build Coastguard Worker       return FindClientTransferByHandleId(event.update_transfer.handle_id);
473*61c4878aSAndroid Build Coastguard Worker 
474*61c4878aSAndroid Build Coastguard Worker     case EventType::kSendStatusChunk:
475*61c4878aSAndroid Build Coastguard Worker     case EventType::kAddTransferHandler:
476*61c4878aSAndroid Build Coastguard Worker     case EventType::kRemoveTransferHandler:
477*61c4878aSAndroid Build Coastguard Worker     case EventType::kSetStream:
478*61c4878aSAndroid Build Coastguard Worker     case EventType::kTerminate:
479*61c4878aSAndroid Build Coastguard Worker     case EventType::kGetResourceStatus:
480*61c4878aSAndroid Build Coastguard Worker     default:
481*61c4878aSAndroid Build Coastguard Worker       return nullptr;
482*61c4878aSAndroid Build Coastguard Worker   }
483*61c4878aSAndroid Build Coastguard Worker }
484*61c4878aSAndroid Build Coastguard Worker 
SendStatusChunk(const internal::SendStatusChunkEvent & event)485*61c4878aSAndroid Build Coastguard Worker void TransferThread::SendStatusChunk(
486*61c4878aSAndroid Build Coastguard Worker     const internal::SendStatusChunkEvent& event) {
487*61c4878aSAndroid Build Coastguard Worker   rpc::Writer& destination = stream_for(event.stream);
488*61c4878aSAndroid Build Coastguard Worker 
489*61c4878aSAndroid Build Coastguard Worker   Chunk chunk =
490*61c4878aSAndroid Build Coastguard Worker       Chunk::Final(event.protocol_version, event.session_id, event.status);
491*61c4878aSAndroid Build Coastguard Worker 
492*61c4878aSAndroid Build Coastguard Worker   Result<ConstByteSpan> result = chunk.Encode(chunk_buffer_);
493*61c4878aSAndroid Build Coastguard Worker   if (!result.ok()) {
494*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Failed to encode final chunk for transfer %u",
495*61c4878aSAndroid Build Coastguard Worker                  static_cast<unsigned>(event.session_id));
496*61c4878aSAndroid Build Coastguard Worker     return;
497*61c4878aSAndroid Build Coastguard Worker   }
498*61c4878aSAndroid Build Coastguard Worker 
499*61c4878aSAndroid Build Coastguard Worker   if (!destination.Write(result.value()).ok()) {
500*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Failed to send final chunk for transfer %u",
501*61c4878aSAndroid Build Coastguard Worker                  static_cast<unsigned>(event.session_id));
502*61c4878aSAndroid Build Coastguard Worker     return;
503*61c4878aSAndroid Build Coastguard Worker   }
504*61c4878aSAndroid Build Coastguard Worker }
505*61c4878aSAndroid Build Coastguard Worker 
506*61c4878aSAndroid Build Coastguard Worker // Should only be called with the `next_event_ownership_` lock held.
AssignSessionId()507*61c4878aSAndroid Build Coastguard Worker uint32_t TransferThread::AssignSessionId() {
508*61c4878aSAndroid Build Coastguard Worker   uint32_t session_id = next_session_id_++;
509*61c4878aSAndroid Build Coastguard Worker   if (session_id == 0) {
510*61c4878aSAndroid Build Coastguard Worker     session_id = next_session_id_++;
511*61c4878aSAndroid Build Coastguard Worker   }
512*61c4878aSAndroid Build Coastguard Worker   return session_id;
513*61c4878aSAndroid Build Coastguard Worker }
514*61c4878aSAndroid Build Coastguard Worker 
515*61c4878aSAndroid Build Coastguard Worker template <typename T>
TerminateTransfers(span<T> contexts,TransferType type,EventType event_type,Status status)516*61c4878aSAndroid Build Coastguard Worker void TerminateTransfers(span<T> contexts,
517*61c4878aSAndroid Build Coastguard Worker                         TransferType type,
518*61c4878aSAndroid Build Coastguard Worker                         EventType event_type,
519*61c4878aSAndroid Build Coastguard Worker                         Status status) {
520*61c4878aSAndroid Build Coastguard Worker   for (Context& context : contexts) {
521*61c4878aSAndroid Build Coastguard Worker     if (context.active() && context.type() == type) {
522*61c4878aSAndroid Build Coastguard Worker       context.HandleEvent(Event{
523*61c4878aSAndroid Build Coastguard Worker           .type = event_type,
524*61c4878aSAndroid Build Coastguard Worker           .end_transfer =
525*61c4878aSAndroid Build Coastguard Worker               EndTransferEvent{
526*61c4878aSAndroid Build Coastguard Worker                   .id_type = IdentifierType::Session,
527*61c4878aSAndroid Build Coastguard Worker                   .id = context.session_id(),
528*61c4878aSAndroid Build Coastguard Worker                   .status = status.code(),
529*61c4878aSAndroid Build Coastguard Worker                   .send_status_chunk = false,
530*61c4878aSAndroid Build Coastguard Worker               },
531*61c4878aSAndroid Build Coastguard Worker       });
532*61c4878aSAndroid Build Coastguard Worker     }
533*61c4878aSAndroid Build Coastguard Worker   }
534*61c4878aSAndroid Build Coastguard Worker }
535*61c4878aSAndroid Build Coastguard Worker 
HandleSetStreamEvent(TransferStream stream)536*61c4878aSAndroid Build Coastguard Worker void TransferThread::HandleSetStreamEvent(TransferStream stream) {
537*61c4878aSAndroid Build Coastguard Worker   switch (stream) {
538*61c4878aSAndroid Build Coastguard Worker     case TransferStream::kClientRead:
539*61c4878aSAndroid Build Coastguard Worker       TerminateTransfers(client_transfers_,
540*61c4878aSAndroid Build Coastguard Worker                          TransferType::kReceive,
541*61c4878aSAndroid Build Coastguard Worker                          EventType::kClientEndTransfer,
542*61c4878aSAndroid Build Coastguard Worker                          Status::Aborted());
543*61c4878aSAndroid Build Coastguard Worker       client_read_stream_ = std::move(staged_client_stream_);
544*61c4878aSAndroid Build Coastguard Worker       client_read_stream_.set_on_next(std::move(staged_client_on_next_));
545*61c4878aSAndroid Build Coastguard Worker       client_read_stream_.set_on_error([](Status status) {
546*61c4878aSAndroid Build Coastguard Worker         PW_LOG_WARN("Client read stream closed unexpectedly: %s", status.str());
547*61c4878aSAndroid Build Coastguard Worker       });
548*61c4878aSAndroid Build Coastguard Worker       break;
549*61c4878aSAndroid Build Coastguard Worker     case TransferStream::kClientWrite:
550*61c4878aSAndroid Build Coastguard Worker       TerminateTransfers(client_transfers_,
551*61c4878aSAndroid Build Coastguard Worker                          TransferType::kTransmit,
552*61c4878aSAndroid Build Coastguard Worker                          EventType::kClientEndTransfer,
553*61c4878aSAndroid Build Coastguard Worker                          Status::Aborted());
554*61c4878aSAndroid Build Coastguard Worker       client_write_stream_ = std::move(staged_client_stream_);
555*61c4878aSAndroid Build Coastguard Worker       client_write_stream_.set_on_next(std::move(staged_client_on_next_));
556*61c4878aSAndroid Build Coastguard Worker       client_write_stream_.set_on_error([](Status status) {
557*61c4878aSAndroid Build Coastguard Worker         PW_LOG_WARN("Client write stream closed unexpectedly: %s",
558*61c4878aSAndroid Build Coastguard Worker                     status.str());
559*61c4878aSAndroid Build Coastguard Worker       });
560*61c4878aSAndroid Build Coastguard Worker       break;
561*61c4878aSAndroid Build Coastguard Worker     case TransferStream::kServerRead:
562*61c4878aSAndroid Build Coastguard Worker       TerminateTransfers(server_transfers_,
563*61c4878aSAndroid Build Coastguard Worker                          TransferType::kTransmit,
564*61c4878aSAndroid Build Coastguard Worker                          EventType::kServerEndTransfer,
565*61c4878aSAndroid Build Coastguard Worker                          Status::Aborted());
566*61c4878aSAndroid Build Coastguard Worker       server_read_stream_ = std::move(staged_server_stream_);
567*61c4878aSAndroid Build Coastguard Worker       server_read_stream_.set_on_next(std::move(staged_server_on_next_));
568*61c4878aSAndroid Build Coastguard Worker       server_read_stream_.set_on_error([](Status status) {
569*61c4878aSAndroid Build Coastguard Worker         PW_LOG_WARN("Server read stream closed unexpectedly: %s", status.str());
570*61c4878aSAndroid Build Coastguard Worker       });
571*61c4878aSAndroid Build Coastguard Worker       break;
572*61c4878aSAndroid Build Coastguard Worker     case TransferStream::kServerWrite:
573*61c4878aSAndroid Build Coastguard Worker       TerminateTransfers(server_transfers_,
574*61c4878aSAndroid Build Coastguard Worker                          TransferType::kReceive,
575*61c4878aSAndroid Build Coastguard Worker                          EventType::kServerEndTransfer,
576*61c4878aSAndroid Build Coastguard Worker                          Status::Aborted());
577*61c4878aSAndroid Build Coastguard Worker       server_write_stream_ = std::move(staged_server_stream_);
578*61c4878aSAndroid Build Coastguard Worker       server_write_stream_.set_on_next(std::move(staged_server_on_next_));
579*61c4878aSAndroid Build Coastguard Worker       server_write_stream_.set_on_error([](Status status) {
580*61c4878aSAndroid Build Coastguard Worker         PW_LOG_WARN("Server write stream closed unexpectedly: %s",
581*61c4878aSAndroid Build Coastguard Worker                     status.str());
582*61c4878aSAndroid Build Coastguard Worker       });
583*61c4878aSAndroid Build Coastguard Worker       break;
584*61c4878aSAndroid Build Coastguard Worker   }
585*61c4878aSAndroid Build Coastguard Worker }
586*61c4878aSAndroid Build Coastguard Worker 
587*61c4878aSAndroid Build Coastguard Worker // Adds GetResourceStatusEvent to the queue. Will fail if there is already a
588*61c4878aSAndroid Build Coastguard Worker // GetResourceStatusEvent in process.
EnqueueResourceEvent(uint32_t resource_id,ResourceStatusCallback && callback)589*61c4878aSAndroid Build Coastguard Worker void TransferThread::EnqueueResourceEvent(uint32_t resource_id,
590*61c4878aSAndroid Build Coastguard Worker                                           ResourceStatusCallback&& callback) {
591*61c4878aSAndroid Build Coastguard Worker   if (!TryWaitForEventToProcess()) {
592*61c4878aSAndroid Build Coastguard Worker     return;
593*61c4878aSAndroid Build Coastguard Worker   }
594*61c4878aSAndroid Build Coastguard Worker 
595*61c4878aSAndroid Build Coastguard Worker   next_event_.type = EventType::kGetResourceStatus;
596*61c4878aSAndroid Build Coastguard Worker 
597*61c4878aSAndroid Build Coastguard Worker   resource_status_callback_ = std::move(callback);
598*61c4878aSAndroid Build Coastguard Worker 
599*61c4878aSAndroid Build Coastguard Worker   next_event_.resource_status.resource_id = resource_id;
600*61c4878aSAndroid Build Coastguard Worker 
601*61c4878aSAndroid Build Coastguard Worker   event_notification_.release();
602*61c4878aSAndroid Build Coastguard Worker }
603*61c4878aSAndroid Build Coastguard Worker 
604*61c4878aSAndroid Build Coastguard Worker // Should only be called when we got a valid callback and RPC responder from
605*61c4878aSAndroid Build Coastguard Worker // GetResourceStatus transfer RPC.
GetResourceState(uint32_t resource_id)606*61c4878aSAndroid Build Coastguard Worker void TransferThread::GetResourceState(uint32_t resource_id) {
607*61c4878aSAndroid Build Coastguard Worker   PW_ASSERT(resource_status_callback_ != nullptr);
608*61c4878aSAndroid Build Coastguard Worker 
609*61c4878aSAndroid Build Coastguard Worker   auto handler = std::find_if(handlers_.begin(), handlers_.end(), [&](auto& h) {
610*61c4878aSAndroid Build Coastguard Worker     return h.id() == resource_id;
611*61c4878aSAndroid Build Coastguard Worker   });
612*61c4878aSAndroid Build Coastguard Worker   internal::ResourceStatus stats;
613*61c4878aSAndroid Build Coastguard Worker   stats.resource_id = resource_id;
614*61c4878aSAndroid Build Coastguard Worker 
615*61c4878aSAndroid Build Coastguard Worker   if (handler != handlers_.end()) {
616*61c4878aSAndroid Build Coastguard Worker     Status status = handler->GetStatus(stats.readable_offset,
617*61c4878aSAndroid Build Coastguard Worker                                        stats.writeable_offset,
618*61c4878aSAndroid Build Coastguard Worker                                        stats.read_checksum,
619*61c4878aSAndroid Build Coastguard Worker                                        stats.write_checksum);
620*61c4878aSAndroid Build Coastguard Worker 
621*61c4878aSAndroid Build Coastguard Worker     resource_status_callback_(status, stats);
622*61c4878aSAndroid Build Coastguard Worker   } else {
623*61c4878aSAndroid Build Coastguard Worker     resource_status_callback_(Status::NotFound(), stats);
624*61c4878aSAndroid Build Coastguard Worker   }
625*61c4878aSAndroid Build Coastguard Worker }
626*61c4878aSAndroid Build Coastguard Worker 
627*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::transfer::internal
628*61c4878aSAndroid Build Coastguard Worker 
629*61c4878aSAndroid Build Coastguard Worker PW_MODIFY_DIAGNOSTICS_POP();
630