1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Worker #define PW_LOG_MODULE_NAME "TRN"
16*61c4878aSAndroid Build Coastguard Worker #define PW_LOG_LEVEL PW_TRANSFER_CONFIG_LOG_LEVEL
17*61c4878aSAndroid Build Coastguard Worker
18*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/transfer_thread.h"
19*61c4878aSAndroid Build Coastguard Worker
20*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
21*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
22*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/internal/chunk.h"
23*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/internal/client_context.h"
24*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/internal/config.h"
25*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/internal/event.h"
26*61c4878aSAndroid Build Coastguard Worker
27*61c4878aSAndroid Build Coastguard Worker PW_MODIFY_DIAGNOSTICS_PUSH();
28*61c4878aSAndroid Build Coastguard Worker PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
29*61c4878aSAndroid Build Coastguard Worker
30*61c4878aSAndroid Build Coastguard Worker namespace pw::transfer::internal {
31*61c4878aSAndroid Build Coastguard Worker
Terminate()32*61c4878aSAndroid Build Coastguard Worker void TransferThread::Terminate() {
33*61c4878aSAndroid Build Coastguard Worker next_event_ownership_.acquire();
34*61c4878aSAndroid Build Coastguard Worker next_event_.type = EventType::kTerminate;
35*61c4878aSAndroid Build Coastguard Worker event_notification_.release();
36*61c4878aSAndroid Build Coastguard Worker }
37*61c4878aSAndroid Build Coastguard Worker
SimulateTimeout(EventType type,uint32_t session_id)38*61c4878aSAndroid Build Coastguard Worker void TransferThread::SimulateTimeout(EventType type, uint32_t session_id) {
39*61c4878aSAndroid Build Coastguard Worker next_event_ownership_.acquire();
40*61c4878aSAndroid Build Coastguard Worker
41*61c4878aSAndroid Build Coastguard Worker next_event_.type = type;
42*61c4878aSAndroid Build Coastguard Worker next_event_.chunk = {};
43*61c4878aSAndroid Build Coastguard Worker next_event_.chunk.context_identifier = session_id;
44*61c4878aSAndroid Build Coastguard Worker
45*61c4878aSAndroid Build Coastguard Worker event_notification_.release();
46*61c4878aSAndroid Build Coastguard Worker
47*61c4878aSAndroid Build Coastguard Worker WaitUntilEventIsProcessed();
48*61c4878aSAndroid Build Coastguard Worker }
49*61c4878aSAndroid Build Coastguard Worker
Run()50*61c4878aSAndroid Build Coastguard Worker void TransferThread::Run() {
51*61c4878aSAndroid Build Coastguard Worker // Next event starts freed.
52*61c4878aSAndroid Build Coastguard Worker next_event_ownership_.release();
53*61c4878aSAndroid Build Coastguard Worker
54*61c4878aSAndroid Build Coastguard Worker while (true) {
55*61c4878aSAndroid Build Coastguard Worker if (event_notification_.try_acquire_until(GetNextTransferTimeout())) {
56*61c4878aSAndroid Build Coastguard Worker HandleEvent(next_event_);
57*61c4878aSAndroid Build Coastguard Worker
58*61c4878aSAndroid Build Coastguard Worker // Sample event type before we release ownership of next_event_.
59*61c4878aSAndroid Build Coastguard Worker bool is_terminating = next_event_.type == EventType::kTerminate;
60*61c4878aSAndroid Build Coastguard Worker
61*61c4878aSAndroid Build Coastguard Worker // Finished processing the event. Allow the next_event struct to be
62*61c4878aSAndroid Build Coastguard Worker // overwritten.
63*61c4878aSAndroid Build Coastguard Worker next_event_ownership_.release();
64*61c4878aSAndroid Build Coastguard Worker
65*61c4878aSAndroid Build Coastguard Worker if (is_terminating) {
66*61c4878aSAndroid Build Coastguard Worker return;
67*61c4878aSAndroid Build Coastguard Worker }
68*61c4878aSAndroid Build Coastguard Worker }
69*61c4878aSAndroid Build Coastguard Worker
70*61c4878aSAndroid Build Coastguard Worker // Regardless of whether an event was received or not, check for any
71*61c4878aSAndroid Build Coastguard Worker // transfers which have timed out and process them if so.
72*61c4878aSAndroid Build Coastguard Worker for (Context& context : client_transfers_) {
73*61c4878aSAndroid Build Coastguard Worker if (context.timed_out()) {
74*61c4878aSAndroid Build Coastguard Worker context.HandleEvent({.type = EventType::kClientTimeout});
75*61c4878aSAndroid Build Coastguard Worker }
76*61c4878aSAndroid Build Coastguard Worker }
77*61c4878aSAndroid Build Coastguard Worker for (Context& context : server_transfers_) {
78*61c4878aSAndroid Build Coastguard Worker if (context.timed_out()) {
79*61c4878aSAndroid Build Coastguard Worker context.HandleEvent({.type = EventType::kServerTimeout});
80*61c4878aSAndroid Build Coastguard Worker }
81*61c4878aSAndroid Build Coastguard Worker }
82*61c4878aSAndroid Build Coastguard Worker }
83*61c4878aSAndroid Build Coastguard Worker }
84*61c4878aSAndroid Build Coastguard Worker
GetNextTransferTimeout() const85*61c4878aSAndroid Build Coastguard Worker chrono::SystemClock::time_point TransferThread::GetNextTransferTimeout() const {
86*61c4878aSAndroid Build Coastguard Worker chrono::SystemClock::time_point timeout =
87*61c4878aSAndroid Build Coastguard Worker chrono::SystemClock::TimePointAfterAtLeast(kMaxTimeout);
88*61c4878aSAndroid Build Coastguard Worker
89*61c4878aSAndroid Build Coastguard Worker for (Context& context : client_transfers_) {
90*61c4878aSAndroid Build Coastguard Worker auto ctx_timeout = context.timeout();
91*61c4878aSAndroid Build Coastguard Worker if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
92*61c4878aSAndroid Build Coastguard Worker timeout = ctx_timeout.value();
93*61c4878aSAndroid Build Coastguard Worker }
94*61c4878aSAndroid Build Coastguard Worker }
95*61c4878aSAndroid Build Coastguard Worker for (Context& context : server_transfers_) {
96*61c4878aSAndroid Build Coastguard Worker auto ctx_timeout = context.timeout();
97*61c4878aSAndroid Build Coastguard Worker if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
98*61c4878aSAndroid Build Coastguard Worker timeout = ctx_timeout.value();
99*61c4878aSAndroid Build Coastguard Worker }
100*61c4878aSAndroid Build Coastguard Worker }
101*61c4878aSAndroid Build Coastguard Worker
102*61c4878aSAndroid Build Coastguard Worker return timeout;
103*61c4878aSAndroid Build Coastguard Worker }
104*61c4878aSAndroid Build Coastguard Worker
StartTransfer(TransferType type,ProtocolVersion version,uint32_t session_id,uint32_t resource_id,uint32_t handle_id,ConstByteSpan raw_chunk,stream::Stream * stream,const TransferParameters & max_parameters,Function<void (Status)> && on_completion,chrono::SystemClock::duration timeout,chrono::SystemClock::duration initial_timeout,uint8_t max_retries,uint32_t max_lifetime_retries,uint32_t initial_offset)105*61c4878aSAndroid Build Coastguard Worker void TransferThread::StartTransfer(
106*61c4878aSAndroid Build Coastguard Worker TransferType type,
107*61c4878aSAndroid Build Coastguard Worker ProtocolVersion version,
108*61c4878aSAndroid Build Coastguard Worker uint32_t session_id,
109*61c4878aSAndroid Build Coastguard Worker uint32_t resource_id,
110*61c4878aSAndroid Build Coastguard Worker uint32_t handle_id,
111*61c4878aSAndroid Build Coastguard Worker ConstByteSpan raw_chunk,
112*61c4878aSAndroid Build Coastguard Worker stream::Stream* stream,
113*61c4878aSAndroid Build Coastguard Worker const TransferParameters& max_parameters,
114*61c4878aSAndroid Build Coastguard Worker Function<void(Status)>&& on_completion,
115*61c4878aSAndroid Build Coastguard Worker chrono::SystemClock::duration timeout,
116*61c4878aSAndroid Build Coastguard Worker chrono::SystemClock::duration initial_timeout,
117*61c4878aSAndroid Build Coastguard Worker uint8_t max_retries,
118*61c4878aSAndroid Build Coastguard Worker uint32_t max_lifetime_retries,
119*61c4878aSAndroid Build Coastguard Worker uint32_t initial_offset) {
120*61c4878aSAndroid Build Coastguard Worker if (!TryWaitForEventToProcess()) {
121*61c4878aSAndroid Build Coastguard Worker return;
122*61c4878aSAndroid Build Coastguard Worker }
123*61c4878aSAndroid Build Coastguard Worker
124*61c4878aSAndroid Build Coastguard Worker bool is_client_transfer = stream != nullptr;
125*61c4878aSAndroid Build Coastguard Worker
126*61c4878aSAndroid Build Coastguard Worker if (is_client_transfer) {
127*61c4878aSAndroid Build Coastguard Worker if (version == ProtocolVersion::kLegacy) {
128*61c4878aSAndroid Build Coastguard Worker session_id = resource_id;
129*61c4878aSAndroid Build Coastguard Worker } else if (session_id == Context::kUnassignedSessionId) {
130*61c4878aSAndroid Build Coastguard Worker session_id = AssignSessionId();
131*61c4878aSAndroid Build Coastguard Worker }
132*61c4878aSAndroid Build Coastguard Worker }
133*61c4878aSAndroid Build Coastguard Worker
134*61c4878aSAndroid Build Coastguard Worker next_event_.type = is_client_transfer ? EventType::kNewClientTransfer
135*61c4878aSAndroid Build Coastguard Worker : EventType::kNewServerTransfer;
136*61c4878aSAndroid Build Coastguard Worker
137*61c4878aSAndroid Build Coastguard Worker if (!raw_chunk.empty()) {
138*61c4878aSAndroid Build Coastguard Worker std::memcpy(chunk_buffer_.data(), raw_chunk.data(), raw_chunk.size());
139*61c4878aSAndroid Build Coastguard Worker }
140*61c4878aSAndroid Build Coastguard Worker
141*61c4878aSAndroid Build Coastguard Worker next_event_.new_transfer = {
142*61c4878aSAndroid Build Coastguard Worker .type = type,
143*61c4878aSAndroid Build Coastguard Worker .protocol_version = version,
144*61c4878aSAndroid Build Coastguard Worker .session_id = session_id,
145*61c4878aSAndroid Build Coastguard Worker .resource_id = resource_id,
146*61c4878aSAndroid Build Coastguard Worker .handle_id = handle_id,
147*61c4878aSAndroid Build Coastguard Worker .max_parameters = &max_parameters,
148*61c4878aSAndroid Build Coastguard Worker .timeout = timeout,
149*61c4878aSAndroid Build Coastguard Worker .initial_timeout = initial_timeout,
150*61c4878aSAndroid Build Coastguard Worker .max_retries = max_retries,
151*61c4878aSAndroid Build Coastguard Worker .max_lifetime_retries = max_lifetime_retries,
152*61c4878aSAndroid Build Coastguard Worker .transfer_thread = this,
153*61c4878aSAndroid Build Coastguard Worker .raw_chunk_data = chunk_buffer_.data(),
154*61c4878aSAndroid Build Coastguard Worker .raw_chunk_size = raw_chunk.size(),
155*61c4878aSAndroid Build Coastguard Worker .initial_offset = initial_offset,
156*61c4878aSAndroid Build Coastguard Worker };
157*61c4878aSAndroid Build Coastguard Worker
158*61c4878aSAndroid Build Coastguard Worker staged_on_completion_ = std::move(on_completion);
159*61c4878aSAndroid Build Coastguard Worker
160*61c4878aSAndroid Build Coastguard Worker // The transfer is initialized with either a stream (client-side) or a handler
161*61c4878aSAndroid Build Coastguard Worker // (server-side). If no stream is provided, try to find a registered handler
162*61c4878aSAndroid Build Coastguard Worker // with the specified ID.
163*61c4878aSAndroid Build Coastguard Worker if (is_client_transfer) {
164*61c4878aSAndroid Build Coastguard Worker next_event_.new_transfer.stream = stream;
165*61c4878aSAndroid Build Coastguard Worker next_event_.new_transfer.rpc_writer =
166*61c4878aSAndroid Build Coastguard Worker &(type == TransferType::kTransmit ? client_write_stream_
167*61c4878aSAndroid Build Coastguard Worker : client_read_stream_)
168*61c4878aSAndroid Build Coastguard Worker .as_writer();
169*61c4878aSAndroid Build Coastguard Worker } else {
170*61c4878aSAndroid Build Coastguard Worker auto handler = std::find_if(handlers_.begin(),
171*61c4878aSAndroid Build Coastguard Worker handlers_.end(),
172*61c4878aSAndroid Build Coastguard Worker [&](auto& h) { return h.id() == resource_id; });
173*61c4878aSAndroid Build Coastguard Worker if (handler != handlers_.end()) {
174*61c4878aSAndroid Build Coastguard Worker next_event_.new_transfer.handler = &*handler;
175*61c4878aSAndroid Build Coastguard Worker next_event_.new_transfer.rpc_writer =
176*61c4878aSAndroid Build Coastguard Worker &(type == TransferType::kTransmit ? server_read_stream_
177*61c4878aSAndroid Build Coastguard Worker : server_write_stream_)
178*61c4878aSAndroid Build Coastguard Worker .as_writer();
179*61c4878aSAndroid Build Coastguard Worker } else {
180*61c4878aSAndroid Build Coastguard Worker // No handler exists for the transfer: return a NOT_FOUND.
181*61c4878aSAndroid Build Coastguard Worker next_event_.type = EventType::kSendStatusChunk;
182*61c4878aSAndroid Build Coastguard Worker next_event_.send_status_chunk = {
183*61c4878aSAndroid Build Coastguard Worker .session_id = session_id,
184*61c4878aSAndroid Build Coastguard Worker .protocol_version = version,
185*61c4878aSAndroid Build Coastguard Worker .status = Status::NotFound().code(),
186*61c4878aSAndroid Build Coastguard Worker .stream = type == TransferType::kTransmit
187*61c4878aSAndroid Build Coastguard Worker ? TransferStream::kServerRead
188*61c4878aSAndroid Build Coastguard Worker : TransferStream::kServerWrite,
189*61c4878aSAndroid Build Coastguard Worker };
190*61c4878aSAndroid Build Coastguard Worker }
191*61c4878aSAndroid Build Coastguard Worker }
192*61c4878aSAndroid Build Coastguard Worker
193*61c4878aSAndroid Build Coastguard Worker event_notification_.release();
194*61c4878aSAndroid Build Coastguard Worker }
195*61c4878aSAndroid Build Coastguard Worker
ProcessChunk(EventType type,ConstByteSpan chunk)196*61c4878aSAndroid Build Coastguard Worker void TransferThread::ProcessChunk(EventType type, ConstByteSpan chunk) {
197*61c4878aSAndroid Build Coastguard Worker // If this assert is hit, there is a bug in the transfer implementation.
198*61c4878aSAndroid Build Coastguard Worker // Contexts' max_chunk_size_bytes fields should be set based on the size of
199*61c4878aSAndroid Build Coastguard Worker // chunk_buffer_.
200*61c4878aSAndroid Build Coastguard Worker PW_CHECK(chunk.size() <= chunk_buffer_.size(),
201*61c4878aSAndroid Build Coastguard Worker "Transfer received a larger chunk than it can handle.");
202*61c4878aSAndroid Build Coastguard Worker
203*61c4878aSAndroid Build Coastguard Worker Result<Chunk::Identifier> identifier = Chunk::ExtractIdentifier(chunk);
204*61c4878aSAndroid Build Coastguard Worker if (!identifier.ok()) {
205*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Received a malformed chunk without a context identifier");
206*61c4878aSAndroid Build Coastguard Worker return;
207*61c4878aSAndroid Build Coastguard Worker }
208*61c4878aSAndroid Build Coastguard Worker
209*61c4878aSAndroid Build Coastguard Worker if (!TryWaitForEventToProcess()) {
210*61c4878aSAndroid Build Coastguard Worker return;
211*61c4878aSAndroid Build Coastguard Worker }
212*61c4878aSAndroid Build Coastguard Worker
213*61c4878aSAndroid Build Coastguard Worker std::memcpy(chunk_buffer_.data(), chunk.data(), chunk.size());
214*61c4878aSAndroid Build Coastguard Worker
215*61c4878aSAndroid Build Coastguard Worker next_event_.type = type;
216*61c4878aSAndroid Build Coastguard Worker next_event_.chunk = {
217*61c4878aSAndroid Build Coastguard Worker .context_identifier = identifier->value(),
218*61c4878aSAndroid Build Coastguard Worker .match_resource_id = identifier->is_legacy(),
219*61c4878aSAndroid Build Coastguard Worker .data = chunk_buffer_.data(),
220*61c4878aSAndroid Build Coastguard Worker .size = chunk.size(),
221*61c4878aSAndroid Build Coastguard Worker };
222*61c4878aSAndroid Build Coastguard Worker
223*61c4878aSAndroid Build Coastguard Worker event_notification_.release();
224*61c4878aSAndroid Build Coastguard Worker }
225*61c4878aSAndroid Build Coastguard Worker
SendStatus(TransferStream stream,uint32_t session_id,ProtocolVersion version,Status status)226*61c4878aSAndroid Build Coastguard Worker void TransferThread::SendStatus(TransferStream stream,
227*61c4878aSAndroid Build Coastguard Worker uint32_t session_id,
228*61c4878aSAndroid Build Coastguard Worker ProtocolVersion version,
229*61c4878aSAndroid Build Coastguard Worker Status status) {
230*61c4878aSAndroid Build Coastguard Worker if (!TryWaitForEventToProcess()) {
231*61c4878aSAndroid Build Coastguard Worker return;
232*61c4878aSAndroid Build Coastguard Worker }
233*61c4878aSAndroid Build Coastguard Worker
234*61c4878aSAndroid Build Coastguard Worker next_event_.type = EventType::kSendStatusChunk;
235*61c4878aSAndroid Build Coastguard Worker next_event_.send_status_chunk = {
236*61c4878aSAndroid Build Coastguard Worker .session_id = session_id,
237*61c4878aSAndroid Build Coastguard Worker .protocol_version = version,
238*61c4878aSAndroid Build Coastguard Worker .status = status.code(),
239*61c4878aSAndroid Build Coastguard Worker .stream = stream,
240*61c4878aSAndroid Build Coastguard Worker };
241*61c4878aSAndroid Build Coastguard Worker
242*61c4878aSAndroid Build Coastguard Worker event_notification_.release();
243*61c4878aSAndroid Build Coastguard Worker }
244*61c4878aSAndroid Build Coastguard Worker
EndTransfer(EventType type,IdentifierType id_type,uint32_t id,Status status,bool send_status_chunk)245*61c4878aSAndroid Build Coastguard Worker void TransferThread::EndTransfer(EventType type,
246*61c4878aSAndroid Build Coastguard Worker IdentifierType id_type,
247*61c4878aSAndroid Build Coastguard Worker uint32_t id,
248*61c4878aSAndroid Build Coastguard Worker Status status,
249*61c4878aSAndroid Build Coastguard Worker bool send_status_chunk) {
250*61c4878aSAndroid Build Coastguard Worker if (!TryWaitForEventToProcess()) {
251*61c4878aSAndroid Build Coastguard Worker return;
252*61c4878aSAndroid Build Coastguard Worker }
253*61c4878aSAndroid Build Coastguard Worker
254*61c4878aSAndroid Build Coastguard Worker next_event_.type = type;
255*61c4878aSAndroid Build Coastguard Worker next_event_.end_transfer = {
256*61c4878aSAndroid Build Coastguard Worker .id_type = id_type,
257*61c4878aSAndroid Build Coastguard Worker .id = id,
258*61c4878aSAndroid Build Coastguard Worker .status = status.code(),
259*61c4878aSAndroid Build Coastguard Worker .send_status_chunk = send_status_chunk,
260*61c4878aSAndroid Build Coastguard Worker };
261*61c4878aSAndroid Build Coastguard Worker
262*61c4878aSAndroid Build Coastguard Worker event_notification_.release();
263*61c4878aSAndroid Build Coastguard Worker }
264*61c4878aSAndroid Build Coastguard Worker
SetStream(TransferStream stream)265*61c4878aSAndroid Build Coastguard Worker void TransferThread::SetStream(TransferStream stream) {
266*61c4878aSAndroid Build Coastguard Worker if (!TryWaitForEventToProcess()) {
267*61c4878aSAndroid Build Coastguard Worker return;
268*61c4878aSAndroid Build Coastguard Worker }
269*61c4878aSAndroid Build Coastguard Worker
270*61c4878aSAndroid Build Coastguard Worker next_event_.type = EventType::kSetStream;
271*61c4878aSAndroid Build Coastguard Worker next_event_.set_stream = {
272*61c4878aSAndroid Build Coastguard Worker .stream = stream,
273*61c4878aSAndroid Build Coastguard Worker };
274*61c4878aSAndroid Build Coastguard Worker
275*61c4878aSAndroid Build Coastguard Worker event_notification_.release();
276*61c4878aSAndroid Build Coastguard Worker }
277*61c4878aSAndroid Build Coastguard Worker
UpdateClientTransfer(uint32_t handle_id,size_t transfer_size_bytes)278*61c4878aSAndroid Build Coastguard Worker void TransferThread::UpdateClientTransfer(uint32_t handle_id,
279*61c4878aSAndroid Build Coastguard Worker size_t transfer_size_bytes) {
280*61c4878aSAndroid Build Coastguard Worker if (!TryWaitForEventToProcess()) {
281*61c4878aSAndroid Build Coastguard Worker return;
282*61c4878aSAndroid Build Coastguard Worker }
283*61c4878aSAndroid Build Coastguard Worker
284*61c4878aSAndroid Build Coastguard Worker next_event_.type = EventType::kUpdateClientTransfer;
285*61c4878aSAndroid Build Coastguard Worker next_event_.update_transfer.handle_id = handle_id;
286*61c4878aSAndroid Build Coastguard Worker next_event_.update_transfer.transfer_size_bytes = transfer_size_bytes;
287*61c4878aSAndroid Build Coastguard Worker
288*61c4878aSAndroid Build Coastguard Worker event_notification_.release();
289*61c4878aSAndroid Build Coastguard Worker }
290*61c4878aSAndroid Build Coastguard Worker
TransferHandlerEvent(EventType type,Handler & handler)291*61c4878aSAndroid Build Coastguard Worker bool TransferThread::TransferHandlerEvent(EventType type, Handler& handler) {
292*61c4878aSAndroid Build Coastguard Worker if (!TryWaitForEventToProcess()) {
293*61c4878aSAndroid Build Coastguard Worker return false;
294*61c4878aSAndroid Build Coastguard Worker }
295*61c4878aSAndroid Build Coastguard Worker
296*61c4878aSAndroid Build Coastguard Worker next_event_.type = type;
297*61c4878aSAndroid Build Coastguard Worker if (type == EventType::kAddTransferHandler) {
298*61c4878aSAndroid Build Coastguard Worker next_event_.add_transfer_handler = &handler;
299*61c4878aSAndroid Build Coastguard Worker } else {
300*61c4878aSAndroid Build Coastguard Worker next_event_.remove_transfer_handler = &handler;
301*61c4878aSAndroid Build Coastguard Worker }
302*61c4878aSAndroid Build Coastguard Worker
303*61c4878aSAndroid Build Coastguard Worker event_notification_.release();
304*61c4878aSAndroid Build Coastguard Worker return true;
305*61c4878aSAndroid Build Coastguard Worker }
306*61c4878aSAndroid Build Coastguard Worker
HandleEvent(const internal::Event & event)307*61c4878aSAndroid Build Coastguard Worker void TransferThread::HandleEvent(const internal::Event& event) {
308*61c4878aSAndroid Build Coastguard Worker switch (event.type) {
309*61c4878aSAndroid Build Coastguard Worker case EventType::kTerminate:
310*61c4878aSAndroid Build Coastguard Worker // Terminate server contexts.
311*61c4878aSAndroid Build Coastguard Worker for (ServerContext& server_context : server_transfers_) {
312*61c4878aSAndroid Build Coastguard Worker server_context.HandleEvent(Event{
313*61c4878aSAndroid Build Coastguard Worker .type = EventType::kServerEndTransfer,
314*61c4878aSAndroid Build Coastguard Worker .end_transfer =
315*61c4878aSAndroid Build Coastguard Worker EndTransferEvent{
316*61c4878aSAndroid Build Coastguard Worker .id_type = IdentifierType::Session,
317*61c4878aSAndroid Build Coastguard Worker .id = server_context.session_id(),
318*61c4878aSAndroid Build Coastguard Worker .status = Status::Aborted().code(),
319*61c4878aSAndroid Build Coastguard Worker .send_status_chunk = false,
320*61c4878aSAndroid Build Coastguard Worker },
321*61c4878aSAndroid Build Coastguard Worker });
322*61c4878aSAndroid Build Coastguard Worker }
323*61c4878aSAndroid Build Coastguard Worker
324*61c4878aSAndroid Build Coastguard Worker // Terminate client contexts.
325*61c4878aSAndroid Build Coastguard Worker for (ClientContext& client_context : client_transfers_) {
326*61c4878aSAndroid Build Coastguard Worker client_context.HandleEvent(Event{
327*61c4878aSAndroid Build Coastguard Worker .type = EventType::kClientEndTransfer,
328*61c4878aSAndroid Build Coastguard Worker .end_transfer =
329*61c4878aSAndroid Build Coastguard Worker EndTransferEvent{
330*61c4878aSAndroid Build Coastguard Worker .id_type = IdentifierType::Session,
331*61c4878aSAndroid Build Coastguard Worker .id = client_context.session_id(),
332*61c4878aSAndroid Build Coastguard Worker .status = Status::Aborted().code(),
333*61c4878aSAndroid Build Coastguard Worker .send_status_chunk = false,
334*61c4878aSAndroid Build Coastguard Worker },
335*61c4878aSAndroid Build Coastguard Worker });
336*61c4878aSAndroid Build Coastguard Worker }
337*61c4878aSAndroid Build Coastguard Worker
338*61c4878aSAndroid Build Coastguard Worker // Cancel/Finish streams.
339*61c4878aSAndroid Build Coastguard Worker client_read_stream_.Cancel().IgnoreError();
340*61c4878aSAndroid Build Coastguard Worker client_write_stream_.Cancel().IgnoreError();
341*61c4878aSAndroid Build Coastguard Worker server_read_stream_.Finish(Status::Aborted()).IgnoreError();
342*61c4878aSAndroid Build Coastguard Worker server_write_stream_.Finish(Status::Aborted()).IgnoreError();
343*61c4878aSAndroid Build Coastguard Worker return;
344*61c4878aSAndroid Build Coastguard Worker
345*61c4878aSAndroid Build Coastguard Worker case EventType::kSendStatusChunk:
346*61c4878aSAndroid Build Coastguard Worker SendStatusChunk(event.send_status_chunk);
347*61c4878aSAndroid Build Coastguard Worker break;
348*61c4878aSAndroid Build Coastguard Worker
349*61c4878aSAndroid Build Coastguard Worker case EventType::kAddTransferHandler:
350*61c4878aSAndroid Build Coastguard Worker handlers_.push_front(*event.add_transfer_handler);
351*61c4878aSAndroid Build Coastguard Worker return;
352*61c4878aSAndroid Build Coastguard Worker
353*61c4878aSAndroid Build Coastguard Worker case EventType::kRemoveTransferHandler:
354*61c4878aSAndroid Build Coastguard Worker for (ServerContext& server_context : server_transfers_) {
355*61c4878aSAndroid Build Coastguard Worker if (server_context.handler() == event.remove_transfer_handler) {
356*61c4878aSAndroid Build Coastguard Worker server_context.HandleEvent(Event{
357*61c4878aSAndroid Build Coastguard Worker .type = EventType::kServerEndTransfer,
358*61c4878aSAndroid Build Coastguard Worker .end_transfer =
359*61c4878aSAndroid Build Coastguard Worker EndTransferEvent{
360*61c4878aSAndroid Build Coastguard Worker .id_type = IdentifierType::Session,
361*61c4878aSAndroid Build Coastguard Worker .id = server_context.session_id(),
362*61c4878aSAndroid Build Coastguard Worker .status = Status::Aborted().code(),
363*61c4878aSAndroid Build Coastguard Worker .send_status_chunk = false,
364*61c4878aSAndroid Build Coastguard Worker },
365*61c4878aSAndroid Build Coastguard Worker });
366*61c4878aSAndroid Build Coastguard Worker }
367*61c4878aSAndroid Build Coastguard Worker }
368*61c4878aSAndroid Build Coastguard Worker handlers_.remove(*event.remove_transfer_handler);
369*61c4878aSAndroid Build Coastguard Worker return;
370*61c4878aSAndroid Build Coastguard Worker
371*61c4878aSAndroid Build Coastguard Worker case EventType::kSetStream:
372*61c4878aSAndroid Build Coastguard Worker HandleSetStreamEvent(event.set_stream.stream);
373*61c4878aSAndroid Build Coastguard Worker return;
374*61c4878aSAndroid Build Coastguard Worker
375*61c4878aSAndroid Build Coastguard Worker case EventType::kGetResourceStatus:
376*61c4878aSAndroid Build Coastguard Worker GetResourceState(event.resource_status.resource_id);
377*61c4878aSAndroid Build Coastguard Worker return;
378*61c4878aSAndroid Build Coastguard Worker
379*61c4878aSAndroid Build Coastguard Worker case EventType::kNewClientTransfer:
380*61c4878aSAndroid Build Coastguard Worker case EventType::kNewServerTransfer:
381*61c4878aSAndroid Build Coastguard Worker case EventType::kClientChunk:
382*61c4878aSAndroid Build Coastguard Worker case EventType::kServerChunk:
383*61c4878aSAndroid Build Coastguard Worker case EventType::kClientTimeout:
384*61c4878aSAndroid Build Coastguard Worker case EventType::kServerTimeout:
385*61c4878aSAndroid Build Coastguard Worker case EventType::kClientEndTransfer:
386*61c4878aSAndroid Build Coastguard Worker case EventType::kServerEndTransfer:
387*61c4878aSAndroid Build Coastguard Worker case EventType::kUpdateClientTransfer:
388*61c4878aSAndroid Build Coastguard Worker default:
389*61c4878aSAndroid Build Coastguard Worker // Other events are handled by individual transfer contexts.
390*61c4878aSAndroid Build Coastguard Worker break;
391*61c4878aSAndroid Build Coastguard Worker }
392*61c4878aSAndroid Build Coastguard Worker
393*61c4878aSAndroid Build Coastguard Worker Context* ctx = FindContextForEvent(event);
394*61c4878aSAndroid Build Coastguard Worker if (ctx == nullptr) {
395*61c4878aSAndroid Build Coastguard Worker // No context was found. For new transfer events, report a
396*61c4878aSAndroid Build Coastguard Worker // RESOURCE_EXHAUSTED error with starting the transfer.
397*61c4878aSAndroid Build Coastguard Worker if (event.type == EventType::kNewClientTransfer) {
398*61c4878aSAndroid Build Coastguard Worker // On the client, invoke the completion callback directly.
399*61c4878aSAndroid Build Coastguard Worker staged_on_completion_(Status::ResourceExhausted());
400*61c4878aSAndroid Build Coastguard Worker } else if (event.type == EventType::kNewServerTransfer) {
401*61c4878aSAndroid Build Coastguard Worker // On the server, send a status chunk back to the client.
402*61c4878aSAndroid Build Coastguard Worker SendStatusChunk(
403*61c4878aSAndroid Build Coastguard Worker {.session_id = event.new_transfer.session_id,
404*61c4878aSAndroid Build Coastguard Worker .protocol_version = event.new_transfer.protocol_version,
405*61c4878aSAndroid Build Coastguard Worker .status = Status::ResourceExhausted().code(),
406*61c4878aSAndroid Build Coastguard Worker .stream = event.new_transfer.type == TransferType::kTransmit
407*61c4878aSAndroid Build Coastguard Worker ? TransferStream::kServerRead
408*61c4878aSAndroid Build Coastguard Worker : TransferStream::kServerWrite});
409*61c4878aSAndroid Build Coastguard Worker }
410*61c4878aSAndroid Build Coastguard Worker return;
411*61c4878aSAndroid Build Coastguard Worker }
412*61c4878aSAndroid Build Coastguard Worker
413*61c4878aSAndroid Build Coastguard Worker if (event.type == EventType::kNewClientTransfer) {
414*61c4878aSAndroid Build Coastguard Worker // TODO(frolv): This is terrible.
415*61c4878aSAndroid Build Coastguard Worker ClientContext* cctx = static_cast<ClientContext*>(ctx);
416*61c4878aSAndroid Build Coastguard Worker cctx->set_on_completion(std::move(staged_on_completion_));
417*61c4878aSAndroid Build Coastguard Worker cctx->set_handle_id(event.new_transfer.handle_id);
418*61c4878aSAndroid Build Coastguard Worker }
419*61c4878aSAndroid Build Coastguard Worker
420*61c4878aSAndroid Build Coastguard Worker if (event.type == EventType::kUpdateClientTransfer) {
421*61c4878aSAndroid Build Coastguard Worker static_cast<ClientContext&>(*ctx).set_transfer_size_bytes(
422*61c4878aSAndroid Build Coastguard Worker event.update_transfer.transfer_size_bytes);
423*61c4878aSAndroid Build Coastguard Worker return;
424*61c4878aSAndroid Build Coastguard Worker }
425*61c4878aSAndroid Build Coastguard Worker
426*61c4878aSAndroid Build Coastguard Worker ctx->HandleEvent(event);
427*61c4878aSAndroid Build Coastguard Worker }
428*61c4878aSAndroid Build Coastguard Worker
FindContextForEvent(const internal::Event & event) const429*61c4878aSAndroid Build Coastguard Worker Context* TransferThread::FindContextForEvent(
430*61c4878aSAndroid Build Coastguard Worker const internal::Event& event) const {
431*61c4878aSAndroid Build Coastguard Worker switch (event.type) {
432*61c4878aSAndroid Build Coastguard Worker case EventType::kNewClientTransfer:
433*61c4878aSAndroid Build Coastguard Worker return FindNewTransfer(client_transfers_, event.new_transfer.session_id);
434*61c4878aSAndroid Build Coastguard Worker case EventType::kNewServerTransfer:
435*61c4878aSAndroid Build Coastguard Worker return FindNewTransfer(server_transfers_, event.new_transfer.session_id);
436*61c4878aSAndroid Build Coastguard Worker
437*61c4878aSAndroid Build Coastguard Worker case EventType::kClientChunk:
438*61c4878aSAndroid Build Coastguard Worker if (event.chunk.match_resource_id) {
439*61c4878aSAndroid Build Coastguard Worker return FindActiveTransferByResourceId(client_transfers_,
440*61c4878aSAndroid Build Coastguard Worker event.chunk.context_identifier);
441*61c4878aSAndroid Build Coastguard Worker }
442*61c4878aSAndroid Build Coastguard Worker return FindActiveTransferByLegacyId(client_transfers_,
443*61c4878aSAndroid Build Coastguard Worker event.chunk.context_identifier);
444*61c4878aSAndroid Build Coastguard Worker
445*61c4878aSAndroid Build Coastguard Worker case EventType::kServerChunk:
446*61c4878aSAndroid Build Coastguard Worker if (event.chunk.match_resource_id) {
447*61c4878aSAndroid Build Coastguard Worker return FindActiveTransferByResourceId(server_transfers_,
448*61c4878aSAndroid Build Coastguard Worker event.chunk.context_identifier);
449*61c4878aSAndroid Build Coastguard Worker }
450*61c4878aSAndroid Build Coastguard Worker return FindActiveTransferByLegacyId(server_transfers_,
451*61c4878aSAndroid Build Coastguard Worker event.chunk.context_identifier);
452*61c4878aSAndroid Build Coastguard Worker
453*61c4878aSAndroid Build Coastguard Worker case EventType::kClientTimeout: // Manually triggered client timeout
454*61c4878aSAndroid Build Coastguard Worker return FindActiveTransferByLegacyId(client_transfers_,
455*61c4878aSAndroid Build Coastguard Worker event.chunk.context_identifier);
456*61c4878aSAndroid Build Coastguard Worker case EventType::kServerTimeout: // Manually triggered server timeout
457*61c4878aSAndroid Build Coastguard Worker return FindActiveTransferByLegacyId(server_transfers_,
458*61c4878aSAndroid Build Coastguard Worker event.chunk.context_identifier);
459*61c4878aSAndroid Build Coastguard Worker
460*61c4878aSAndroid Build Coastguard Worker case EventType::kClientEndTransfer:
461*61c4878aSAndroid Build Coastguard Worker if (event.end_transfer.id_type == IdentifierType::Handle) {
462*61c4878aSAndroid Build Coastguard Worker return FindClientTransferByHandleId(event.end_transfer.id);
463*61c4878aSAndroid Build Coastguard Worker }
464*61c4878aSAndroid Build Coastguard Worker return FindActiveTransferByLegacyId(client_transfers_,
465*61c4878aSAndroid Build Coastguard Worker event.end_transfer.id);
466*61c4878aSAndroid Build Coastguard Worker case EventType::kServerEndTransfer:
467*61c4878aSAndroid Build Coastguard Worker PW_DCHECK(event.end_transfer.id_type != IdentifierType::Handle);
468*61c4878aSAndroid Build Coastguard Worker return FindActiveTransferByLegacyId(server_transfers_,
469*61c4878aSAndroid Build Coastguard Worker event.end_transfer.id);
470*61c4878aSAndroid Build Coastguard Worker
471*61c4878aSAndroid Build Coastguard Worker case EventType::kUpdateClientTransfer:
472*61c4878aSAndroid Build Coastguard Worker return FindClientTransferByHandleId(event.update_transfer.handle_id);
473*61c4878aSAndroid Build Coastguard Worker
474*61c4878aSAndroid Build Coastguard Worker case EventType::kSendStatusChunk:
475*61c4878aSAndroid Build Coastguard Worker case EventType::kAddTransferHandler:
476*61c4878aSAndroid Build Coastguard Worker case EventType::kRemoveTransferHandler:
477*61c4878aSAndroid Build Coastguard Worker case EventType::kSetStream:
478*61c4878aSAndroid Build Coastguard Worker case EventType::kTerminate:
479*61c4878aSAndroid Build Coastguard Worker case EventType::kGetResourceStatus:
480*61c4878aSAndroid Build Coastguard Worker default:
481*61c4878aSAndroid Build Coastguard Worker return nullptr;
482*61c4878aSAndroid Build Coastguard Worker }
483*61c4878aSAndroid Build Coastguard Worker }
484*61c4878aSAndroid Build Coastguard Worker
SendStatusChunk(const internal::SendStatusChunkEvent & event)485*61c4878aSAndroid Build Coastguard Worker void TransferThread::SendStatusChunk(
486*61c4878aSAndroid Build Coastguard Worker const internal::SendStatusChunkEvent& event) {
487*61c4878aSAndroid Build Coastguard Worker rpc::Writer& destination = stream_for(event.stream);
488*61c4878aSAndroid Build Coastguard Worker
489*61c4878aSAndroid Build Coastguard Worker Chunk chunk =
490*61c4878aSAndroid Build Coastguard Worker Chunk::Final(event.protocol_version, event.session_id, event.status);
491*61c4878aSAndroid Build Coastguard Worker
492*61c4878aSAndroid Build Coastguard Worker Result<ConstByteSpan> result = chunk.Encode(chunk_buffer_);
493*61c4878aSAndroid Build Coastguard Worker if (!result.ok()) {
494*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Failed to encode final chunk for transfer %u",
495*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(event.session_id));
496*61c4878aSAndroid Build Coastguard Worker return;
497*61c4878aSAndroid Build Coastguard Worker }
498*61c4878aSAndroid Build Coastguard Worker
499*61c4878aSAndroid Build Coastguard Worker if (!destination.Write(result.value()).ok()) {
500*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Failed to send final chunk for transfer %u",
501*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(event.session_id));
502*61c4878aSAndroid Build Coastguard Worker return;
503*61c4878aSAndroid Build Coastguard Worker }
504*61c4878aSAndroid Build Coastguard Worker }
505*61c4878aSAndroid Build Coastguard Worker
506*61c4878aSAndroid Build Coastguard Worker // Should only be called with the `next_event_ownership_` lock held.
AssignSessionId()507*61c4878aSAndroid Build Coastguard Worker uint32_t TransferThread::AssignSessionId() {
508*61c4878aSAndroid Build Coastguard Worker uint32_t session_id = next_session_id_++;
509*61c4878aSAndroid Build Coastguard Worker if (session_id == 0) {
510*61c4878aSAndroid Build Coastguard Worker session_id = next_session_id_++;
511*61c4878aSAndroid Build Coastguard Worker }
512*61c4878aSAndroid Build Coastguard Worker return session_id;
513*61c4878aSAndroid Build Coastguard Worker }
514*61c4878aSAndroid Build Coastguard Worker
515*61c4878aSAndroid Build Coastguard Worker template <typename T>
TerminateTransfers(span<T> contexts,TransferType type,EventType event_type,Status status)516*61c4878aSAndroid Build Coastguard Worker void TerminateTransfers(span<T> contexts,
517*61c4878aSAndroid Build Coastguard Worker TransferType type,
518*61c4878aSAndroid Build Coastguard Worker EventType event_type,
519*61c4878aSAndroid Build Coastguard Worker Status status) {
520*61c4878aSAndroid Build Coastguard Worker for (Context& context : contexts) {
521*61c4878aSAndroid Build Coastguard Worker if (context.active() && context.type() == type) {
522*61c4878aSAndroid Build Coastguard Worker context.HandleEvent(Event{
523*61c4878aSAndroid Build Coastguard Worker .type = event_type,
524*61c4878aSAndroid Build Coastguard Worker .end_transfer =
525*61c4878aSAndroid Build Coastguard Worker EndTransferEvent{
526*61c4878aSAndroid Build Coastguard Worker .id_type = IdentifierType::Session,
527*61c4878aSAndroid Build Coastguard Worker .id = context.session_id(),
528*61c4878aSAndroid Build Coastguard Worker .status = status.code(),
529*61c4878aSAndroid Build Coastguard Worker .send_status_chunk = false,
530*61c4878aSAndroid Build Coastguard Worker },
531*61c4878aSAndroid Build Coastguard Worker });
532*61c4878aSAndroid Build Coastguard Worker }
533*61c4878aSAndroid Build Coastguard Worker }
534*61c4878aSAndroid Build Coastguard Worker }
535*61c4878aSAndroid Build Coastguard Worker
HandleSetStreamEvent(TransferStream stream)536*61c4878aSAndroid Build Coastguard Worker void TransferThread::HandleSetStreamEvent(TransferStream stream) {
537*61c4878aSAndroid Build Coastguard Worker switch (stream) {
538*61c4878aSAndroid Build Coastguard Worker case TransferStream::kClientRead:
539*61c4878aSAndroid Build Coastguard Worker TerminateTransfers(client_transfers_,
540*61c4878aSAndroid Build Coastguard Worker TransferType::kReceive,
541*61c4878aSAndroid Build Coastguard Worker EventType::kClientEndTransfer,
542*61c4878aSAndroid Build Coastguard Worker Status::Aborted());
543*61c4878aSAndroid Build Coastguard Worker client_read_stream_ = std::move(staged_client_stream_);
544*61c4878aSAndroid Build Coastguard Worker client_read_stream_.set_on_next(std::move(staged_client_on_next_));
545*61c4878aSAndroid Build Coastguard Worker client_read_stream_.set_on_error([](Status status) {
546*61c4878aSAndroid Build Coastguard Worker PW_LOG_WARN("Client read stream closed unexpectedly: %s", status.str());
547*61c4878aSAndroid Build Coastguard Worker });
548*61c4878aSAndroid Build Coastguard Worker break;
549*61c4878aSAndroid Build Coastguard Worker case TransferStream::kClientWrite:
550*61c4878aSAndroid Build Coastguard Worker TerminateTransfers(client_transfers_,
551*61c4878aSAndroid Build Coastguard Worker TransferType::kTransmit,
552*61c4878aSAndroid Build Coastguard Worker EventType::kClientEndTransfer,
553*61c4878aSAndroid Build Coastguard Worker Status::Aborted());
554*61c4878aSAndroid Build Coastguard Worker client_write_stream_ = std::move(staged_client_stream_);
555*61c4878aSAndroid Build Coastguard Worker client_write_stream_.set_on_next(std::move(staged_client_on_next_));
556*61c4878aSAndroid Build Coastguard Worker client_write_stream_.set_on_error([](Status status) {
557*61c4878aSAndroid Build Coastguard Worker PW_LOG_WARN("Client write stream closed unexpectedly: %s",
558*61c4878aSAndroid Build Coastguard Worker status.str());
559*61c4878aSAndroid Build Coastguard Worker });
560*61c4878aSAndroid Build Coastguard Worker break;
561*61c4878aSAndroid Build Coastguard Worker case TransferStream::kServerRead:
562*61c4878aSAndroid Build Coastguard Worker TerminateTransfers(server_transfers_,
563*61c4878aSAndroid Build Coastguard Worker TransferType::kTransmit,
564*61c4878aSAndroid Build Coastguard Worker EventType::kServerEndTransfer,
565*61c4878aSAndroid Build Coastguard Worker Status::Aborted());
566*61c4878aSAndroid Build Coastguard Worker server_read_stream_ = std::move(staged_server_stream_);
567*61c4878aSAndroid Build Coastguard Worker server_read_stream_.set_on_next(std::move(staged_server_on_next_));
568*61c4878aSAndroid Build Coastguard Worker server_read_stream_.set_on_error([](Status status) {
569*61c4878aSAndroid Build Coastguard Worker PW_LOG_WARN("Server read stream closed unexpectedly: %s", status.str());
570*61c4878aSAndroid Build Coastguard Worker });
571*61c4878aSAndroid Build Coastguard Worker break;
572*61c4878aSAndroid Build Coastguard Worker case TransferStream::kServerWrite:
573*61c4878aSAndroid Build Coastguard Worker TerminateTransfers(server_transfers_,
574*61c4878aSAndroid Build Coastguard Worker TransferType::kReceive,
575*61c4878aSAndroid Build Coastguard Worker EventType::kServerEndTransfer,
576*61c4878aSAndroid Build Coastguard Worker Status::Aborted());
577*61c4878aSAndroid Build Coastguard Worker server_write_stream_ = std::move(staged_server_stream_);
578*61c4878aSAndroid Build Coastguard Worker server_write_stream_.set_on_next(std::move(staged_server_on_next_));
579*61c4878aSAndroid Build Coastguard Worker server_write_stream_.set_on_error([](Status status) {
580*61c4878aSAndroid Build Coastguard Worker PW_LOG_WARN("Server write stream closed unexpectedly: %s",
581*61c4878aSAndroid Build Coastguard Worker status.str());
582*61c4878aSAndroid Build Coastguard Worker });
583*61c4878aSAndroid Build Coastguard Worker break;
584*61c4878aSAndroid Build Coastguard Worker }
585*61c4878aSAndroid Build Coastguard Worker }
586*61c4878aSAndroid Build Coastguard Worker
587*61c4878aSAndroid Build Coastguard Worker // Adds GetResourceStatusEvent to the queue. Will fail if there is already a
588*61c4878aSAndroid Build Coastguard Worker // GetResourceStatusEvent in process.
EnqueueResourceEvent(uint32_t resource_id,ResourceStatusCallback && callback)589*61c4878aSAndroid Build Coastguard Worker void TransferThread::EnqueueResourceEvent(uint32_t resource_id,
590*61c4878aSAndroid Build Coastguard Worker ResourceStatusCallback&& callback) {
591*61c4878aSAndroid Build Coastguard Worker if (!TryWaitForEventToProcess()) {
592*61c4878aSAndroid Build Coastguard Worker return;
593*61c4878aSAndroid Build Coastguard Worker }
594*61c4878aSAndroid Build Coastguard Worker
595*61c4878aSAndroid Build Coastguard Worker next_event_.type = EventType::kGetResourceStatus;
596*61c4878aSAndroid Build Coastguard Worker
597*61c4878aSAndroid Build Coastguard Worker resource_status_callback_ = std::move(callback);
598*61c4878aSAndroid Build Coastguard Worker
599*61c4878aSAndroid Build Coastguard Worker next_event_.resource_status.resource_id = resource_id;
600*61c4878aSAndroid Build Coastguard Worker
601*61c4878aSAndroid Build Coastguard Worker event_notification_.release();
602*61c4878aSAndroid Build Coastguard Worker }
603*61c4878aSAndroid Build Coastguard Worker
604*61c4878aSAndroid Build Coastguard Worker // Should only be called when we got a valid callback and RPC responder from
605*61c4878aSAndroid Build Coastguard Worker // GetResourceStatus transfer RPC.
GetResourceState(uint32_t resource_id)606*61c4878aSAndroid Build Coastguard Worker void TransferThread::GetResourceState(uint32_t resource_id) {
607*61c4878aSAndroid Build Coastguard Worker PW_ASSERT(resource_status_callback_ != nullptr);
608*61c4878aSAndroid Build Coastguard Worker
609*61c4878aSAndroid Build Coastguard Worker auto handler = std::find_if(handlers_.begin(), handlers_.end(), [&](auto& h) {
610*61c4878aSAndroid Build Coastguard Worker return h.id() == resource_id;
611*61c4878aSAndroid Build Coastguard Worker });
612*61c4878aSAndroid Build Coastguard Worker internal::ResourceStatus stats;
613*61c4878aSAndroid Build Coastguard Worker stats.resource_id = resource_id;
614*61c4878aSAndroid Build Coastguard Worker
615*61c4878aSAndroid Build Coastguard Worker if (handler != handlers_.end()) {
616*61c4878aSAndroid Build Coastguard Worker Status status = handler->GetStatus(stats.readable_offset,
617*61c4878aSAndroid Build Coastguard Worker stats.writeable_offset,
618*61c4878aSAndroid Build Coastguard Worker stats.read_checksum,
619*61c4878aSAndroid Build Coastguard Worker stats.write_checksum);
620*61c4878aSAndroid Build Coastguard Worker
621*61c4878aSAndroid Build Coastguard Worker resource_status_callback_(status, stats);
622*61c4878aSAndroid Build Coastguard Worker } else {
623*61c4878aSAndroid Build Coastguard Worker resource_status_callback_(Status::NotFound(), stats);
624*61c4878aSAndroid Build Coastguard Worker }
625*61c4878aSAndroid Build Coastguard Worker }
626*61c4878aSAndroid Build Coastguard Worker
627*61c4878aSAndroid Build Coastguard Worker } // namespace pw::transfer::internal
628*61c4878aSAndroid Build Coastguard Worker
629*61c4878aSAndroid Build Coastguard Worker PW_MODIFY_DIAGNOSTICS_POP();
630