1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SUPPORT_SYNC_STREAM_H 20 #define GRPCPP_SUPPORT_SYNC_STREAM_H 21 22 #include <grpc/support/log.h> 23 #include <grpcpp/client_context.h> 24 #include <grpcpp/completion_queue.h> 25 #include <grpcpp/impl/call.h> 26 #include <grpcpp/impl/codegen/channel_interface.h> 27 #include <grpcpp/impl/service_type.h> 28 #include <grpcpp/server_context.h> 29 #include <grpcpp/support/status.h> 30 31 namespace grpc { 32 33 namespace internal { 34 /// Common interface for all synchronous client side streaming. 35 class ClientStreamingInterface { 36 public: ~ClientStreamingInterface()37 virtual ~ClientStreamingInterface() {} 38 39 /// Block waiting until the stream finishes and a final status of the call is 40 /// available. 41 /// 42 /// It is appropriate to call this method exactly once when both: 43 /// * the calling code (client-side) has no more message to send 44 /// (this can be declared implicitly by calling this method, or 45 /// explicitly through an earlier call to <i>WritesDone</i> method of the 46 /// class in use, e.g. \a ClientWriterInterface::WritesDone or 47 /// \a ClientReaderWriterInterface::WritesDone). 48 /// * there are no more messages to be received from the server (which can 49 /// be known implicitly, or explicitly from an earlier call to \a 50 /// ReaderInterface::Read that returned "false"). 51 /// 52 /// This function will return either: 53 /// - when all incoming messages have been read and the server has 54 /// returned status. 55 /// - when the server has returned a non-OK status. 56 /// - OR when the call failed for some reason and the library generated a 57 /// status. 58 /// 59 /// Return values: 60 /// - \a Status contains the status code, message and details for the call 61 /// - the \a ClientContext associated with this call is updated with 62 /// possible trailing metadata sent from the server. 63 virtual grpc::Status Finish() = 0; 64 }; 65 66 /// Common interface for all synchronous server side streaming. 67 class ServerStreamingInterface { 68 public: ~ServerStreamingInterface()69 virtual ~ServerStreamingInterface() {} 70 71 /// Block to send initial metadata to client. 72 /// This call is optional, but if it is used, it cannot be used concurrently 73 /// with or after the \a Finish method. 74 /// 75 /// The initial metadata that will be sent to the client will be 76 /// taken from the \a ServerContext associated with the call. 77 virtual void SendInitialMetadata() = 0; 78 }; 79 80 /// An interface that yields a sequence of messages of type \a R. 81 template <class R> 82 class ReaderInterface { 83 public: ~ReaderInterface()84 virtual ~ReaderInterface() {} 85 86 /// Get an upper bound on the next message size available for reading on this 87 /// stream. 88 virtual bool NextMessageSize(uint32_t* sz) = 0; 89 90 /// Block to read a message and parse to \a msg. Returns \a true on success. 91 /// This is thread-safe with respect to \a Write or \WritesDone methods on 92 /// the same stream. It should not be called concurrently with another \a 93 /// Read on the same stream as the order of delivery will not be defined. 94 /// 95 /// \param[out] msg The read message. 96 /// 97 /// \return \a false when there will be no more incoming messages, either 98 /// because the other side has called \a WritesDone() or the stream has failed 99 /// (or been cancelled). 100 virtual bool Read(R* msg) = 0; 101 }; 102 103 /// An interface that can be fed a sequence of messages of type \a W. 104 template <class W> 105 class WriterInterface { 106 public: ~WriterInterface()107 virtual ~WriterInterface() {} 108 109 /// Block to write \a msg to the stream with WriteOptions \a options. 110 /// This is thread-safe with respect to \a ReaderInterface::Read 111 /// 112 /// \param msg The message to be written to the stream. 113 /// \param options The WriteOptions affecting the write operation. 114 /// 115 /// \return \a true on success, \a false when the stream has been closed. 116 virtual bool Write(const W& msg, grpc::WriteOptions options) = 0; 117 118 /// Block to write \a msg to the stream with default write options. 119 /// This is thread-safe with respect to \a ReaderInterface::Read 120 /// 121 /// \param msg The message to be written to the stream. 122 /// 123 /// \return \a true on success, \a false when the stream has been closed. Write(const W & msg)124 inline bool Write(const W& msg) { return Write(msg, grpc::WriteOptions()); } 125 126 /// Write \a msg and coalesce it with the writing of trailing metadata, using 127 /// WriteOptions \a options. 128 /// 129 /// For client, WriteLast is equivalent of performing Write and WritesDone in 130 /// a single step. \a msg and trailing metadata are coalesced and sent on wire 131 /// by calling this function. For server, WriteLast buffers the \a msg. 132 /// The writing of \a msg is held until the service handler returns, 133 /// where \a msg and trailing metadata are coalesced and sent on wire. 134 /// Note that WriteLast can only buffer \a msg up to the flow control window 135 /// size. If \a msg size is larger than the window size, it will be sent on 136 /// wire without buffering. 137 /// 138 /// \param[in] msg The message to be written to the stream. 139 /// \param[in] options The WriteOptions to be used to write this message. WriteLast(const W & msg,grpc::WriteOptions options)140 void WriteLast(const W& msg, grpc::WriteOptions options) { 141 Write(msg, options.set_last_message()); 142 } 143 }; 144 145 } // namespace internal 146 147 /// Client-side interface for streaming reads of message of type \a R. 148 template <class R> 149 class ClientReaderInterface : public internal::ClientStreamingInterface, 150 public internal::ReaderInterface<R> { 151 public: 152 /// Block to wait for initial metadata from server. The received metadata 153 /// can only be accessed after this call returns. Should only be called before 154 /// the first read. Calling this method is optional, and if it is not called 155 /// the metadata will be available in ClientContext after the first read. 156 virtual void WaitForInitialMetadata() = 0; 157 }; 158 159 namespace internal { 160 template <class R> 161 class ClientReaderFactory { 162 public: 163 template <class W> Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)164 static ClientReader<R>* Create(grpc::ChannelInterface* channel, 165 const grpc::internal::RpcMethod& method, 166 grpc::ClientContext* context, 167 const W& request) { 168 return new ClientReader<R>(channel, method, context, request); 169 } 170 }; 171 } // namespace internal 172 173 /// Synchronous (blocking) client-side API for doing server-streaming RPCs, 174 /// where the stream of messages coming from the server has messages 175 /// of type \a R. 176 template <class R> 177 class ClientReader final : public ClientReaderInterface<R> { 178 public: 179 /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for 180 /// semantics. 181 /// 182 // Side effect: 183 /// Once complete, the initial metadata read from 184 /// the server will be accessible through the \a ClientContext used to 185 /// construct this object. WaitForInitialMetadata()186 void WaitForInitialMetadata() override { 187 GPR_ASSERT(!context_->initial_metadata_received_); 188 189 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops; 190 ops.RecvInitialMetadata(context_); 191 call_.PerformOps(&ops); 192 cq_.Pluck(&ops); /// status ignored 193 } 194 NextMessageSize(uint32_t * sz)195 bool NextMessageSize(uint32_t* sz) override { 196 int result = call_.max_receive_message_size(); 197 *sz = (result > 0) ? result : UINT32_MAX; 198 return true; 199 } 200 201 /// See the \a ReaderInterface.Read method for semantics. 202 /// Side effect: 203 /// This also receives initial metadata from the server, if not 204 /// already received (if initial metadata is received, it can be then 205 /// accessed through the \a ClientContext associated with this call). Read(R * msg)206 bool Read(R* msg) override { 207 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 208 grpc::internal::CallOpRecvMessage<R>> 209 ops; 210 if (!context_->initial_metadata_received_) { 211 ops.RecvInitialMetadata(context_); 212 } 213 ops.RecvMessage(msg); 214 call_.PerformOps(&ops); 215 return cq_.Pluck(&ops) && ops.got_message; 216 } 217 218 /// See the \a ClientStreamingInterface.Finish method for semantics. 219 /// 220 /// Side effect: 221 /// The \a ClientContext associated with this call is updated with 222 /// possible metadata received from the server. Finish()223 grpc::Status Finish() override { 224 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 225 grpc::internal::CallOpClientRecvStatus> 226 ops; 227 if (!context_->initial_metadata_received_) { 228 ops.RecvInitialMetadata(context_); 229 } 230 grpc::Status status; 231 ops.ClientRecvStatus(context_, &status); 232 call_.PerformOps(&ops); 233 GPR_ASSERT(cq_.Pluck(&ops)); 234 return status; 235 } 236 237 private: 238 friend class internal::ClientReaderFactory<R>; 239 grpc::ClientContext* context_; 240 grpc::CompletionQueue cq_; 241 grpc::internal::Call call_; 242 243 /// Block to create a stream and write the initial metadata and \a request 244 /// out. Note that \a context will be used to fill in custom initial 245 /// metadata used to send to the server when starting the call. 246 template <class W> ClientReader(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)247 ClientReader(grpc::ChannelInterface* channel, 248 const grpc::internal::RpcMethod& method, 249 grpc::ClientContext* context, const W& request) 250 : context_(context), 251 cq_(grpc_completion_queue_attributes{ 252 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 253 nullptr}), // Pluckable cq 254 call_(channel->CreateCall(method, context, &cq_)) { 255 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 256 grpc::internal::CallOpSendMessage, 257 grpc::internal::CallOpClientSendClose> 258 ops; 259 ops.SendInitialMetadata(&context->send_initial_metadata_, 260 context->initial_metadata_flags()); 261 // TODO(ctiller): don't assert 262 GPR_ASSERT(ops.SendMessagePtr(&request).ok()); 263 ops.ClientSendClose(); 264 call_.PerformOps(&ops); 265 cq_.Pluck(&ops); 266 } 267 }; 268 269 /// Client-side interface for streaming writes of message type \a W. 270 template <class W> 271 class ClientWriterInterface : public internal::ClientStreamingInterface, 272 public internal::WriterInterface<W> { 273 public: 274 /// Half close writing from the client. (signal that the stream of messages 275 /// coming from the client is complete). 276 /// Blocks until currently-pending writes are completed. 277 /// Thread safe with respect to \a ReaderInterface::Read operations only 278 /// 279 /// \return Whether the writes were successful. 280 virtual bool WritesDone() = 0; 281 }; 282 283 namespace internal { 284 template <class W> 285 class ClientWriterFactory { 286 public: 287 template <class R> Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response)288 static ClientWriter<W>* Create(grpc::ChannelInterface* channel, 289 const grpc::internal::RpcMethod& method, 290 grpc::ClientContext* context, R* response) { 291 return new ClientWriter<W>(channel, method, context, response); 292 } 293 }; 294 } // namespace internal 295 296 /// Synchronous (blocking) client-side API for doing client-streaming RPCs, 297 /// where the outgoing message stream coming from the client has messages of 298 /// type \a W. 299 template <class W> 300 class ClientWriter : public ClientWriterInterface<W> { 301 public: 302 /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for 303 /// semantics. 304 /// 305 // Side effect: 306 /// Once complete, the initial metadata read from the server will be 307 /// accessible through the \a ClientContext used to construct this object. WaitForInitialMetadata()308 void WaitForInitialMetadata() { 309 GPR_ASSERT(!context_->initial_metadata_received_); 310 311 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops; 312 ops.RecvInitialMetadata(context_); 313 call_.PerformOps(&ops); 314 cq_.Pluck(&ops); // status ignored 315 } 316 317 /// See the WriterInterface.Write(const W& msg, WriteOptions options) method 318 /// for semantics. 319 /// 320 /// Side effect: 321 /// Also sends initial metadata if not already sent (using the 322 /// \a ClientContext associated with this call). 323 using internal::WriterInterface<W>::Write; Write(const W & msg,grpc::WriteOptions options)324 bool Write(const W& msg, grpc::WriteOptions options) override { 325 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 326 grpc::internal::CallOpSendMessage, 327 grpc::internal::CallOpClientSendClose> 328 ops; 329 330 if (options.is_last_message()) { 331 options.set_buffer_hint(); 332 ops.ClientSendClose(); 333 } 334 if (context_->initial_metadata_corked_) { 335 ops.SendInitialMetadata(&context_->send_initial_metadata_, 336 context_->initial_metadata_flags()); 337 context_->set_initial_metadata_corked(false); 338 } 339 if (!ops.SendMessagePtr(&msg, options).ok()) { 340 return false; 341 } 342 343 call_.PerformOps(&ops); 344 return cq_.Pluck(&ops); 345 } 346 WritesDone()347 bool WritesDone() override { 348 grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops; 349 ops.ClientSendClose(); 350 call_.PerformOps(&ops); 351 return cq_.Pluck(&ops); 352 } 353 354 /// See the ClientStreamingInterface.Finish method for semantics. 355 /// Side effects: 356 /// - Also receives initial metadata if not already received. 357 /// - Attempts to fill in the \a response parameter passed 358 /// to the constructor of this instance with the response 359 /// message from the server. Finish()360 grpc::Status Finish() override { 361 grpc::Status status; 362 if (!context_->initial_metadata_received_) { 363 finish_ops_.RecvInitialMetadata(context_); 364 } 365 finish_ops_.ClientRecvStatus(context_, &status); 366 call_.PerformOps(&finish_ops_); 367 GPR_ASSERT(cq_.Pluck(&finish_ops_)); 368 return status; 369 } 370 371 private: 372 friend class internal::ClientWriterFactory<W>; 373 374 /// Block to create a stream (i.e. send request headers and other initial 375 /// metadata to the server). Note that \a context will be used to fill 376 /// in custom initial metadata. \a response will be filled in with the 377 /// single expected response message from the server upon a successful 378 /// call to the \a Finish method of this instance. 379 template <class R> ClientWriter(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response)380 ClientWriter(grpc::ChannelInterface* channel, 381 const grpc::internal::RpcMethod& method, 382 grpc::ClientContext* context, R* response) 383 : context_(context), 384 cq_(grpc_completion_queue_attributes{ 385 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 386 nullptr}), // Pluckable cq 387 call_(channel->CreateCall(method, context, &cq_)) { 388 finish_ops_.RecvMessage(response); 389 finish_ops_.AllowNoMessage(); 390 391 if (!context_->initial_metadata_corked_) { 392 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 393 ops.SendInitialMetadata(&context->send_initial_metadata_, 394 context->initial_metadata_flags()); 395 call_.PerformOps(&ops); 396 cq_.Pluck(&ops); 397 } 398 } 399 400 grpc::ClientContext* context_; 401 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 402 grpc::internal::CallOpGenericRecvMessage, 403 grpc::internal::CallOpClientRecvStatus> 404 finish_ops_; 405 grpc::CompletionQueue cq_; 406 grpc::internal::Call call_; 407 }; 408 409 /// Client-side interface for bi-directional streaming with 410 /// client-to-server stream messages of type \a W and 411 /// server-to-client stream messages of type \a R. 412 template <class W, class R> 413 class ClientReaderWriterInterface : public internal::ClientStreamingInterface, 414 public internal::WriterInterface<W>, 415 public internal::ReaderInterface<R> { 416 public: 417 /// Block to wait for initial metadata from server. The received metadata 418 /// can only be accessed after this call returns. Should only be called before 419 /// the first read. Calling this method is optional, and if it is not called 420 /// the metadata will be available in ClientContext after the first read. 421 virtual void WaitForInitialMetadata() = 0; 422 423 /// Half close writing from the client. (signal that the stream of messages 424 /// coming from the client is complete). 425 /// Blocks until currently-pending writes are completed. 426 /// Thread-safe with respect to \a ReaderInterface::Read 427 /// 428 /// \return Whether the writes were successful. 429 virtual bool WritesDone() = 0; 430 }; 431 432 namespace internal { 433 template <class W, class R> 434 class ClientReaderWriterFactory { 435 public: Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context)436 static ClientReaderWriter<W, R>* Create( 437 grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method, 438 grpc::ClientContext* context) { 439 return new ClientReaderWriter<W, R>(channel, method, context); 440 } 441 }; 442 } // namespace internal 443 444 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs, 445 /// where the outgoing message stream coming from the client has messages of 446 /// type \a W, and the incoming messages stream coming from the server has 447 /// messages of type \a R. 448 template <class W, class R> 449 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { 450 public: 451 /// Block waiting to read initial metadata from the server. 452 /// This call is optional, but if it is used, it cannot be used concurrently 453 /// with or after the \a Finish method. 454 /// 455 /// Once complete, the initial metadata read from the server will be 456 /// accessible through the \a ClientContext used to construct this object. WaitForInitialMetadata()457 void WaitForInitialMetadata() override { 458 GPR_ASSERT(!context_->initial_metadata_received_); 459 460 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops; 461 ops.RecvInitialMetadata(context_); 462 call_.PerformOps(&ops); 463 cq_.Pluck(&ops); // status ignored 464 } 465 NextMessageSize(uint32_t * sz)466 bool NextMessageSize(uint32_t* sz) override { 467 int result = call_.max_receive_message_size(); 468 *sz = (result > 0) ? result : UINT32_MAX; 469 return true; 470 } 471 472 /// See the \a ReaderInterface.Read method for semantics. 473 /// Side effect: 474 /// Also receives initial metadata if not already received (updates the \a 475 /// ClientContext associated with this call in that case). Read(R * msg)476 bool Read(R* msg) override { 477 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 478 grpc::internal::CallOpRecvMessage<R>> 479 ops; 480 if (!context_->initial_metadata_received_) { 481 ops.RecvInitialMetadata(context_); 482 } 483 ops.RecvMessage(msg); 484 call_.PerformOps(&ops); 485 return cq_.Pluck(&ops) && ops.got_message; 486 } 487 488 /// See the \a WriterInterface.Write method for semantics. 489 /// 490 /// Side effect: 491 /// Also sends initial metadata if not already sent (using the 492 /// \a ClientContext associated with this call to fill in values). 493 using internal::WriterInterface<W>::Write; Write(const W & msg,grpc::WriteOptions options)494 bool Write(const W& msg, grpc::WriteOptions options) override { 495 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 496 grpc::internal::CallOpSendMessage, 497 grpc::internal::CallOpClientSendClose> 498 ops; 499 500 if (options.is_last_message()) { 501 options.set_buffer_hint(); 502 ops.ClientSendClose(); 503 } 504 if (context_->initial_metadata_corked_) { 505 ops.SendInitialMetadata(&context_->send_initial_metadata_, 506 context_->initial_metadata_flags()); 507 context_->set_initial_metadata_corked(false); 508 } 509 if (!ops.SendMessagePtr(&msg, options).ok()) { 510 return false; 511 } 512 513 call_.PerformOps(&ops); 514 return cq_.Pluck(&ops); 515 } 516 WritesDone()517 bool WritesDone() override { 518 grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops; 519 ops.ClientSendClose(); 520 call_.PerformOps(&ops); 521 return cq_.Pluck(&ops); 522 } 523 524 /// See the ClientStreamingInterface.Finish method for semantics. 525 /// 526 /// Side effect: 527 /// - the \a ClientContext associated with this call is updated with 528 /// possible trailing metadata sent from the server. Finish()529 grpc::Status Finish() override { 530 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 531 grpc::internal::CallOpClientRecvStatus> 532 ops; 533 if (!context_->initial_metadata_received_) { 534 ops.RecvInitialMetadata(context_); 535 } 536 grpc::Status status; 537 ops.ClientRecvStatus(context_, &status); 538 call_.PerformOps(&ops); 539 GPR_ASSERT(cq_.Pluck(&ops)); 540 return status; 541 } 542 543 private: 544 friend class internal::ClientReaderWriterFactory<W, R>; 545 546 grpc::ClientContext* context_; 547 grpc::CompletionQueue cq_; 548 grpc::internal::Call call_; 549 550 /// Block to create a stream and write the initial metadata and \a request 551 /// out. Note that \a context will be used to fill in custom initial metadata 552 /// used to send to the server when starting the call. ClientReaderWriter(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context)553 ClientReaderWriter(grpc::ChannelInterface* channel, 554 const grpc::internal::RpcMethod& method, 555 grpc::ClientContext* context) 556 : context_(context), 557 cq_(grpc_completion_queue_attributes{ 558 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 559 nullptr}), // Pluckable cq 560 call_(channel->CreateCall(method, context, &cq_)) { 561 if (!context_->initial_metadata_corked_) { 562 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 563 ops.SendInitialMetadata(&context->send_initial_metadata_, 564 context->initial_metadata_flags()); 565 call_.PerformOps(&ops); 566 cq_.Pluck(&ops); 567 } 568 } 569 }; 570 571 /// Server-side interface for streaming reads of message of type \a R. 572 template <class R> 573 class ServerReaderInterface : public internal::ServerStreamingInterface, 574 public internal::ReaderInterface<R> {}; 575 576 /// Synchronous (blocking) server-side API for doing client-streaming RPCs, 577 /// where the incoming message stream coming from the client has messages of 578 /// type \a R. 579 template <class R> 580 class ServerReader final : public ServerReaderInterface<R> { 581 public: 582 /// See the \a ServerStreamingInterface.SendInitialMetadata method 583 /// for semantics. Note that initial metadata will be affected by the 584 /// \a ServerContext associated with this call. SendInitialMetadata()585 void SendInitialMetadata() override { 586 GPR_ASSERT(!ctx_->sent_initial_metadata_); 587 588 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 589 ops.SendInitialMetadata(&ctx_->initial_metadata_, 590 ctx_->initial_metadata_flags()); 591 if (ctx_->compression_level_set()) { 592 ops.set_compression_level(ctx_->compression_level()); 593 } 594 ctx_->sent_initial_metadata_ = true; 595 call_->PerformOps(&ops); 596 call_->cq()->Pluck(&ops); 597 } 598 NextMessageSize(uint32_t * sz)599 bool NextMessageSize(uint32_t* sz) override { 600 int result = call_->max_receive_message_size(); 601 *sz = (result > 0) ? result : UINT32_MAX; 602 return true; 603 } 604 Read(R * msg)605 bool Read(R* msg) override { 606 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops; 607 ops.RecvMessage(msg); 608 call_->PerformOps(&ops); 609 bool ok = call_->cq()->Pluck(&ops) && ops.got_message; 610 if (!ok) { 611 ctx_->MaybeMarkCancelledOnRead(); 612 } 613 return ok; 614 } 615 616 private: 617 grpc::internal::Call* const call_; 618 ServerContext* const ctx_; 619 620 template <class ServiceType, class RequestType, class ResponseType> 621 friend class internal::ClientStreamingHandler; 622 ServerReader(grpc::internal::Call * call,grpc::ServerContext * ctx)623 ServerReader(grpc::internal::Call* call, grpc::ServerContext* ctx) 624 : call_(call), ctx_(ctx) {} 625 }; 626 627 /// Server-side interface for streaming writes of message of type \a W. 628 template <class W> 629 class ServerWriterInterface : public internal::ServerStreamingInterface, 630 public internal::WriterInterface<W> {}; 631 632 /// Synchronous (blocking) server-side API for doing for doing a 633 /// server-streaming RPCs, where the outgoing message stream coming from the 634 /// server has messages of type \a W. 635 template <class W> 636 class ServerWriter final : public ServerWriterInterface<W> { 637 public: 638 /// See the \a ServerStreamingInterface.SendInitialMetadata method 639 /// for semantics. 640 /// Note that initial metadata will be affected by the 641 /// \a ServerContext associated with this call. SendInitialMetadata()642 void SendInitialMetadata() override { 643 GPR_ASSERT(!ctx_->sent_initial_metadata_); 644 645 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 646 ops.SendInitialMetadata(&ctx_->initial_metadata_, 647 ctx_->initial_metadata_flags()); 648 if (ctx_->compression_level_set()) { 649 ops.set_compression_level(ctx_->compression_level()); 650 } 651 ctx_->sent_initial_metadata_ = true; 652 call_->PerformOps(&ops); 653 call_->cq()->Pluck(&ops); 654 } 655 656 /// See the \a WriterInterface.Write method for semantics. 657 /// 658 /// Side effect: 659 /// Also sends initial metadata if not already sent (using the 660 /// \a ClientContext associated with this call to fill in values). 661 using internal::WriterInterface<W>::Write; Write(const W & msg,grpc::WriteOptions options)662 bool Write(const W& msg, grpc::WriteOptions options) override { 663 if (options.is_last_message()) { 664 options.set_buffer_hint(); 665 } 666 667 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { 668 return false; 669 } 670 if (!ctx_->sent_initial_metadata_) { 671 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 672 ctx_->initial_metadata_flags()); 673 if (ctx_->compression_level_set()) { 674 ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); 675 } 676 ctx_->sent_initial_metadata_ = true; 677 } 678 call_->PerformOps(&ctx_->pending_ops_); 679 // if this is the last message we defer the pluck until AFTER we start 680 // the trailing md op. This prevents hangs. See 681 // https://github.com/grpc/grpc/issues/11546 682 if (options.is_last_message()) { 683 ctx_->has_pending_ops_ = true; 684 return true; 685 } 686 ctx_->has_pending_ops_ = false; 687 return call_->cq()->Pluck(&ctx_->pending_ops_); 688 } 689 690 private: 691 grpc::internal::Call* const call_; 692 grpc::ServerContext* const ctx_; 693 694 template <class ServiceType, class RequestType, class ResponseType> 695 friend class internal::ServerStreamingHandler; 696 ServerWriter(grpc::internal::Call * call,grpc::ServerContext * ctx)697 ServerWriter(grpc::internal::Call* call, grpc::ServerContext* ctx) 698 : call_(call), ctx_(ctx) {} 699 }; 700 701 /// Server-side interface for bi-directional streaming. 702 template <class W, class R> 703 class ServerReaderWriterInterface : public internal::ServerStreamingInterface, 704 public internal::WriterInterface<W>, 705 public internal::ReaderInterface<R> {}; 706 707 /// Actual implementation of bi-directional streaming 708 namespace internal { 709 template <class W, class R> 710 class ServerReaderWriterBody final { 711 public: ServerReaderWriterBody(grpc::internal::Call * call,grpc::ServerContext * ctx)712 ServerReaderWriterBody(grpc::internal::Call* call, grpc::ServerContext* ctx) 713 : call_(call), ctx_(ctx) {} 714 SendInitialMetadata()715 void SendInitialMetadata() { 716 GPR_ASSERT(!ctx_->sent_initial_metadata_); 717 718 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 719 ops.SendInitialMetadata(&ctx_->initial_metadata_, 720 ctx_->initial_metadata_flags()); 721 if (ctx_->compression_level_set()) { 722 ops.set_compression_level(ctx_->compression_level()); 723 } 724 ctx_->sent_initial_metadata_ = true; 725 call_->PerformOps(&ops); 726 call_->cq()->Pluck(&ops); 727 } 728 NextMessageSize(uint32_t * sz)729 bool NextMessageSize(uint32_t* sz) { 730 int result = call_->max_receive_message_size(); 731 *sz = (result > 0) ? result : UINT32_MAX; 732 return true; 733 } 734 Read(R * msg)735 bool Read(R* msg) { 736 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops; 737 ops.RecvMessage(msg); 738 call_->PerformOps(&ops); 739 bool ok = call_->cq()->Pluck(&ops) && ops.got_message; 740 if (!ok) { 741 ctx_->MaybeMarkCancelledOnRead(); 742 } 743 return ok; 744 } 745 Write(const W & msg,grpc::WriteOptions options)746 bool Write(const W& msg, grpc::WriteOptions options) { 747 if (options.is_last_message()) { 748 options.set_buffer_hint(); 749 } 750 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { 751 return false; 752 } 753 if (!ctx_->sent_initial_metadata_) { 754 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 755 ctx_->initial_metadata_flags()); 756 if (ctx_->compression_level_set()) { 757 ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); 758 } 759 ctx_->sent_initial_metadata_ = true; 760 } 761 call_->PerformOps(&ctx_->pending_ops_); 762 // if this is the last message we defer the pluck until AFTER we start 763 // the trailing md op. This prevents hangs. See 764 // https://github.com/grpc/grpc/issues/11546 765 if (options.is_last_message()) { 766 ctx_->has_pending_ops_ = true; 767 return true; 768 } 769 ctx_->has_pending_ops_ = false; 770 return call_->cq()->Pluck(&ctx_->pending_ops_); 771 } 772 773 private: 774 grpc::internal::Call* const call_; 775 grpc::ServerContext* const ctx_; 776 }; 777 778 } // namespace internal 779 780 /// Synchronous (blocking) server-side API for a bidirectional 781 /// streaming call, where the incoming message stream coming from the client has 782 /// messages of type \a R, and the outgoing message streaming coming from 783 /// the server has messages of type \a W. 784 template <class W, class R> 785 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { 786 public: 787 /// See the \a ServerStreamingInterface.SendInitialMetadata method 788 /// for semantics. Note that initial metadata will be affected by the 789 /// \a ServerContext associated with this call. SendInitialMetadata()790 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 791 NextMessageSize(uint32_t * sz)792 bool NextMessageSize(uint32_t* sz) override { 793 return body_.NextMessageSize(sz); 794 } 795 Read(R * msg)796 bool Read(R* msg) override { return body_.Read(msg); } 797 798 /// See the \a WriterInterface.Write(const W& msg, WriteOptions options) 799 /// method for semantics. 800 /// Side effect: 801 /// Also sends initial metadata if not already sent (using the \a 802 /// ServerContext associated with this call). 803 using internal::WriterInterface<W>::Write; Write(const W & msg,grpc::WriteOptions options)804 bool Write(const W& msg, grpc::WriteOptions options) override { 805 return body_.Write(msg, options); 806 } 807 808 private: 809 internal::ServerReaderWriterBody<W, R> body_; 810 811 friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, 812 false>; ServerReaderWriter(grpc::internal::Call * call,grpc::ServerContext * ctx)813 ServerReaderWriter(grpc::internal::Call* call, grpc::ServerContext* ctx) 814 : body_(call, ctx) {} 815 }; 816 817 /// A class to represent a flow-controlled unary call. This is something 818 /// of a hybrid between conventional unary and streaming. This is invoked 819 /// through a unary call on the client side, but the server responds to it 820 /// as though it were a single-ping-pong streaming call. The server can use 821 /// the \a NextMessageSize method to determine an upper-bound on the size of 822 /// the message. A key difference relative to streaming: ServerUnaryStreamer 823 /// must have exactly 1 Read and exactly 1 Write, in that order, to function 824 /// correctly. Otherwise, the RPC is in error. 825 template <class RequestType, class ResponseType> 826 class ServerUnaryStreamer final 827 : public ServerReaderWriterInterface<ResponseType, RequestType> { 828 public: 829 /// Block to send initial metadata to client. 830 /// Implicit input parameter: 831 /// - the \a ServerContext associated with this call will be used for 832 /// sending initial metadata. SendInitialMetadata()833 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 834 835 /// Get an upper bound on the request message size from the client. NextMessageSize(uint32_t * sz)836 bool NextMessageSize(uint32_t* sz) override { 837 return body_.NextMessageSize(sz); 838 } 839 840 /// Read a message of type \a R into \a msg. Completion will be notified by \a 841 /// tag on the associated completion queue. 842 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 843 /// should not be called concurrently with other streaming APIs 844 /// on the same stream. It is not meaningful to call it concurrently 845 /// with another \a ReaderInterface::Read on the same stream since reads on 846 /// the same stream are delivered in order. 847 /// 848 /// \param[out] msg Where to eventually store the read message. 849 /// \param[in] tag The tag identifying the operation. Read(RequestType * request)850 bool Read(RequestType* request) override { 851 if (read_done_) { 852 return false; 853 } 854 read_done_ = true; 855 return body_.Read(request); 856 } 857 858 /// Block to write \a msg to the stream with WriteOptions \a options. 859 /// This is thread-safe with respect to \a ReaderInterface::Read 860 /// 861 /// \param msg The message to be written to the stream. 862 /// \param options The WriteOptions affecting the write operation. 863 /// 864 /// \return \a true on success, \a false when the stream has been closed. 865 using internal::WriterInterface<ResponseType>::Write; Write(const ResponseType & response,grpc::WriteOptions options)866 bool Write(const ResponseType& response, 867 grpc::WriteOptions options) override { 868 if (write_done_ || !read_done_) { 869 return false; 870 } 871 write_done_ = true; 872 return body_.Write(response, options); 873 } 874 875 private: 876 internal::ServerReaderWriterBody<ResponseType, RequestType> body_; 877 bool read_done_; 878 bool write_done_; 879 880 friend class internal::TemplatedBidiStreamingHandler< 881 ServerUnaryStreamer<RequestType, ResponseType>, true>; ServerUnaryStreamer(grpc::internal::Call * call,grpc::ServerContext * ctx)882 ServerUnaryStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx) 883 : body_(call, ctx), read_done_(false), write_done_(false) {} 884 }; 885 886 /// A class to represent a flow-controlled server-side streaming call. 887 /// This is something of a hybrid between server-side and bidi streaming. 888 /// This is invoked through a server-side streaming call on the client side, 889 /// but the server responds to it as though it were a bidi streaming call that 890 /// must first have exactly 1 Read and then any number of Writes. 891 template <class RequestType, class ResponseType> 892 class ServerSplitStreamer final 893 : public ServerReaderWriterInterface<ResponseType, RequestType> { 894 public: 895 /// Block to send initial metadata to client. 896 /// Implicit input parameter: 897 /// - the \a ServerContext associated with this call will be used for 898 /// sending initial metadata. SendInitialMetadata()899 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 900 901 /// Get an upper bound on the request message size from the client. NextMessageSize(uint32_t * sz)902 bool NextMessageSize(uint32_t* sz) override { 903 return body_.NextMessageSize(sz); 904 } 905 906 /// Read a message of type \a R into \a msg. Completion will be notified by \a 907 /// tag on the associated completion queue. 908 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 909 /// should not be called concurrently with other streaming APIs 910 /// on the same stream. It is not meaningful to call it concurrently 911 /// with another \a ReaderInterface::Read on the same stream since reads on 912 /// the same stream are delivered in order. 913 /// 914 /// \param[out] msg Where to eventually store the read message. 915 /// \param[in] tag The tag identifying the operation. Read(RequestType * request)916 bool Read(RequestType* request) override { 917 if (read_done_) { 918 return false; 919 } 920 read_done_ = true; 921 return body_.Read(request); 922 } 923 924 /// Block to write \a msg to the stream with WriteOptions \a options. 925 /// This is thread-safe with respect to \a ReaderInterface::Read 926 /// 927 /// \param msg The message to be written to the stream. 928 /// \param options The WriteOptions affecting the write operation. 929 /// 930 /// \return \a true on success, \a false when the stream has been closed. 931 using internal::WriterInterface<ResponseType>::Write; Write(const ResponseType & response,grpc::WriteOptions options)932 bool Write(const ResponseType& response, 933 grpc::WriteOptions options) override { 934 return read_done_ && body_.Write(response, options); 935 } 936 937 private: 938 internal::ServerReaderWriterBody<ResponseType, RequestType> body_; 939 bool read_done_; 940 941 friend class internal::TemplatedBidiStreamingHandler< 942 ServerSplitStreamer<RequestType, ResponseType>, false>; ServerSplitStreamer(grpc::internal::Call * call,grpc::ServerContext * ctx)943 ServerSplitStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx) 944 : body_(call, ctx), read_done_(false) {} 945 }; 946 947 } // namespace grpc 948 949 #endif // GRPCPP_SUPPORT_SYNC_STREAM_H 950