xref: /aosp_15_r20/external/pigweed/pw_rpc/nanopb/public/pw_rpc/nanopb/client_reader_writer.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 // This file defines the ServerReaderWriter, ServerReader, and ServerWriter
16 // classes for the Nanopb RPC interface. These classes are used for
17 // bidirectional, client, and server streaming RPCs.
18 #pragma once
19 
20 #include "pw_rpc/channel.h"
21 #include "pw_rpc/internal/client_call.h"
22 #include "pw_rpc/internal/lock.h"
23 #include "pw_rpc/nanopb/internal/common.h"
24 
25 namespace pw::rpc {
26 namespace internal {
27 
28 // Base class for unary and client streaming calls.
29 template <typename Response>
30 class NanopbUnaryResponseClientCall : public UnaryResponseClientCall {
31  public:
32   template <typename CallType, typename... Request>
Start(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,const NanopbMethodSerde & serde,Function<void (const Response &,Status)> && on_completed,Function<void (Status)> && on_error,const Request &...request)33   static CallType Start(Endpoint& client,
34                         uint32_t channel_id,
35                         uint32_t service_id,
36                         uint32_t method_id,
37                         const NanopbMethodSerde& serde,
38                         Function<void(const Response&, Status)>&& on_completed,
39                         Function<void(Status)>&& on_error,
40                         const Request&... request)
41       PW_LOCKS_EXCLUDED(rpc_lock()) {
42     rpc_lock().lock();
43     CallType call(
44         client.ClaimLocked(), channel_id, service_id, method_id, serde);
45 
46     call.set_nanopb_on_completed_locked(std::move(on_completed));
47     call.set_on_error_locked(std::move(on_error));
48 
49     if constexpr (sizeof...(Request) == 0u) {
50       call.SendInitialClientRequest({});
51     } else {
52       NanopbSendInitialRequest(call, serde.request(), &request...);
53     }
54     client.CleanUpCalls();
55     return call;
56   }
57 
~NanopbUnaryResponseClientCall()58   ~NanopbUnaryResponseClientCall() { DestroyClientCall(); }
59 
60  protected:
NanopbUnaryResponseClientCall()61   constexpr NanopbUnaryResponseClientCall() : serde_(nullptr) {}
62 
NanopbUnaryResponseClientCall(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type,const NanopbMethodSerde & serde)63   NanopbUnaryResponseClientCall(LockedEndpoint& client,
64                                 uint32_t channel_id,
65                                 uint32_t service_id,
66                                 uint32_t method_id,
67                                 MethodType type,
68                                 const NanopbMethodSerde& serde)
69       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
70       : UnaryResponseClientCall(
71             client, channel_id, service_id, method_id, StructCallProps(type)),
72         serde_(&serde) {}
73 
74   NanopbUnaryResponseClientCall(NanopbUnaryResponseClientCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())75       PW_LOCKS_EXCLUDED(rpc_lock()) {
76     *this = std::move(other);
77   }
78 
79   NanopbUnaryResponseClientCall& operator=(
PW_LOCKS_EXCLUDED(rpc_lock ())80       NanopbUnaryResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) {
81     RpcLockGuard lock;
82     MoveUnaryResponseClientCallFrom(other);
83     serde_ = other.serde_;
84     set_nanopb_on_completed_locked(std::move(other.nanopb_on_completed_));
85     return *this;
86   }
87 
set_on_completed(Function<void (const Response & response,Status)> && on_completed)88   void set_on_completed(
89       Function<void(const Response& response, Status)>&& on_completed)
90       PW_LOCKS_EXCLUDED(rpc_lock()) {
91     RpcLockGuard lock;
92     set_nanopb_on_completed_locked(std::move(on_completed));
93   }
94 
SendClientStream(const void * payload)95   Status SendClientStream(const void* payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
96     RpcLockGuard lock;
97     return NanopbSendStream(*this, payload, serde_);
98   }
99 
100  private:
set_nanopb_on_completed_locked(Function<void (const Response & response,Status)> && on_completed)101   void set_nanopb_on_completed_locked(
102       Function<void(const Response& response, Status)>&& on_completed)
103       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
104     nanopb_on_completed_ = std::move(on_completed);
105 
106     UnaryResponseClientCall::set_on_completed_locked(
107         [this](ConstByteSpan payload, Status status)
108             PW_NO_LOCK_SAFETY_ANALYSIS {
109               DecodeToStructAndInvokeOnCompleted(
110                   payload, serde_->response(), nanopb_on_completed_, status);
111             });
112   }
113 
114   const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
115   Function<void(const Response&, Status)> nanopb_on_completed_
116       PW_GUARDED_BY(rpc_lock());
117 };
118 
119 // Base class for server and bidirectional streaming calls.
120 template <typename Response>
121 class NanopbStreamResponseClientCall : public StreamResponseClientCall {
122  public:
123   template <typename CallType, typename... Request>
Start(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,const NanopbMethodSerde & serde,Function<void (const Response &)> && on_next,Function<void (Status)> && on_completed,Function<void (Status)> && on_error,const Request &...request)124   static CallType Start(Endpoint& client,
125                         uint32_t channel_id,
126                         uint32_t service_id,
127                         uint32_t method_id,
128                         const NanopbMethodSerde& serde,
129                         Function<void(const Response&)>&& on_next,
130                         Function<void(Status)>&& on_completed,
131                         Function<void(Status)>&& on_error,
132                         const Request&... request)
133       PW_LOCKS_EXCLUDED(rpc_lock()) {
134     rpc_lock().lock();
135     CallType call(
136         client.ClaimLocked(), channel_id, service_id, method_id, serde);
137 
138     call.set_nanopb_on_next_locked(std::move(on_next));
139     call.set_on_completed_locked(std::move(on_completed));
140     call.set_on_error_locked(std::move(on_error));
141 
142     if constexpr (sizeof...(Request) == 0u) {
143       call.SendInitialClientRequest({});
144     } else {
145       NanopbSendInitialRequest(call, serde.request(), &request...);
146     }
147     client.CleanUpCalls();
148     return call;
149   }
150 
~NanopbStreamResponseClientCall()151   ~NanopbStreamResponseClientCall() { DestroyClientCall(); }
152 
153  protected:
NanopbStreamResponseClientCall()154   constexpr NanopbStreamResponseClientCall() : serde_(nullptr) {}
155 
156   NanopbStreamResponseClientCall(NanopbStreamResponseClientCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())157       PW_LOCKS_EXCLUDED(rpc_lock()) {
158     *this = std::move(other);
159   }
160 
161   NanopbStreamResponseClientCall& operator=(
PW_LOCKS_EXCLUDED(rpc_lock ())162       NanopbStreamResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) {
163     RpcLockGuard lock;
164     MoveStreamResponseClientCallFrom(other);
165     serde_ = other.serde_;
166     set_nanopb_on_next_locked(std::move(other.nanopb_on_next_));
167     return *this;
168   }
169 
NanopbStreamResponseClientCall(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type,const NanopbMethodSerde & serde)170   NanopbStreamResponseClientCall(LockedEndpoint& client,
171                                  uint32_t channel_id,
172                                  uint32_t service_id,
173                                  uint32_t method_id,
174                                  MethodType type,
175                                  const NanopbMethodSerde& serde)
176       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
177       : StreamResponseClientCall(
178             client, channel_id, service_id, method_id, StructCallProps(type)),
179         serde_(&serde) {}
180 
SendClientStream(const void * payload)181   Status SendClientStream(const void* payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
182     RpcLockGuard lock;
183     return NanopbSendStream(*this, payload, serde_);
184   }
185 
set_on_next(Function<void (const Response & response)> && on_next)186   void set_on_next(Function<void(const Response& response)>&& on_next)
187       PW_LOCKS_EXCLUDED(rpc_lock()) {
188     RpcLockGuard lock;
189     set_nanopb_on_next_locked(std::move(on_next));
190   }
191 
192  private:
set_nanopb_on_next_locked(Function<void (const Response & response)> && on_next)193   void set_nanopb_on_next_locked(
194       Function<void(const Response& response)>&& on_next)
195       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
196     nanopb_on_next_ = std::move(on_next);
197 
198     Call::set_on_next_locked(
199         [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
200           DecodeToStructAndInvokeOnNext(
201               payload, serde_->response(), nanopb_on_next_);
202         });
203   }
204 
205   const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
206   Function<void(const Response&)> nanopb_on_next_ PW_GUARDED_BY(rpc_lock());
207 };
208 
209 }  // namespace internal
210 
211 // The NanopbClientReaderWriter is used to send and receive messages in a
212 // bidirectional streaming RPC.
213 template <typename Request, typename ResponseType>
214 class NanopbClientReaderWriter
215     : private internal::NanopbStreamResponseClientCall<ResponseType> {
216  public:
217   using Response = ResponseType;
218 
219   constexpr NanopbClientReaderWriter() = default;
220 
221   NanopbClientReaderWriter(NanopbClientReaderWriter&&) = default;
222   NanopbClientReaderWriter& operator=(NanopbClientReaderWriter&&) = default;
223 
224   using internal::Call::active;
225   using internal::Call::channel_id;
226 
227   using internal::ClientCall::id;
228 
229   // Writes a response struct. Returns the following Status codes:
230   //
231   //   OK - the response was successfully sent
232   //   FAILED_PRECONDITION - the writer is closed
233   //   INTERNAL - pw_rpc was unable to encode the Nanopb protobuf
234   //   other errors - the ChannelOutput failed to send the packet; the error
235   //       codes are determined by the ChannelOutput implementation
236   //
Write(const Request & request)237   Status Write(const Request& request) {
238     return internal::NanopbStreamResponseClientCall<Response>::SendClientStream(
239         &request);
240   }
241 
242   // Notifies the server that the client has requested to stop communication by
243   // sending CLIENT_REQUEST_COMPLETION.
244   using internal::ClientCall::RequestCompletion;
245 
246   // Cancels this RPC. Closes the call locally and sends a CANCELLED error to
247   // the server.
248   using internal::Call::Cancel;
249 
250   // Closes this RPC locally. Sends a CLIENT_REQUEST_COMPLETION, but no
251   // cancellation packet. Future packets for this RPC are dropped, and the
252   // client sends a FAILED_PRECONDITION error in response because the call is
253   // not active.
254   using internal::ClientCall::Abandon;
255 
256   // Closes this RPC locally and waits for any running callbacks to complete.
257   // Sends a CLIENT_REQUEST_COMPLETION, but no cancellation packet. Future
258   // packets for this RPC are dropped, and the client sends a
259   // FAILED_PRECONDITION error in response because the call is not active.
260   using internal::ClientCall::CloseAndWaitForCallbacks;
261 
262   // Functions for setting RPC event callbacks.
263   using internal::Call::set_on_error;
264   using internal::StreamResponseClientCall::set_on_completed;
265   using internal::NanopbStreamResponseClientCall<Response>::set_on_next;
266 
267  private:
268   friend class internal::NanopbStreamResponseClientCall<Response>;
269 
NanopbClientReaderWriter(internal::LockedEndpoint & client,uint32_t channel_id_value,uint32_t service_id,uint32_t method_id,const internal::NanopbMethodSerde & serde)270   NanopbClientReaderWriter(internal::LockedEndpoint& client,
271                            uint32_t channel_id_value,
272                            uint32_t service_id,
273                            uint32_t method_id,
274                            const internal::NanopbMethodSerde& serde)
275       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
276       : internal::NanopbStreamResponseClientCall<Response>(
277             client,
278             channel_id_value,
279             service_id,
280             method_id,
281             MethodType::kBidirectionalStreaming,
282             serde) {}
283 };
284 
285 // The NanopbClientReader is used to receive messages and send a response in a
286 // client streaming RPC.
287 template <typename ResponseType>
288 class NanopbClientReader
289     : private internal::NanopbStreamResponseClientCall<ResponseType> {
290  public:
291   using Response = ResponseType;
292 
293   constexpr NanopbClientReader() = default;
294 
295   NanopbClientReader(NanopbClientReader&&) = default;
296   NanopbClientReader& operator=(NanopbClientReader&&) = default;
297 
298   using internal::Call::active;
299   using internal::Call::channel_id;
300 
301   using internal::ClientCall::id;
302 
303   // Functions for setting RPC event callbacks.
304   using internal::NanopbStreamResponseClientCall<Response>::set_on_next;
305   using internal::Call::set_on_error;
306   using internal::StreamResponseClientCall::set_on_completed;
307 
308   using internal::Call::Cancel;
309   using internal::Call::RequestCompletion;
310   using internal::ClientCall::Abandon;
311   using internal::ClientCall::CloseAndWaitForCallbacks;
312 
313  private:
314   friend class internal::NanopbStreamResponseClientCall<Response>;
315 
NanopbClientReader(internal::LockedEndpoint & client,uint32_t channel_id_value,uint32_t service_id,uint32_t method_id,const internal::NanopbMethodSerde & serde)316   NanopbClientReader(internal::LockedEndpoint& client,
317                      uint32_t channel_id_value,
318                      uint32_t service_id,
319                      uint32_t method_id,
320                      const internal::NanopbMethodSerde& serde)
321       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
322       : internal::NanopbStreamResponseClientCall<Response>(
323             client,
324             channel_id_value,
325             service_id,
326             method_id,
327             MethodType::kServerStreaming,
328             serde) {}
329 };
330 
331 // The NanopbClientWriter is used to send responses in a server streaming RPC.
332 template <typename Request, typename ResponseType>
333 class NanopbClientWriter
334     : private internal::NanopbUnaryResponseClientCall<ResponseType> {
335  public:
336   using Response = ResponseType;
337 
338   constexpr NanopbClientWriter() = default;
339 
340   NanopbClientWriter(NanopbClientWriter&&) = default;
341   NanopbClientWriter& operator=(NanopbClientWriter&&) = default;
342 
343   using internal::Call::active;
344   using internal::Call::channel_id;
345 
346   using internal::ClientCall::id;
347 
348   // Functions for setting RPC event callbacks.
349   using internal::NanopbUnaryResponseClientCall<Response>::set_on_completed;
350   using internal::Call::set_on_error;
351 
Write(const Request & request)352   Status Write(const Request& request) {
353     return internal::NanopbUnaryResponseClientCall<Response>::SendClientStream(
354         &request);
355   }
356 
357   using internal::Call::Cancel;
358   using internal::Call::RequestCompletion;
359   using internal::ClientCall::Abandon;
360   using internal::ClientCall::CloseAndWaitForCallbacks;
361 
362  private:
363   friend class internal::NanopbUnaryResponseClientCall<Response>;
364 
NanopbClientWriter(internal::LockedEndpoint & client,uint32_t channel_id_value,uint32_t service_id,uint32_t method_id,const internal::NanopbMethodSerde & serde)365   NanopbClientWriter(internal::LockedEndpoint& client,
366                      uint32_t channel_id_value,
367                      uint32_t service_id,
368                      uint32_t method_id,
369                      const internal::NanopbMethodSerde& serde)
370       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
371       : internal::NanopbUnaryResponseClientCall<Response>(
372             client,
373             channel_id_value,
374             service_id,
375             method_id,
376             MethodType::kClientStreaming,
377             serde) {}
378 };
379 
380 // The NanopbUnaryReceiver is used to receive a response in a unary RPC.
381 template <typename ResponseType>
382 class NanopbUnaryReceiver
383     : private internal::NanopbUnaryResponseClientCall<ResponseType> {
384  public:
385   using Response = ResponseType;
386 
387   constexpr NanopbUnaryReceiver() = default;
388 
389   NanopbUnaryReceiver(NanopbUnaryReceiver&&) = default;
390   NanopbUnaryReceiver& operator=(NanopbUnaryReceiver&&) = default;
391 
392   using internal::Call::active;
393   using internal::Call::channel_id;
394 
395   using internal::ClientCall::id;
396 
397   // Functions for setting RPC event callbacks.
398   using internal::NanopbUnaryResponseClientCall<Response>::set_on_completed;
399   using internal::Call::set_on_error;
400 
401   using internal::Call::Cancel;
402   using internal::ClientCall::Abandon;
403   using internal::ClientCall::CloseAndWaitForCallbacks;
404 
405  private:
406   friend class internal::NanopbUnaryResponseClientCall<Response>;
407 
NanopbUnaryReceiver(internal::LockedEndpoint & client,uint32_t channel_id_value,uint32_t service_id,uint32_t method_id,const internal::NanopbMethodSerde & serde)408   NanopbUnaryReceiver(internal::LockedEndpoint& client,
409                       uint32_t channel_id_value,
410                       uint32_t service_id,
411                       uint32_t method_id,
412                       const internal::NanopbMethodSerde& serde)
413       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
414       : internal::NanopbUnaryResponseClientCall<Response>(client,
415                                                           channel_id_value,
416                                                           service_id,
417                                                           method_id,
418                                                           MethodType::kUnary,
419                                                           serde) {}
420 };
421 
422 }  // namespace pw::rpc
423