1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H
20 #define GRPCPP_SUPPORT_CLIENT_CALLBACK_H
21
22 #include <atomic>
23 #include <functional>
24
25 #include <grpc/grpc.h>
26 #include <grpc/support/log.h>
27 #include <grpcpp/impl/call.h>
28 #include <grpcpp/impl/call_op_set.h>
29 #include <grpcpp/impl/sync.h>
30 #include <grpcpp/support/callback_common.h>
31 #include <grpcpp/support/config.h>
32 #include <grpcpp/support/status.h>
33
34 namespace grpc {
35 class Channel;
36 class ClientContext;
37
38 namespace internal {
39 class RpcMethod;
40
41 /// Perform a callback-based unary call. May optionally specify the base
42 /// class of the Request and Response so that the internal calls and structures
43 /// below this may be based on those base classes and thus achieve code reuse
44 /// across different RPCs (e.g., for protobuf, MessageLite would be a base
45 /// class).
46 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
47 template <class InputMessage, class OutputMessage,
48 class BaseInputMessage = InputMessage,
49 class BaseOutputMessage = OutputMessage>
CallbackUnaryCall(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (grpc::Status)> on_completion)50 void CallbackUnaryCall(grpc::ChannelInterface* channel,
51 const grpc::internal::RpcMethod& method,
52 grpc::ClientContext* context,
53 const InputMessage* request, OutputMessage* result,
54 std::function<void(grpc::Status)> on_completion) {
55 static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
56 "Invalid input message specification");
57 static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
58 "Invalid output message specification");
59 CallbackUnaryCallImpl<BaseInputMessage, BaseOutputMessage> x(
60 channel, method, context, request, result, on_completion);
61 }
62
63 template <class InputMessage, class OutputMessage>
64 class CallbackUnaryCallImpl {
65 public:
CallbackUnaryCallImpl(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (grpc::Status)> on_completion)66 CallbackUnaryCallImpl(grpc::ChannelInterface* channel,
67 const grpc::internal::RpcMethod& method,
68 grpc::ClientContext* context,
69 const InputMessage* request, OutputMessage* result,
70 std::function<void(grpc::Status)> on_completion) {
71 grpc::CompletionQueue* cq = channel->CallbackCQ();
72 GPR_ASSERT(cq != nullptr);
73 grpc::internal::Call call(channel->CreateCall(method, context, cq));
74
75 using FullCallOpSet = grpc::internal::CallOpSet<
76 grpc::internal::CallOpSendInitialMetadata,
77 grpc::internal::CallOpSendMessage,
78 grpc::internal::CallOpRecvInitialMetadata,
79 grpc::internal::CallOpRecvMessage<OutputMessage>,
80 grpc::internal::CallOpClientSendClose,
81 grpc::internal::CallOpClientRecvStatus>;
82
83 struct OpSetAndTag {
84 FullCallOpSet opset;
85 grpc::internal::CallbackWithStatusTag tag;
86 };
87 const size_t alloc_sz = sizeof(OpSetAndTag);
88 auto* const alloced =
89 static_cast<OpSetAndTag*>(grpc_call_arena_alloc(call.call(), alloc_sz));
90 auto* ops = new (&alloced->opset) FullCallOpSet;
91 auto* tag = new (&alloced->tag)
92 grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
93
94 // TODO(vjpai): Unify code with sync API as much as possible
95 grpc::Status s = ops->SendMessagePtr(request);
96 if (!s.ok()) {
97 tag->force_run(s);
98 return;
99 }
100 ops->SendInitialMetadata(&context->send_initial_metadata_,
101 context->initial_metadata_flags());
102 ops->RecvInitialMetadata(context);
103 ops->RecvMessage(result);
104 ops->AllowNoMessage();
105 ops->ClientSendClose();
106 ops->ClientRecvStatus(context, tag->status_ptr());
107 ops->set_core_cq_tag(tag);
108 call.PerformOps(ops);
109 }
110 };
111
112 // Base class for public API classes.
113 class ClientReactor {
114 public:
115 virtual ~ClientReactor() = default;
116
117 /// Called by the library when all operations associated with this RPC have
118 /// completed and all Holds have been removed. OnDone provides the RPC status
119 /// outcome for both successful and failed RPCs. If it is never called on an
120 /// RPC, it indicates an application-level problem (like failure to remove a
121 /// hold).
122 ///
123 /// \param[in] s The status outcome of this RPC
124 virtual void OnDone(const grpc::Status& /*s*/) = 0;
125
126 /// InternalScheduleOnDone is not part of the API and is not meant to be
127 /// overridden. It is virtual to allow successful builds for certain bazel
128 /// build users that only want to depend on gRPC codegen headers and not the
129 /// full library (although this is not a generally-supported option). Although
130 /// the virtual call is slower than a direct call, this function is
131 /// heavyweight and the cost of the virtual call is not much in comparison.
132 /// This function may be removed or devirtualized in the future.
133 virtual void InternalScheduleOnDone(grpc::Status s);
134
135 /// InternalTrailersOnly is not part of the API and is not meant to be
136 /// overridden. It is virtual to allow successful builds for certain bazel
137 /// build users that only want to depend on gRPC codegen headers and not the
138 /// full library (although this is not a generally-supported option). Although
139 /// the virtual call is slower than a direct call, this function is
140 /// heavyweight and the cost of the virtual call is not much in comparison.
141 /// This function may be removed or devirtualized in the future.
142 virtual bool InternalTrailersOnly(const grpc_call* call) const;
143 };
144
145 } // namespace internal
146
147 // Forward declarations
148 template <class Request, class Response>
149 class ClientBidiReactor;
150 template <class Response>
151 class ClientReadReactor;
152 template <class Request>
153 class ClientWriteReactor;
154 class ClientUnaryReactor;
155
156 // NOTE: The streaming objects are not actually implemented in the public API.
157 // These interfaces are provided for mocking only. Typical applications
158 // will interact exclusively with the reactors that they define.
159 template <class Request, class Response>
160 class ClientCallbackReaderWriter {
161 public:
~ClientCallbackReaderWriter()162 virtual ~ClientCallbackReaderWriter() {}
163 virtual void StartCall() = 0;
164 virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
165 virtual void WritesDone() = 0;
166 virtual void Read(Response* resp) = 0;
167 virtual void AddHold(int holds) = 0;
168 virtual void RemoveHold() = 0;
169
170 protected:
BindReactor(ClientBidiReactor<Request,Response> * reactor)171 void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
172 reactor->BindStream(this);
173 }
174 };
175
176 template <class Response>
177 class ClientCallbackReader {
178 public:
~ClientCallbackReader()179 virtual ~ClientCallbackReader() {}
180 virtual void StartCall() = 0;
181 virtual void Read(Response* resp) = 0;
182 virtual void AddHold(int holds) = 0;
183 virtual void RemoveHold() = 0;
184
185 protected:
BindReactor(ClientReadReactor<Response> * reactor)186 void BindReactor(ClientReadReactor<Response>* reactor) {
187 reactor->BindReader(this);
188 }
189 };
190
191 template <class Request>
192 class ClientCallbackWriter {
193 public:
~ClientCallbackWriter()194 virtual ~ClientCallbackWriter() {}
195 virtual void StartCall() = 0;
Write(const Request * req)196 void Write(const Request* req) { Write(req, grpc::WriteOptions()); }
197 virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
WriteLast(const Request * req,grpc::WriteOptions options)198 void WriteLast(const Request* req, grpc::WriteOptions options) {
199 Write(req, options.set_last_message());
200 }
201 virtual void WritesDone() = 0;
202
203 virtual void AddHold(int holds) = 0;
204 virtual void RemoveHold() = 0;
205
206 protected:
BindReactor(ClientWriteReactor<Request> * reactor)207 void BindReactor(ClientWriteReactor<Request>* reactor) {
208 reactor->BindWriter(this);
209 }
210 };
211
212 class ClientCallbackUnary {
213 public:
~ClientCallbackUnary()214 virtual ~ClientCallbackUnary() {}
215 virtual void StartCall() = 0;
216
217 protected:
218 void BindReactor(ClientUnaryReactor* reactor);
219 };
220
221 // The following classes are the reactor interfaces that are to be implemented
222 // by the user. They are passed in to the library as an argument to a call on a
223 // stub (either a codegen-ed call or a generic call). The streaming RPC is
224 // activated by calling StartCall, possibly after initiating StartRead,
225 // StartWrite, or AddHold operations on the streaming object. Note that none of
226 // the classes are pure; all reactions have a default empty reaction so that the
227 // user class only needs to override those reactions that it cares about.
228 // The reactor must be passed to the stub invocation before any of the below
229 // operations can be called and its reactions will be invoked by the library in
230 // response to the completion of various operations. Reactions must not include
231 // blocking operations (such as blocking I/O, starting synchronous RPCs, or
232 // waiting on condition variables). Reactions may be invoked concurrently,
233 // except that OnDone is called after all others (assuming proper API usage).
234 // The reactor may not be deleted until OnDone is called.
235
236 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
237 template <class Request, class Response>
238 class ClientBidiReactor : public internal::ClientReactor {
239 public:
240 /// Activate the RPC and initiate any reads or writes that have been Start'ed
241 /// before this call. All streaming RPCs issued by the client MUST have
242 /// StartCall invoked on them (even if they are canceled) as this call is the
243 /// activation of their lifecycle.
StartCall()244 void StartCall() { stream_->StartCall(); }
245
246 /// Initiate a read operation (or post it for later initiation if StartCall
247 /// has not yet been invoked).
248 ///
249 /// \param[out] resp Where to eventually store the read message. Valid when
250 /// the library calls OnReadDone
StartRead(Response * resp)251 void StartRead(Response* resp) { stream_->Read(resp); }
252
253 /// Initiate a write operation (or post it for later initiation if StartCall
254 /// has not yet been invoked).
255 ///
256 /// \param[in] req The message to be written. The library does not take
257 /// ownership but the caller must ensure that the message is
258 /// not deleted or modified until OnWriteDone is called.
StartWrite(const Request * req)259 void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
260
261 /// Initiate/post a write operation with specified options.
262 ///
263 /// \param[in] req The message to be written. The library does not take
264 /// ownership but the caller must ensure that the message is
265 /// not deleted or modified until OnWriteDone is called.
266 /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Request * req,grpc::WriteOptions options)267 void StartWrite(const Request* req, grpc::WriteOptions options) {
268 stream_->Write(req, options);
269 }
270
271 /// Initiate/post a write operation with specified options and an indication
272 /// that this is the last write (like StartWrite and StartWritesDone, merged).
273 /// Note that calling this means that no more calls to StartWrite,
274 /// StartWriteLast, or StartWritesDone are allowed.
275 ///
276 /// \param[in] req The message to be written. The library does not take
277 /// ownership but the caller must ensure that the message is
278 /// not deleted or modified until OnWriteDone is called.
279 /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Request * req,grpc::WriteOptions options)280 void StartWriteLast(const Request* req, grpc::WriteOptions options) {
281 StartWrite(req, options.set_last_message());
282 }
283
284 /// Indicate that the RPC will have no more write operations. This can only be
285 /// issued once for a given RPC. This is not required or allowed if
286 /// StartWriteLast is used since that already has the same implication.
287 /// Note that calling this means that no more calls to StartWrite,
288 /// StartWriteLast, or StartWritesDone are allowed.
StartWritesDone()289 void StartWritesDone() { stream_->WritesDone(); }
290
291 /// Holds are needed if (and only if) this stream has operations that take
292 /// place on it after StartCall but from outside one of the reactions
293 /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
294 ///
295 /// Holds must be added before calling StartCall. If a stream still has a hold
296 /// in place, its resources will not be destroyed even if the status has
297 /// already come in from the wire and there are currently no active callbacks
298 /// outstanding. Similarly, the stream will not call OnDone if there are still
299 /// holds on it.
300 ///
301 /// For example, if a StartRead or StartWrite operation is going to be
302 /// initiated from elsewhere in the application, the application should call
303 /// AddHold or AddMultipleHolds before StartCall. If there is going to be,
304 /// for example, a read-flow and a write-flow taking place outside the
305 /// reactions, then call AddMultipleHolds(2) before StartCall. When the
306 /// application knows that it won't issue any more read operations (such as
307 /// when a read comes back as not ok), it should issue a RemoveHold(). It
308 /// should also call RemoveHold() again after it does StartWriteLast or
309 /// StartWritesDone that indicates that there will be no more write ops.
310 /// The number of RemoveHold calls must match the total number of AddHold
311 /// calls plus the number of holds added by AddMultipleHolds.
312 /// The argument to AddMultipleHolds must be positive.
AddHold()313 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)314 void AddMultipleHolds(int holds) {
315 GPR_DEBUG_ASSERT(holds > 0);
316 stream_->AddHold(holds);
317 }
RemoveHold()318 void RemoveHold() { stream_->RemoveHold(); }
319
320 /// Notifies the application that all operations associated with this RPC
321 /// have completed and all Holds have been removed. OnDone provides the RPC
322 /// status outcome for both successful and failed RPCs and will be called in
323 /// all cases. If it is not called, it indicates an application-level problem
324 /// (like failure to remove a hold).
325 ///
326 /// \param[in] s The status outcome of this RPC
OnDone(const grpc::Status &)327 void OnDone(const grpc::Status& /*s*/) override {}
328
329 /// Notifies the application that a read of initial metadata from the
330 /// server is done. If the application chooses not to implement this method,
331 /// it can assume that the initial metadata has been read before the first
332 /// call of OnReadDone or OnDone.
333 ///
334 /// \param[in] ok Was the initial metadata read successfully? If false, no
335 /// new read/write operation will succeed, and any further
336 /// Start* operations should not be called.
OnReadInitialMetadataDone(bool)337 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
338
339 /// Notifies the application that a StartRead operation completed.
340 ///
341 /// \param[in] ok Was it successful? If false, no new read/write operation
342 /// will succeed, and any further Start* should not be called.
OnReadDone(bool)343 virtual void OnReadDone(bool /*ok*/) {}
344
345 /// Notifies the application that a StartWrite or StartWriteLast operation
346 /// completed.
347 ///
348 /// \param[in] ok Was it successful? If false, no new read/write operation
349 /// will succeed, and any further Start* should not be called.
OnWriteDone(bool)350 virtual void OnWriteDone(bool /*ok*/) {}
351
352 /// Notifies the application that a StartWritesDone operation completed. Note
353 /// that this is only used on explicit StartWritesDone operations and not for
354 /// those that are implicitly invoked as part of a StartWriteLast.
355 ///
356 /// \param[in] ok Was it successful? If false, the application will later see
357 /// the failure reflected as a bad status in OnDone and no
358 /// further Start* should be called.
OnWritesDoneDone(bool)359 virtual void OnWritesDoneDone(bool /*ok*/) {}
360
361 private:
362 friend class ClientCallbackReaderWriter<Request, Response>;
BindStream(ClientCallbackReaderWriter<Request,Response> * stream)363 void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
364 stream_ = stream;
365 }
366 ClientCallbackReaderWriter<Request, Response>* stream_;
367 };
368
369 /// \a ClientReadReactor is the interface for a server-streaming RPC.
370 /// All public methods behave as in ClientBidiReactor.
371 template <class Response>
372 class ClientReadReactor : public internal::ClientReactor {
373 public:
StartCall()374 void StartCall() { reader_->StartCall(); }
StartRead(Response * resp)375 void StartRead(Response* resp) { reader_->Read(resp); }
376
AddHold()377 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)378 void AddMultipleHolds(int holds) {
379 GPR_DEBUG_ASSERT(holds > 0);
380 reader_->AddHold(holds);
381 }
RemoveHold()382 void RemoveHold() { reader_->RemoveHold(); }
383
OnDone(const grpc::Status &)384 void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)385 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)386 virtual void OnReadDone(bool /*ok*/) {}
387
388 private:
389 friend class ClientCallbackReader<Response>;
BindReader(ClientCallbackReader<Response> * reader)390 void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
391 ClientCallbackReader<Response>* reader_;
392 };
393
394 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
395 /// All public methods behave as in ClientBidiReactor.
396 template <class Request>
397 class ClientWriteReactor : public internal::ClientReactor {
398 public:
StartCall()399 void StartCall() { writer_->StartCall(); }
StartWrite(const Request * req)400 void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
StartWrite(const Request * req,grpc::WriteOptions options)401 void StartWrite(const Request* req, grpc::WriteOptions options) {
402 writer_->Write(req, options);
403 }
StartWriteLast(const Request * req,grpc::WriteOptions options)404 void StartWriteLast(const Request* req, grpc::WriteOptions options) {
405 StartWrite(req, options.set_last_message());
406 }
StartWritesDone()407 void StartWritesDone() { writer_->WritesDone(); }
408
AddHold()409 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)410 void AddMultipleHolds(int holds) {
411 GPR_DEBUG_ASSERT(holds > 0);
412 writer_->AddHold(holds);
413 }
RemoveHold()414 void RemoveHold() { writer_->RemoveHold(); }
415
OnDone(const grpc::Status &)416 void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)417 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)418 virtual void OnWriteDone(bool /*ok*/) {}
OnWritesDoneDone(bool)419 virtual void OnWritesDoneDone(bool /*ok*/) {}
420
421 private:
422 friend class ClientCallbackWriter<Request>;
BindWriter(ClientCallbackWriter<Request> * writer)423 void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
424
425 ClientCallbackWriter<Request>* writer_;
426 };
427
428 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
429 /// This is _not_ a common way of invoking a unary RPC. In practice, this
430 /// option should be used only if the unary RPC wants to receive initial
431 /// metadata without waiting for the response to complete. Most deployments of
432 /// RPC systems do not use this option, but it is needed for generality.
433 /// All public methods behave as in ClientBidiReactor.
434 /// StartCall is included for consistency with the other reactor flavors: even
435 /// though there are no StartRead or StartWrite operations to queue before the
436 /// call (that is part of the unary call itself) and there is no reactor object
437 /// being created as a result of this call, we keep a consistent 2-phase
438 /// initiation API among all the reactor flavors.
439 class ClientUnaryReactor : public internal::ClientReactor {
440 public:
StartCall()441 void StartCall() { call_->StartCall(); }
OnDone(const grpc::Status &)442 void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)443 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
444
445 private:
446 friend class ClientCallbackUnary;
BindCall(ClientCallbackUnary * call)447 void BindCall(ClientCallbackUnary* call) { call_ = call; }
448 ClientCallbackUnary* call_;
449 };
450
451 // Define function out-of-line from class to avoid forward declaration issue
BindReactor(ClientUnaryReactor * reactor)452 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
453 reactor->BindCall(this);
454 }
455
456 namespace internal {
457
458 // Forward declare factory classes for friendship
459 template <class Request, class Response>
460 class ClientCallbackReaderWriterFactory;
461 template <class Response>
462 class ClientCallbackReaderFactory;
463 template <class Request>
464 class ClientCallbackWriterFactory;
465
466 template <class Request, class Response>
467 class ClientCallbackReaderWriterImpl
468 : public ClientCallbackReaderWriter<Request, Response> {
469 public:
470 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)471 static void operator delete(void* /*ptr*/, std::size_t size) {
472 GPR_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
473 }
474
475 // This operator should never be called as the memory should be freed as part
476 // of the arena destruction. It only exists to provide a matching operator
477 // delete to the operator new so that some compilers will not complain (see
478 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
479 // there are no tests catching the compiler warning.
delete(void *,void *)480 static void operator delete(void*, void*) { GPR_ASSERT(false); }
481
StartCall()482 void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
483 // This call initiates two batches, plus any backlog, each with a callback
484 // 1. Send initial metadata (unless corked) + recv initial metadata
485 // 2. Any read backlog
486 // 3. Any write backlog
487 // 4. Recv trailing metadata (unless corked)
488 if (!start_corked_) {
489 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
490 context_->initial_metadata_flags());
491 }
492
493 call_.PerformOps(&start_ops_);
494
495 {
496 grpc::internal::MutexLock lock(&start_mu_);
497
498 if (backlog_.read_ops) {
499 call_.PerformOps(&read_ops_);
500 }
501 if (backlog_.write_ops) {
502 call_.PerformOps(&write_ops_);
503 }
504 if (backlog_.writes_done_ops) {
505 call_.PerformOps(&writes_done_ops_);
506 }
507 call_.PerformOps(&finish_ops_);
508 // The last thing in this critical section is to set started_ so that it
509 // can be used lock-free as well.
510 started_.store(true, std::memory_order_release);
511 }
512 // MaybeFinish outside the lock to make sure that destruction of this object
513 // doesn't take place while holding the lock (which would cause the lock to
514 // be released after destruction)
515 this->MaybeFinish(/*from_reaction=*/false);
516 }
517
Read(Response * msg)518 void Read(Response* msg) override {
519 read_ops_.RecvMessage(msg);
520 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
521 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
522 grpc::internal::MutexLock lock(&start_mu_);
523 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
524 backlog_.read_ops = true;
525 return;
526 }
527 }
528 call_.PerformOps(&read_ops_);
529 }
530
Write(const Request * msg,grpc::WriteOptions options)531 void Write(const Request* msg, grpc::WriteOptions options)
532 ABSL_LOCKS_EXCLUDED(start_mu_) override {
533 if (options.is_last_message()) {
534 options.set_buffer_hint();
535 write_ops_.ClientSendClose();
536 }
537 // TODO(vjpai): don't assert
538 GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
539 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
540 if (GPR_UNLIKELY(corked_write_needed_)) {
541 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
542 context_->initial_metadata_flags());
543 corked_write_needed_ = false;
544 }
545
546 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
547 grpc::internal::MutexLock lock(&start_mu_);
548 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
549 backlog_.write_ops = true;
550 return;
551 }
552 }
553 call_.PerformOps(&write_ops_);
554 }
WritesDone()555 void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
556 writes_done_ops_.ClientSendClose();
557 writes_done_tag_.Set(
558 call_.call(),
559 [this](bool ok) {
560 reactor_->OnWritesDoneDone(ok);
561 MaybeFinish(/*from_reaction=*/true);
562 },
563 &writes_done_ops_, /*can_inline=*/false);
564 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
565 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
566 if (GPR_UNLIKELY(corked_write_needed_)) {
567 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
568 context_->initial_metadata_flags());
569 corked_write_needed_ = false;
570 }
571 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
572 grpc::internal::MutexLock lock(&start_mu_);
573 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
574 backlog_.writes_done_ops = true;
575 return;
576 }
577 }
578 call_.PerformOps(&writes_done_ops_);
579 }
580
AddHold(int holds)581 void AddHold(int holds) override {
582 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
583 }
RemoveHold()584 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
585
586 private:
587 friend class ClientCallbackReaderWriterFactory<Request, Response>;
588
ClientCallbackReaderWriterImpl(grpc::internal::Call call,grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)589 ClientCallbackReaderWriterImpl(grpc::internal::Call call,
590 grpc::ClientContext* context,
591 ClientBidiReactor<Request, Response>* reactor)
592 : context_(context),
593 call_(call),
594 reactor_(reactor),
595 start_corked_(context_->initial_metadata_corked_),
596 corked_write_needed_(start_corked_) {
597 this->BindReactor(reactor);
598
599 // Set up the unchanging parts of the start, read, and write tags and ops.
600 start_tag_.Set(
601 call_.call(),
602 [this](bool ok) {
603 reactor_->OnReadInitialMetadataDone(
604 ok && !reactor_->InternalTrailersOnly(call_.call()));
605 MaybeFinish(/*from_reaction=*/true);
606 },
607 &start_ops_, /*can_inline=*/false);
608 start_ops_.RecvInitialMetadata(context_);
609 start_ops_.set_core_cq_tag(&start_tag_);
610
611 write_tag_.Set(
612 call_.call(),
613 [this](bool ok) {
614 reactor_->OnWriteDone(ok);
615 MaybeFinish(/*from_reaction=*/true);
616 },
617 &write_ops_, /*can_inline=*/false);
618 write_ops_.set_core_cq_tag(&write_tag_);
619
620 read_tag_.Set(
621 call_.call(),
622 [this](bool ok) {
623 reactor_->OnReadDone(ok);
624 MaybeFinish(/*from_reaction=*/true);
625 },
626 &read_ops_, /*can_inline=*/false);
627 read_ops_.set_core_cq_tag(&read_tag_);
628
629 // Also set up the Finish tag and op set.
630 finish_tag_.Set(
631 call_.call(),
632 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
633 &finish_ops_,
634 /*can_inline=*/false);
635 finish_ops_.ClientRecvStatus(context_, &finish_status_);
636 finish_ops_.set_core_cq_tag(&finish_tag_);
637 }
638
639 // MaybeFinish can be called from reactions or from user-initiated operations
640 // like StartCall or RemoveHold. If this is the last operation or hold on this
641 // object, it will invoke the OnDone reaction. If MaybeFinish was called from
642 // a reaction, it can call OnDone directly. If not, it would need to schedule
643 // OnDone onto an executor thread to avoid the possibility of deadlocking with
644 // any locks in the user code that invoked it.
MaybeFinish(bool from_reaction)645 void MaybeFinish(bool from_reaction) {
646 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
647 1, std::memory_order_acq_rel) == 1)) {
648 grpc::Status s = std::move(finish_status_);
649 auto* reactor = reactor_;
650 auto* call = call_.call();
651 this->~ClientCallbackReaderWriterImpl();
652 grpc_call_unref(call);
653 if (GPR_LIKELY(from_reaction)) {
654 reactor->OnDone(s);
655 } else {
656 reactor->InternalScheduleOnDone(std::move(s));
657 }
658 }
659 }
660
661 grpc::ClientContext* const context_;
662 grpc::internal::Call call_;
663 ClientBidiReactor<Request, Response>* const reactor_;
664
665 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
666 grpc::internal::CallOpRecvInitialMetadata>
667 start_ops_;
668 grpc::internal::CallbackWithSuccessTag start_tag_;
669 const bool start_corked_;
670 bool corked_write_needed_; // no lock needed since only accessed in
671 // Write/WritesDone which cannot be concurrent
672
673 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
674 grpc::internal::CallbackWithSuccessTag finish_tag_;
675 grpc::Status finish_status_;
676
677 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
678 grpc::internal::CallOpSendMessage,
679 grpc::internal::CallOpClientSendClose>
680 write_ops_;
681 grpc::internal::CallbackWithSuccessTag write_tag_;
682
683 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
684 grpc::internal::CallOpClientSendClose>
685 writes_done_ops_;
686 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
687
688 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
689 read_ops_;
690 grpc::internal::CallbackWithSuccessTag read_tag_;
691
692 struct StartCallBacklog {
693 bool write_ops = false;
694 bool writes_done_ops = false;
695 bool read_ops = false;
696 };
697 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
698
699 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
700 std::atomic<intptr_t> callbacks_outstanding_{3};
701 std::atomic_bool started_{false};
702 grpc::internal::Mutex start_mu_;
703 };
704
705 template <class Request, class Response>
706 class ClientCallbackReaderWriterFactory {
707 public:
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)708 static void Create(grpc::ChannelInterface* channel,
709 const grpc::internal::RpcMethod& method,
710 grpc::ClientContext* context,
711 ClientBidiReactor<Request, Response>* reactor) {
712 grpc::internal::Call call =
713 channel->CreateCall(method, context, channel->CallbackCQ());
714
715 grpc_call_ref(call.call());
716 new (grpc_call_arena_alloc(
717 call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
718 ClientCallbackReaderWriterImpl<Request, Response>(call, context,
719 reactor);
720 }
721 };
722
723 template <class Response>
724 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
725 public:
726 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)727 static void operator delete(void* /*ptr*/, std::size_t size) {
728 GPR_ASSERT(size == sizeof(ClientCallbackReaderImpl));
729 }
730
731 // This operator should never be called as the memory should be freed as part
732 // of the arena destruction. It only exists to provide a matching operator
733 // delete to the operator new so that some compilers will not complain (see
734 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
735 // there are no tests catching the compiler warning.
delete(void *,void *)736 static void operator delete(void*, void*) { GPR_ASSERT(false); }
737
StartCall()738 void StartCall() override {
739 // This call initiates two batches, plus any backlog, each with a callback
740 // 1. Send initial metadata (unless corked) + recv initial metadata
741 // 2. Any backlog
742 // 3. Recv trailing metadata
743
744 start_tag_.Set(
745 call_.call(),
746 [this](bool ok) {
747 reactor_->OnReadInitialMetadataDone(
748 ok && !reactor_->InternalTrailersOnly(call_.call()));
749 MaybeFinish(/*from_reaction=*/true);
750 },
751 &start_ops_, /*can_inline=*/false);
752 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
753 context_->initial_metadata_flags());
754 start_ops_.RecvInitialMetadata(context_);
755 start_ops_.set_core_cq_tag(&start_tag_);
756 call_.PerformOps(&start_ops_);
757
758 // Also set up the read tag so it doesn't have to be set up each time
759 read_tag_.Set(
760 call_.call(),
761 [this](bool ok) {
762 reactor_->OnReadDone(ok);
763 MaybeFinish(/*from_reaction=*/true);
764 },
765 &read_ops_, /*can_inline=*/false);
766 read_ops_.set_core_cq_tag(&read_tag_);
767
768 {
769 grpc::internal::MutexLock lock(&start_mu_);
770 if (backlog_.read_ops) {
771 call_.PerformOps(&read_ops_);
772 }
773 started_.store(true, std::memory_order_release);
774 }
775
776 finish_tag_.Set(
777 call_.call(),
778 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
779 &finish_ops_, /*can_inline=*/false);
780 finish_ops_.ClientRecvStatus(context_, &finish_status_);
781 finish_ops_.set_core_cq_tag(&finish_tag_);
782 call_.PerformOps(&finish_ops_);
783 }
784
Read(Response * msg)785 void Read(Response* msg) override {
786 read_ops_.RecvMessage(msg);
787 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
788 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
789 grpc::internal::MutexLock lock(&start_mu_);
790 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
791 backlog_.read_ops = true;
792 return;
793 }
794 }
795 call_.PerformOps(&read_ops_);
796 }
797
AddHold(int holds)798 void AddHold(int holds) override {
799 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
800 }
RemoveHold()801 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
802
803 private:
804 friend class ClientCallbackReaderFactory<Response>;
805
806 template <class Request>
ClientCallbackReaderImpl(grpc::internal::Call call,grpc::ClientContext * context,Request * request,ClientReadReactor<Response> * reactor)807 ClientCallbackReaderImpl(grpc::internal::Call call,
808 grpc::ClientContext* context, Request* request,
809 ClientReadReactor<Response>* reactor)
810 : context_(context), call_(call), reactor_(reactor) {
811 this->BindReactor(reactor);
812 // TODO(vjpai): don't assert
813 GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
814 start_ops_.ClientSendClose();
815 }
816
817 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)818 void MaybeFinish(bool from_reaction) {
819 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
820 1, std::memory_order_acq_rel) == 1)) {
821 grpc::Status s = std::move(finish_status_);
822 auto* reactor = reactor_;
823 auto* call = call_.call();
824 this->~ClientCallbackReaderImpl();
825 grpc_call_unref(call);
826 if (GPR_LIKELY(from_reaction)) {
827 reactor->OnDone(s);
828 } else {
829 reactor->InternalScheduleOnDone(std::move(s));
830 }
831 }
832 }
833
834 grpc::ClientContext* const context_;
835 grpc::internal::Call call_;
836 ClientReadReactor<Response>* const reactor_;
837
838 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
839 grpc::internal::CallOpSendMessage,
840 grpc::internal::CallOpClientSendClose,
841 grpc::internal::CallOpRecvInitialMetadata>
842 start_ops_;
843 grpc::internal::CallbackWithSuccessTag start_tag_;
844
845 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
846 grpc::internal::CallbackWithSuccessTag finish_tag_;
847 grpc::Status finish_status_;
848
849 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
850 read_ops_;
851 grpc::internal::CallbackWithSuccessTag read_tag_;
852
853 struct StartCallBacklog {
854 bool read_ops = false;
855 };
856 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
857
858 // Minimum of 2 callbacks to pre-register for start and finish
859 std::atomic<intptr_t> callbacks_outstanding_{2};
860 std::atomic_bool started_{false};
861 grpc::internal::Mutex start_mu_;
862 };
863
864 template <class Response>
865 class ClientCallbackReaderFactory {
866 public:
867 template <class Request>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const Request * request,ClientReadReactor<Response> * reactor)868 static void Create(grpc::ChannelInterface* channel,
869 const grpc::internal::RpcMethod& method,
870 grpc::ClientContext* context, const Request* request,
871 ClientReadReactor<Response>* reactor) {
872 grpc::internal::Call call =
873 channel->CreateCall(method, context, channel->CallbackCQ());
874
875 grpc_call_ref(call.call());
876 new (grpc_call_arena_alloc(call.call(),
877 sizeof(ClientCallbackReaderImpl<Response>)))
878 ClientCallbackReaderImpl<Response>(call, context, request, reactor);
879 }
880 };
881
882 template <class Request>
883 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
884 public:
885 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)886 static void operator delete(void* /*ptr*/, std::size_t size) {
887 GPR_ASSERT(size == sizeof(ClientCallbackWriterImpl));
888 }
889
890 // This operator should never be called as the memory should be freed as part
891 // of the arena destruction. It only exists to provide a matching operator
892 // delete to the operator new so that some compilers will not complain (see
893 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
894 // there are no tests catching the compiler warning.
delete(void *,void *)895 static void operator delete(void*, void*) { GPR_ASSERT(false); }
896
StartCall()897 void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
898 // This call initiates two batches, plus any backlog, each with a callback
899 // 1. Send initial metadata (unless corked) + recv initial metadata
900 // 2. Any backlog
901 // 3. Recv trailing metadata
902
903 if (!start_corked_) {
904 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
905 context_->initial_metadata_flags());
906 }
907 call_.PerformOps(&start_ops_);
908
909 {
910 grpc::internal::MutexLock lock(&start_mu_);
911
912 if (backlog_.write_ops) {
913 call_.PerformOps(&write_ops_);
914 }
915 if (backlog_.writes_done_ops) {
916 call_.PerformOps(&writes_done_ops_);
917 }
918 call_.PerformOps(&finish_ops_);
919 // The last thing in this critical section is to set started_ so that it
920 // can be used lock-free as well.
921 started_.store(true, std::memory_order_release);
922 }
923 // MaybeFinish outside the lock to make sure that destruction of this object
924 // doesn't take place while holding the lock (which would cause the lock to
925 // be released after destruction)
926 this->MaybeFinish(/*from_reaction=*/false);
927 }
928
Write(const Request * msg,grpc::WriteOptions options)929 void Write(const Request* msg, grpc::WriteOptions options)
930 ABSL_LOCKS_EXCLUDED(start_mu_) override {
931 if (GPR_UNLIKELY(options.is_last_message())) {
932 options.set_buffer_hint();
933 write_ops_.ClientSendClose();
934 }
935 // TODO(vjpai): don't assert
936 GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
937 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
938
939 if (GPR_UNLIKELY(corked_write_needed_)) {
940 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
941 context_->initial_metadata_flags());
942 corked_write_needed_ = false;
943 }
944
945 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
946 grpc::internal::MutexLock lock(&start_mu_);
947 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
948 backlog_.write_ops = true;
949 return;
950 }
951 }
952 call_.PerformOps(&write_ops_);
953 }
954
WritesDone()955 void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
956 writes_done_ops_.ClientSendClose();
957 writes_done_tag_.Set(
958 call_.call(),
959 [this](bool ok) {
960 reactor_->OnWritesDoneDone(ok);
961 MaybeFinish(/*from_reaction=*/true);
962 },
963 &writes_done_ops_, /*can_inline=*/false);
964 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
965 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
966
967 if (GPR_UNLIKELY(corked_write_needed_)) {
968 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
969 context_->initial_metadata_flags());
970 corked_write_needed_ = false;
971 }
972
973 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
974 grpc::internal::MutexLock lock(&start_mu_);
975 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
976 backlog_.writes_done_ops = true;
977 return;
978 }
979 }
980 call_.PerformOps(&writes_done_ops_);
981 }
982
AddHold(int holds)983 void AddHold(int holds) override {
984 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
985 }
RemoveHold()986 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
987
988 private:
989 friend class ClientCallbackWriterFactory<Request>;
990
991 template <class Response>
ClientCallbackWriterImpl(grpc::internal::Call call,grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)992 ClientCallbackWriterImpl(grpc::internal::Call call,
993 grpc::ClientContext* context, Response* response,
994 ClientWriteReactor<Request>* reactor)
995 : context_(context),
996 call_(call),
997 reactor_(reactor),
998 start_corked_(context_->initial_metadata_corked_),
999 corked_write_needed_(start_corked_) {
1000 this->BindReactor(reactor);
1001
1002 // Set up the unchanging parts of the start and write tags and ops.
1003 start_tag_.Set(
1004 call_.call(),
1005 [this](bool ok) {
1006 reactor_->OnReadInitialMetadataDone(
1007 ok && !reactor_->InternalTrailersOnly(call_.call()));
1008 MaybeFinish(/*from_reaction=*/true);
1009 },
1010 &start_ops_, /*can_inline=*/false);
1011 start_ops_.RecvInitialMetadata(context_);
1012 start_ops_.set_core_cq_tag(&start_tag_);
1013
1014 write_tag_.Set(
1015 call_.call(),
1016 [this](bool ok) {
1017 reactor_->OnWriteDone(ok);
1018 MaybeFinish(/*from_reaction=*/true);
1019 },
1020 &write_ops_, /*can_inline=*/false);
1021 write_ops_.set_core_cq_tag(&write_tag_);
1022
1023 // Also set up the Finish tag and op set.
1024 finish_ops_.RecvMessage(response);
1025 finish_ops_.AllowNoMessage();
1026 finish_tag_.Set(
1027 call_.call(),
1028 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1029 &finish_ops_,
1030 /*can_inline=*/false);
1031 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1032 finish_ops_.set_core_cq_tag(&finish_tag_);
1033 }
1034
1035 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)1036 void MaybeFinish(bool from_reaction) {
1037 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1038 1, std::memory_order_acq_rel) == 1)) {
1039 grpc::Status s = std::move(finish_status_);
1040 auto* reactor = reactor_;
1041 auto* call = call_.call();
1042 this->~ClientCallbackWriterImpl();
1043 grpc_call_unref(call);
1044 if (GPR_LIKELY(from_reaction)) {
1045 reactor->OnDone(s);
1046 } else {
1047 reactor->InternalScheduleOnDone(std::move(s));
1048 }
1049 }
1050 }
1051
1052 grpc::ClientContext* const context_;
1053 grpc::internal::Call call_;
1054 ClientWriteReactor<Request>* const reactor_;
1055
1056 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1057 grpc::internal::CallOpRecvInitialMetadata>
1058 start_ops_;
1059 grpc::internal::CallbackWithSuccessTag start_tag_;
1060 const bool start_corked_;
1061 bool corked_write_needed_; // no lock needed since only accessed in
1062 // Write/WritesDone which cannot be concurrent
1063
1064 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1065 grpc::internal::CallOpClientRecvStatus>
1066 finish_ops_;
1067 grpc::internal::CallbackWithSuccessTag finish_tag_;
1068 grpc::Status finish_status_;
1069
1070 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1071 grpc::internal::CallOpSendMessage,
1072 grpc::internal::CallOpClientSendClose>
1073 write_ops_;
1074 grpc::internal::CallbackWithSuccessTag write_tag_;
1075
1076 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1077 grpc::internal::CallOpClientSendClose>
1078 writes_done_ops_;
1079 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1080
1081 struct StartCallBacklog {
1082 bool write_ops = false;
1083 bool writes_done_ops = false;
1084 };
1085 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1086
1087 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1088 std::atomic<intptr_t> callbacks_outstanding_{3};
1089 std::atomic_bool started_{false};
1090 grpc::internal::Mutex start_mu_;
1091 };
1092
1093 template <class Request>
1094 class ClientCallbackWriterFactory {
1095 public:
1096 template <class Response>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)1097 static void Create(grpc::ChannelInterface* channel,
1098 const grpc::internal::RpcMethod& method,
1099 grpc::ClientContext* context, Response* response,
1100 ClientWriteReactor<Request>* reactor) {
1101 grpc::internal::Call call =
1102 channel->CreateCall(method, context, channel->CallbackCQ());
1103
1104 grpc_call_ref(call.call());
1105 new (grpc_call_arena_alloc(call.call(),
1106 sizeof(ClientCallbackWriterImpl<Request>)))
1107 ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1108 }
1109 };
1110
1111 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1112 public:
1113 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)1114 static void operator delete(void* /*ptr*/, std::size_t size) {
1115 GPR_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1116 }
1117
1118 // This operator should never be called as the memory should be freed as part
1119 // of the arena destruction. It only exists to provide a matching operator
1120 // delete to the operator new so that some compilers will not complain (see
1121 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1122 // there are no tests catching the compiler warning.
delete(void *,void *)1123 static void operator delete(void*, void*) { GPR_ASSERT(false); }
1124
StartCall()1125 void StartCall() override {
1126 // This call initiates two batches, each with a callback
1127 // 1. Send initial metadata + write + writes done + recv initial metadata
1128 // 2. Read message, recv trailing metadata
1129
1130 start_tag_.Set(
1131 call_.call(),
1132 [this](bool ok) {
1133 reactor_->OnReadInitialMetadataDone(
1134 ok && !reactor_->InternalTrailersOnly(call_.call()));
1135 MaybeFinish();
1136 },
1137 &start_ops_, /*can_inline=*/false);
1138 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1139 context_->initial_metadata_flags());
1140 start_ops_.RecvInitialMetadata(context_);
1141 start_ops_.set_core_cq_tag(&start_tag_);
1142 call_.PerformOps(&start_ops_);
1143
1144 finish_tag_.Set(
1145 call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1146 /*can_inline=*/false);
1147 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1148 finish_ops_.set_core_cq_tag(&finish_tag_);
1149 call_.PerformOps(&finish_ops_);
1150 }
1151
1152 private:
1153 friend class ClientCallbackUnaryFactory;
1154
1155 template <class Request, class Response>
ClientCallbackUnaryImpl(grpc::internal::Call call,grpc::ClientContext * context,Request * request,Response * response,ClientUnaryReactor * reactor)1156 ClientCallbackUnaryImpl(grpc::internal::Call call,
1157 grpc::ClientContext* context, Request* request,
1158 Response* response, ClientUnaryReactor* reactor)
1159 : context_(context), call_(call), reactor_(reactor) {
1160 this->BindReactor(reactor);
1161 // TODO(vjpai): don't assert
1162 GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
1163 start_ops_.ClientSendClose();
1164 finish_ops_.RecvMessage(response);
1165 finish_ops_.AllowNoMessage();
1166 }
1167
1168 // In the unary case, MaybeFinish is only ever invoked from a
1169 // library-initiated reaction, so it will just directly call OnDone if this is
1170 // the last reaction for this RPC.
MaybeFinish()1171 void MaybeFinish() {
1172 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1173 1, std::memory_order_acq_rel) == 1)) {
1174 grpc::Status s = std::move(finish_status_);
1175 auto* reactor = reactor_;
1176 auto* call = call_.call();
1177 this->~ClientCallbackUnaryImpl();
1178 grpc_call_unref(call);
1179 reactor->OnDone(s);
1180 }
1181 }
1182
1183 grpc::ClientContext* const context_;
1184 grpc::internal::Call call_;
1185 ClientUnaryReactor* const reactor_;
1186
1187 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1188 grpc::internal::CallOpSendMessage,
1189 grpc::internal::CallOpClientSendClose,
1190 grpc::internal::CallOpRecvInitialMetadata>
1191 start_ops_;
1192 grpc::internal::CallbackWithSuccessTag start_tag_;
1193
1194 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1195 grpc::internal::CallOpClientRecvStatus>
1196 finish_ops_;
1197 grpc::internal::CallbackWithSuccessTag finish_tag_;
1198 grpc::Status finish_status_;
1199
1200 // This call will have 2 callbacks: start and finish
1201 std::atomic<intptr_t> callbacks_outstanding_{2};
1202 };
1203
1204 class ClientCallbackUnaryFactory {
1205 public:
1206 template <class Request, class Response, class BaseRequest = Request,
1207 class BaseResponse = Response>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const Request * request,Response * response,ClientUnaryReactor * reactor)1208 static void Create(grpc::ChannelInterface* channel,
1209 const grpc::internal::RpcMethod& method,
1210 grpc::ClientContext* context, const Request* request,
1211 Response* response, ClientUnaryReactor* reactor) {
1212 grpc::internal::Call call =
1213 channel->CreateCall(method, context, channel->CallbackCQ());
1214
1215 grpc_call_ref(call.call());
1216
1217 new (grpc_call_arena_alloc(call.call(), sizeof(ClientCallbackUnaryImpl)))
1218 ClientCallbackUnaryImpl(call, context,
1219 static_cast<const BaseRequest*>(request),
1220 static_cast<BaseResponse*>(response), reactor);
1221 }
1222 };
1223
1224 } // namespace internal
1225 } // namespace grpc
1226
1227 #endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H
1228