1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 // This file defines the ServerReaderWriter, ServerReader, and ServerWriter 16 // classes for the Nanopb RPC interface. These classes are used for 17 // bidirectional, client, and server streaming RPCs. 18 #pragma once 19 20 #include "pw_rpc/channel.h" 21 #include "pw_rpc/internal/client_call.h" 22 #include "pw_rpc/internal/lock.h" 23 #include "pw_rpc/nanopb/internal/common.h" 24 25 namespace pw::rpc { 26 namespace internal { 27 28 // Base class for unary and client streaming calls. 29 template <typename Response> 30 class NanopbUnaryResponseClientCall : public UnaryResponseClientCall { 31 public: 32 template <typename CallType, typename... Request> Start(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,const NanopbMethodSerde & serde,Function<void (const Response &,Status)> && on_completed,Function<void (Status)> && on_error,const Request &...request)33 static CallType Start(Endpoint& client, 34 uint32_t channel_id, 35 uint32_t service_id, 36 uint32_t method_id, 37 const NanopbMethodSerde& serde, 38 Function<void(const Response&, Status)>&& on_completed, 39 Function<void(Status)>&& on_error, 40 const Request&... request) 41 PW_LOCKS_EXCLUDED(rpc_lock()) { 42 rpc_lock().lock(); 43 CallType call( 44 client.ClaimLocked(), channel_id, service_id, method_id, serde); 45 46 call.set_nanopb_on_completed_locked(std::move(on_completed)); 47 call.set_on_error_locked(std::move(on_error)); 48 49 if constexpr (sizeof...(Request) == 0u) { 50 call.SendInitialClientRequest({}); 51 } else { 52 NanopbSendInitialRequest(call, serde.request(), &request...); 53 } 54 client.CleanUpCalls(); 55 return call; 56 } 57 ~NanopbUnaryResponseClientCall()58 ~NanopbUnaryResponseClientCall() { DestroyClientCall(); } 59 60 protected: NanopbUnaryResponseClientCall()61 constexpr NanopbUnaryResponseClientCall() : serde_(nullptr) {} 62 NanopbUnaryResponseClientCall(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type,const NanopbMethodSerde & serde)63 NanopbUnaryResponseClientCall(LockedEndpoint& client, 64 uint32_t channel_id, 65 uint32_t service_id, 66 uint32_t method_id, 67 MethodType type, 68 const NanopbMethodSerde& serde) 69 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) 70 : UnaryResponseClientCall( 71 client, channel_id, service_id, method_id, StructCallProps(type)), 72 serde_(&serde) {} 73 74 NanopbUnaryResponseClientCall(NanopbUnaryResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock ())75 PW_LOCKS_EXCLUDED(rpc_lock()) { 76 *this = std::move(other); 77 } 78 79 NanopbUnaryResponseClientCall& operator=( PW_LOCKS_EXCLUDED(rpc_lock ())80 NanopbUnaryResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) { 81 RpcLockGuard lock; 82 MoveUnaryResponseClientCallFrom(other); 83 serde_ = other.serde_; 84 set_nanopb_on_completed_locked(std::move(other.nanopb_on_completed_)); 85 return *this; 86 } 87 set_on_completed(Function<void (const Response & response,Status)> && on_completed)88 void set_on_completed( 89 Function<void(const Response& response, Status)>&& on_completed) 90 PW_LOCKS_EXCLUDED(rpc_lock()) { 91 RpcLockGuard lock; 92 set_nanopb_on_completed_locked(std::move(on_completed)); 93 } 94 SendClientStream(const void * payload)95 Status SendClientStream(const void* payload) PW_LOCKS_EXCLUDED(rpc_lock()) { 96 RpcLockGuard lock; 97 return NanopbSendStream(*this, payload, serde_); 98 } 99 100 private: set_nanopb_on_completed_locked(Function<void (const Response & response,Status)> && on_completed)101 void set_nanopb_on_completed_locked( 102 Function<void(const Response& response, Status)>&& on_completed) 103 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 104 nanopb_on_completed_ = std::move(on_completed); 105 106 UnaryResponseClientCall::set_on_completed_locked( 107 [this](ConstByteSpan payload, Status status) 108 PW_NO_LOCK_SAFETY_ANALYSIS { 109 DecodeToStructAndInvokeOnCompleted( 110 payload, serde_->response(), nanopb_on_completed_, status); 111 }); 112 } 113 114 const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock()); 115 Function<void(const Response&, Status)> nanopb_on_completed_ 116 PW_GUARDED_BY(rpc_lock()); 117 }; 118 119 // Base class for server and bidirectional streaming calls. 120 template <typename Response> 121 class NanopbStreamResponseClientCall : public StreamResponseClientCall { 122 public: 123 template <typename CallType, typename... Request> Start(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,const NanopbMethodSerde & serde,Function<void (const Response &)> && on_next,Function<void (Status)> && on_completed,Function<void (Status)> && on_error,const Request &...request)124 static CallType Start(Endpoint& client, 125 uint32_t channel_id, 126 uint32_t service_id, 127 uint32_t method_id, 128 const NanopbMethodSerde& serde, 129 Function<void(const Response&)>&& on_next, 130 Function<void(Status)>&& on_completed, 131 Function<void(Status)>&& on_error, 132 const Request&... request) 133 PW_LOCKS_EXCLUDED(rpc_lock()) { 134 rpc_lock().lock(); 135 CallType call( 136 client.ClaimLocked(), channel_id, service_id, method_id, serde); 137 138 call.set_nanopb_on_next_locked(std::move(on_next)); 139 call.set_on_completed_locked(std::move(on_completed)); 140 call.set_on_error_locked(std::move(on_error)); 141 142 if constexpr (sizeof...(Request) == 0u) { 143 call.SendInitialClientRequest({}); 144 } else { 145 NanopbSendInitialRequest(call, serde.request(), &request...); 146 } 147 client.CleanUpCalls(); 148 return call; 149 } 150 ~NanopbStreamResponseClientCall()151 ~NanopbStreamResponseClientCall() { DestroyClientCall(); } 152 153 protected: NanopbStreamResponseClientCall()154 constexpr NanopbStreamResponseClientCall() : serde_(nullptr) {} 155 156 NanopbStreamResponseClientCall(NanopbStreamResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock ())157 PW_LOCKS_EXCLUDED(rpc_lock()) { 158 *this = std::move(other); 159 } 160 161 NanopbStreamResponseClientCall& operator=( PW_LOCKS_EXCLUDED(rpc_lock ())162 NanopbStreamResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) { 163 RpcLockGuard lock; 164 MoveStreamResponseClientCallFrom(other); 165 serde_ = other.serde_; 166 set_nanopb_on_next_locked(std::move(other.nanopb_on_next_)); 167 return *this; 168 } 169 NanopbStreamResponseClientCall(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type,const NanopbMethodSerde & serde)170 NanopbStreamResponseClientCall(LockedEndpoint& client, 171 uint32_t channel_id, 172 uint32_t service_id, 173 uint32_t method_id, 174 MethodType type, 175 const NanopbMethodSerde& serde) 176 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) 177 : StreamResponseClientCall( 178 client, channel_id, service_id, method_id, StructCallProps(type)), 179 serde_(&serde) {} 180 SendClientStream(const void * payload)181 Status SendClientStream(const void* payload) PW_LOCKS_EXCLUDED(rpc_lock()) { 182 RpcLockGuard lock; 183 return NanopbSendStream(*this, payload, serde_); 184 } 185 set_on_next(Function<void (const Response & response)> && on_next)186 void set_on_next(Function<void(const Response& response)>&& on_next) 187 PW_LOCKS_EXCLUDED(rpc_lock()) { 188 RpcLockGuard lock; 189 set_nanopb_on_next_locked(std::move(on_next)); 190 } 191 192 private: set_nanopb_on_next_locked(Function<void (const Response & response)> && on_next)193 void set_nanopb_on_next_locked( 194 Function<void(const Response& response)>&& on_next) 195 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 196 nanopb_on_next_ = std::move(on_next); 197 198 Call::set_on_next_locked( 199 [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS { 200 DecodeToStructAndInvokeOnNext( 201 payload, serde_->response(), nanopb_on_next_); 202 }); 203 } 204 205 const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock()); 206 Function<void(const Response&)> nanopb_on_next_ PW_GUARDED_BY(rpc_lock()); 207 }; 208 209 } // namespace internal 210 211 // The NanopbClientReaderWriter is used to send and receive messages in a 212 // bidirectional streaming RPC. 213 template <typename Request, typename ResponseType> 214 class NanopbClientReaderWriter 215 : private internal::NanopbStreamResponseClientCall<ResponseType> { 216 public: 217 using Response = ResponseType; 218 219 constexpr NanopbClientReaderWriter() = default; 220 221 NanopbClientReaderWriter(NanopbClientReaderWriter&&) = default; 222 NanopbClientReaderWriter& operator=(NanopbClientReaderWriter&&) = default; 223 224 using internal::Call::active; 225 using internal::Call::channel_id; 226 227 using internal::ClientCall::id; 228 229 // Writes a response struct. Returns the following Status codes: 230 // 231 // OK - the response was successfully sent 232 // FAILED_PRECONDITION - the writer is closed 233 // INTERNAL - pw_rpc was unable to encode the Nanopb protobuf 234 // other errors - the ChannelOutput failed to send the packet; the error 235 // codes are determined by the ChannelOutput implementation 236 // Write(const Request & request)237 Status Write(const Request& request) { 238 return internal::NanopbStreamResponseClientCall<Response>::SendClientStream( 239 &request); 240 } 241 242 // Notifies the server that the client has requested to stop communication by 243 // sending CLIENT_REQUEST_COMPLETION. 244 using internal::ClientCall::RequestCompletion; 245 246 // Cancels this RPC. Closes the call locally and sends a CANCELLED error to 247 // the server. 248 using internal::Call::Cancel; 249 250 // Closes this RPC locally. Sends a CLIENT_REQUEST_COMPLETION, but no 251 // cancellation packet. Future packets for this RPC are dropped, and the 252 // client sends a FAILED_PRECONDITION error in response because the call is 253 // not active. 254 using internal::ClientCall::Abandon; 255 256 // Closes this RPC locally and waits for any running callbacks to complete. 257 // Sends a CLIENT_REQUEST_COMPLETION, but no cancellation packet. Future 258 // packets for this RPC are dropped, and the client sends a 259 // FAILED_PRECONDITION error in response because the call is not active. 260 using internal::ClientCall::CloseAndWaitForCallbacks; 261 262 // Functions for setting RPC event callbacks. 263 using internal::Call::set_on_error; 264 using internal::StreamResponseClientCall::set_on_completed; 265 using internal::NanopbStreamResponseClientCall<Response>::set_on_next; 266 267 private: 268 friend class internal::NanopbStreamResponseClientCall<Response>; 269 NanopbClientReaderWriter(internal::LockedEndpoint & client,uint32_t channel_id_value,uint32_t service_id,uint32_t method_id,const internal::NanopbMethodSerde & serde)270 NanopbClientReaderWriter(internal::LockedEndpoint& client, 271 uint32_t channel_id_value, 272 uint32_t service_id, 273 uint32_t method_id, 274 const internal::NanopbMethodSerde& serde) 275 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 276 : internal::NanopbStreamResponseClientCall<Response>( 277 client, 278 channel_id_value, 279 service_id, 280 method_id, 281 MethodType::kBidirectionalStreaming, 282 serde) {} 283 }; 284 285 // The NanopbClientReader is used to receive messages and send a response in a 286 // client streaming RPC. 287 template <typename ResponseType> 288 class NanopbClientReader 289 : private internal::NanopbStreamResponseClientCall<ResponseType> { 290 public: 291 using Response = ResponseType; 292 293 constexpr NanopbClientReader() = default; 294 295 NanopbClientReader(NanopbClientReader&&) = default; 296 NanopbClientReader& operator=(NanopbClientReader&&) = default; 297 298 using internal::Call::active; 299 using internal::Call::channel_id; 300 301 using internal::ClientCall::id; 302 303 // Functions for setting RPC event callbacks. 304 using internal::NanopbStreamResponseClientCall<Response>::set_on_next; 305 using internal::Call::set_on_error; 306 using internal::StreamResponseClientCall::set_on_completed; 307 308 using internal::Call::Cancel; 309 using internal::Call::RequestCompletion; 310 using internal::ClientCall::Abandon; 311 using internal::ClientCall::CloseAndWaitForCallbacks; 312 313 private: 314 friend class internal::NanopbStreamResponseClientCall<Response>; 315 NanopbClientReader(internal::LockedEndpoint & client,uint32_t channel_id_value,uint32_t service_id,uint32_t method_id,const internal::NanopbMethodSerde & serde)316 NanopbClientReader(internal::LockedEndpoint& client, 317 uint32_t channel_id_value, 318 uint32_t service_id, 319 uint32_t method_id, 320 const internal::NanopbMethodSerde& serde) 321 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 322 : internal::NanopbStreamResponseClientCall<Response>( 323 client, 324 channel_id_value, 325 service_id, 326 method_id, 327 MethodType::kServerStreaming, 328 serde) {} 329 }; 330 331 // The NanopbClientWriter is used to send responses in a server streaming RPC. 332 template <typename Request, typename ResponseType> 333 class NanopbClientWriter 334 : private internal::NanopbUnaryResponseClientCall<ResponseType> { 335 public: 336 using Response = ResponseType; 337 338 constexpr NanopbClientWriter() = default; 339 340 NanopbClientWriter(NanopbClientWriter&&) = default; 341 NanopbClientWriter& operator=(NanopbClientWriter&&) = default; 342 343 using internal::Call::active; 344 using internal::Call::channel_id; 345 346 using internal::ClientCall::id; 347 348 // Functions for setting RPC event callbacks. 349 using internal::NanopbUnaryResponseClientCall<Response>::set_on_completed; 350 using internal::Call::set_on_error; 351 Write(const Request & request)352 Status Write(const Request& request) { 353 return internal::NanopbUnaryResponseClientCall<Response>::SendClientStream( 354 &request); 355 } 356 357 using internal::Call::Cancel; 358 using internal::Call::RequestCompletion; 359 using internal::ClientCall::Abandon; 360 using internal::ClientCall::CloseAndWaitForCallbacks; 361 362 private: 363 friend class internal::NanopbUnaryResponseClientCall<Response>; 364 NanopbClientWriter(internal::LockedEndpoint & client,uint32_t channel_id_value,uint32_t service_id,uint32_t method_id,const internal::NanopbMethodSerde & serde)365 NanopbClientWriter(internal::LockedEndpoint& client, 366 uint32_t channel_id_value, 367 uint32_t service_id, 368 uint32_t method_id, 369 const internal::NanopbMethodSerde& serde) 370 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 371 : internal::NanopbUnaryResponseClientCall<Response>( 372 client, 373 channel_id_value, 374 service_id, 375 method_id, 376 MethodType::kClientStreaming, 377 serde) {} 378 }; 379 380 // The NanopbUnaryReceiver is used to receive a response in a unary RPC. 381 template <typename ResponseType> 382 class NanopbUnaryReceiver 383 : private internal::NanopbUnaryResponseClientCall<ResponseType> { 384 public: 385 using Response = ResponseType; 386 387 constexpr NanopbUnaryReceiver() = default; 388 389 NanopbUnaryReceiver(NanopbUnaryReceiver&&) = default; 390 NanopbUnaryReceiver& operator=(NanopbUnaryReceiver&&) = default; 391 392 using internal::Call::active; 393 using internal::Call::channel_id; 394 395 using internal::ClientCall::id; 396 397 // Functions for setting RPC event callbacks. 398 using internal::NanopbUnaryResponseClientCall<Response>::set_on_completed; 399 using internal::Call::set_on_error; 400 401 using internal::Call::Cancel; 402 using internal::ClientCall::Abandon; 403 using internal::ClientCall::CloseAndWaitForCallbacks; 404 405 private: 406 friend class internal::NanopbUnaryResponseClientCall<Response>; 407 NanopbUnaryReceiver(internal::LockedEndpoint & client,uint32_t channel_id_value,uint32_t service_id,uint32_t method_id,const internal::NanopbMethodSerde & serde)408 NanopbUnaryReceiver(internal::LockedEndpoint& client, 409 uint32_t channel_id_value, 410 uint32_t service_id, 411 uint32_t method_id, 412 const internal::NanopbMethodSerde& serde) 413 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 414 : internal::NanopbUnaryResponseClientCall<Response>(client, 415 channel_id_value, 416 service_id, 417 method_id, 418 MethodType::kUnary, 419 serde) {} 420 }; 421 422 } // namespace pw::rpc 423