1*61c4878aSAndroid Build Coastguard Worker // Copyright 2020 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/internal/call.h"
16*61c4878aSAndroid Build Coastguard Worker
17*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
18*61c4878aSAndroid Build Coastguard Worker #include "pw_bytes/span.h"
19*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
20*61c4878aSAndroid Build Coastguard Worker #include "pw_preprocessor/util.h"
21*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/channel.h"
22*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/client.h"
23*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/internal/encoding_buffer.h"
24*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/internal/endpoint.h"
25*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/internal/method.h"
26*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/internal/packet.pwpb.h"
27*61c4878aSAndroid Build Coastguard Worker #include "pw_rpc/server.h"
28*61c4878aSAndroid Build Coastguard Worker #include "pw_status/status_with_size.h"
29*61c4878aSAndroid Build Coastguard Worker #include "pw_status/try.h"
30*61c4878aSAndroid Build Coastguard Worker
31*61c4878aSAndroid Build Coastguard Worker // If the callback timeout is enabled, count the number of iterations of the
32*61c4878aSAndroid Build Coastguard Worker // waiting loop and crash if it exceeds PW_RPC_CALLBACK_TIMEOUT_TICKS.
33*61c4878aSAndroid Build Coastguard Worker #if PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
34*61c4878aSAndroid Build Coastguard Worker #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source, call) \
35*61c4878aSAndroid Build Coastguard Worker iterations += 1; \
36*61c4878aSAndroid Build Coastguard Worker PW_CHECK( \
37*61c4878aSAndroid Build Coastguard Worker iterations < PW_RPC_CALLBACK_TIMEOUT_TICKS, \
38*61c4878aSAndroid Build Coastguard Worker "A callback for RPC %u:%08x/%08x has not finished after " \
39*61c4878aSAndroid Build Coastguard Worker PW_STRINGIFY(PW_RPC_CALLBACK_TIMEOUT_TICKS) \
40*61c4878aSAndroid Build Coastguard Worker " ticks. This may indicate that an RPC callback attempted to " \
41*61c4878aSAndroid Build Coastguard Worker timeout_source \
42*61c4878aSAndroid Build Coastguard Worker " its own call object, which is not permitted. Fix this condition or " \
43*61c4878aSAndroid Build Coastguard Worker "change the value of PW_RPC_CALLBACK_TIMEOUT_TICKS to avoid this " \
44*61c4878aSAndroid Build Coastguard Worker "crash. See https://pigweed.dev/pw_rpc" \
45*61c4878aSAndroid Build Coastguard Worker "#destructors-moves-wait-for-callbacks-to-complete for details.", \
46*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>((call).channel_id_), \
47*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>((call).service_id_), \
48*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>((call).method_id_))
49*61c4878aSAndroid Build Coastguard Worker #else
50*61c4878aSAndroid Build Coastguard Worker #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source, call) \
51*61c4878aSAndroid Build Coastguard Worker static_cast<void>(iterations)
52*61c4878aSAndroid Build Coastguard Worker #endif // PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
53*61c4878aSAndroid Build Coastguard Worker
54*61c4878aSAndroid Build Coastguard Worker namespace pw::rpc::internal {
55*61c4878aSAndroid Build Coastguard Worker
56*61c4878aSAndroid Build Coastguard Worker using pwpb::PacketType;
57*61c4878aSAndroid Build Coastguard Worker
EncodeCallbackToPayloadBuffer(const Function<StatusWithSize (ByteSpan)> & callback)58*61c4878aSAndroid Build Coastguard Worker Result<ConstByteSpan> EncodeCallbackToPayloadBuffer(
59*61c4878aSAndroid Build Coastguard Worker const Function<StatusWithSize(ByteSpan)>& callback)
60*61c4878aSAndroid Build Coastguard Worker PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
61*61c4878aSAndroid Build Coastguard Worker if (callback == nullptr) {
62*61c4878aSAndroid Build Coastguard Worker return Status::InvalidArgument();
63*61c4878aSAndroid Build Coastguard Worker }
64*61c4878aSAndroid Build Coastguard Worker
65*61c4878aSAndroid Build Coastguard Worker ByteSpan payload_buffer =
66*61c4878aSAndroid Build Coastguard Worker encoding_buffer.AllocatePayloadBuffer(MaxSafePayloadSize());
67*61c4878aSAndroid Build Coastguard Worker PW_TRY_ASSIGN(const size_t payload_size, callback(payload_buffer));
68*61c4878aSAndroid Build Coastguard Worker
69*61c4878aSAndroid Build Coastguard Worker return payload_buffer.first(payload_size);
70*61c4878aSAndroid Build Coastguard Worker }
71*61c4878aSAndroid Build Coastguard Worker
72*61c4878aSAndroid Build Coastguard Worker // Creates an active server-side Call.
Call(const LockedCallContext & context,CallProperties properties)73*61c4878aSAndroid Build Coastguard Worker Call::Call(const LockedCallContext& context, CallProperties properties)
74*61c4878aSAndroid Build Coastguard Worker : Call(context.server().ClaimLocked(),
75*61c4878aSAndroid Build Coastguard Worker context.call_id(),
76*61c4878aSAndroid Build Coastguard Worker context.channel_id(),
77*61c4878aSAndroid Build Coastguard Worker UnwrapServiceId(context.service().service_id()),
78*61c4878aSAndroid Build Coastguard Worker context.method().id(),
79*61c4878aSAndroid Build Coastguard Worker properties) {}
80*61c4878aSAndroid Build Coastguard Worker
81*61c4878aSAndroid Build Coastguard Worker // Creates an active client-side call, assigning it a new ID.
Call(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,CallProperties properties)82*61c4878aSAndroid Build Coastguard Worker Call::Call(LockedEndpoint& client,
83*61c4878aSAndroid Build Coastguard Worker uint32_t channel_id,
84*61c4878aSAndroid Build Coastguard Worker uint32_t service_id,
85*61c4878aSAndroid Build Coastguard Worker uint32_t method_id,
86*61c4878aSAndroid Build Coastguard Worker CallProperties properties)
87*61c4878aSAndroid Build Coastguard Worker : Call(client,
88*61c4878aSAndroid Build Coastguard Worker client.NewCallId(),
89*61c4878aSAndroid Build Coastguard Worker channel_id,
90*61c4878aSAndroid Build Coastguard Worker service_id,
91*61c4878aSAndroid Build Coastguard Worker method_id,
92*61c4878aSAndroid Build Coastguard Worker properties) {}
93*61c4878aSAndroid Build Coastguard Worker
Call(LockedEndpoint & endpoint_ref,uint32_t call_id,uint32_t channel_id,uint32_t service_id,uint32_t method_id,CallProperties properties)94*61c4878aSAndroid Build Coastguard Worker Call::Call(LockedEndpoint& endpoint_ref,
95*61c4878aSAndroid Build Coastguard Worker uint32_t call_id,
96*61c4878aSAndroid Build Coastguard Worker uint32_t channel_id,
97*61c4878aSAndroid Build Coastguard Worker uint32_t service_id,
98*61c4878aSAndroid Build Coastguard Worker uint32_t method_id,
99*61c4878aSAndroid Build Coastguard Worker CallProperties properties)
100*61c4878aSAndroid Build Coastguard Worker : endpoint_(&endpoint_ref),
101*61c4878aSAndroid Build Coastguard Worker channel_id_(channel_id),
102*61c4878aSAndroid Build Coastguard Worker id_(call_id),
103*61c4878aSAndroid Build Coastguard Worker service_id_(service_id),
104*61c4878aSAndroid Build Coastguard Worker method_id_(method_id),
105*61c4878aSAndroid Build Coastguard Worker // Note: Bit kActive set to 1 and kClientRequestedCompletion is set to 0.
106*61c4878aSAndroid Build Coastguard Worker state_(kActive),
107*61c4878aSAndroid Build Coastguard Worker awaiting_cleanup_(OkStatus().code()),
108*61c4878aSAndroid Build Coastguard Worker callbacks_executing_(0),
109*61c4878aSAndroid Build Coastguard Worker properties_(properties) {
110*61c4878aSAndroid Build Coastguard Worker PW_CHECK_UINT_NE(channel_id,
111*61c4878aSAndroid Build Coastguard Worker Channel::kUnassignedChannelId,
112*61c4878aSAndroid Build Coastguard Worker "Calls cannot be created with channel ID 0 "
113*61c4878aSAndroid Build Coastguard Worker "(Channel::kUnassignedChannelId)");
114*61c4878aSAndroid Build Coastguard Worker endpoint().RegisterCall(*this);
115*61c4878aSAndroid Build Coastguard Worker }
116*61c4878aSAndroid Build Coastguard Worker
DestroyServerCall()117*61c4878aSAndroid Build Coastguard Worker void Call::DestroyServerCall() {
118*61c4878aSAndroid Build Coastguard Worker RpcLockGuard lock;
119*61c4878aSAndroid Build Coastguard Worker // Any errors are logged in Channel::Send.
120*61c4878aSAndroid Build Coastguard Worker CloseAndSendResponseLocked(OkStatus()).IgnoreError();
121*61c4878aSAndroid Build Coastguard Worker WaitForCallbacksToComplete();
122*61c4878aSAndroid Build Coastguard Worker state_ |= kHasBeenDestroyed;
123*61c4878aSAndroid Build Coastguard Worker }
124*61c4878aSAndroid Build Coastguard Worker
DestroyClientCall()125*61c4878aSAndroid Build Coastguard Worker void Call::DestroyClientCall() {
126*61c4878aSAndroid Build Coastguard Worker RpcLockGuard lock;
127*61c4878aSAndroid Build Coastguard Worker CloseClientCall();
128*61c4878aSAndroid Build Coastguard Worker WaitForCallbacksToComplete();
129*61c4878aSAndroid Build Coastguard Worker state_ |= kHasBeenDestroyed;
130*61c4878aSAndroid Build Coastguard Worker }
131*61c4878aSAndroid Build Coastguard Worker
WaitForCallbacksToComplete()132*61c4878aSAndroid Build Coastguard Worker void Call::WaitForCallbacksToComplete() {
133*61c4878aSAndroid Build Coastguard Worker do {
134*61c4878aSAndroid Build Coastguard Worker int iterations = 0;
135*61c4878aSAndroid Build Coastguard Worker while (CallbacksAreRunning()) {
136*61c4878aSAndroid Build Coastguard Worker PW_RPC_CHECK_FOR_DEADLOCK("destroy", *this);
137*61c4878aSAndroid Build Coastguard Worker YieldRpcLock();
138*61c4878aSAndroid Build Coastguard Worker }
139*61c4878aSAndroid Build Coastguard Worker
140*61c4878aSAndroid Build Coastguard Worker } while (CleanUpIfRequired());
141*61c4878aSAndroid Build Coastguard Worker }
142*61c4878aSAndroid Build Coastguard Worker
MoveFrom(Call & other)143*61c4878aSAndroid Build Coastguard Worker void Call::MoveFrom(Call& other) {
144*61c4878aSAndroid Build Coastguard Worker PW_DCHECK(!active_locked());
145*61c4878aSAndroid Build Coastguard Worker PW_DCHECK(!awaiting_cleanup() && !other.awaiting_cleanup());
146*61c4878aSAndroid Build Coastguard Worker
147*61c4878aSAndroid Build Coastguard Worker // An active call with an executing callback cannot be moved. Derived call
148*61c4878aSAndroid Build Coastguard Worker // classes must wait for callbacks to finish before calling MoveFrom.
149*61c4878aSAndroid Build Coastguard Worker PW_DCHECK(!other.active_locked() || !other.CallbacksAreRunning());
150*61c4878aSAndroid Build Coastguard Worker
151*61c4878aSAndroid Build Coastguard Worker // Copy all members from the other call.
152*61c4878aSAndroid Build Coastguard Worker endpoint_ = other.endpoint_;
153*61c4878aSAndroid Build Coastguard Worker channel_id_ = other.channel_id_;
154*61c4878aSAndroid Build Coastguard Worker id_ = other.id_;
155*61c4878aSAndroid Build Coastguard Worker service_id_ = other.service_id_;
156*61c4878aSAndroid Build Coastguard Worker method_id_ = other.method_id_;
157*61c4878aSAndroid Build Coastguard Worker
158*61c4878aSAndroid Build Coastguard Worker state_ = other.state_;
159*61c4878aSAndroid Build Coastguard Worker
160*61c4878aSAndroid Build Coastguard Worker // No need to move awaiting_cleanup_, since it is 0 in both calls here.
161*61c4878aSAndroid Build Coastguard Worker
162*61c4878aSAndroid Build Coastguard Worker properties_ = other.properties_;
163*61c4878aSAndroid Build Coastguard Worker
164*61c4878aSAndroid Build Coastguard Worker // callbacks_executing_ is not moved since it is associated with the object in
165*61c4878aSAndroid Build Coastguard Worker // memory, not the call.
166*61c4878aSAndroid Build Coastguard Worker
167*61c4878aSAndroid Build Coastguard Worker on_error_ = std::move(other.on_error_);
168*61c4878aSAndroid Build Coastguard Worker on_next_ = std::move(other.on_next_);
169*61c4878aSAndroid Build Coastguard Worker
170*61c4878aSAndroid Build Coastguard Worker if (other.active_locked()) {
171*61c4878aSAndroid Build Coastguard Worker // Mark the other call inactive, unregister it, and register this one.
172*61c4878aSAndroid Build Coastguard Worker other.MarkClosed();
173*61c4878aSAndroid Build Coastguard Worker endpoint().UnregisterCall(other);
174*61c4878aSAndroid Build Coastguard Worker endpoint().RegisterUniqueCall(*this);
175*61c4878aSAndroid Build Coastguard Worker }
176*61c4878aSAndroid Build Coastguard Worker }
177*61c4878aSAndroid Build Coastguard Worker
WaitUntilReadyForMove(Call & destination,Call & source)178*61c4878aSAndroid Build Coastguard Worker void Call::WaitUntilReadyForMove(Call& destination, Call& source) {
179*61c4878aSAndroid Build Coastguard Worker do {
180*61c4878aSAndroid Build Coastguard Worker // Wait for the source's callbacks to finish if it is active.
181*61c4878aSAndroid Build Coastguard Worker int iterations = 0;
182*61c4878aSAndroid Build Coastguard Worker while (source.active_locked() && source.CallbacksAreRunning()) {
183*61c4878aSAndroid Build Coastguard Worker PW_RPC_CHECK_FOR_DEADLOCK("move", source);
184*61c4878aSAndroid Build Coastguard Worker YieldRpcLock();
185*61c4878aSAndroid Build Coastguard Worker }
186*61c4878aSAndroid Build Coastguard Worker
187*61c4878aSAndroid Build Coastguard Worker // At this point, no callbacks are running in the source call. If cleanup
188*61c4878aSAndroid Build Coastguard Worker // is required for the destination call, perform it and retry since
189*61c4878aSAndroid Build Coastguard Worker // cleanup releases and reacquires the RPC lock.
190*61c4878aSAndroid Build Coastguard Worker } while (source.CleanUpIfRequired() || destination.CleanUpIfRequired());
191*61c4878aSAndroid Build Coastguard Worker }
192*61c4878aSAndroid Build Coastguard Worker
CallOnError(Status error)193*61c4878aSAndroid Build Coastguard Worker void Call::CallOnError(Status error) {
194*61c4878aSAndroid Build Coastguard Worker auto on_error_local = std::move(on_error_);
195*61c4878aSAndroid Build Coastguard Worker
196*61c4878aSAndroid Build Coastguard Worker CallbackStarted();
197*61c4878aSAndroid Build Coastguard Worker
198*61c4878aSAndroid Build Coastguard Worker rpc_lock().unlock();
199*61c4878aSAndroid Build Coastguard Worker if (on_error_local) {
200*61c4878aSAndroid Build Coastguard Worker on_error_local(error);
201*61c4878aSAndroid Build Coastguard Worker }
202*61c4878aSAndroid Build Coastguard Worker
203*61c4878aSAndroid Build Coastguard Worker // This mutex lock could be avoided by making callbacks_executing_ atomic.
204*61c4878aSAndroid Build Coastguard Worker RpcLockGuard lock;
205*61c4878aSAndroid Build Coastguard Worker CallbackFinished();
206*61c4878aSAndroid Build Coastguard Worker }
207*61c4878aSAndroid Build Coastguard Worker
CleanUpIfRequired()208*61c4878aSAndroid Build Coastguard Worker bool Call::CleanUpIfRequired() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
209*61c4878aSAndroid Build Coastguard Worker if (!awaiting_cleanup()) {
210*61c4878aSAndroid Build Coastguard Worker return false;
211*61c4878aSAndroid Build Coastguard Worker }
212*61c4878aSAndroid Build Coastguard Worker endpoint_->CleanUpCall(*this);
213*61c4878aSAndroid Build Coastguard Worker rpc_lock().lock();
214*61c4878aSAndroid Build Coastguard Worker return true;
215*61c4878aSAndroid Build Coastguard Worker }
216*61c4878aSAndroid Build Coastguard Worker
SendPacket(PacketType type,ConstByteSpan payload,Status status)217*61c4878aSAndroid Build Coastguard Worker Status Call::SendPacket(PacketType type, ConstByteSpan payload, Status status) {
218*61c4878aSAndroid Build Coastguard Worker if (!active_locked()) {
219*61c4878aSAndroid Build Coastguard Worker encoding_buffer.ReleaseIfAllocated();
220*61c4878aSAndroid Build Coastguard Worker return Status::FailedPrecondition();
221*61c4878aSAndroid Build Coastguard Worker }
222*61c4878aSAndroid Build Coastguard Worker
223*61c4878aSAndroid Build Coastguard Worker ChannelBase* channel = endpoint_->GetInternalChannel(channel_id_);
224*61c4878aSAndroid Build Coastguard Worker if (channel == nullptr) {
225*61c4878aSAndroid Build Coastguard Worker encoding_buffer.ReleaseIfAllocated();
226*61c4878aSAndroid Build Coastguard Worker return Status::Unavailable();
227*61c4878aSAndroid Build Coastguard Worker }
228*61c4878aSAndroid Build Coastguard Worker return channel->Send(MakePacket(type, payload, status));
229*61c4878aSAndroid Build Coastguard Worker }
230*61c4878aSAndroid Build Coastguard Worker
CloseAndSendResponseCallbackLocked(const Function<StatusWithSize (ByteSpan)> & callback,Status status)231*61c4878aSAndroid Build Coastguard Worker Status Call::CloseAndSendResponseCallbackLocked(
232*61c4878aSAndroid Build Coastguard Worker const Function<StatusWithSize(ByteSpan)>& callback, Status status) {
233*61c4878aSAndroid Build Coastguard Worker PW_TRY_ASSIGN(ConstByteSpan payload, EncodeCallbackToPayloadBuffer(callback));
234*61c4878aSAndroid Build Coastguard Worker return CloseAndSendFinalPacketLocked(
235*61c4878aSAndroid Build Coastguard Worker pwpb::PacketType::RESPONSE, payload, status);
236*61c4878aSAndroid Build Coastguard Worker }
237*61c4878aSAndroid Build Coastguard Worker
TryCloseAndSendResponseCallbackLocked(const Function<StatusWithSize (ByteSpan)> & callback,Status status)238*61c4878aSAndroid Build Coastguard Worker Status Call::TryCloseAndSendResponseCallbackLocked(
239*61c4878aSAndroid Build Coastguard Worker const Function<StatusWithSize(ByteSpan)>& callback, Status status) {
240*61c4878aSAndroid Build Coastguard Worker PW_TRY_ASSIGN(ConstByteSpan payload, EncodeCallbackToPayloadBuffer(callback));
241*61c4878aSAndroid Build Coastguard Worker return TryCloseAndSendFinalPacketLocked(
242*61c4878aSAndroid Build Coastguard Worker pwpb::PacketType::RESPONSE, payload, status);
243*61c4878aSAndroid Build Coastguard Worker }
244*61c4878aSAndroid Build Coastguard Worker
CloseAndSendFinalPacketLocked(PacketType type,ConstByteSpan response,Status status)245*61c4878aSAndroid Build Coastguard Worker Status Call::CloseAndSendFinalPacketLocked(PacketType type,
246*61c4878aSAndroid Build Coastguard Worker ConstByteSpan response,
247*61c4878aSAndroid Build Coastguard Worker Status status) {
248*61c4878aSAndroid Build Coastguard Worker const Status send_status = SendPacket(type, response, status);
249*61c4878aSAndroid Build Coastguard Worker UnregisterAndMarkClosed();
250*61c4878aSAndroid Build Coastguard Worker return send_status;
251*61c4878aSAndroid Build Coastguard Worker }
252*61c4878aSAndroid Build Coastguard Worker
TryCloseAndSendFinalPacketLocked(PacketType type,ConstByteSpan response,Status status)253*61c4878aSAndroid Build Coastguard Worker Status Call::TryCloseAndSendFinalPacketLocked(PacketType type,
254*61c4878aSAndroid Build Coastguard Worker ConstByteSpan response,
255*61c4878aSAndroid Build Coastguard Worker Status status) {
256*61c4878aSAndroid Build Coastguard Worker const Status send_status = SendPacket(type, response, status);
257*61c4878aSAndroid Build Coastguard Worker // Only close the call if the final packet gets sent out successfully.
258*61c4878aSAndroid Build Coastguard Worker if (send_status.ok()) {
259*61c4878aSAndroid Build Coastguard Worker UnregisterAndMarkClosed();
260*61c4878aSAndroid Build Coastguard Worker }
261*61c4878aSAndroid Build Coastguard Worker return send_status;
262*61c4878aSAndroid Build Coastguard Worker }
263*61c4878aSAndroid Build Coastguard Worker
WriteLocked(ConstByteSpan payload)264*61c4878aSAndroid Build Coastguard Worker Status Call::WriteLocked(ConstByteSpan payload) {
265*61c4878aSAndroid Build Coastguard Worker return SendPacket(properties_.call_type() == kServerCall
266*61c4878aSAndroid Build Coastguard Worker ? PacketType::SERVER_STREAM
267*61c4878aSAndroid Build Coastguard Worker : PacketType::CLIENT_STREAM,
268*61c4878aSAndroid Build Coastguard Worker payload);
269*61c4878aSAndroid Build Coastguard Worker }
270*61c4878aSAndroid Build Coastguard Worker
WriteCallbackLocked(const Function<StatusWithSize (ByteSpan)> & callback)271*61c4878aSAndroid Build Coastguard Worker Status Call::WriteCallbackLocked(
272*61c4878aSAndroid Build Coastguard Worker const Function<StatusWithSize(ByteSpan)>& callback) {
273*61c4878aSAndroid Build Coastguard Worker PW_TRY_ASSIGN(ConstByteSpan payload, EncodeCallbackToPayloadBuffer(callback));
274*61c4878aSAndroid Build Coastguard Worker return SendPacket(properties_.call_type() == kServerCall
275*61c4878aSAndroid Build Coastguard Worker ? PacketType::SERVER_STREAM
276*61c4878aSAndroid Build Coastguard Worker : PacketType::CLIENT_STREAM,
277*61c4878aSAndroid Build Coastguard Worker payload);
278*61c4878aSAndroid Build Coastguard Worker }
279*61c4878aSAndroid Build Coastguard Worker
280*61c4878aSAndroid Build Coastguard Worker // This definition is in the .cc file because the Endpoint class is not defined
281*61c4878aSAndroid Build Coastguard Worker // in the Call header, due to circular dependencies between the two.
CloseAndMarkForCleanup(Status error)282*61c4878aSAndroid Build Coastguard Worker void Call::CloseAndMarkForCleanup(Status error) {
283*61c4878aSAndroid Build Coastguard Worker endpoint_->CloseCallAndMarkForCleanup(*this, error);
284*61c4878aSAndroid Build Coastguard Worker }
285*61c4878aSAndroid Build Coastguard Worker
HandlePayload(ConstByteSpan payload)286*61c4878aSAndroid Build Coastguard Worker void Call::HandlePayload(ConstByteSpan payload) {
287*61c4878aSAndroid Build Coastguard Worker // pw_rpc only supports handling packets for a particular RPC one at a time.
288*61c4878aSAndroid Build Coastguard Worker // Check if any callbacks are running and drop the packet if they are.
289*61c4878aSAndroid Build Coastguard Worker //
290*61c4878aSAndroid Build Coastguard Worker // The on_next callback cannot support multiple packets at once since it is
291*61c4878aSAndroid Build Coastguard Worker // moved before it is invoked. on_error and on_completed are only called
292*61c4878aSAndroid Build Coastguard Worker // after the call is closed.
293*61c4878aSAndroid Build Coastguard Worker if (CallbacksAreRunning()) {
294*61c4878aSAndroid Build Coastguard Worker PW_LOG_WARN(
295*61c4878aSAndroid Build Coastguard Worker "Received stream packet for %u:%08x/%08x before the callback for a "
296*61c4878aSAndroid Build Coastguard Worker "previous packet completed! This packet will be dropped. This can be "
297*61c4878aSAndroid Build Coastguard Worker "avoided by handling packets for a particular RPC on only one thread.",
298*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(channel_id_),
299*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(service_id_),
300*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(method_id_));
301*61c4878aSAndroid Build Coastguard Worker rpc_lock().unlock();
302*61c4878aSAndroid Build Coastguard Worker return;
303*61c4878aSAndroid Build Coastguard Worker }
304*61c4878aSAndroid Build Coastguard Worker
305*61c4878aSAndroid Build Coastguard Worker if (on_next_ == nullptr) {
306*61c4878aSAndroid Build Coastguard Worker rpc_lock().unlock();
307*61c4878aSAndroid Build Coastguard Worker return;
308*61c4878aSAndroid Build Coastguard Worker }
309*61c4878aSAndroid Build Coastguard Worker
310*61c4878aSAndroid Build Coastguard Worker const uint32_t original_id = id();
311*61c4878aSAndroid Build Coastguard Worker auto on_next_local = std::move(on_next_);
312*61c4878aSAndroid Build Coastguard Worker CallbackStarted();
313*61c4878aSAndroid Build Coastguard Worker
314*61c4878aSAndroid Build Coastguard Worker if (hold_lock_while_invoking_callback_with_payload()) {
315*61c4878aSAndroid Build Coastguard Worker on_next_local(payload);
316*61c4878aSAndroid Build Coastguard Worker } else {
317*61c4878aSAndroid Build Coastguard Worker rpc_lock().unlock();
318*61c4878aSAndroid Build Coastguard Worker on_next_local(payload);
319*61c4878aSAndroid Build Coastguard Worker rpc_lock().lock();
320*61c4878aSAndroid Build Coastguard Worker }
321*61c4878aSAndroid Build Coastguard Worker
322*61c4878aSAndroid Build Coastguard Worker CallbackFinished();
323*61c4878aSAndroid Build Coastguard Worker
324*61c4878aSAndroid Build Coastguard Worker // Restore the original callback if the original call is still active and
325*61c4878aSAndroid Build Coastguard Worker // the callback has not been replaced.
326*61c4878aSAndroid Build Coastguard Worker // NOLINTNEXTLINE(bugprone-use-after-move)
327*61c4878aSAndroid Build Coastguard Worker if (active_locked() && id() == original_id && on_next_ == nullptr) {
328*61c4878aSAndroid Build Coastguard Worker on_next_ = std::move(on_next_local);
329*61c4878aSAndroid Build Coastguard Worker }
330*61c4878aSAndroid Build Coastguard Worker
331*61c4878aSAndroid Build Coastguard Worker // The call could have been reinitialized and cleaned up already by another
332*61c4878aSAndroid Build Coastguard Worker // thread that acquired the rpc_lock() while on_next_local was executing
333*61c4878aSAndroid Build Coastguard Worker // without lock held.
334*61c4878aSAndroid Build Coastguard Worker if (endpoint_ != nullptr) {
335*61c4878aSAndroid Build Coastguard Worker // Clean up calls in case decoding failed.
336*61c4878aSAndroid Build Coastguard Worker endpoint_->CleanUpCalls();
337*61c4878aSAndroid Build Coastguard Worker } else {
338*61c4878aSAndroid Build Coastguard Worker rpc_lock().unlock();
339*61c4878aSAndroid Build Coastguard Worker }
340*61c4878aSAndroid Build Coastguard Worker }
341*61c4878aSAndroid Build Coastguard Worker
CloseClientCall()342*61c4878aSAndroid Build Coastguard Worker void Call::CloseClientCall() {
343*61c4878aSAndroid Build Coastguard Worker // When a client call is closed, for bidirectional and client streaming RPCs,
344*61c4878aSAndroid Build Coastguard Worker // the server may be waiting for client stream messages, so we need to notify
345*61c4878aSAndroid Build Coastguard Worker // the server that the client has requested for completion and no further
346*61c4878aSAndroid Build Coastguard Worker // requests should be expected from the client. For unary and server streaming
347*61c4878aSAndroid Build Coastguard Worker // RPCs, since the client is not sending messages, server does not need to be
348*61c4878aSAndroid Build Coastguard Worker // notified.
349*61c4878aSAndroid Build Coastguard Worker if (has_client_stream() && !client_requested_completion()) {
350*61c4878aSAndroid Build Coastguard Worker RequestCompletionLocked().IgnoreError();
351*61c4878aSAndroid Build Coastguard Worker }
352*61c4878aSAndroid Build Coastguard Worker UnregisterAndMarkClosed();
353*61c4878aSAndroid Build Coastguard Worker }
354*61c4878aSAndroid Build Coastguard Worker
UnregisterAndMarkClosed()355*61c4878aSAndroid Build Coastguard Worker void Call::UnregisterAndMarkClosed() {
356*61c4878aSAndroid Build Coastguard Worker if (active_locked()) {
357*61c4878aSAndroid Build Coastguard Worker endpoint().UnregisterCall(*this);
358*61c4878aSAndroid Build Coastguard Worker MarkClosed();
359*61c4878aSAndroid Build Coastguard Worker }
360*61c4878aSAndroid Build Coastguard Worker }
361*61c4878aSAndroid Build Coastguard Worker
DebugLog() const362*61c4878aSAndroid Build Coastguard Worker void Call::DebugLog() const PW_NO_LOCK_SAFETY_ANALYSIS {
363*61c4878aSAndroid Build Coastguard Worker PW_LOG_INFO(
364*61c4878aSAndroid Build Coastguard Worker "Call %p\n"
365*61c4878aSAndroid Build Coastguard Worker "\tEndpoint: %p\n"
366*61c4878aSAndroid Build Coastguard Worker "\tCall ID: %8u\n"
367*61c4878aSAndroid Build Coastguard Worker "\tChannel: %8u\n"
368*61c4878aSAndroid Build Coastguard Worker "\tService: %08x\n"
369*61c4878aSAndroid Build Coastguard Worker "\tMethod: %08x\n"
370*61c4878aSAndroid Build Coastguard Worker "\tState: %8x\n"
371*61c4878aSAndroid Build Coastguard Worker "\tCleanup: %8s\n"
372*61c4878aSAndroid Build Coastguard Worker "\tBusy CBs: %8x\n"
373*61c4878aSAndroid Build Coastguard Worker "\tType: %8d\n"
374*61c4878aSAndroid Build Coastguard Worker "\tClient: %8d\n"
375*61c4878aSAndroid Build Coastguard Worker "\tWrapped: %8d\n"
376*61c4878aSAndroid Build Coastguard Worker "\ton_error: %8d\n"
377*61c4878aSAndroid Build Coastguard Worker "\ton_next: %8d\n",
378*61c4878aSAndroid Build Coastguard Worker static_cast<const void*>(this),
379*61c4878aSAndroid Build Coastguard Worker static_cast<const void*>(endpoint_),
380*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(id_),
381*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(channel_id_),
382*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(service_id_),
383*61c4878aSAndroid Build Coastguard Worker static_cast<unsigned>(method_id_),
384*61c4878aSAndroid Build Coastguard Worker static_cast<int>(state_),
385*61c4878aSAndroid Build Coastguard Worker Status(static_cast<Status::Code>(awaiting_cleanup_)).str(),
386*61c4878aSAndroid Build Coastguard Worker static_cast<int>(callbacks_executing_),
387*61c4878aSAndroid Build Coastguard Worker static_cast<int>(properties_.method_type()),
388*61c4878aSAndroid Build Coastguard Worker static_cast<int>(properties_.call_type()),
389*61c4878aSAndroid Build Coastguard Worker static_cast<int>(hold_lock_while_invoking_callback_with_payload()),
390*61c4878aSAndroid Build Coastguard Worker static_cast<int>(on_error_ == nullptr),
391*61c4878aSAndroid Build Coastguard Worker static_cast<int>(on_next_ == nullptr));
392*61c4878aSAndroid Build Coastguard Worker }
393*61c4878aSAndroid Build Coastguard Worker
394*61c4878aSAndroid Build Coastguard Worker } // namespace pw::rpc::internal
395