xref: /aosp_15_r20/external/grpc-grpc/include/grpcpp/support/sync_stream.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_SYNC_STREAM_H
20 #define GRPCPP_SUPPORT_SYNC_STREAM_H
21 
22 #include <grpc/support/log.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/completion_queue.h>
25 #include <grpcpp/impl/call.h>
26 #include <grpcpp/impl/codegen/channel_interface.h>
27 #include <grpcpp/impl/service_type.h>
28 #include <grpcpp/server_context.h>
29 #include <grpcpp/support/status.h>
30 
31 namespace grpc {
32 
33 namespace internal {
34 /// Common interface for all synchronous client side streaming.
35 class ClientStreamingInterface {
36  public:
~ClientStreamingInterface()37   virtual ~ClientStreamingInterface() {}
38 
39   /// Block waiting until the stream finishes and a final status of the call is
40   /// available.
41   ///
42   /// It is appropriate to call this method exactly once when both:
43   ///   * the calling code (client-side) has no more message to send
44   ///     (this can be declared implicitly by calling this method, or
45   ///     explicitly through an earlier call to <i>WritesDone</i> method of the
46   ///     class in use, e.g. \a ClientWriterInterface::WritesDone or
47   ///     \a ClientReaderWriterInterface::WritesDone).
48   ///   * there are no more messages to be received from the server (which can
49   ///     be known implicitly, or explicitly from an earlier call to \a
50   ///     ReaderInterface::Read that returned "false").
51   ///
52   /// This function will return either:
53   /// - when all incoming messages have been read and the server has
54   ///   returned status.
55   /// - when the server has returned a non-OK status.
56   /// - OR when the call failed for some reason and the library generated a
57   ///   status.
58   ///
59   /// Return values:
60   ///   - \a Status contains the status code, message and details for the call
61   ///   - the \a ClientContext associated with this call is updated with
62   ///     possible trailing metadata sent from the server.
63   virtual grpc::Status Finish() = 0;
64 };
65 
66 /// Common interface for all synchronous server side streaming.
67 class ServerStreamingInterface {
68  public:
~ServerStreamingInterface()69   virtual ~ServerStreamingInterface() {}
70 
71   /// Block to send initial metadata to client.
72   /// This call is optional, but if it is used, it cannot be used concurrently
73   /// with or after the \a Finish method.
74   ///
75   /// The initial metadata that will be sent to the client will be
76   /// taken from the \a ServerContext associated with the call.
77   virtual void SendInitialMetadata() = 0;
78 };
79 
80 /// An interface that yields a sequence of messages of type \a R.
81 template <class R>
82 class ReaderInterface {
83  public:
~ReaderInterface()84   virtual ~ReaderInterface() {}
85 
86   /// Get an upper bound on the next message size available for reading on this
87   /// stream.
88   virtual bool NextMessageSize(uint32_t* sz) = 0;
89 
90   /// Block to read a message and parse to \a msg. Returns \a true on success.
91   /// This is thread-safe with respect to \a Write or \WritesDone methods on
92   /// the same stream. It should not be called concurrently with another \a
93   /// Read on the same stream as the order of delivery will not be defined.
94   ///
95   /// \param[out] msg The read message.
96   ///
97   /// \return \a false when there will be no more incoming messages, either
98   /// because the other side has called \a WritesDone() or the stream has failed
99   /// (or been cancelled).
100   virtual bool Read(R* msg) = 0;
101 };
102 
103 /// An interface that can be fed a sequence of messages of type \a W.
104 template <class W>
105 class WriterInterface {
106  public:
~WriterInterface()107   virtual ~WriterInterface() {}
108 
109   /// Block to write \a msg to the stream with WriteOptions \a options.
110   /// This is thread-safe with respect to \a ReaderInterface::Read
111   ///
112   /// \param msg The message to be written to the stream.
113   /// \param options The WriteOptions affecting the write operation.
114   ///
115   /// \return \a true on success, \a false when the stream has been closed.
116   virtual bool Write(const W& msg, grpc::WriteOptions options) = 0;
117 
118   /// Block to write \a msg to the stream with default write options.
119   /// This is thread-safe with respect to \a ReaderInterface::Read
120   ///
121   /// \param msg The message to be written to the stream.
122   ///
123   /// \return \a true on success, \a false when the stream has been closed.
Write(const W & msg)124   inline bool Write(const W& msg) { return Write(msg, grpc::WriteOptions()); }
125 
126   /// Write \a msg and coalesce it with the writing of trailing metadata, using
127   /// WriteOptions \a options.
128   ///
129   /// For client, WriteLast is equivalent of performing Write and WritesDone in
130   /// a single step. \a msg and trailing metadata are coalesced and sent on wire
131   /// by calling this function. For server, WriteLast buffers the \a msg.
132   /// The writing of \a msg is held until the service handler returns,
133   /// where \a msg and trailing metadata are coalesced and sent on wire.
134   /// Note that WriteLast can only buffer \a msg up to the flow control window
135   /// size. If \a msg size is larger than the window size, it will be sent on
136   /// wire without buffering.
137   ///
138   /// \param[in] msg The message to be written to the stream.
139   /// \param[in] options The WriteOptions to be used to write this message.
WriteLast(const W & msg,grpc::WriteOptions options)140   void WriteLast(const W& msg, grpc::WriteOptions options) {
141     Write(msg, options.set_last_message());
142   }
143 };
144 
145 }  // namespace internal
146 
147 /// Client-side interface for streaming reads of message of type \a R.
148 template <class R>
149 class ClientReaderInterface : public internal::ClientStreamingInterface,
150                               public internal::ReaderInterface<R> {
151  public:
152   /// Block to wait for initial metadata from server. The received metadata
153   /// can only be accessed after this call returns. Should only be called before
154   /// the first read. Calling this method is optional, and if it is not called
155   /// the metadata will be available in ClientContext after the first read.
156   virtual void WaitForInitialMetadata() = 0;
157 };
158 
159 namespace internal {
160 template <class R>
161 class ClientReaderFactory {
162  public:
163   template <class W>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)164   static ClientReader<R>* Create(grpc::ChannelInterface* channel,
165                                  const grpc::internal::RpcMethod& method,
166                                  grpc::ClientContext* context,
167                                  const W& request) {
168     return new ClientReader<R>(channel, method, context, request);
169   }
170 };
171 }  // namespace internal
172 
173 /// Synchronous (blocking) client-side API for doing server-streaming RPCs,
174 /// where the stream of messages coming from the server has messages
175 /// of type \a R.
176 template <class R>
177 class ClientReader final : public ClientReaderInterface<R> {
178  public:
179   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
180   /// semantics.
181   ///
182   //  Side effect:
183   ///   Once complete, the initial metadata read from
184   ///   the server will be accessible through the \a ClientContext used to
185   ///   construct this object.
WaitForInitialMetadata()186   void WaitForInitialMetadata() override {
187     GPR_ASSERT(!context_->initial_metadata_received_);
188 
189     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops;
190     ops.RecvInitialMetadata(context_);
191     call_.PerformOps(&ops);
192     cq_.Pluck(&ops);  /// status ignored
193   }
194 
NextMessageSize(uint32_t * sz)195   bool NextMessageSize(uint32_t* sz) override {
196     int result = call_.max_receive_message_size();
197     *sz = (result > 0) ? result : UINT32_MAX;
198     return true;
199   }
200 
201   /// See the \a ReaderInterface.Read method for semantics.
202   /// Side effect:
203   ///   This also receives initial metadata from the server, if not
204   ///   already received (if initial metadata is received, it can be then
205   ///   accessed through the \a ClientContext associated with this call).
Read(R * msg)206   bool Read(R* msg) override {
207     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
208                               grpc::internal::CallOpRecvMessage<R>>
209         ops;
210     if (!context_->initial_metadata_received_) {
211       ops.RecvInitialMetadata(context_);
212     }
213     ops.RecvMessage(msg);
214     call_.PerformOps(&ops);
215     return cq_.Pluck(&ops) && ops.got_message;
216   }
217 
218   /// See the \a ClientStreamingInterface.Finish method for semantics.
219   ///
220   /// Side effect:
221   ///   The \a ClientContext associated with this call is updated with
222   ///   possible metadata received from the server.
Finish()223   grpc::Status Finish() override {
224     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
225                               grpc::internal::CallOpClientRecvStatus>
226         ops;
227     if (!context_->initial_metadata_received_) {
228       ops.RecvInitialMetadata(context_);
229     }
230     grpc::Status status;
231     ops.ClientRecvStatus(context_, &status);
232     call_.PerformOps(&ops);
233     GPR_ASSERT(cq_.Pluck(&ops));
234     return status;
235   }
236 
237  private:
238   friend class internal::ClientReaderFactory<R>;
239   grpc::ClientContext* context_;
240   grpc::CompletionQueue cq_;
241   grpc::internal::Call call_;
242 
243   /// Block to create a stream and write the initial metadata and \a request
244   /// out. Note that \a context will be used to fill in custom initial
245   /// metadata used to send to the server when starting the call.
246   template <class W>
ClientReader(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)247   ClientReader(grpc::ChannelInterface* channel,
248                const grpc::internal::RpcMethod& method,
249                grpc::ClientContext* context, const W& request)
250       : context_(context),
251         cq_(grpc_completion_queue_attributes{
252             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
253             nullptr}),  // Pluckable cq
254         call_(channel->CreateCall(method, context, &cq_)) {
255     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
256                               grpc::internal::CallOpSendMessage,
257                               grpc::internal::CallOpClientSendClose>
258         ops;
259     ops.SendInitialMetadata(&context->send_initial_metadata_,
260                             context->initial_metadata_flags());
261     // TODO(ctiller): don't assert
262     GPR_ASSERT(ops.SendMessagePtr(&request).ok());
263     ops.ClientSendClose();
264     call_.PerformOps(&ops);
265     cq_.Pluck(&ops);
266   }
267 };
268 
269 /// Client-side interface for streaming writes of message type \a W.
270 template <class W>
271 class ClientWriterInterface : public internal::ClientStreamingInterface,
272                               public internal::WriterInterface<W> {
273  public:
274   /// Half close writing from the client. (signal that the stream of messages
275   /// coming from the client is complete).
276   /// Blocks until currently-pending writes are completed.
277   /// Thread safe with respect to \a ReaderInterface::Read operations only
278   ///
279   /// \return Whether the writes were successful.
280   virtual bool WritesDone() = 0;
281 };
282 
283 namespace internal {
284 template <class W>
285 class ClientWriterFactory {
286  public:
287   template <class R>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response)288   static ClientWriter<W>* Create(grpc::ChannelInterface* channel,
289                                  const grpc::internal::RpcMethod& method,
290                                  grpc::ClientContext* context, R* response) {
291     return new ClientWriter<W>(channel, method, context, response);
292   }
293 };
294 }  // namespace internal
295 
296 /// Synchronous (blocking) client-side API for doing client-streaming RPCs,
297 /// where the outgoing message stream coming from the client has messages of
298 /// type \a W.
299 template <class W>
300 class ClientWriter : public ClientWriterInterface<W> {
301  public:
302   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
303   /// semantics.
304   ///
305   //  Side effect:
306   ///   Once complete, the initial metadata read from the server will be
307   ///   accessible through the \a ClientContext used to construct this object.
WaitForInitialMetadata()308   void WaitForInitialMetadata() {
309     GPR_ASSERT(!context_->initial_metadata_received_);
310 
311     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops;
312     ops.RecvInitialMetadata(context_);
313     call_.PerformOps(&ops);
314     cq_.Pluck(&ops);  // status ignored
315   }
316 
317   /// See the WriterInterface.Write(const W& msg, WriteOptions options) method
318   /// for semantics.
319   ///
320   /// Side effect:
321   ///   Also sends initial metadata if not already sent (using the
322   ///   \a ClientContext associated with this call).
323   using internal::WriterInterface<W>::Write;
Write(const W & msg,grpc::WriteOptions options)324   bool Write(const W& msg, grpc::WriteOptions options) override {
325     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
326                               grpc::internal::CallOpSendMessage,
327                               grpc::internal::CallOpClientSendClose>
328         ops;
329 
330     if (options.is_last_message()) {
331       options.set_buffer_hint();
332       ops.ClientSendClose();
333     }
334     if (context_->initial_metadata_corked_) {
335       ops.SendInitialMetadata(&context_->send_initial_metadata_,
336                               context_->initial_metadata_flags());
337       context_->set_initial_metadata_corked(false);
338     }
339     if (!ops.SendMessagePtr(&msg, options).ok()) {
340       return false;
341     }
342 
343     call_.PerformOps(&ops);
344     return cq_.Pluck(&ops);
345   }
346 
WritesDone()347   bool WritesDone() override {
348     grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops;
349     ops.ClientSendClose();
350     call_.PerformOps(&ops);
351     return cq_.Pluck(&ops);
352   }
353 
354   /// See the ClientStreamingInterface.Finish method for semantics.
355   /// Side effects:
356   ///   - Also receives initial metadata if not already received.
357   ///   - Attempts to fill in the \a response parameter passed
358   ///     to the constructor of this instance with the response
359   ///     message from the server.
Finish()360   grpc::Status Finish() override {
361     grpc::Status status;
362     if (!context_->initial_metadata_received_) {
363       finish_ops_.RecvInitialMetadata(context_);
364     }
365     finish_ops_.ClientRecvStatus(context_, &status);
366     call_.PerformOps(&finish_ops_);
367     GPR_ASSERT(cq_.Pluck(&finish_ops_));
368     return status;
369   }
370 
371  private:
372   friend class internal::ClientWriterFactory<W>;
373 
374   /// Block to create a stream (i.e. send request headers and other initial
375   /// metadata to the server). Note that \a context will be used to fill
376   /// in custom initial metadata. \a response will be filled in with the
377   /// single expected response message from the server upon a successful
378   /// call to the \a Finish method of this instance.
379   template <class R>
ClientWriter(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response)380   ClientWriter(grpc::ChannelInterface* channel,
381                const grpc::internal::RpcMethod& method,
382                grpc::ClientContext* context, R* response)
383       : context_(context),
384         cq_(grpc_completion_queue_attributes{
385             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
386             nullptr}),  // Pluckable cq
387         call_(channel->CreateCall(method, context, &cq_)) {
388     finish_ops_.RecvMessage(response);
389     finish_ops_.AllowNoMessage();
390 
391     if (!context_->initial_metadata_corked_) {
392       grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
393       ops.SendInitialMetadata(&context->send_initial_metadata_,
394                               context->initial_metadata_flags());
395       call_.PerformOps(&ops);
396       cq_.Pluck(&ops);
397     }
398   }
399 
400   grpc::ClientContext* context_;
401   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
402                             grpc::internal::CallOpGenericRecvMessage,
403                             grpc::internal::CallOpClientRecvStatus>
404       finish_ops_;
405   grpc::CompletionQueue cq_;
406   grpc::internal::Call call_;
407 };
408 
409 /// Client-side interface for bi-directional streaming with
410 /// client-to-server stream messages of type \a W and
411 /// server-to-client stream messages of type \a R.
412 template <class W, class R>
413 class ClientReaderWriterInterface : public internal::ClientStreamingInterface,
414                                     public internal::WriterInterface<W>,
415                                     public internal::ReaderInterface<R> {
416  public:
417   /// Block to wait for initial metadata from server. The received metadata
418   /// can only be accessed after this call returns. Should only be called before
419   /// the first read. Calling this method is optional, and if it is not called
420   /// the metadata will be available in ClientContext after the first read.
421   virtual void WaitForInitialMetadata() = 0;
422 
423   /// Half close writing from the client. (signal that the stream of messages
424   /// coming from the client is complete).
425   /// Blocks until currently-pending writes are completed.
426   /// Thread-safe with respect to \a ReaderInterface::Read
427   ///
428   /// \return Whether the writes were successful.
429   virtual bool WritesDone() = 0;
430 };
431 
432 namespace internal {
433 template <class W, class R>
434 class ClientReaderWriterFactory {
435  public:
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context)436   static ClientReaderWriter<W, R>* Create(
437       grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method,
438       grpc::ClientContext* context) {
439     return new ClientReaderWriter<W, R>(channel, method, context);
440   }
441 };
442 }  // namespace internal
443 
444 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs,
445 /// where the outgoing message stream coming from the client has messages of
446 /// type \a W, and the incoming messages stream coming from the server has
447 /// messages of type \a R.
448 template <class W, class R>
449 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
450  public:
451   /// Block waiting to read initial metadata from the server.
452   /// This call is optional, but if it is used, it cannot be used concurrently
453   /// with or after the \a Finish method.
454   ///
455   /// Once complete, the initial metadata read from the server will be
456   /// accessible through the \a ClientContext used to construct this object.
WaitForInitialMetadata()457   void WaitForInitialMetadata() override {
458     GPR_ASSERT(!context_->initial_metadata_received_);
459 
460     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops;
461     ops.RecvInitialMetadata(context_);
462     call_.PerformOps(&ops);
463     cq_.Pluck(&ops);  // status ignored
464   }
465 
NextMessageSize(uint32_t * sz)466   bool NextMessageSize(uint32_t* sz) override {
467     int result = call_.max_receive_message_size();
468     *sz = (result > 0) ? result : UINT32_MAX;
469     return true;
470   }
471 
472   /// See the \a ReaderInterface.Read method for semantics.
473   /// Side effect:
474   ///   Also receives initial metadata if not already received (updates the \a
475   ///   ClientContext associated with this call in that case).
Read(R * msg)476   bool Read(R* msg) override {
477     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
478                               grpc::internal::CallOpRecvMessage<R>>
479         ops;
480     if (!context_->initial_metadata_received_) {
481       ops.RecvInitialMetadata(context_);
482     }
483     ops.RecvMessage(msg);
484     call_.PerformOps(&ops);
485     return cq_.Pluck(&ops) && ops.got_message;
486   }
487 
488   /// See the \a WriterInterface.Write method for semantics.
489   ///
490   /// Side effect:
491   ///   Also sends initial metadata if not already sent (using the
492   ///   \a ClientContext associated with this call to fill in values).
493   using internal::WriterInterface<W>::Write;
Write(const W & msg,grpc::WriteOptions options)494   bool Write(const W& msg, grpc::WriteOptions options) override {
495     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
496                               grpc::internal::CallOpSendMessage,
497                               grpc::internal::CallOpClientSendClose>
498         ops;
499 
500     if (options.is_last_message()) {
501       options.set_buffer_hint();
502       ops.ClientSendClose();
503     }
504     if (context_->initial_metadata_corked_) {
505       ops.SendInitialMetadata(&context_->send_initial_metadata_,
506                               context_->initial_metadata_flags());
507       context_->set_initial_metadata_corked(false);
508     }
509     if (!ops.SendMessagePtr(&msg, options).ok()) {
510       return false;
511     }
512 
513     call_.PerformOps(&ops);
514     return cq_.Pluck(&ops);
515   }
516 
WritesDone()517   bool WritesDone() override {
518     grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops;
519     ops.ClientSendClose();
520     call_.PerformOps(&ops);
521     return cq_.Pluck(&ops);
522   }
523 
524   /// See the ClientStreamingInterface.Finish method for semantics.
525   ///
526   /// Side effect:
527   ///   - the \a ClientContext associated with this call is updated with
528   ///     possible trailing metadata sent from the server.
Finish()529   grpc::Status Finish() override {
530     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
531                               grpc::internal::CallOpClientRecvStatus>
532         ops;
533     if (!context_->initial_metadata_received_) {
534       ops.RecvInitialMetadata(context_);
535     }
536     grpc::Status status;
537     ops.ClientRecvStatus(context_, &status);
538     call_.PerformOps(&ops);
539     GPR_ASSERT(cq_.Pluck(&ops));
540     return status;
541   }
542 
543  private:
544   friend class internal::ClientReaderWriterFactory<W, R>;
545 
546   grpc::ClientContext* context_;
547   grpc::CompletionQueue cq_;
548   grpc::internal::Call call_;
549 
550   /// Block to create a stream and write the initial metadata and \a request
551   /// out. Note that \a context will be used to fill in custom initial metadata
552   /// used to send to the server when starting the call.
ClientReaderWriter(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context)553   ClientReaderWriter(grpc::ChannelInterface* channel,
554                      const grpc::internal::RpcMethod& method,
555                      grpc::ClientContext* context)
556       : context_(context),
557         cq_(grpc_completion_queue_attributes{
558             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
559             nullptr}),  // Pluckable cq
560         call_(channel->CreateCall(method, context, &cq_)) {
561     if (!context_->initial_metadata_corked_) {
562       grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
563       ops.SendInitialMetadata(&context->send_initial_metadata_,
564                               context->initial_metadata_flags());
565       call_.PerformOps(&ops);
566       cq_.Pluck(&ops);
567     }
568   }
569 };
570 
571 /// Server-side interface for streaming reads of message of type \a R.
572 template <class R>
573 class ServerReaderInterface : public internal::ServerStreamingInterface,
574                               public internal::ReaderInterface<R> {};
575 
576 /// Synchronous (blocking) server-side API for doing client-streaming RPCs,
577 /// where the incoming message stream coming from the client has messages of
578 /// type \a R.
579 template <class R>
580 class ServerReader final : public ServerReaderInterface<R> {
581  public:
582   /// See the \a ServerStreamingInterface.SendInitialMetadata method
583   /// for semantics. Note that initial metadata will be affected by the
584   /// \a ServerContext associated with this call.
SendInitialMetadata()585   void SendInitialMetadata() override {
586     GPR_ASSERT(!ctx_->sent_initial_metadata_);
587 
588     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
589     ops.SendInitialMetadata(&ctx_->initial_metadata_,
590                             ctx_->initial_metadata_flags());
591     if (ctx_->compression_level_set()) {
592       ops.set_compression_level(ctx_->compression_level());
593     }
594     ctx_->sent_initial_metadata_ = true;
595     call_->PerformOps(&ops);
596     call_->cq()->Pluck(&ops);
597   }
598 
NextMessageSize(uint32_t * sz)599   bool NextMessageSize(uint32_t* sz) override {
600     int result = call_->max_receive_message_size();
601     *sz = (result > 0) ? result : UINT32_MAX;
602     return true;
603   }
604 
Read(R * msg)605   bool Read(R* msg) override {
606     grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops;
607     ops.RecvMessage(msg);
608     call_->PerformOps(&ops);
609     bool ok = call_->cq()->Pluck(&ops) && ops.got_message;
610     if (!ok) {
611       ctx_->MaybeMarkCancelledOnRead();
612     }
613     return ok;
614   }
615 
616  private:
617   grpc::internal::Call* const call_;
618   ServerContext* const ctx_;
619 
620   template <class ServiceType, class RequestType, class ResponseType>
621   friend class internal::ClientStreamingHandler;
622 
ServerReader(grpc::internal::Call * call,grpc::ServerContext * ctx)623   ServerReader(grpc::internal::Call* call, grpc::ServerContext* ctx)
624       : call_(call), ctx_(ctx) {}
625 };
626 
627 /// Server-side interface for streaming writes of message of type \a W.
628 template <class W>
629 class ServerWriterInterface : public internal::ServerStreamingInterface,
630                               public internal::WriterInterface<W> {};
631 
632 /// Synchronous (blocking) server-side API for doing for doing a
633 /// server-streaming RPCs, where the outgoing message stream coming from the
634 /// server has messages of type \a W.
635 template <class W>
636 class ServerWriter final : public ServerWriterInterface<W> {
637  public:
638   /// See the \a ServerStreamingInterface.SendInitialMetadata method
639   /// for semantics.
640   /// Note that initial metadata will be affected by the
641   /// \a ServerContext associated with this call.
SendInitialMetadata()642   void SendInitialMetadata() override {
643     GPR_ASSERT(!ctx_->sent_initial_metadata_);
644 
645     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
646     ops.SendInitialMetadata(&ctx_->initial_metadata_,
647                             ctx_->initial_metadata_flags());
648     if (ctx_->compression_level_set()) {
649       ops.set_compression_level(ctx_->compression_level());
650     }
651     ctx_->sent_initial_metadata_ = true;
652     call_->PerformOps(&ops);
653     call_->cq()->Pluck(&ops);
654   }
655 
656   /// See the \a WriterInterface.Write method for semantics.
657   ///
658   /// Side effect:
659   ///   Also sends initial metadata if not already sent (using the
660   ///   \a ClientContext associated with this call to fill in values).
661   using internal::WriterInterface<W>::Write;
Write(const W & msg,grpc::WriteOptions options)662   bool Write(const W& msg, grpc::WriteOptions options) override {
663     if (options.is_last_message()) {
664       options.set_buffer_hint();
665     }
666 
667     if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
668       return false;
669     }
670     if (!ctx_->sent_initial_metadata_) {
671       ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
672                                              ctx_->initial_metadata_flags());
673       if (ctx_->compression_level_set()) {
674         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
675       }
676       ctx_->sent_initial_metadata_ = true;
677     }
678     call_->PerformOps(&ctx_->pending_ops_);
679     // if this is the last message we defer the pluck until AFTER we start
680     // the trailing md op. This prevents hangs. See
681     // https://github.com/grpc/grpc/issues/11546
682     if (options.is_last_message()) {
683       ctx_->has_pending_ops_ = true;
684       return true;
685     }
686     ctx_->has_pending_ops_ = false;
687     return call_->cq()->Pluck(&ctx_->pending_ops_);
688   }
689 
690  private:
691   grpc::internal::Call* const call_;
692   grpc::ServerContext* const ctx_;
693 
694   template <class ServiceType, class RequestType, class ResponseType>
695   friend class internal::ServerStreamingHandler;
696 
ServerWriter(grpc::internal::Call * call,grpc::ServerContext * ctx)697   ServerWriter(grpc::internal::Call* call, grpc::ServerContext* ctx)
698       : call_(call), ctx_(ctx) {}
699 };
700 
701 /// Server-side interface for bi-directional streaming.
702 template <class W, class R>
703 class ServerReaderWriterInterface : public internal::ServerStreamingInterface,
704                                     public internal::WriterInterface<W>,
705                                     public internal::ReaderInterface<R> {};
706 
707 /// Actual implementation of bi-directional streaming
708 namespace internal {
709 template <class W, class R>
710 class ServerReaderWriterBody final {
711  public:
ServerReaderWriterBody(grpc::internal::Call * call,grpc::ServerContext * ctx)712   ServerReaderWriterBody(grpc::internal::Call* call, grpc::ServerContext* ctx)
713       : call_(call), ctx_(ctx) {}
714 
SendInitialMetadata()715   void SendInitialMetadata() {
716     GPR_ASSERT(!ctx_->sent_initial_metadata_);
717 
718     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
719     ops.SendInitialMetadata(&ctx_->initial_metadata_,
720                             ctx_->initial_metadata_flags());
721     if (ctx_->compression_level_set()) {
722       ops.set_compression_level(ctx_->compression_level());
723     }
724     ctx_->sent_initial_metadata_ = true;
725     call_->PerformOps(&ops);
726     call_->cq()->Pluck(&ops);
727   }
728 
NextMessageSize(uint32_t * sz)729   bool NextMessageSize(uint32_t* sz) {
730     int result = call_->max_receive_message_size();
731     *sz = (result > 0) ? result : UINT32_MAX;
732     return true;
733   }
734 
Read(R * msg)735   bool Read(R* msg) {
736     grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops;
737     ops.RecvMessage(msg);
738     call_->PerformOps(&ops);
739     bool ok = call_->cq()->Pluck(&ops) && ops.got_message;
740     if (!ok) {
741       ctx_->MaybeMarkCancelledOnRead();
742     }
743     return ok;
744   }
745 
Write(const W & msg,grpc::WriteOptions options)746   bool Write(const W& msg, grpc::WriteOptions options) {
747     if (options.is_last_message()) {
748       options.set_buffer_hint();
749     }
750     if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
751       return false;
752     }
753     if (!ctx_->sent_initial_metadata_) {
754       ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
755                                              ctx_->initial_metadata_flags());
756       if (ctx_->compression_level_set()) {
757         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
758       }
759       ctx_->sent_initial_metadata_ = true;
760     }
761     call_->PerformOps(&ctx_->pending_ops_);
762     // if this is the last message we defer the pluck until AFTER we start
763     // the trailing md op. This prevents hangs. See
764     // https://github.com/grpc/grpc/issues/11546
765     if (options.is_last_message()) {
766       ctx_->has_pending_ops_ = true;
767       return true;
768     }
769     ctx_->has_pending_ops_ = false;
770     return call_->cq()->Pluck(&ctx_->pending_ops_);
771   }
772 
773  private:
774   grpc::internal::Call* const call_;
775   grpc::ServerContext* const ctx_;
776 };
777 
778 }  // namespace internal
779 
780 /// Synchronous (blocking) server-side API for a bidirectional
781 /// streaming call, where the incoming message stream coming from the client has
782 /// messages of type \a R, and the outgoing message streaming coming from
783 /// the server has messages of type \a W.
784 template <class W, class R>
785 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
786  public:
787   /// See the \a ServerStreamingInterface.SendInitialMetadata method
788   /// for semantics. Note that initial metadata will be affected by the
789   /// \a ServerContext associated with this call.
SendInitialMetadata()790   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
791 
NextMessageSize(uint32_t * sz)792   bool NextMessageSize(uint32_t* sz) override {
793     return body_.NextMessageSize(sz);
794   }
795 
Read(R * msg)796   bool Read(R* msg) override { return body_.Read(msg); }
797 
798   /// See the \a WriterInterface.Write(const W& msg, WriteOptions options)
799   /// method for semantics.
800   /// Side effect:
801   ///   Also sends initial metadata if not already sent (using the \a
802   ///   ServerContext associated with this call).
803   using internal::WriterInterface<W>::Write;
Write(const W & msg,grpc::WriteOptions options)804   bool Write(const W& msg, grpc::WriteOptions options) override {
805     return body_.Write(msg, options);
806   }
807 
808  private:
809   internal::ServerReaderWriterBody<W, R> body_;
810 
811   friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>,
812                                                        false>;
ServerReaderWriter(grpc::internal::Call * call,grpc::ServerContext * ctx)813   ServerReaderWriter(grpc::internal::Call* call, grpc::ServerContext* ctx)
814       : body_(call, ctx) {}
815 };
816 
817 /// A class to represent a flow-controlled unary call. This is something
818 /// of a hybrid between conventional unary and streaming. This is invoked
819 /// through a unary call on the client side, but the server responds to it
820 /// as though it were a single-ping-pong streaming call. The server can use
821 /// the \a NextMessageSize method to determine an upper-bound on the size of
822 /// the message. A key difference relative to streaming: ServerUnaryStreamer
823 /// must have exactly 1 Read and exactly 1 Write, in that order, to function
824 /// correctly. Otherwise, the RPC is in error.
825 template <class RequestType, class ResponseType>
826 class ServerUnaryStreamer final
827     : public ServerReaderWriterInterface<ResponseType, RequestType> {
828  public:
829   /// Block to send initial metadata to client.
830   /// Implicit input parameter:
831   ///    - the \a ServerContext associated with this call will be used for
832   ///      sending initial metadata.
SendInitialMetadata()833   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
834 
835   /// Get an upper bound on the request message size from the client.
NextMessageSize(uint32_t * sz)836   bool NextMessageSize(uint32_t* sz) override {
837     return body_.NextMessageSize(sz);
838   }
839 
840   /// Read a message of type \a R into \a msg. Completion will be notified by \a
841   /// tag on the associated completion queue.
842   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
843   /// should not be called concurrently with other streaming APIs
844   /// on the same stream. It is not meaningful to call it concurrently
845   /// with another \a ReaderInterface::Read on the same stream since reads on
846   /// the same stream are delivered in order.
847   ///
848   /// \param[out] msg Where to eventually store the read message.
849   /// \param[in] tag The tag identifying the operation.
Read(RequestType * request)850   bool Read(RequestType* request) override {
851     if (read_done_) {
852       return false;
853     }
854     read_done_ = true;
855     return body_.Read(request);
856   }
857 
858   /// Block to write \a msg to the stream with WriteOptions \a options.
859   /// This is thread-safe with respect to \a ReaderInterface::Read
860   ///
861   /// \param msg The message to be written to the stream.
862   /// \param options The WriteOptions affecting the write operation.
863   ///
864   /// \return \a true on success, \a false when the stream has been closed.
865   using internal::WriterInterface<ResponseType>::Write;
Write(const ResponseType & response,grpc::WriteOptions options)866   bool Write(const ResponseType& response,
867              grpc::WriteOptions options) override {
868     if (write_done_ || !read_done_) {
869       return false;
870     }
871     write_done_ = true;
872     return body_.Write(response, options);
873   }
874 
875  private:
876   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
877   bool read_done_;
878   bool write_done_;
879 
880   friend class internal::TemplatedBidiStreamingHandler<
881       ServerUnaryStreamer<RequestType, ResponseType>, true>;
ServerUnaryStreamer(grpc::internal::Call * call,grpc::ServerContext * ctx)882   ServerUnaryStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx)
883       : body_(call, ctx), read_done_(false), write_done_(false) {}
884 };
885 
886 /// A class to represent a flow-controlled server-side streaming call.
887 /// This is something of a hybrid between server-side and bidi streaming.
888 /// This is invoked through a server-side streaming call on the client side,
889 /// but the server responds to it as though it were a bidi streaming call that
890 /// must first have exactly 1 Read and then any number of Writes.
891 template <class RequestType, class ResponseType>
892 class ServerSplitStreamer final
893     : public ServerReaderWriterInterface<ResponseType, RequestType> {
894  public:
895   /// Block to send initial metadata to client.
896   /// Implicit input parameter:
897   ///    - the \a ServerContext associated with this call will be used for
898   ///      sending initial metadata.
SendInitialMetadata()899   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
900 
901   /// Get an upper bound on the request message size from the client.
NextMessageSize(uint32_t * sz)902   bool NextMessageSize(uint32_t* sz) override {
903     return body_.NextMessageSize(sz);
904   }
905 
906   /// Read a message of type \a R into \a msg. Completion will be notified by \a
907   /// tag on the associated completion queue.
908   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
909   /// should not be called concurrently with other streaming APIs
910   /// on the same stream. It is not meaningful to call it concurrently
911   /// with another \a ReaderInterface::Read on the same stream since reads on
912   /// the same stream are delivered in order.
913   ///
914   /// \param[out] msg Where to eventually store the read message.
915   /// \param[in] tag The tag identifying the operation.
Read(RequestType * request)916   bool Read(RequestType* request) override {
917     if (read_done_) {
918       return false;
919     }
920     read_done_ = true;
921     return body_.Read(request);
922   }
923 
924   /// Block to write \a msg to the stream with WriteOptions \a options.
925   /// This is thread-safe with respect to \a ReaderInterface::Read
926   ///
927   /// \param msg The message to be written to the stream.
928   /// \param options The WriteOptions affecting the write operation.
929   ///
930   /// \return \a true on success, \a false when the stream has been closed.
931   using internal::WriterInterface<ResponseType>::Write;
Write(const ResponseType & response,grpc::WriteOptions options)932   bool Write(const ResponseType& response,
933              grpc::WriteOptions options) override {
934     return read_done_ && body_.Write(response, options);
935   }
936 
937  private:
938   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
939   bool read_done_;
940 
941   friend class internal::TemplatedBidiStreamingHandler<
942       ServerSplitStreamer<RequestType, ResponseType>, false>;
ServerSplitStreamer(grpc::internal::Call * call,grpc::ServerContext * ctx)943   ServerSplitStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx)
944       : body_(call, ctx), read_done_(false) {}
945 };
946 
947 }  // namespace grpc
948 
949 #endif  // GRPCPP_SUPPORT_SYNC_STREAM_H
950