1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Worker #define PW_LOG_MODULE_NAME "TRN"
16*61c4878aSAndroid Build Coastguard Worker #define PW_LOG_LEVEL PW_TRANSFER_CONFIG_LOG_LEVEL
17*61c4878aSAndroid Build Coastguard Worker
18*61c4878aSAndroid Build Coastguard Worker #include "pw_transfer/client.h"
19*61c4878aSAndroid Build Coastguard Worker
20*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
21*61c4878aSAndroid Build Coastguard Worker
22*61c4878aSAndroid Build Coastguard Worker namespace pw::transfer {
23*61c4878aSAndroid Build Coastguard Worker
Read(uint32_t resource_id,stream::Writer & output,CompletionFunc && on_completion,ProtocolVersion protocol_version,chrono::SystemClock::duration timeout,chrono::SystemClock::duration initial_chunk_timeout,uint32_t initial_offset)24*61c4878aSAndroid Build Coastguard Worker Result<Client::Handle> Client::Read(
25*61c4878aSAndroid Build Coastguard Worker uint32_t resource_id,
26*61c4878aSAndroid Build Coastguard Worker stream::Writer& output,
27*61c4878aSAndroid Build Coastguard Worker CompletionFunc&& on_completion,
28*61c4878aSAndroid Build Coastguard Worker ProtocolVersion protocol_version,
29*61c4878aSAndroid Build Coastguard Worker chrono::SystemClock::duration timeout,
30*61c4878aSAndroid Build Coastguard Worker chrono::SystemClock::duration initial_chunk_timeout,
31*61c4878aSAndroid Build Coastguard Worker uint32_t initial_offset) {
32*61c4878aSAndroid Build Coastguard Worker if (on_completion == nullptr ||
33*61c4878aSAndroid Build Coastguard Worker protocol_version == ProtocolVersion::kUnknown) {
34*61c4878aSAndroid Build Coastguard Worker return Status::InvalidArgument();
35*61c4878aSAndroid Build Coastguard Worker }
36*61c4878aSAndroid Build Coastguard Worker
37*61c4878aSAndroid Build Coastguard Worker if (protocol_version < ProtocolVersion::kVersionTwo && initial_offset != 0) {
38*61c4878aSAndroid Build Coastguard Worker return Status::InvalidArgument();
39*61c4878aSAndroid Build Coastguard Worker }
40*61c4878aSAndroid Build Coastguard Worker
41*61c4878aSAndroid Build Coastguard Worker if (!has_read_stream_) {
42*61c4878aSAndroid Build Coastguard Worker rpc::RawClientReaderWriter read_stream =
43*61c4878aSAndroid Build Coastguard Worker client_.Read(nullptr, // on_next will be set by the transfer_thread.
44*61c4878aSAndroid Build Coastguard Worker [this](Status status) {
45*61c4878aSAndroid Build Coastguard Worker OnRpcError(status, internal::TransferType::kReceive);
46*61c4878aSAndroid Build Coastguard Worker });
47*61c4878aSAndroid Build Coastguard Worker transfer_thread_.SetClientReadStream(
48*61c4878aSAndroid Build Coastguard Worker read_stream, [this](ConstByteSpan chunk) {
49*61c4878aSAndroid Build Coastguard Worker transfer_thread_.ProcessClientChunk(chunk);
50*61c4878aSAndroid Build Coastguard Worker });
51*61c4878aSAndroid Build Coastguard Worker has_read_stream_ = true;
52*61c4878aSAndroid Build Coastguard Worker }
53*61c4878aSAndroid Build Coastguard Worker
54*61c4878aSAndroid Build Coastguard Worker Handle handle = AssignHandle();
55*61c4878aSAndroid Build Coastguard Worker
56*61c4878aSAndroid Build Coastguard Worker transfer_thread_.StartClientTransfer(internal::TransferType::kReceive,
57*61c4878aSAndroid Build Coastguard Worker protocol_version,
58*61c4878aSAndroid Build Coastguard Worker resource_id,
59*61c4878aSAndroid Build Coastguard Worker handle.id(),
60*61c4878aSAndroid Build Coastguard Worker &output,
61*61c4878aSAndroid Build Coastguard Worker max_parameters_,
62*61c4878aSAndroid Build Coastguard Worker std::move(on_completion),
63*61c4878aSAndroid Build Coastguard Worker timeout,
64*61c4878aSAndroid Build Coastguard Worker initial_chunk_timeout,
65*61c4878aSAndroid Build Coastguard Worker max_retries_,
66*61c4878aSAndroid Build Coastguard Worker max_lifetime_retries_,
67*61c4878aSAndroid Build Coastguard Worker initial_offset);
68*61c4878aSAndroid Build Coastguard Worker return handle;
69*61c4878aSAndroid Build Coastguard Worker }
70*61c4878aSAndroid Build Coastguard Worker
Write(uint32_t resource_id,stream::Reader & input,CompletionFunc && on_completion,ProtocolVersion protocol_version,chrono::SystemClock::duration timeout,chrono::SystemClock::duration initial_chunk_timeout,uint32_t initial_offset)71*61c4878aSAndroid Build Coastguard Worker Result<Client::Handle> Client::Write(
72*61c4878aSAndroid Build Coastguard Worker uint32_t resource_id,
73*61c4878aSAndroid Build Coastguard Worker stream::Reader& input,
74*61c4878aSAndroid Build Coastguard Worker CompletionFunc&& on_completion,
75*61c4878aSAndroid Build Coastguard Worker ProtocolVersion protocol_version,
76*61c4878aSAndroid Build Coastguard Worker chrono::SystemClock::duration timeout,
77*61c4878aSAndroid Build Coastguard Worker chrono::SystemClock::duration initial_chunk_timeout,
78*61c4878aSAndroid Build Coastguard Worker uint32_t initial_offset) {
79*61c4878aSAndroid Build Coastguard Worker if (on_completion == nullptr ||
80*61c4878aSAndroid Build Coastguard Worker protocol_version == ProtocolVersion::kUnknown) {
81*61c4878aSAndroid Build Coastguard Worker return Status::InvalidArgument();
82*61c4878aSAndroid Build Coastguard Worker }
83*61c4878aSAndroid Build Coastguard Worker
84*61c4878aSAndroid Build Coastguard Worker if (protocol_version < ProtocolVersion::kVersionTwo && initial_offset != 0) {
85*61c4878aSAndroid Build Coastguard Worker return Status::InvalidArgument();
86*61c4878aSAndroid Build Coastguard Worker }
87*61c4878aSAndroid Build Coastguard Worker
88*61c4878aSAndroid Build Coastguard Worker if (!has_write_stream_) {
89*61c4878aSAndroid Build Coastguard Worker rpc::RawClientReaderWriter write_stream =
90*61c4878aSAndroid Build Coastguard Worker client_.Write(nullptr, // on_next will be set by the transfer thread.
91*61c4878aSAndroid Build Coastguard Worker [this](Status status) {
92*61c4878aSAndroid Build Coastguard Worker OnRpcError(status, internal::TransferType::kTransmit);
93*61c4878aSAndroid Build Coastguard Worker });
94*61c4878aSAndroid Build Coastguard Worker transfer_thread_.SetClientWriteStream(
95*61c4878aSAndroid Build Coastguard Worker write_stream, [this](ConstByteSpan chunk) {
96*61c4878aSAndroid Build Coastguard Worker transfer_thread_.ProcessClientChunk(chunk);
97*61c4878aSAndroid Build Coastguard Worker });
98*61c4878aSAndroid Build Coastguard Worker has_write_stream_ = true;
99*61c4878aSAndroid Build Coastguard Worker }
100*61c4878aSAndroid Build Coastguard Worker
101*61c4878aSAndroid Build Coastguard Worker Handle handle = AssignHandle();
102*61c4878aSAndroid Build Coastguard Worker
103*61c4878aSAndroid Build Coastguard Worker transfer_thread_.StartClientTransfer(internal::TransferType::kTransmit,
104*61c4878aSAndroid Build Coastguard Worker protocol_version,
105*61c4878aSAndroid Build Coastguard Worker resource_id,
106*61c4878aSAndroid Build Coastguard Worker handle.id(),
107*61c4878aSAndroid Build Coastguard Worker &input,
108*61c4878aSAndroid Build Coastguard Worker max_parameters_,
109*61c4878aSAndroid Build Coastguard Worker std::move(on_completion),
110*61c4878aSAndroid Build Coastguard Worker timeout,
111*61c4878aSAndroid Build Coastguard Worker initial_chunk_timeout,
112*61c4878aSAndroid Build Coastguard Worker max_retries_,
113*61c4878aSAndroid Build Coastguard Worker max_lifetime_retries_,
114*61c4878aSAndroid Build Coastguard Worker initial_offset);
115*61c4878aSAndroid Build Coastguard Worker
116*61c4878aSAndroid Build Coastguard Worker return handle;
117*61c4878aSAndroid Build Coastguard Worker }
118*61c4878aSAndroid Build Coastguard Worker
AssignHandle()119*61c4878aSAndroid Build Coastguard Worker Client::Handle Client::AssignHandle() {
120*61c4878aSAndroid Build Coastguard Worker uint32_t handle_id = next_handle_id_++;
121*61c4878aSAndroid Build Coastguard Worker if (handle_id == Handle::kUnassignedHandleId) {
122*61c4878aSAndroid Build Coastguard Worker handle_id = next_handle_id_++;
123*61c4878aSAndroid Build Coastguard Worker }
124*61c4878aSAndroid Build Coastguard Worker
125*61c4878aSAndroid Build Coastguard Worker return Handle(this, handle_id);
126*61c4878aSAndroid Build Coastguard Worker }
127*61c4878aSAndroid Build Coastguard Worker
OnRpcError(Status status,internal::TransferType type)128*61c4878aSAndroid Build Coastguard Worker void Client::OnRpcError(Status status, internal::TransferType type) {
129*61c4878aSAndroid Build Coastguard Worker bool is_write_error = type == internal::TransferType::kTransmit;
130*61c4878aSAndroid Build Coastguard Worker
131*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Client %s stream terminated with status %d",
132*61c4878aSAndroid Build Coastguard Worker is_write_error ? "Write()" : "Read()",
133*61c4878aSAndroid Build Coastguard Worker status.code());
134*61c4878aSAndroid Build Coastguard Worker
135*61c4878aSAndroid Build Coastguard Worker if (is_write_error) {
136*61c4878aSAndroid Build Coastguard Worker has_write_stream_ = false;
137*61c4878aSAndroid Build Coastguard Worker } else {
138*61c4878aSAndroid Build Coastguard Worker has_read_stream_ = false;
139*61c4878aSAndroid Build Coastguard Worker }
140*61c4878aSAndroid Build Coastguard Worker }
141*61c4878aSAndroid Build Coastguard Worker
142*61c4878aSAndroid Build Coastguard Worker } // namespace pw::transfer
143