xref: /aosp_15_r20/external/pigweed/pw_rpc/nanopb/public/pw_rpc/nanopb/server_reader_writer.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 // This file defines the ServerReaderWriter, ServerReader, and ServerWriter
16 // classes for the Nanopb RPC interface. These classes are used for
17 // bidirectional, client, and server streaming RPCs.
18 #pragma once
19 
20 #include "pw_bytes/span.h"
21 #include "pw_rpc/channel.h"
22 #include "pw_rpc/internal/lock.h"
23 #include "pw_rpc/internal/method_info.h"
24 #include "pw_rpc/internal/method_lookup.h"
25 #include "pw_rpc/internal/server_call.h"
26 #include "pw_rpc/nanopb/internal/common.h"
27 #include "pw_rpc/server.h"
28 
29 namespace pw::rpc {
30 namespace internal {
31 
32 // Forward declarations for internal classes needed in friend statements.
33 class NanopbMethod;
34 
35 namespace test {
36 
37 template <typename, typename, uint32_t>
38 class InvocationContext;
39 
40 }  // namespace test
41 
42 class NanopbServerCall : public ServerCall {
43  public:
NanopbServerCall()44   constexpr NanopbServerCall() : serde_(nullptr) {}
45 
46   NanopbServerCall(const LockedCallContext& context, MethodType type)
47       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
48 
~NanopbServerCall()49   ~NanopbServerCall() { DestroyServerCall(); }
50 
SendUnaryResponse(const void * payload,Status status)51   Status SendUnaryResponse(const void* payload, Status status)
52       PW_LOCKS_EXCLUDED(rpc_lock()) {
53     return SendFinalResponse(*this, payload, status);
54   }
55 
TrySendUnaryResponse(const void * payload,Status status)56   Status TrySendUnaryResponse(const void* payload, Status status)
57       PW_LOCKS_EXCLUDED(rpc_lock()) {
58     return TrySendFinalResponse(*this, payload, status);
59   }
60 
serde()61   const NanopbMethodSerde& serde() const
62       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
63     return *serde_;
64   }
65 
66  protected:
NanopbServerCall(NanopbServerCall && other)67   NanopbServerCall(NanopbServerCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) {
68     *this = std::move(other);
69   }
70 
71   NanopbServerCall& operator=(NanopbServerCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())72       PW_LOCKS_EXCLUDED(rpc_lock()) {
73     RpcLockGuard lock;
74     MoveNanopbServerCallFrom(other);
75     return *this;
76   }
77 
MoveNanopbServerCallFrom(NanopbServerCall & other)78   void MoveNanopbServerCallFrom(NanopbServerCall& other)
79       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
80     MoveServerCallFrom(other);
81     serde_ = other.serde_;
82   }
83 
SendServerStream(const void * payload)84   Status SendServerStream(const void* payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
85     RpcLockGuard lock;
86     return NanopbSendStream(*this, payload, serde_);
87   }
88 
89  private:
90   const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
91 };
92 
93 // The BaseNanopbServerReader serves as the base for the ServerReader and
94 // ServerReaderWriter classes. It adds a callback templated on the request
95 // struct type. It is templated on the Request type only.
96 template <typename Request>
97 class BaseNanopbServerReader : public NanopbServerCall {
98  public:
BaseNanopbServerReader(const LockedCallContext & context,MethodType type)99   BaseNanopbServerReader(const LockedCallContext& context, MethodType type)
100       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
101       : NanopbServerCall(context, type) {}
102 
~BaseNanopbServerReader()103   ~BaseNanopbServerReader() { DestroyServerCall(); }
104 
105  protected:
106   constexpr BaseNanopbServerReader() = default;
107 
108   BaseNanopbServerReader(BaseNanopbServerReader&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())109       PW_LOCKS_EXCLUDED(rpc_lock()) {
110     *this = std::move(other);
111   }
112 
113   BaseNanopbServerReader& operator=(BaseNanopbServerReader&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())114       PW_LOCKS_EXCLUDED(rpc_lock()) {
115     RpcLockGuard lock;
116     MoveNanopbServerCallFrom(other);
117     set_nanopb_on_next_locked(std::move(other.nanopb_on_next_));
118     return *this;
119   }
120 
set_on_next(Function<void (const Request & request)> && on_next)121   void set_on_next(Function<void(const Request& request)>&& on_next)
122       PW_LOCKS_EXCLUDED(rpc_lock()) {
123     RpcLockGuard lock;
124     set_nanopb_on_next_locked(std::move(on_next));
125   }
126 
127  private:
set_nanopb_on_next_locked(Function<void (const Request & request)> && on_next)128   void set_nanopb_on_next_locked(
129       Function<void(const Request& request)>&& on_next)
130       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
131     nanopb_on_next_ = std::move(on_next);
132 
133     Call::set_on_next_locked(
134         [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
135           DecodeToStructAndInvokeOnNext(
136               payload, serde().request(), nanopb_on_next_);
137         });
138   }
139 
140   Function<void(const Request&)> nanopb_on_next_ PW_GUARDED_BY(rpc_lock());
141 };
142 
143 }  // namespace internal
144 
145 // The NanopbServerReaderWriter is used to send and receive messages in a Nanopb
146 // bidirectional streaming RPC.
147 //
148 // These classes use private inheritance to hide the internal::Call API while
149 // allow direct use of its public and protected functions.
150 template <typename Request, typename Response>
151 class NanopbServerReaderWriter
152     : private internal::BaseNanopbServerReader<Request> {
153  public:
154   // Creates a NanopbServerReaderWriter that is ready to send responses for a
155   // particular RPC. This can be used for testing or to send responses to an RPC
156   // that has not been started by a client.
157   template <auto kMethod, typename ServiceImpl>
Open(Server & server,uint32_t channel_id,ServiceImpl & service)158   [[nodiscard]] static NanopbServerReaderWriter Open(Server& server,
159                                                      uint32_t channel_id,
160                                                      ServiceImpl& service)
161       PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
162     using Info = internal::MethodInfo<kMethod>;
163     static_assert(std::is_same_v<Request, typename Info::Request>,
164                   "The request type of a NanopbServerReaderWriter must match "
165                   "the method.");
166     static_assert(std::is_same_v<Response, typename Info::Response>,
167                   "The response type of a NanopbServerReaderWriter must match "
168                   "the method.");
169     return server.OpenCall<NanopbServerReaderWriter<Request, Response>,
170                            kMethod,
171                            MethodType::kBidirectionalStreaming>(
172         channel_id,
173         service,
174         internal::MethodLookup::GetNanopbMethod<ServiceImpl,
175                                                 Info::kMethodId>());
176   }
177 
178   constexpr NanopbServerReaderWriter() = default;
179 
180   NanopbServerReaderWriter(NanopbServerReaderWriter&&) = default;
181   NanopbServerReaderWriter& operator=(NanopbServerReaderWriter&&) = default;
182 
183   using internal::Call::active;
184   using internal::Call::channel_id;
185 
186   // Writes a response struct. Returns the following Status codes:
187   //
188   //   OK - the response was successfully sent
189   //   FAILED_PRECONDITION - the writer is closed
190   //   INTERNAL - pw_rpc was unable to encode the Nanopb protobuf
191   //   other errors - the ChannelOutput failed to send the packet; the error
192   //       codes are determined by the ChannelOutput implementation
193   //
Write(const Response & response)194   Status Write(const Response& response) {
195     return internal::NanopbServerCall::SendServerStream(&response);
196   }
197 
198   Status Finish(Status status = OkStatus()) {
199     return internal::Call::CloseAndSendResponse(status);
200   }
201 
202   Status TryFinish(Status status = OkStatus()) {
203     return internal::Call::TryCloseAndSendResponse(status);
204   }
205 
206   // Functions for setting RPC event callbacks.
207   using internal::Call::set_on_error;
208   using internal::ServerCall::set_on_completion_requested;
209   using internal::ServerCall::set_on_completion_requested_if_enabled;
210   using internal::BaseNanopbServerReader<Request>::set_on_next;
211 
212  private:
213   friend class internal::NanopbMethod;
214   friend class Server;
215 
216   template <typename, typename, uint32_t>
217   friend class internal::test::InvocationContext;
218 
219   NanopbServerReaderWriter(const internal::LockedCallContext& context)
PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())220       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
221       : internal::BaseNanopbServerReader<Request>(
222             context, MethodType::kBidirectionalStreaming) {}
223 };
224 
225 // The NanopbServerReader is used to receive messages and send a response in a
226 // Nanopb client streaming RPC.
227 template <typename Request, typename Response>
228 class NanopbServerReader : private internal::BaseNanopbServerReader<Request> {
229  public:
230   // Creates a NanopbServerReader that is ready to send a response to a
231   // particular RPC. This can be used for testing or to finish an RPC that has
232   // not been started by the client.
233   template <auto kMethod, typename ServiceImpl>
Open(Server & server,uint32_t channel_id,ServiceImpl & service)234   [[nodiscard]] static NanopbServerReader Open(Server& server,
235                                                uint32_t channel_id,
236                                                ServiceImpl& service)
237       PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
238     using Info = internal::MethodInfo<kMethod>;
239     static_assert(
240         std::is_same_v<Request, typename Info::Request>,
241         "The request type of a NanopbServerReader must match the method.");
242     static_assert(
243         std::is_same_v<Response, typename Info::Response>,
244         "The response type of a NanopbServerReader must match the method.");
245     return server.OpenCall<NanopbServerReader<Request, Response>,
246                            kMethod,
247                            MethodType::kClientStreaming>(
248         channel_id,
249         service,
250         internal::MethodLookup::GetNanopbMethod<ServiceImpl,
251                                                 Info::kMethodId>());
252   }
253 
254   // Allow default construction so that users can declare a variable into which
255   // to move NanopbServerReaders from RPC calls.
256   constexpr NanopbServerReader() = default;
257 
258   NanopbServerReader(NanopbServerReader&&) = default;
259   NanopbServerReader& operator=(NanopbServerReader&&) = default;
260 
261   using internal::Call::active;
262   using internal::Call::channel_id;
263 
264   // Functions for setting RPC event callbacks.
265   using internal::Call::set_on_error;
266   using internal::ServerCall::set_on_completion_requested;
267   using internal::ServerCall::set_on_completion_requested_if_enabled;
268   using internal::BaseNanopbServerReader<Request>::set_on_next;
269 
270   Status Finish(const Response& response, Status status = OkStatus()) {
271     return internal::NanopbServerCall::SendUnaryResponse(&response, status);
272   }
273 
274   Status TryFinish(const Response& response, Status status = OkStatus()) {
275     return internal::NanopbServerCall::TrySendUnaryResponse(&response, status);
276   }
277 
278  private:
279   friend class internal::NanopbMethod;
280   friend class Server;
281 
282   template <typename, typename, uint32_t>
283   friend class internal::test::InvocationContext;
284 
285   NanopbServerReader(const internal::LockedCallContext& context)
PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())286       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
287       : internal::BaseNanopbServerReader<Request>(
288             context, MethodType::kClientStreaming) {}
289 };
290 
291 // The NanopbServerWriter is used to send responses in a Nanopb server streaming
292 // RPC.
293 template <typename Response>
294 class NanopbServerWriter : private internal::NanopbServerCall {
295  public:
296   // Creates a NanopbServerWriter that is ready to send responses for a
297   // particular RPC. This can be used for testing or to send responses to an RPC
298   // that has not been started by a client.
299   template <auto kMethod, typename ServiceImpl>
Open(Server & server,uint32_t channel_id,ServiceImpl & service)300   [[nodiscard]] static NanopbServerWriter Open(Server& server,
301                                                uint32_t channel_id,
302                                                ServiceImpl& service)
303       PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
304     using Info = internal::MethodInfo<kMethod>;
305     static_assert(
306         std::is_same_v<Response, typename Info::Response>,
307         "The response type of a NanopbServerWriter must match the method.");
308     return server.OpenCall<NanopbServerWriter<Response>,
309                            kMethod,
310                            MethodType::kServerStreaming>(
311         channel_id,
312         service,
313         internal::MethodLookup::GetNanopbMethod<ServiceImpl,
314                                                 Info::kMethodId>());
315   }
316 
317   // Allow default construction so that users can declare a variable into which
318   // to move ServerWriters from RPC calls.
319   constexpr NanopbServerWriter() = default;
320 
321   NanopbServerWriter(NanopbServerWriter&&) = default;
322   NanopbServerWriter& operator=(NanopbServerWriter&&) = default;
323 
324   using internal::Call::active;
325   using internal::Call::channel_id;
326 
327   // Writes a response struct. Returns the following Status codes:
328   //
329   //   OK - the response was successfully sent
330   //   FAILED_PRECONDITION - the writer is closed
331   //   INTERNAL - pw_rpc was unable to encode the Nanopb protobuf
332   //   other errors - the ChannelOutput failed to send the packet; the error
333   //       codes are determined by the ChannelOutput implementation
334   //
Write(const Response & response)335   Status Write(const Response& response) {
336     return internal::NanopbServerCall::SendServerStream(&response);
337   }
338 
339   Status Finish(Status status = OkStatus()) {
340     return internal::Call::CloseAndSendResponse(status);
341   }
342 
343   Status TryFinish(Status status = OkStatus()) {
344     return internal::Call::TryCloseAndSendResponse(status);
345   }
346 
347   using internal::Call::set_on_error;
348   using internal::ServerCall::set_on_completion_requested;
349   using internal::ServerCall::set_on_completion_requested_if_enabled;
350 
351  private:
352   friend class internal::NanopbMethod;
353   friend class Server;
354 
355   template <typename, typename, uint32_t>
356   friend class internal::test::InvocationContext;
357 
358   NanopbServerWriter(const internal::LockedCallContext& context)
PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())359       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
360       : internal::NanopbServerCall(context, MethodType::kServerStreaming) {}
361 };
362 
363 template <typename Response>
364 class NanopbUnaryResponder : private internal::NanopbServerCall {
365  public:
366   // Creates a NanopbUnaryResponder that is ready to send a response for a
367   // particular RPC. This can be used for testing or to send responses to an RPC
368   // that has not been started by a client.
369   template <auto kMethod, typename ServiceImpl>
Open(Server & server,uint32_t channel_id,ServiceImpl & service)370   [[nodiscard]] static NanopbUnaryResponder Open(Server& server,
371                                                  uint32_t channel_id,
372                                                  ServiceImpl& service)
373       PW_LOCKS_EXCLUDED(internal::rpc_lock()) {
374     using Info = internal::MethodInfo<kMethod>;
375     static_assert(
376         std::is_same_v<Response, typename Info::Response>,
377         "The response type of a NanopbUnaryResponder must match the method.");
378     return server
379         .OpenCall<NanopbUnaryResponder<Response>, kMethod, MethodType::kUnary>(
380             channel_id,
381             service,
382             internal::MethodLookup::GetNanopbMethod<ServiceImpl,
383                                                     Info::kMethodId>());
384   }
385 
386   // Allow default construction so that users can declare a variable into which
387   // to move ServerWriters from RPC calls.
388   constexpr NanopbUnaryResponder() = default;
389 
390   NanopbUnaryResponder(NanopbUnaryResponder&&) = default;
391   NanopbUnaryResponder& operator=(NanopbUnaryResponder&&) = default;
392 
393   using internal::Call::active;
394   using internal::Call::channel_id;
395 
396   // Sends the response. Returns the following Status codes:
397   //
398   //   OK - the response was successfully sent
399   //   FAILED_PRECONDITION - the writer is closed
400   //   INTERNAL - pw_rpc was unable to encode the Nanopb protobuf
401   //   other errors - the ChannelOutput failed to send the packet; the error
402   //       codes are determined by the ChannelOutput implementation
403   //
404   Status Finish(const Response& response, Status status = OkStatus()) {
405     return internal::NanopbServerCall::SendUnaryResponse(&response, status);
406   }
407 
408   using internal::Call::set_on_error;
409 
410  private:
411   friend class internal::NanopbMethod;
412   friend class Server;
413 
414   template <typename, typename, uint32_t>
415   friend class internal::test::InvocationContext;
416 
417   NanopbUnaryResponder(const internal::LockedCallContext& context)
PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())418       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
419       : internal::NanopbServerCall(context, MethodType::kUnary) {}
420 };
421 
422 }  // namespace pw::rpc
423