1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H
20 #define GRPCPP_SUPPORT_CLIENT_CALLBACK_H
21 
22 #include <atomic>
23 #include <functional>
24 
25 #include <grpc/grpc.h>
26 #include <grpc/support/log.h>
27 #include <grpcpp/impl/call.h>
28 #include <grpcpp/impl/call_op_set.h>
29 #include <grpcpp/impl/sync.h>
30 #include <grpcpp/support/callback_common.h>
31 #include <grpcpp/support/config.h>
32 #include <grpcpp/support/status.h>
33 
34 namespace grpc {
35 class Channel;
36 class ClientContext;
37 
38 namespace internal {
39 class RpcMethod;
40 
41 /// Perform a callback-based unary call.  May optionally specify the base
42 /// class of the Request and Response so that the internal calls and structures
43 /// below this may be based on those base classes and thus achieve code reuse
44 /// across different RPCs (e.g., for protobuf, MessageLite would be a base
45 /// class).
46 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
47 template <class InputMessage, class OutputMessage,
48           class BaseInputMessage = InputMessage,
49           class BaseOutputMessage = OutputMessage>
CallbackUnaryCall(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (grpc::Status)> on_completion)50 void CallbackUnaryCall(grpc::ChannelInterface* channel,
51                        const grpc::internal::RpcMethod& method,
52                        grpc::ClientContext* context,
53                        const InputMessage* request, OutputMessage* result,
54                        std::function<void(grpc::Status)> on_completion) {
55   static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
56                 "Invalid input message specification");
57   static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
58                 "Invalid output message specification");
59   CallbackUnaryCallImpl<BaseInputMessage, BaseOutputMessage> x(
60       channel, method, context, request, result, on_completion);
61 }
62 
63 template <class InputMessage, class OutputMessage>
64 class CallbackUnaryCallImpl {
65  public:
CallbackUnaryCallImpl(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (grpc::Status)> on_completion)66   CallbackUnaryCallImpl(grpc::ChannelInterface* channel,
67                         const grpc::internal::RpcMethod& method,
68                         grpc::ClientContext* context,
69                         const InputMessage* request, OutputMessage* result,
70                         std::function<void(grpc::Status)> on_completion) {
71     grpc::CompletionQueue* cq = channel->CallbackCQ();
72     GPR_ASSERT(cq != nullptr);
73     grpc::internal::Call call(channel->CreateCall(method, context, cq));
74 
75     using FullCallOpSet = grpc::internal::CallOpSet<
76         grpc::internal::CallOpSendInitialMetadata,
77         grpc::internal::CallOpSendMessage,
78         grpc::internal::CallOpRecvInitialMetadata,
79         grpc::internal::CallOpRecvMessage<OutputMessage>,
80         grpc::internal::CallOpClientSendClose,
81         grpc::internal::CallOpClientRecvStatus>;
82 
83     struct OpSetAndTag {
84       FullCallOpSet opset;
85       grpc::internal::CallbackWithStatusTag tag;
86     };
87     const size_t alloc_sz = sizeof(OpSetAndTag);
88     auto* const alloced =
89         static_cast<OpSetAndTag*>(grpc_call_arena_alloc(call.call(), alloc_sz));
90     auto* ops = new (&alloced->opset) FullCallOpSet;
91     auto* tag = new (&alloced->tag)
92         grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
93 
94     // TODO(vjpai): Unify code with sync API as much as possible
95     grpc::Status s = ops->SendMessagePtr(request);
96     if (!s.ok()) {
97       tag->force_run(s);
98       return;
99     }
100     ops->SendInitialMetadata(&context->send_initial_metadata_,
101                              context->initial_metadata_flags());
102     ops->RecvInitialMetadata(context);
103     ops->RecvMessage(result);
104     ops->AllowNoMessage();
105     ops->ClientSendClose();
106     ops->ClientRecvStatus(context, tag->status_ptr());
107     ops->set_core_cq_tag(tag);
108     call.PerformOps(ops);
109   }
110 };
111 
112 // Base class for public API classes.
113 class ClientReactor {
114  public:
115   virtual ~ClientReactor() = default;
116 
117   /// Called by the library when all operations associated with this RPC have
118   /// completed and all Holds have been removed. OnDone provides the RPC status
119   /// outcome for both successful and failed RPCs. If it is never called on an
120   /// RPC, it indicates an application-level problem (like failure to remove a
121   /// hold).
122   ///
123   /// \param[in] s The status outcome of this RPC
124   virtual void OnDone(const grpc::Status& /*s*/) = 0;
125 
126   /// InternalScheduleOnDone is not part of the API and is not meant to be
127   /// overridden. It is virtual to allow successful builds for certain bazel
128   /// build users that only want to depend on gRPC codegen headers and not the
129   /// full library (although this is not a generally-supported option). Although
130   /// the virtual call is slower than a direct call, this function is
131   /// heavyweight and the cost of the virtual call is not much in comparison.
132   /// This function may be removed or devirtualized in the future.
133   virtual void InternalScheduleOnDone(grpc::Status s);
134 
135   /// InternalTrailersOnly is not part of the API and is not meant to be
136   /// overridden. It is virtual to allow successful builds for certain bazel
137   /// build users that only want to depend on gRPC codegen headers and not the
138   /// full library (although this is not a generally-supported option). Although
139   /// the virtual call is slower than a direct call, this function is
140   /// heavyweight and the cost of the virtual call is not much in comparison.
141   /// This function may be removed or devirtualized in the future.
142   virtual bool InternalTrailersOnly(const grpc_call* call) const;
143 };
144 
145 }  // namespace internal
146 
147 // Forward declarations
148 template <class Request, class Response>
149 class ClientBidiReactor;
150 template <class Response>
151 class ClientReadReactor;
152 template <class Request>
153 class ClientWriteReactor;
154 class ClientUnaryReactor;
155 
156 // NOTE: The streaming objects are not actually implemented in the public API.
157 //       These interfaces are provided for mocking only. Typical applications
158 //       will interact exclusively with the reactors that they define.
159 template <class Request, class Response>
160 class ClientCallbackReaderWriter {
161  public:
~ClientCallbackReaderWriter()162   virtual ~ClientCallbackReaderWriter() {}
163   virtual void StartCall() = 0;
164   virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
165   virtual void WritesDone() = 0;
166   virtual void Read(Response* resp) = 0;
167   virtual void AddHold(int holds) = 0;
168   virtual void RemoveHold() = 0;
169 
170  protected:
BindReactor(ClientBidiReactor<Request,Response> * reactor)171   void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
172     reactor->BindStream(this);
173   }
174 };
175 
176 template <class Response>
177 class ClientCallbackReader {
178  public:
~ClientCallbackReader()179   virtual ~ClientCallbackReader() {}
180   virtual void StartCall() = 0;
181   virtual void Read(Response* resp) = 0;
182   virtual void AddHold(int holds) = 0;
183   virtual void RemoveHold() = 0;
184 
185  protected:
BindReactor(ClientReadReactor<Response> * reactor)186   void BindReactor(ClientReadReactor<Response>* reactor) {
187     reactor->BindReader(this);
188   }
189 };
190 
191 template <class Request>
192 class ClientCallbackWriter {
193  public:
~ClientCallbackWriter()194   virtual ~ClientCallbackWriter() {}
195   virtual void StartCall() = 0;
Write(const Request * req)196   void Write(const Request* req) { Write(req, grpc::WriteOptions()); }
197   virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
WriteLast(const Request * req,grpc::WriteOptions options)198   void WriteLast(const Request* req, grpc::WriteOptions options) {
199     Write(req, options.set_last_message());
200   }
201   virtual void WritesDone() = 0;
202 
203   virtual void AddHold(int holds) = 0;
204   virtual void RemoveHold() = 0;
205 
206  protected:
BindReactor(ClientWriteReactor<Request> * reactor)207   void BindReactor(ClientWriteReactor<Request>* reactor) {
208     reactor->BindWriter(this);
209   }
210 };
211 
212 class ClientCallbackUnary {
213  public:
~ClientCallbackUnary()214   virtual ~ClientCallbackUnary() {}
215   virtual void StartCall() = 0;
216 
217  protected:
218   void BindReactor(ClientUnaryReactor* reactor);
219 };
220 
221 // The following classes are the reactor interfaces that are to be implemented
222 // by the user. They are passed in to the library as an argument to a call on a
223 // stub (either a codegen-ed call or a generic call). The streaming RPC is
224 // activated by calling StartCall, possibly after initiating StartRead,
225 // StartWrite, or AddHold operations on the streaming object. Note that none of
226 // the classes are pure; all reactions have a default empty reaction so that the
227 // user class only needs to override those reactions that it cares about.
228 // The reactor must be passed to the stub invocation before any of the below
229 // operations can be called and its reactions will be invoked by the library in
230 // response to the completion of various operations. Reactions must not include
231 // blocking operations (such as blocking I/O, starting synchronous RPCs, or
232 // waiting on condition variables). Reactions may be invoked concurrently,
233 // except that OnDone is called after all others (assuming proper API usage).
234 // The reactor may not be deleted until OnDone is called.
235 
236 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
237 template <class Request, class Response>
238 class ClientBidiReactor : public internal::ClientReactor {
239  public:
240   /// Activate the RPC and initiate any reads or writes that have been Start'ed
241   /// before this call. All streaming RPCs issued by the client MUST have
242   /// StartCall invoked on them (even if they are canceled) as this call is the
243   /// activation of their lifecycle.
StartCall()244   void StartCall() { stream_->StartCall(); }
245 
246   /// Initiate a read operation (or post it for later initiation if StartCall
247   /// has not yet been invoked).
248   ///
249   /// \param[out] resp Where to eventually store the read message. Valid when
250   ///                  the library calls OnReadDone
StartRead(Response * resp)251   void StartRead(Response* resp) { stream_->Read(resp); }
252 
253   /// Initiate a write operation (or post it for later initiation if StartCall
254   /// has not yet been invoked).
255   ///
256   /// \param[in] req The message to be written. The library does not take
257   ///                ownership but the caller must ensure that the message is
258   ///                not deleted or modified until OnWriteDone is called.
StartWrite(const Request * req)259   void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
260 
261   /// Initiate/post a write operation with specified options.
262   ///
263   /// \param[in] req The message to be written. The library does not take
264   ///                ownership but the caller must ensure that the message is
265   ///                not deleted or modified until OnWriteDone is called.
266   /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Request * req,grpc::WriteOptions options)267   void StartWrite(const Request* req, grpc::WriteOptions options) {
268     stream_->Write(req, options);
269   }
270 
271   /// Initiate/post a write operation with specified options and an indication
272   /// that this is the last write (like StartWrite and StartWritesDone, merged).
273   /// Note that calling this means that no more calls to StartWrite,
274   /// StartWriteLast, or StartWritesDone are allowed.
275   ///
276   /// \param[in] req The message to be written. The library does not take
277   ///                ownership but the caller must ensure that the message is
278   ///                not deleted or modified until OnWriteDone is called.
279   /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Request * req,grpc::WriteOptions options)280   void StartWriteLast(const Request* req, grpc::WriteOptions options) {
281     StartWrite(req, options.set_last_message());
282   }
283 
284   /// Indicate that the RPC will have no more write operations. This can only be
285   /// issued once for a given RPC. This is not required or allowed if
286   /// StartWriteLast is used since that already has the same implication.
287   /// Note that calling this means that no more calls to StartWrite,
288   /// StartWriteLast, or StartWritesDone are allowed.
StartWritesDone()289   void StartWritesDone() { stream_->WritesDone(); }
290 
291   /// Holds are needed if (and only if) this stream has operations that take
292   /// place on it after StartCall but from outside one of the reactions
293   /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
294   ///
295   /// Holds must be added before calling StartCall. If a stream still has a hold
296   /// in place, its resources will not be destroyed even if the status has
297   /// already come in from the wire and there are currently no active callbacks
298   /// outstanding. Similarly, the stream will not call OnDone if there are still
299   /// holds on it.
300   ///
301   /// For example, if a StartRead or StartWrite operation is going to be
302   /// initiated from elsewhere in the application, the application should call
303   /// AddHold or AddMultipleHolds before StartCall.  If there is going to be,
304   /// for example, a read-flow and a write-flow taking place outside the
305   /// reactions, then call AddMultipleHolds(2) before StartCall. When the
306   /// application knows that it won't issue any more read operations (such as
307   /// when a read comes back as not ok), it should issue a RemoveHold(). It
308   /// should also call RemoveHold() again after it does StartWriteLast or
309   /// StartWritesDone that indicates that there will be no more write ops.
310   /// The number of RemoveHold calls must match the total number of AddHold
311   /// calls plus the number of holds added by AddMultipleHolds.
312   /// The argument to AddMultipleHolds must be positive.
AddHold()313   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)314   void AddMultipleHolds(int holds) {
315     GPR_DEBUG_ASSERT(holds > 0);
316     stream_->AddHold(holds);
317   }
RemoveHold()318   void RemoveHold() { stream_->RemoveHold(); }
319 
320   /// Notifies the application that all operations associated with this RPC
321   /// have completed and all Holds have been removed. OnDone provides the RPC
322   /// status outcome for both successful and failed RPCs and will be called in
323   /// all cases. If it is not called, it indicates an application-level problem
324   /// (like failure to remove a hold).
325   ///
326   /// \param[in] s The status outcome of this RPC
OnDone(const grpc::Status &)327   void OnDone(const grpc::Status& /*s*/) override {}
328 
329   /// Notifies the application that a read of initial metadata from the
330   /// server is done. If the application chooses not to implement this method,
331   /// it can assume that the initial metadata has been read before the first
332   /// call of OnReadDone or OnDone.
333   ///
334   /// \param[in] ok Was the initial metadata read successfully? If false, no
335   ///               new read/write operation will succeed, and any further
336   ///               Start* operations should not be called.
OnReadInitialMetadataDone(bool)337   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
338 
339   /// Notifies the application that a StartRead operation completed.
340   ///
341   /// \param[in] ok Was it successful? If false, no new read/write operation
342   ///               will succeed, and any further Start* should not be called.
OnReadDone(bool)343   virtual void OnReadDone(bool /*ok*/) {}
344 
345   /// Notifies the application that a StartWrite or StartWriteLast operation
346   /// completed.
347   ///
348   /// \param[in] ok Was it successful? If false, no new read/write operation
349   ///               will succeed, and any further Start* should not be called.
OnWriteDone(bool)350   virtual void OnWriteDone(bool /*ok*/) {}
351 
352   /// Notifies the application that a StartWritesDone operation completed. Note
353   /// that this is only used on explicit StartWritesDone operations and not for
354   /// those that are implicitly invoked as part of a StartWriteLast.
355   ///
356   /// \param[in] ok Was it successful? If false, the application will later see
357   ///               the failure reflected as a bad status in OnDone and no
358   ///               further Start* should be called.
OnWritesDoneDone(bool)359   virtual void OnWritesDoneDone(bool /*ok*/) {}
360 
361  private:
362   friend class ClientCallbackReaderWriter<Request, Response>;
BindStream(ClientCallbackReaderWriter<Request,Response> * stream)363   void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
364     stream_ = stream;
365   }
366   ClientCallbackReaderWriter<Request, Response>* stream_;
367 };
368 
369 /// \a ClientReadReactor is the interface for a server-streaming RPC.
370 /// All public methods behave as in ClientBidiReactor.
371 template <class Response>
372 class ClientReadReactor : public internal::ClientReactor {
373  public:
StartCall()374   void StartCall() { reader_->StartCall(); }
StartRead(Response * resp)375   void StartRead(Response* resp) { reader_->Read(resp); }
376 
AddHold()377   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)378   void AddMultipleHolds(int holds) {
379     GPR_DEBUG_ASSERT(holds > 0);
380     reader_->AddHold(holds);
381   }
RemoveHold()382   void RemoveHold() { reader_->RemoveHold(); }
383 
OnDone(const grpc::Status &)384   void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)385   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)386   virtual void OnReadDone(bool /*ok*/) {}
387 
388  private:
389   friend class ClientCallbackReader<Response>;
BindReader(ClientCallbackReader<Response> * reader)390   void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
391   ClientCallbackReader<Response>* reader_;
392 };
393 
394 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
395 /// All public methods behave as in ClientBidiReactor.
396 template <class Request>
397 class ClientWriteReactor : public internal::ClientReactor {
398  public:
StartCall()399   void StartCall() { writer_->StartCall(); }
StartWrite(const Request * req)400   void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
StartWrite(const Request * req,grpc::WriteOptions options)401   void StartWrite(const Request* req, grpc::WriteOptions options) {
402     writer_->Write(req, options);
403   }
StartWriteLast(const Request * req,grpc::WriteOptions options)404   void StartWriteLast(const Request* req, grpc::WriteOptions options) {
405     StartWrite(req, options.set_last_message());
406   }
StartWritesDone()407   void StartWritesDone() { writer_->WritesDone(); }
408 
AddHold()409   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)410   void AddMultipleHolds(int holds) {
411     GPR_DEBUG_ASSERT(holds > 0);
412     writer_->AddHold(holds);
413   }
RemoveHold()414   void RemoveHold() { writer_->RemoveHold(); }
415 
OnDone(const grpc::Status &)416   void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)417   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)418   virtual void OnWriteDone(bool /*ok*/) {}
OnWritesDoneDone(bool)419   virtual void OnWritesDoneDone(bool /*ok*/) {}
420 
421  private:
422   friend class ClientCallbackWriter<Request>;
BindWriter(ClientCallbackWriter<Request> * writer)423   void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
424 
425   ClientCallbackWriter<Request>* writer_;
426 };
427 
428 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
429 /// This is _not_ a common way of invoking a unary RPC. In practice, this
430 /// option should be used only if the unary RPC wants to receive initial
431 /// metadata without waiting for the response to complete. Most deployments of
432 /// RPC systems do not use this option, but it is needed for generality.
433 /// All public methods behave as in ClientBidiReactor.
434 /// StartCall is included for consistency with the other reactor flavors: even
435 /// though there are no StartRead or StartWrite operations to queue before the
436 /// call (that is part of the unary call itself) and there is no reactor object
437 /// being created as a result of this call, we keep a consistent 2-phase
438 /// initiation API among all the reactor flavors.
439 class ClientUnaryReactor : public internal::ClientReactor {
440  public:
StartCall()441   void StartCall() { call_->StartCall(); }
OnDone(const grpc::Status &)442   void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)443   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
444 
445  private:
446   friend class ClientCallbackUnary;
BindCall(ClientCallbackUnary * call)447   void BindCall(ClientCallbackUnary* call) { call_ = call; }
448   ClientCallbackUnary* call_;
449 };
450 
451 // Define function out-of-line from class to avoid forward declaration issue
BindReactor(ClientUnaryReactor * reactor)452 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
453   reactor->BindCall(this);
454 }
455 
456 namespace internal {
457 
458 // Forward declare factory classes for friendship
459 template <class Request, class Response>
460 class ClientCallbackReaderWriterFactory;
461 template <class Response>
462 class ClientCallbackReaderFactory;
463 template <class Request>
464 class ClientCallbackWriterFactory;
465 
466 template <class Request, class Response>
467 class ClientCallbackReaderWriterImpl
468     : public ClientCallbackReaderWriter<Request, Response> {
469  public:
470   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)471   static void operator delete(void* /*ptr*/, std::size_t size) {
472     GPR_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
473   }
474 
475   // This operator should never be called as the memory should be freed as part
476   // of the arena destruction. It only exists to provide a matching operator
477   // delete to the operator new so that some compilers will not complain (see
478   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
479   // there are no tests catching the compiler warning.
delete(void *,void *)480   static void operator delete(void*, void*) { GPR_ASSERT(false); }
481 
StartCall()482   void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
483     // This call initiates two batches, plus any backlog, each with a callback
484     // 1. Send initial metadata (unless corked) + recv initial metadata
485     // 2. Any read backlog
486     // 3. Any write backlog
487     // 4. Recv trailing metadata (unless corked)
488     if (!start_corked_) {
489       start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
490                                      context_->initial_metadata_flags());
491     }
492 
493     call_.PerformOps(&start_ops_);
494 
495     {
496       grpc::internal::MutexLock lock(&start_mu_);
497 
498       if (backlog_.read_ops) {
499         call_.PerformOps(&read_ops_);
500       }
501       if (backlog_.write_ops) {
502         call_.PerformOps(&write_ops_);
503       }
504       if (backlog_.writes_done_ops) {
505         call_.PerformOps(&writes_done_ops_);
506       }
507       call_.PerformOps(&finish_ops_);
508       // The last thing in this critical section is to set started_ so that it
509       // can be used lock-free as well.
510       started_.store(true, std::memory_order_release);
511     }
512     // MaybeFinish outside the lock to make sure that destruction of this object
513     // doesn't take place while holding the lock (which would cause the lock to
514     // be released after destruction)
515     this->MaybeFinish(/*from_reaction=*/false);
516   }
517 
Read(Response * msg)518   void Read(Response* msg) override {
519     read_ops_.RecvMessage(msg);
520     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
521     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
522       grpc::internal::MutexLock lock(&start_mu_);
523       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
524         backlog_.read_ops = true;
525         return;
526       }
527     }
528     call_.PerformOps(&read_ops_);
529   }
530 
Write(const Request * msg,grpc::WriteOptions options)531   void Write(const Request* msg, grpc::WriteOptions options)
532       ABSL_LOCKS_EXCLUDED(start_mu_) override {
533     if (options.is_last_message()) {
534       options.set_buffer_hint();
535       write_ops_.ClientSendClose();
536     }
537     // TODO(vjpai): don't assert
538     GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
539     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
540     if (GPR_UNLIKELY(corked_write_needed_)) {
541       write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
542                                      context_->initial_metadata_flags());
543       corked_write_needed_ = false;
544     }
545 
546     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
547       grpc::internal::MutexLock lock(&start_mu_);
548       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
549         backlog_.write_ops = true;
550         return;
551       }
552     }
553     call_.PerformOps(&write_ops_);
554   }
WritesDone()555   void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
556     writes_done_ops_.ClientSendClose();
557     writes_done_tag_.Set(
558         call_.call(),
559         [this](bool ok) {
560           reactor_->OnWritesDoneDone(ok);
561           MaybeFinish(/*from_reaction=*/true);
562         },
563         &writes_done_ops_, /*can_inline=*/false);
564     writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
565     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
566     if (GPR_UNLIKELY(corked_write_needed_)) {
567       writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
568                                            context_->initial_metadata_flags());
569       corked_write_needed_ = false;
570     }
571     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
572       grpc::internal::MutexLock lock(&start_mu_);
573       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
574         backlog_.writes_done_ops = true;
575         return;
576       }
577     }
578     call_.PerformOps(&writes_done_ops_);
579   }
580 
AddHold(int holds)581   void AddHold(int holds) override {
582     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
583   }
RemoveHold()584   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
585 
586  private:
587   friend class ClientCallbackReaderWriterFactory<Request, Response>;
588 
ClientCallbackReaderWriterImpl(grpc::internal::Call call,grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)589   ClientCallbackReaderWriterImpl(grpc::internal::Call call,
590                                  grpc::ClientContext* context,
591                                  ClientBidiReactor<Request, Response>* reactor)
592       : context_(context),
593         call_(call),
594         reactor_(reactor),
595         start_corked_(context_->initial_metadata_corked_),
596         corked_write_needed_(start_corked_) {
597     this->BindReactor(reactor);
598 
599     // Set up the unchanging parts of the start, read, and write tags and ops.
600     start_tag_.Set(
601         call_.call(),
602         [this](bool ok) {
603           reactor_->OnReadInitialMetadataDone(
604               ok && !reactor_->InternalTrailersOnly(call_.call()));
605           MaybeFinish(/*from_reaction=*/true);
606         },
607         &start_ops_, /*can_inline=*/false);
608     start_ops_.RecvInitialMetadata(context_);
609     start_ops_.set_core_cq_tag(&start_tag_);
610 
611     write_tag_.Set(
612         call_.call(),
613         [this](bool ok) {
614           reactor_->OnWriteDone(ok);
615           MaybeFinish(/*from_reaction=*/true);
616         },
617         &write_ops_, /*can_inline=*/false);
618     write_ops_.set_core_cq_tag(&write_tag_);
619 
620     read_tag_.Set(
621         call_.call(),
622         [this](bool ok) {
623           reactor_->OnReadDone(ok);
624           MaybeFinish(/*from_reaction=*/true);
625         },
626         &read_ops_, /*can_inline=*/false);
627     read_ops_.set_core_cq_tag(&read_tag_);
628 
629     // Also set up the Finish tag and op set.
630     finish_tag_.Set(
631         call_.call(),
632         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
633         &finish_ops_,
634         /*can_inline=*/false);
635     finish_ops_.ClientRecvStatus(context_, &finish_status_);
636     finish_ops_.set_core_cq_tag(&finish_tag_);
637   }
638 
639   // MaybeFinish can be called from reactions or from user-initiated operations
640   // like StartCall or RemoveHold. If this is the last operation or hold on this
641   // object, it will invoke the OnDone reaction. If MaybeFinish was called from
642   // a reaction, it can call OnDone directly. If not, it would need to schedule
643   // OnDone onto an executor thread to avoid the possibility of deadlocking with
644   // any locks in the user code that invoked it.
MaybeFinish(bool from_reaction)645   void MaybeFinish(bool from_reaction) {
646     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
647                          1, std::memory_order_acq_rel) == 1)) {
648       grpc::Status s = std::move(finish_status_);
649       auto* reactor = reactor_;
650       auto* call = call_.call();
651       this->~ClientCallbackReaderWriterImpl();
652       grpc_call_unref(call);
653       if (GPR_LIKELY(from_reaction)) {
654         reactor->OnDone(s);
655       } else {
656         reactor->InternalScheduleOnDone(std::move(s));
657       }
658     }
659   }
660 
661   grpc::ClientContext* const context_;
662   grpc::internal::Call call_;
663   ClientBidiReactor<Request, Response>* const reactor_;
664 
665   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
666                             grpc::internal::CallOpRecvInitialMetadata>
667       start_ops_;
668   grpc::internal::CallbackWithSuccessTag start_tag_;
669   const bool start_corked_;
670   bool corked_write_needed_;  // no lock needed since only accessed in
671                               // Write/WritesDone which cannot be concurrent
672 
673   grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
674   grpc::internal::CallbackWithSuccessTag finish_tag_;
675   grpc::Status finish_status_;
676 
677   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
678                             grpc::internal::CallOpSendMessage,
679                             grpc::internal::CallOpClientSendClose>
680       write_ops_;
681   grpc::internal::CallbackWithSuccessTag write_tag_;
682 
683   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
684                             grpc::internal::CallOpClientSendClose>
685       writes_done_ops_;
686   grpc::internal::CallbackWithSuccessTag writes_done_tag_;
687 
688   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
689       read_ops_;
690   grpc::internal::CallbackWithSuccessTag read_tag_;
691 
692   struct StartCallBacklog {
693     bool write_ops = false;
694     bool writes_done_ops = false;
695     bool read_ops = false;
696   };
697   StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
698 
699   // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
700   std::atomic<intptr_t> callbacks_outstanding_{3};
701   std::atomic_bool started_{false};
702   grpc::internal::Mutex start_mu_;
703 };
704 
705 template <class Request, class Response>
706 class ClientCallbackReaderWriterFactory {
707  public:
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)708   static void Create(grpc::ChannelInterface* channel,
709                      const grpc::internal::RpcMethod& method,
710                      grpc::ClientContext* context,
711                      ClientBidiReactor<Request, Response>* reactor) {
712     grpc::internal::Call call =
713         channel->CreateCall(method, context, channel->CallbackCQ());
714 
715     grpc_call_ref(call.call());
716     new (grpc_call_arena_alloc(
717         call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
718         ClientCallbackReaderWriterImpl<Request, Response>(call, context,
719                                                           reactor);
720   }
721 };
722 
723 template <class Response>
724 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
725  public:
726   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)727   static void operator delete(void* /*ptr*/, std::size_t size) {
728     GPR_ASSERT(size == sizeof(ClientCallbackReaderImpl));
729   }
730 
731   // This operator should never be called as the memory should be freed as part
732   // of the arena destruction. It only exists to provide a matching operator
733   // delete to the operator new so that some compilers will not complain (see
734   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
735   // there are no tests catching the compiler warning.
delete(void *,void *)736   static void operator delete(void*, void*) { GPR_ASSERT(false); }
737 
StartCall()738   void StartCall() override {
739     // This call initiates two batches, plus any backlog, each with a callback
740     // 1. Send initial metadata (unless corked) + recv initial metadata
741     // 2. Any backlog
742     // 3. Recv trailing metadata
743 
744     start_tag_.Set(
745         call_.call(),
746         [this](bool ok) {
747           reactor_->OnReadInitialMetadataDone(
748               ok && !reactor_->InternalTrailersOnly(call_.call()));
749           MaybeFinish(/*from_reaction=*/true);
750         },
751         &start_ops_, /*can_inline=*/false);
752     start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
753                                    context_->initial_metadata_flags());
754     start_ops_.RecvInitialMetadata(context_);
755     start_ops_.set_core_cq_tag(&start_tag_);
756     call_.PerformOps(&start_ops_);
757 
758     // Also set up the read tag so it doesn't have to be set up each time
759     read_tag_.Set(
760         call_.call(),
761         [this](bool ok) {
762           reactor_->OnReadDone(ok);
763           MaybeFinish(/*from_reaction=*/true);
764         },
765         &read_ops_, /*can_inline=*/false);
766     read_ops_.set_core_cq_tag(&read_tag_);
767 
768     {
769       grpc::internal::MutexLock lock(&start_mu_);
770       if (backlog_.read_ops) {
771         call_.PerformOps(&read_ops_);
772       }
773       started_.store(true, std::memory_order_release);
774     }
775 
776     finish_tag_.Set(
777         call_.call(),
778         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
779         &finish_ops_, /*can_inline=*/false);
780     finish_ops_.ClientRecvStatus(context_, &finish_status_);
781     finish_ops_.set_core_cq_tag(&finish_tag_);
782     call_.PerformOps(&finish_ops_);
783   }
784 
Read(Response * msg)785   void Read(Response* msg) override {
786     read_ops_.RecvMessage(msg);
787     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
788     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
789       grpc::internal::MutexLock lock(&start_mu_);
790       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
791         backlog_.read_ops = true;
792         return;
793       }
794     }
795     call_.PerformOps(&read_ops_);
796   }
797 
AddHold(int holds)798   void AddHold(int holds) override {
799     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
800   }
RemoveHold()801   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
802 
803  private:
804   friend class ClientCallbackReaderFactory<Response>;
805 
806   template <class Request>
ClientCallbackReaderImpl(grpc::internal::Call call,grpc::ClientContext * context,Request * request,ClientReadReactor<Response> * reactor)807   ClientCallbackReaderImpl(grpc::internal::Call call,
808                            grpc::ClientContext* context, Request* request,
809                            ClientReadReactor<Response>* reactor)
810       : context_(context), call_(call), reactor_(reactor) {
811     this->BindReactor(reactor);
812     // TODO(vjpai): don't assert
813     GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
814     start_ops_.ClientSendClose();
815   }
816 
817   // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)818   void MaybeFinish(bool from_reaction) {
819     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
820                          1, std::memory_order_acq_rel) == 1)) {
821       grpc::Status s = std::move(finish_status_);
822       auto* reactor = reactor_;
823       auto* call = call_.call();
824       this->~ClientCallbackReaderImpl();
825       grpc_call_unref(call);
826       if (GPR_LIKELY(from_reaction)) {
827         reactor->OnDone(s);
828       } else {
829         reactor->InternalScheduleOnDone(std::move(s));
830       }
831     }
832   }
833 
834   grpc::ClientContext* const context_;
835   grpc::internal::Call call_;
836   ClientReadReactor<Response>* const reactor_;
837 
838   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
839                             grpc::internal::CallOpSendMessage,
840                             grpc::internal::CallOpClientSendClose,
841                             grpc::internal::CallOpRecvInitialMetadata>
842       start_ops_;
843   grpc::internal::CallbackWithSuccessTag start_tag_;
844 
845   grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
846   grpc::internal::CallbackWithSuccessTag finish_tag_;
847   grpc::Status finish_status_;
848 
849   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
850       read_ops_;
851   grpc::internal::CallbackWithSuccessTag read_tag_;
852 
853   struct StartCallBacklog {
854     bool read_ops = false;
855   };
856   StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
857 
858   // Minimum of 2 callbacks to pre-register for start and finish
859   std::atomic<intptr_t> callbacks_outstanding_{2};
860   std::atomic_bool started_{false};
861   grpc::internal::Mutex start_mu_;
862 };
863 
864 template <class Response>
865 class ClientCallbackReaderFactory {
866  public:
867   template <class Request>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const Request * request,ClientReadReactor<Response> * reactor)868   static void Create(grpc::ChannelInterface* channel,
869                      const grpc::internal::RpcMethod& method,
870                      grpc::ClientContext* context, const Request* request,
871                      ClientReadReactor<Response>* reactor) {
872     grpc::internal::Call call =
873         channel->CreateCall(method, context, channel->CallbackCQ());
874 
875     grpc_call_ref(call.call());
876     new (grpc_call_arena_alloc(call.call(),
877                                sizeof(ClientCallbackReaderImpl<Response>)))
878         ClientCallbackReaderImpl<Response>(call, context, request, reactor);
879   }
880 };
881 
882 template <class Request>
883 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
884  public:
885   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)886   static void operator delete(void* /*ptr*/, std::size_t size) {
887     GPR_ASSERT(size == sizeof(ClientCallbackWriterImpl));
888   }
889 
890   // This operator should never be called as the memory should be freed as part
891   // of the arena destruction. It only exists to provide a matching operator
892   // delete to the operator new so that some compilers will not complain (see
893   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
894   // there are no tests catching the compiler warning.
delete(void *,void *)895   static void operator delete(void*, void*) { GPR_ASSERT(false); }
896 
StartCall()897   void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
898     // This call initiates two batches, plus any backlog, each with a callback
899     // 1. Send initial metadata (unless corked) + recv initial metadata
900     // 2. Any backlog
901     // 3. Recv trailing metadata
902 
903     if (!start_corked_) {
904       start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
905                                      context_->initial_metadata_flags());
906     }
907     call_.PerformOps(&start_ops_);
908 
909     {
910       grpc::internal::MutexLock lock(&start_mu_);
911 
912       if (backlog_.write_ops) {
913         call_.PerformOps(&write_ops_);
914       }
915       if (backlog_.writes_done_ops) {
916         call_.PerformOps(&writes_done_ops_);
917       }
918       call_.PerformOps(&finish_ops_);
919       // The last thing in this critical section is to set started_ so that it
920       // can be used lock-free as well.
921       started_.store(true, std::memory_order_release);
922     }
923     // MaybeFinish outside the lock to make sure that destruction of this object
924     // doesn't take place while holding the lock (which would cause the lock to
925     // be released after destruction)
926     this->MaybeFinish(/*from_reaction=*/false);
927   }
928 
Write(const Request * msg,grpc::WriteOptions options)929   void Write(const Request* msg, grpc::WriteOptions options)
930       ABSL_LOCKS_EXCLUDED(start_mu_) override {
931     if (GPR_UNLIKELY(options.is_last_message())) {
932       options.set_buffer_hint();
933       write_ops_.ClientSendClose();
934     }
935     // TODO(vjpai): don't assert
936     GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
937     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
938 
939     if (GPR_UNLIKELY(corked_write_needed_)) {
940       write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
941                                      context_->initial_metadata_flags());
942       corked_write_needed_ = false;
943     }
944 
945     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
946       grpc::internal::MutexLock lock(&start_mu_);
947       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
948         backlog_.write_ops = true;
949         return;
950       }
951     }
952     call_.PerformOps(&write_ops_);
953   }
954 
WritesDone()955   void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
956     writes_done_ops_.ClientSendClose();
957     writes_done_tag_.Set(
958         call_.call(),
959         [this](bool ok) {
960           reactor_->OnWritesDoneDone(ok);
961           MaybeFinish(/*from_reaction=*/true);
962         },
963         &writes_done_ops_, /*can_inline=*/false);
964     writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
965     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
966 
967     if (GPR_UNLIKELY(corked_write_needed_)) {
968       writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
969                                            context_->initial_metadata_flags());
970       corked_write_needed_ = false;
971     }
972 
973     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
974       grpc::internal::MutexLock lock(&start_mu_);
975       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
976         backlog_.writes_done_ops = true;
977         return;
978       }
979     }
980     call_.PerformOps(&writes_done_ops_);
981   }
982 
AddHold(int holds)983   void AddHold(int holds) override {
984     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
985   }
RemoveHold()986   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
987 
988  private:
989   friend class ClientCallbackWriterFactory<Request>;
990 
991   template <class Response>
ClientCallbackWriterImpl(grpc::internal::Call call,grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)992   ClientCallbackWriterImpl(grpc::internal::Call call,
993                            grpc::ClientContext* context, Response* response,
994                            ClientWriteReactor<Request>* reactor)
995       : context_(context),
996         call_(call),
997         reactor_(reactor),
998         start_corked_(context_->initial_metadata_corked_),
999         corked_write_needed_(start_corked_) {
1000     this->BindReactor(reactor);
1001 
1002     // Set up the unchanging parts of the start and write tags and ops.
1003     start_tag_.Set(
1004         call_.call(),
1005         [this](bool ok) {
1006           reactor_->OnReadInitialMetadataDone(
1007               ok && !reactor_->InternalTrailersOnly(call_.call()));
1008           MaybeFinish(/*from_reaction=*/true);
1009         },
1010         &start_ops_, /*can_inline=*/false);
1011     start_ops_.RecvInitialMetadata(context_);
1012     start_ops_.set_core_cq_tag(&start_tag_);
1013 
1014     write_tag_.Set(
1015         call_.call(),
1016         [this](bool ok) {
1017           reactor_->OnWriteDone(ok);
1018           MaybeFinish(/*from_reaction=*/true);
1019         },
1020         &write_ops_, /*can_inline=*/false);
1021     write_ops_.set_core_cq_tag(&write_tag_);
1022 
1023     // Also set up the Finish tag and op set.
1024     finish_ops_.RecvMessage(response);
1025     finish_ops_.AllowNoMessage();
1026     finish_tag_.Set(
1027         call_.call(),
1028         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1029         &finish_ops_,
1030         /*can_inline=*/false);
1031     finish_ops_.ClientRecvStatus(context_, &finish_status_);
1032     finish_ops_.set_core_cq_tag(&finish_tag_);
1033   }
1034 
1035   // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)1036   void MaybeFinish(bool from_reaction) {
1037     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1038                          1, std::memory_order_acq_rel) == 1)) {
1039       grpc::Status s = std::move(finish_status_);
1040       auto* reactor = reactor_;
1041       auto* call = call_.call();
1042       this->~ClientCallbackWriterImpl();
1043       grpc_call_unref(call);
1044       if (GPR_LIKELY(from_reaction)) {
1045         reactor->OnDone(s);
1046       } else {
1047         reactor->InternalScheduleOnDone(std::move(s));
1048       }
1049     }
1050   }
1051 
1052   grpc::ClientContext* const context_;
1053   grpc::internal::Call call_;
1054   ClientWriteReactor<Request>* const reactor_;
1055 
1056   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1057                             grpc::internal::CallOpRecvInitialMetadata>
1058       start_ops_;
1059   grpc::internal::CallbackWithSuccessTag start_tag_;
1060   const bool start_corked_;
1061   bool corked_write_needed_;  // no lock needed since only accessed in
1062                               // Write/WritesDone which cannot be concurrent
1063 
1064   grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1065                             grpc::internal::CallOpClientRecvStatus>
1066       finish_ops_;
1067   grpc::internal::CallbackWithSuccessTag finish_tag_;
1068   grpc::Status finish_status_;
1069 
1070   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1071                             grpc::internal::CallOpSendMessage,
1072                             grpc::internal::CallOpClientSendClose>
1073       write_ops_;
1074   grpc::internal::CallbackWithSuccessTag write_tag_;
1075 
1076   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1077                             grpc::internal::CallOpClientSendClose>
1078       writes_done_ops_;
1079   grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1080 
1081   struct StartCallBacklog {
1082     bool write_ops = false;
1083     bool writes_done_ops = false;
1084   };
1085   StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1086 
1087   // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1088   std::atomic<intptr_t> callbacks_outstanding_{3};
1089   std::atomic_bool started_{false};
1090   grpc::internal::Mutex start_mu_;
1091 };
1092 
1093 template <class Request>
1094 class ClientCallbackWriterFactory {
1095  public:
1096   template <class Response>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)1097   static void Create(grpc::ChannelInterface* channel,
1098                      const grpc::internal::RpcMethod& method,
1099                      grpc::ClientContext* context, Response* response,
1100                      ClientWriteReactor<Request>* reactor) {
1101     grpc::internal::Call call =
1102         channel->CreateCall(method, context, channel->CallbackCQ());
1103 
1104     grpc_call_ref(call.call());
1105     new (grpc_call_arena_alloc(call.call(),
1106                                sizeof(ClientCallbackWriterImpl<Request>)))
1107         ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1108   }
1109 };
1110 
1111 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1112  public:
1113   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)1114   static void operator delete(void* /*ptr*/, std::size_t size) {
1115     GPR_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1116   }
1117 
1118   // This operator should never be called as the memory should be freed as part
1119   // of the arena destruction. It only exists to provide a matching operator
1120   // delete to the operator new so that some compilers will not complain (see
1121   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1122   // there are no tests catching the compiler warning.
delete(void *,void *)1123   static void operator delete(void*, void*) { GPR_ASSERT(false); }
1124 
StartCall()1125   void StartCall() override {
1126     // This call initiates two batches, each with a callback
1127     // 1. Send initial metadata + write + writes done + recv initial metadata
1128     // 2. Read message, recv trailing metadata
1129 
1130     start_tag_.Set(
1131         call_.call(),
1132         [this](bool ok) {
1133           reactor_->OnReadInitialMetadataDone(
1134               ok && !reactor_->InternalTrailersOnly(call_.call()));
1135           MaybeFinish();
1136         },
1137         &start_ops_, /*can_inline=*/false);
1138     start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1139                                    context_->initial_metadata_flags());
1140     start_ops_.RecvInitialMetadata(context_);
1141     start_ops_.set_core_cq_tag(&start_tag_);
1142     call_.PerformOps(&start_ops_);
1143 
1144     finish_tag_.Set(
1145         call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1146         /*can_inline=*/false);
1147     finish_ops_.ClientRecvStatus(context_, &finish_status_);
1148     finish_ops_.set_core_cq_tag(&finish_tag_);
1149     call_.PerformOps(&finish_ops_);
1150   }
1151 
1152  private:
1153   friend class ClientCallbackUnaryFactory;
1154 
1155   template <class Request, class Response>
ClientCallbackUnaryImpl(grpc::internal::Call call,grpc::ClientContext * context,Request * request,Response * response,ClientUnaryReactor * reactor)1156   ClientCallbackUnaryImpl(grpc::internal::Call call,
1157                           grpc::ClientContext* context, Request* request,
1158                           Response* response, ClientUnaryReactor* reactor)
1159       : context_(context), call_(call), reactor_(reactor) {
1160     this->BindReactor(reactor);
1161     // TODO(vjpai): don't assert
1162     GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
1163     start_ops_.ClientSendClose();
1164     finish_ops_.RecvMessage(response);
1165     finish_ops_.AllowNoMessage();
1166   }
1167 
1168   // In the unary case, MaybeFinish is only ever invoked from a
1169   // library-initiated reaction, so it will just directly call OnDone if this is
1170   // the last reaction for this RPC.
MaybeFinish()1171   void MaybeFinish() {
1172     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1173                          1, std::memory_order_acq_rel) == 1)) {
1174       grpc::Status s = std::move(finish_status_);
1175       auto* reactor = reactor_;
1176       auto* call = call_.call();
1177       this->~ClientCallbackUnaryImpl();
1178       grpc_call_unref(call);
1179       reactor->OnDone(s);
1180     }
1181   }
1182 
1183   grpc::ClientContext* const context_;
1184   grpc::internal::Call call_;
1185   ClientUnaryReactor* const reactor_;
1186 
1187   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1188                             grpc::internal::CallOpSendMessage,
1189                             grpc::internal::CallOpClientSendClose,
1190                             grpc::internal::CallOpRecvInitialMetadata>
1191       start_ops_;
1192   grpc::internal::CallbackWithSuccessTag start_tag_;
1193 
1194   grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1195                             grpc::internal::CallOpClientRecvStatus>
1196       finish_ops_;
1197   grpc::internal::CallbackWithSuccessTag finish_tag_;
1198   grpc::Status finish_status_;
1199 
1200   // This call will have 2 callbacks: start and finish
1201   std::atomic<intptr_t> callbacks_outstanding_{2};
1202 };
1203 
1204 class ClientCallbackUnaryFactory {
1205  public:
1206   template <class Request, class Response, class BaseRequest = Request,
1207             class BaseResponse = Response>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const Request * request,Response * response,ClientUnaryReactor * reactor)1208   static void Create(grpc::ChannelInterface* channel,
1209                      const grpc::internal::RpcMethod& method,
1210                      grpc::ClientContext* context, const Request* request,
1211                      Response* response, ClientUnaryReactor* reactor) {
1212     grpc::internal::Call call =
1213         channel->CreateCall(method, context, channel->CallbackCQ());
1214 
1215     grpc_call_ref(call.call());
1216 
1217     new (grpc_call_arena_alloc(call.call(), sizeof(ClientCallbackUnaryImpl)))
1218         ClientCallbackUnaryImpl(call, context,
1219                                 static_cast<const BaseRequest*>(request),
1220                                 static_cast<BaseResponse*>(response), reactor);
1221   }
1222 };
1223 
1224 }  // namespace internal
1225 }  // namespace grpc
1226 
1227 #endif  // GRPCPP_SUPPORT_CLIENT_CALLBACK_H
1228