xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/surface/call.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/surface/call.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdlib.h>
26 #include <string.h>
27 
28 #include <algorithm>
29 #include <atomic>
30 #include <memory>
31 #include <new>
32 #include <string>
33 #include <type_traits>
34 #include <utility>
35 #include <vector>
36 
37 #include "absl/base/thread_annotations.h"
38 #include "absl/status/status.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_format.h"
41 #include "absl/strings/str_join.h"
42 #include "absl/strings/string_view.h"
43 
44 #include <grpc/byte_buffer.h>
45 #include <grpc/compression.h>
46 #include <grpc/event_engine/event_engine.h>
47 #include <grpc/grpc.h>
48 #include <grpc/impl/call.h>
49 #include <grpc/impl/propagation_bits.h>
50 #include <grpc/slice.h>
51 #include <grpc/slice_buffer.h>
52 #include <grpc/status.h>
53 #include <grpc/support/alloc.h>
54 #include <grpc/support/atm.h>
55 #include <grpc/support/log.h>
56 #include <grpc/support/string_util.h>
57 
58 #include "src/core/lib/channel/call_finalization.h"
59 #include "src/core/lib/channel/call_tracer.h"
60 #include "src/core/lib/channel/channel_stack.h"
61 #include "src/core/lib/channel/channelz.h"
62 #include "src/core/lib/channel/context.h"
63 #include "src/core/lib/channel/status_util.h"
64 #include "src/core/lib/compression/compression_internal.h"
65 #include "src/core/lib/debug/stats.h"
66 #include "src/core/lib/debug/stats_data.h"
67 #include "src/core/lib/experiments/experiments.h"
68 #include "src/core/lib/gpr/alloc.h"
69 #include "src/core/lib/gpr/time_precise.h"
70 #include "src/core/lib/gpr/useful.h"
71 #include "src/core/lib/gprpp/bitset.h"
72 #include "src/core/lib/gprpp/cpp_impl_of.h"
73 #include "src/core/lib/gprpp/crash.h"
74 #include "src/core/lib/gprpp/debug_location.h"
75 #include "src/core/lib/gprpp/ref_counted.h"
76 #include "src/core/lib/gprpp/ref_counted_ptr.h"
77 #include "src/core/lib/gprpp/status_helper.h"
78 #include "src/core/lib/gprpp/sync.h"
79 #include "src/core/lib/iomgr/call_combiner.h"
80 #include "src/core/lib/iomgr/exec_ctx.h"
81 #include "src/core/lib/iomgr/polling_entity.h"
82 #include "src/core/lib/promise/activity.h"
83 #include "src/core/lib/promise/all_ok.h"
84 #include "src/core/lib/promise/arena_promise.h"
85 #include "src/core/lib/promise/context.h"
86 #include "src/core/lib/promise/latch.h"
87 #include "src/core/lib/promise/map.h"
88 #include "src/core/lib/promise/party.h"
89 #include "src/core/lib/promise/pipe.h"
90 #include "src/core/lib/promise/poll.h"
91 #include "src/core/lib/promise/race.h"
92 #include "src/core/lib/promise/seq.h"
93 #include "src/core/lib/promise/status_flag.h"
94 #include "src/core/lib/resource_quota/arena.h"
95 #include "src/core/lib/slice/slice_buffer.h"
96 #include "src/core/lib/slice/slice_internal.h"
97 #include "src/core/lib/surface/api_trace.h"
98 #include "src/core/lib/surface/call_test_only.h"
99 #include "src/core/lib/surface/channel.h"
100 #include "src/core/lib/surface/completion_queue.h"
101 #include "src/core/lib/surface/server_interface.h"
102 #include "src/core/lib/surface/validate_metadata.h"
103 #include "src/core/lib/surface/wait_for_cq_end_op.h"
104 #include "src/core/lib/transport/batch_builder.h"
105 #include "src/core/lib/transport/error_utils.h"
106 #include "src/core/lib/transport/metadata_batch.h"
107 #include "src/core/lib/transport/transport.h"
108 
109 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
110 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
111 grpc_core::TraceFlag grpc_call_trace(false, "call");
112 grpc_core::DebugOnlyTraceFlag grpc_call_refcount_trace(false, "call_refcount");
113 
114 namespace grpc_core {
115 
116 ///////////////////////////////////////////////////////////////////////////////
117 // Call
118 
119 class Call : public CppImplOf<Call, grpc_call> {
120  public:
arena()121   Arena* arena() { return arena_; }
is_client() const122   bool is_client() const { return is_client_; }
123 
124   virtual void ContextSet(grpc_context_index elem, void* value,
125                           void (*destroy)(void* value)) = 0;
126   virtual void* ContextGet(grpc_context_index elem) const = 0;
127   virtual bool Completed() = 0;
128   void CancelWithStatus(grpc_status_code status, const char* description);
129   virtual void CancelWithError(grpc_error_handle error) = 0;
130   virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0;
131   char* GetPeer();
132   virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops,
133                                      void* notify_tag,
134                                      bool is_notify_tag_closure) = 0;
135   virtual bool failed_before_recv_message() const = 0;
136   virtual bool is_trailers_only() const = 0;
137   virtual absl::string_view GetServerAuthority() const = 0;
138   virtual void ExternalRef() = 0;
139   virtual void ExternalUnref() = 0;
140   virtual void InternalRef(const char* reason) = 0;
141   virtual void InternalUnref(const char* reason) = 0;
142 
test_only_compression_algorithm()143   grpc_compression_algorithm test_only_compression_algorithm() {
144     return incoming_compression_algorithm_;
145   }
test_only_message_flags()146   uint32_t test_only_message_flags() { return test_only_last_message_flags_; }
encodings_accepted_by_peer()147   CompressionAlgorithmSet encodings_accepted_by_peer() {
148     return encodings_accepted_by_peer_;
149   }
150 
151   // This should return nullptr for the promise stack (and alternative means
152   // for that functionality be invented)
153   virtual grpc_call_stack* call_stack() = 0;
154 
155   // Return the EventEngine used for this call's async execution.
156   virtual grpc_event_engine::experimental::EventEngine* event_engine()
157       const = 0;
158 
159  protected:
160   // The maximum number of concurrent batches possible.
161   // Based upon the maximum number of individually queueable ops in the batch
162   // api:
163   //    - initial metadata send
164   //    - message send
165   //    - status/close send (depending on client/server)
166   //    - initial metadata recv
167   //    - message recv
168   //    - status/close recv (depending on client/server)
169   static constexpr size_t kMaxConcurrentBatches = 6;
170 
171   struct ParentCall {
172     Mutex child_list_mu;
173     Call* first_child ABSL_GUARDED_BY(child_list_mu) = nullptr;
174   };
175 
176   struct ChildCall {
ChildCallgrpc_core::Call::ChildCall177     explicit ChildCall(Call* parent) : parent(parent) {}
178     Call* parent;
179     /// siblings: children of the same parent form a list, and this list is
180     /// protected under
181     /// parent->mu
182     Call* sibling_next = nullptr;
183     Call* sibling_prev = nullptr;
184   };
185 
Call(Arena * arena,bool is_client,Timestamp send_deadline,RefCountedPtr<Channel> channel)186   Call(Arena* arena, bool is_client, Timestamp send_deadline,
187        RefCountedPtr<Channel> channel)
188       : channel_(std::move(channel)),
189         arena_(arena),
190         send_deadline_(send_deadline),
191         is_client_(is_client) {
192     GPR_DEBUG_ASSERT(arena_ != nullptr);
193     GPR_DEBUG_ASSERT(channel_ != nullptr);
194   }
195   virtual ~Call() = default;
196 
197   void DeleteThis();
198 
199   ParentCall* GetOrCreateParentCall();
200   ParentCall* parent_call();
channel() const201   Channel* channel() const {
202     GPR_DEBUG_ASSERT(channel_ != nullptr);
203     return channel_.get();
204   }
205 
206   absl::Status InitParent(Call* parent, uint32_t propagation_mask);
207   void PublishToParent(Call* parent);
208   void MaybeUnpublishFromParent();
209   void PropagateCancellationToChildren();
210 
send_deadline() const211   Timestamp send_deadline() const { return send_deadline_; }
set_send_deadline(Timestamp send_deadline)212   void set_send_deadline(Timestamp send_deadline) {
213     send_deadline_ = send_deadline;
214   }
215 
GetPeerString() const216   Slice GetPeerString() const {
217     MutexLock lock(&peer_mu_);
218     return peer_string_.Ref();
219   }
220 
SetPeerString(Slice peer_string)221   void SetPeerString(Slice peer_string) {
222     MutexLock lock(&peer_mu_);
223     peer_string_ = std::move(peer_string);
224   }
225 
ClearPeerString()226   void ClearPeerString() { SetPeerString(Slice(grpc_empty_slice())); }
227 
228   // TODO(ctiller): cancel_func is for cancellation of the call - filter stack
229   // holds no mutexes here, promise stack does, and so locking is different.
230   // Remove this and cancel directly once promise conversion is done.
231   void ProcessIncomingInitialMetadata(grpc_metadata_batch& md);
232   // Fixup outgoing metadata before sending - adds compression, protects
233   // internal headers against external modification.
234   void PrepareOutgoingInitialMetadata(const grpc_op& op,
235                                       grpc_metadata_batch& md);
NoteLastMessageFlags(uint32_t flags)236   void NoteLastMessageFlags(uint32_t flags) {
237     test_only_last_message_flags_ = flags;
238   }
incoming_compression_algorithm() const239   grpc_compression_algorithm incoming_compression_algorithm() const {
240     return incoming_compression_algorithm_;
241   }
242 
243   void HandleCompressionAlgorithmDisabled(
244       grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
245   void HandleCompressionAlgorithmNotAccepted(
246       grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
247 
start_time() const248   gpr_cycle_counter start_time() const { return start_time_; }
249 
250  private:
251   RefCountedPtr<Channel> channel_;
252   Arena* const arena_;
253   std::atomic<ParentCall*> parent_call_{nullptr};
254   ChildCall* child_ = nullptr;
255   Timestamp send_deadline_;
256   const bool is_client_;
257   // flag indicating that cancellation is inherited
258   bool cancellation_is_inherited_ = false;
259   // Compression algorithm for *incoming* data
260   grpc_compression_algorithm incoming_compression_algorithm_ =
261       GRPC_COMPRESS_NONE;
262   // Supported encodings (compression algorithms), a bitset.
263   // Always support no compression.
264   CompressionAlgorithmSet encodings_accepted_by_peer_{GRPC_COMPRESS_NONE};
265   uint32_t test_only_last_message_flags_ = 0;
266   // Peer name is protected by a mutex because it can be accessed by the
267   // application at the same moment as it is being set by the completion
268   // of the recv_initial_metadata op.  The mutex should be mostly uncontended.
269   mutable Mutex peer_mu_;
270   Slice peer_string_;
271   gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
272 };
273 
GetOrCreateParentCall()274 Call::ParentCall* Call::GetOrCreateParentCall() {
275   ParentCall* p = parent_call_.load(std::memory_order_acquire);
276   if (p == nullptr) {
277     p = arena_->New<ParentCall>();
278     ParentCall* expected = nullptr;
279     if (!parent_call_.compare_exchange_strong(expected, p,
280                                               std::memory_order_release,
281                                               std::memory_order_relaxed)) {
282       p->~ParentCall();
283       p = expected;
284     }
285   }
286   return p;
287 }
288 
parent_call()289 Call::ParentCall* Call::parent_call() {
290   return parent_call_.load(std::memory_order_acquire);
291 }
292 
InitParent(Call * parent,uint32_t propagation_mask)293 absl::Status Call::InitParent(Call* parent, uint32_t propagation_mask) {
294   child_ = arena()->New<ChildCall>(parent);
295 
296   parent->InternalRef("child");
297   GPR_ASSERT(is_client_);
298   GPR_ASSERT(!parent->is_client_);
299 
300   if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
301     send_deadline_ = std::min(send_deadline_, parent->send_deadline_);
302   }
303   // for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
304   // GRPC_PROPAGATE_STATS_CONTEXT
305   // TODO(ctiller): This should change to use the appropriate census start_op
306   // call.
307   if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
308     if (0 == (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
309       return absl::UnknownError(
310           "Census tracing propagation requested without Census context "
311           "propagation");
312     }
313     ContextSet(GRPC_CONTEXT_TRACING, parent->ContextGet(GRPC_CONTEXT_TRACING),
314                nullptr);
315   } else if (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
316     return absl::UnknownError(
317         "Census context propagation requested without Census tracing "
318         "propagation");
319   }
320   if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
321     cancellation_is_inherited_ = true;
322   }
323   return absl::OkStatus();
324 }
325 
PublishToParent(Call * parent)326 void Call::PublishToParent(Call* parent) {
327   ChildCall* cc = child_;
328   ParentCall* pc = parent->GetOrCreateParentCall();
329   MutexLock lock(&pc->child_list_mu);
330   if (pc->first_child == nullptr) {
331     pc->first_child = this;
332     cc->sibling_next = cc->sibling_prev = this;
333   } else {
334     cc->sibling_next = pc->first_child;
335     cc->sibling_prev = pc->first_child->child_->sibling_prev;
336     cc->sibling_next->child_->sibling_prev =
337         cc->sibling_prev->child_->sibling_next = this;
338   }
339   if (parent->Completed()) {
340     CancelWithError(absl::CancelledError());
341   }
342 }
343 
MaybeUnpublishFromParent()344 void Call::MaybeUnpublishFromParent() {
345   ChildCall* cc = child_;
346   if (cc == nullptr) return;
347 
348   ParentCall* pc = cc->parent->parent_call();
349   {
350     MutexLock lock(&pc->child_list_mu);
351     if (this == pc->first_child) {
352       pc->first_child = cc->sibling_next;
353       if (this == pc->first_child) {
354         pc->first_child = nullptr;
355       }
356     }
357     cc->sibling_prev->child_->sibling_next = cc->sibling_next;
358     cc->sibling_next->child_->sibling_prev = cc->sibling_prev;
359   }
360   cc->parent->InternalUnref("child");
361 }
362 
CancelWithStatus(grpc_status_code status,const char * description)363 void Call::CancelWithStatus(grpc_status_code status, const char* description) {
364   // copying 'description' is needed to ensure the grpc_call_cancel_with_status
365   // guarantee that can be short-lived.
366   // TODO(ctiller): change to
367   // absl::Status(static_cast<absl::StatusCode>(status), description)
368   // (ie remove the set_int, set_str).
369   CancelWithError(grpc_error_set_int(
370       grpc_error_set_str(
371           absl::Status(static_cast<absl::StatusCode>(status), description),
372           StatusStrProperty::kGrpcMessage, description),
373       StatusIntProperty::kRpcStatus, status));
374 }
375 
PropagateCancellationToChildren()376 void Call::PropagateCancellationToChildren() {
377   ParentCall* pc = parent_call();
378   if (pc != nullptr) {
379     Call* child;
380     MutexLock lock(&pc->child_list_mu);
381     child = pc->first_child;
382     if (child != nullptr) {
383       do {
384         Call* next_child_call = child->child_->sibling_next;
385         if (child->cancellation_is_inherited_) {
386           child->InternalRef("propagate_cancel");
387           child->CancelWithError(absl::CancelledError());
388           child->InternalUnref("propagate_cancel");
389         }
390         child = next_child_call;
391       } while (child != pc->first_child);
392     }
393   }
394 }
395 
GetPeer()396 char* Call::GetPeer() {
397   Slice peer_slice = GetPeerString();
398   if (!peer_slice.empty()) {
399     absl::string_view peer_string_view = peer_slice.as_string_view();
400     char* peer_string =
401         static_cast<char*>(gpr_malloc(peer_string_view.size() + 1));
402     memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
403     peer_string[peer_string_view.size()] = '\0';
404     return peer_string;
405   }
406   char* peer_string = grpc_channel_get_target(channel_->c_ptr());
407   if (peer_string != nullptr) return peer_string;
408   return gpr_strdup("unknown");
409 }
410 
DeleteThis()411 void Call::DeleteThis() {
412   RefCountedPtr<Channel> channel = std::move(channel_);
413   Arena* arena = arena_;
414   this->~Call();
415   channel->DestroyArena(arena);
416 }
417 
PrepareOutgoingInitialMetadata(const grpc_op & op,grpc_metadata_batch & md)418 void Call::PrepareOutgoingInitialMetadata(const grpc_op& op,
419                                           grpc_metadata_batch& md) {
420   // TODO(juanlishen): If the user has already specified a compression
421   // algorithm by setting the initial metadata with key of
422   // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
423   // with the compression algorithm mapped from compression level.
424   // process compression level
425   grpc_compression_level effective_compression_level = GRPC_COMPRESS_LEVEL_NONE;
426   bool level_set = false;
427   if (op.data.send_initial_metadata.maybe_compression_level.is_set) {
428     effective_compression_level =
429         op.data.send_initial_metadata.maybe_compression_level.level;
430     level_set = true;
431   } else {
432     const grpc_compression_options copts = channel()->compression_options();
433     if (copts.default_level.is_set) {
434       level_set = true;
435       effective_compression_level = copts.default_level.level;
436     }
437   }
438   // Currently, only server side supports compression level setting.
439   if (level_set && !is_client()) {
440     const grpc_compression_algorithm calgo =
441         encodings_accepted_by_peer().CompressionAlgorithmForLevel(
442             effective_compression_level);
443     // The following metadata will be checked and removed by the message
444     // compression filter. It will be used as the call's compression
445     // algorithm.
446     md.Set(GrpcInternalEncodingRequest(), calgo);
447   }
448   // Ignore any te metadata key value pairs specified.
449   md.Remove(TeMetadata());
450   // Should never come from applications
451   md.Remove(GrpcLbClientStatsMetadata());
452 }
453 
ProcessIncomingInitialMetadata(grpc_metadata_batch & md)454 void Call::ProcessIncomingInitialMetadata(grpc_metadata_batch& md) {
455   Slice* peer_string = md.get_pointer(PeerString());
456   if (peer_string != nullptr) SetPeerString(peer_string->Ref());
457 
458   incoming_compression_algorithm_ =
459       md.Take(GrpcEncodingMetadata()).value_or(GRPC_COMPRESS_NONE);
460   encodings_accepted_by_peer_ =
461       md.Take(GrpcAcceptEncodingMetadata())
462           .value_or(CompressionAlgorithmSet{GRPC_COMPRESS_NONE});
463 
464   const grpc_compression_options compression_options =
465       channel_->compression_options();
466   const grpc_compression_algorithm compression_algorithm =
467       incoming_compression_algorithm_;
468   if (GPR_UNLIKELY(!CompressionAlgorithmSet::FromUint32(
469                         compression_options.enabled_algorithms_bitset)
470                         .IsSet(compression_algorithm))) {
471     // check if algorithm is supported by current channel config
472     HandleCompressionAlgorithmDisabled(compression_algorithm);
473   }
474   // GRPC_COMPRESS_NONE is always set.
475   GPR_DEBUG_ASSERT(encodings_accepted_by_peer_.IsSet(GRPC_COMPRESS_NONE));
476   if (GPR_UNLIKELY(!encodings_accepted_by_peer_.IsSet(compression_algorithm))) {
477     if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
478       HandleCompressionAlgorithmNotAccepted(compression_algorithm);
479     }
480   }
481 }
482 
HandleCompressionAlgorithmNotAccepted(grpc_compression_algorithm compression_algorithm)483 void Call::HandleCompressionAlgorithmNotAccepted(
484     grpc_compression_algorithm compression_algorithm) {
485   const char* algo_name = nullptr;
486   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
487   gpr_log(GPR_ERROR,
488           "Compression algorithm ('%s') not present in the "
489           "accepted encodings (%s)",
490           algo_name,
491           std::string(encodings_accepted_by_peer_.ToString()).c_str());
492 }
493 
HandleCompressionAlgorithmDisabled(grpc_compression_algorithm compression_algorithm)494 void Call::HandleCompressionAlgorithmDisabled(
495     grpc_compression_algorithm compression_algorithm) {
496   const char* algo_name = nullptr;
497   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
498   std::string error_msg =
499       absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
500   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
501   CancelWithError(grpc_error_set_int(absl::UnimplementedError(error_msg),
502                                      StatusIntProperty::kRpcStatus,
503                                      GRPC_STATUS_UNIMPLEMENTED));
504 }
505 
506 ///////////////////////////////////////////////////////////////////////////////
507 // FilterStackCall
508 // To be removed once promise conversion is complete
509 
510 class FilterStackCall final : public Call {
511  public:
~FilterStackCall()512   ~FilterStackCall() override {
513     for (int i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
514       if (context_[i].destroy) {
515         context_[i].destroy(context_[i].value);
516       }
517     }
518     gpr_free(static_cast<void*>(const_cast<char*>(final_info_.error_string)));
519   }
520 
Completed()521   bool Completed() override {
522     return gpr_atm_acq_load(&received_final_op_atm_) != 0;
523   }
524 
525   // TODO(ctiller): return absl::StatusOr<SomeSmartPointer<Call>>?
526   static grpc_error_handle Create(grpc_call_create_args* args,
527                                   grpc_call** out_call);
528 
FromTopElem(grpc_call_element * elem)529   static Call* FromTopElem(grpc_call_element* elem) {
530     return FromCallStack(grpc_call_stack_from_top_element(elem));
531   }
532 
call_stack()533   grpc_call_stack* call_stack() override {
534     return reinterpret_cast<grpc_call_stack*>(
535         reinterpret_cast<char*>(this) +
536         GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this)));
537   }
538 
event_engine() const539   grpc_event_engine::experimental::EventEngine* event_engine() const override {
540     return channel()->event_engine();
541   }
542 
call_elem(size_t idx)543   grpc_call_element* call_elem(size_t idx) {
544     return grpc_call_stack_element(call_stack(), idx);
545   }
546 
call_combiner()547   CallCombiner* call_combiner() { return &call_combiner_; }
548 
549   void CancelWithError(grpc_error_handle error) override;
550   void SetCompletionQueue(grpc_completion_queue* cq) override;
551   grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
552                              bool is_notify_tag_closure) override;
ExternalRef()553   void ExternalRef() override { ext_ref_.Ref(); }
554   void ExternalUnref() override;
InternalRef(const char * reason)555   void InternalRef(const char* reason) override {
556     GRPC_CALL_STACK_REF(call_stack(), reason);
557   }
InternalUnref(const char * reason)558   void InternalUnref(const char* reason) override {
559     GRPC_CALL_STACK_UNREF(call_stack(), reason);
560   }
561 
562   void ContextSet(grpc_context_index elem, void* value,
563                   void (*destroy)(void* value)) override;
ContextGet(grpc_context_index elem) const564   void* ContextGet(grpc_context_index elem) const override {
565     return context_[elem].value;
566   }
567 
is_trailers_only() const568   bool is_trailers_only() const override {
569     bool result = is_trailers_only_;
570     GPR_DEBUG_ASSERT(!result || recv_initial_metadata_.TransportSize() == 0);
571     return result;
572   }
573 
failed_before_recv_message() const574   bool failed_before_recv_message() const override {
575     return call_failed_before_recv_message_;
576   }
577 
GetServerAuthority() const578   absl::string_view GetServerAuthority() const override {
579     const Slice* authority_metadata =
580         recv_initial_metadata_.get_pointer(HttpAuthorityMetadata());
581     if (authority_metadata == nullptr) return "";
582     return authority_metadata->as_string_view();
583   }
584 
InitialSizeEstimate()585   static size_t InitialSizeEstimate() {
586     return sizeof(FilterStackCall) +
587            sizeof(BatchControl) * kMaxConcurrentBatches;
588   }
589 
590  private:
591   class ScopedContext : public promise_detail::Context<Arena> {
592    public:
ScopedContext(FilterStackCall * call)593     explicit ScopedContext(FilterStackCall* call)
594         : promise_detail::Context<Arena>(call->arena()) {}
595   };
596 
597   static constexpr gpr_atm kRecvNone = 0;
598   static constexpr gpr_atm kRecvInitialMetadataFirst = 1;
599 
600   enum class PendingOp {
601     kRecvMessage,
602     kRecvInitialMetadata,
603     kRecvTrailingMetadata,
604     kSends
605   };
PendingOpMask(PendingOp op)606   static intptr_t PendingOpMask(PendingOp op) {
607     return static_cast<intptr_t>(1) << static_cast<intptr_t>(op);
608   }
PendingOpString(intptr_t pending_ops)609   static std::string PendingOpString(intptr_t pending_ops) {
610     std::vector<absl::string_view> pending_op_strings;
611     if (pending_ops & PendingOpMask(PendingOp::kRecvMessage)) {
612       pending_op_strings.push_back("kRecvMessage");
613     }
614     if (pending_ops & PendingOpMask(PendingOp::kRecvInitialMetadata)) {
615       pending_op_strings.push_back("kRecvInitialMetadata");
616     }
617     if (pending_ops & PendingOpMask(PendingOp::kRecvTrailingMetadata)) {
618       pending_op_strings.push_back("kRecvTrailingMetadata");
619     }
620     if (pending_ops & PendingOpMask(PendingOp::kSends)) {
621       pending_op_strings.push_back("kSends");
622     }
623     return absl::StrCat("{", absl::StrJoin(pending_op_strings, ","), "}");
624   }
625   struct BatchControl {
626     FilterStackCall* call_ = nullptr;
627     CallTracerAnnotationInterface* call_tracer_ = nullptr;
628     grpc_transport_stream_op_batch op_;
629     // Share memory for cq_completion and notify_tag as they are never needed
630     // simultaneously. Each byte used in this data structure count as six bytes
631     // per call, so any savings we can make are worthwhile,
632 
633     // We use notify_tag to determine whether or not to send notification to the
634     // completion queue. Once we've made that determination, we can reuse the
635     // memory for cq_completion.
636     union {
637       grpc_cq_completion cq_completion;
638       struct {
639         // Any given op indicates completion by either (a) calling a closure or
640         // (b) sending a notification on the call's completion queue.  If
641         // \a is_closure is true, \a tag indicates a closure to be invoked;
642         // otherwise, \a tag indicates the tag to be used in the notification to
643         // be sent to the completion queue.
644         void* tag;
645         bool is_closure;
646       } notify_tag;
647     } completion_data_;
648     grpc_closure start_batch_;
649     grpc_closure finish_batch_;
650     std::atomic<intptr_t> ops_pending_{0};
651     AtomicError batch_error_;
set_pending_opsgrpc_core::FilterStackCall::BatchControl652     void set_pending_ops(uintptr_t ops) {
653       ops_pending_.store(ops, std::memory_order_release);
654     }
completed_batch_stepgrpc_core::FilterStackCall::BatchControl655     bool completed_batch_step(PendingOp op) {
656       auto mask = PendingOpMask(op);
657       auto r = ops_pending_.fetch_sub(mask, std::memory_order_acq_rel);
658       if (grpc_call_trace.enabled()) {
659         gpr_log(GPR_DEBUG, "BATCH:%p COMPLETE:%s REMAINING:%s (tag:%p)", this,
660                 PendingOpString(mask).c_str(),
661                 PendingOpString(r & ~mask).c_str(),
662                 completion_data_.notify_tag.tag);
663       }
664       GPR_ASSERT((r & mask) != 0);
665       return r == mask;
666     }
667 
668     void PostCompletion();
669     void FinishStep(PendingOp op);
670     void ProcessDataAfterMetadata();
671     void ReceivingStreamReady(grpc_error_handle error);
672     void ReceivingInitialMetadataReady(grpc_error_handle error);
673     void ReceivingTrailingMetadataReady(grpc_error_handle error);
674     void FinishBatch(grpc_error_handle error);
675   };
676 
FilterStackCall(Arena * arena,const grpc_call_create_args & args)677   FilterStackCall(Arena* arena, const grpc_call_create_args& args)
678       : Call(arena, args.server_transport_data == nullptr, args.send_deadline,
679              args.channel->Ref()),
680         cq_(args.cq),
681         stream_op_payload_(context_) {}
682 
683   static void ReleaseCall(void* call, grpc_error_handle);
684   static void DestroyCall(void* call, grpc_error_handle);
685 
FromCallStack(grpc_call_stack * call_stack)686   static FilterStackCall* FromCallStack(grpc_call_stack* call_stack) {
687     return reinterpret_cast<FilterStackCall*>(
688         reinterpret_cast<char*>(call_stack) -
689         GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall)));
690   }
691 
692   void ExecuteBatch(grpc_transport_stream_op_batch* batch,
693                     grpc_closure* start_batch_closure);
694   void SetFinalStatus(grpc_error_handle error);
695   BatchControl* ReuseOrAllocateBatchControl(const grpc_op* ops);
696   bool PrepareApplicationMetadata(size_t count, grpc_metadata* metadata,
697                                   bool is_trailing);
698   void PublishAppMetadata(grpc_metadata_batch* b, bool is_trailing);
699   void RecvInitialFilter(grpc_metadata_batch* b);
700   void RecvTrailingFilter(grpc_metadata_batch* b,
701                           grpc_error_handle batch_error);
702 
703   RefCount ext_ref_;
704   CallCombiner call_combiner_;
705   grpc_completion_queue* cq_;
706   grpc_polling_entity pollent_;
707 
708   /// has grpc_call_unref been called
709   bool destroy_called_ = false;
710   // Trailers-only response status
711   bool is_trailers_only_ = false;
712   /// which ops are in-flight
713   bool sent_initial_metadata_ = false;
714   bool sending_message_ = false;
715   bool sent_final_op_ = false;
716   bool received_initial_metadata_ = false;
717   bool receiving_message_ = false;
718   bool requested_final_op_ = false;
719   gpr_atm received_final_op_atm_ = 0;
720 
721   BatchControl* active_batches_[kMaxConcurrentBatches] = {};
722   grpc_transport_stream_op_batch_payload stream_op_payload_;
723 
724   // first idx: is_receiving, second idx: is_trailing
725   grpc_metadata_batch send_initial_metadata_;
726   grpc_metadata_batch send_trailing_metadata_;
727   grpc_metadata_batch recv_initial_metadata_;
728   grpc_metadata_batch recv_trailing_metadata_;
729 
730   // Buffered read metadata waiting to be returned to the application.
731   // Element 0 is initial metadata, element 1 is trailing metadata.
732   grpc_metadata_array* buffered_metadata_[2] = {};
733 
734   // Call data useful used for reporting. Only valid after the call has
735   // completed
736   grpc_call_final_info final_info_;
737 
738   // Contexts for various subsystems (security, tracing, ...).
739   grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
740 
741   SliceBuffer send_slice_buffer_;
742   absl::optional<SliceBuffer> receiving_slice_buffer_;
743   uint32_t receiving_stream_flags_;
744 
745   bool call_failed_before_recv_message_ = false;
746   grpc_byte_buffer** receiving_buffer_ = nullptr;
747   grpc_slice receiving_slice_ = grpc_empty_slice();
748   grpc_closure receiving_stream_ready_;
749   grpc_closure receiving_initial_metadata_ready_;
750   grpc_closure receiving_trailing_metadata_ready_;
751   // Status about operation of call
752   bool sent_server_trailing_metadata_ = false;
753   gpr_atm cancelled_with_error_ = 0;
754 
755   grpc_closure release_call_;
756 
757   union {
758     struct {
759       grpc_status_code* status;
760       grpc_slice* status_details;
761       const char** error_string;
762     } client;
763     struct {
764       int* cancelled;
765       // backpointer to owning server if this is a server side call.
766       ServerInterface* core_server;
767     } server;
768   } final_op_;
769   AtomicError status_error_;
770 
771   // recv_state can contain one of the following values:
772   // RECV_NONE :                 :  no initial metadata and messages received
773   // RECV_INITIAL_METADATA_FIRST :  received initial metadata first
774   // a batch_control*            :  received messages first
775 
776   //             +------1------RECV_NONE------3-----+
777   //             |                                  |
778   //             |                                  |
779   //             v                                  v
780   // RECV_INITIAL_METADATA_FIRST        receiving_stream_ready_bctlp
781   //       |           ^                      |           ^
782   //       |           |                      |           |
783   //       +-----2-----+                      +-----4-----+
784 
785   // For 1, 4: See receiving_initial_metadata_ready() function
786   // For 2, 3: See receiving_stream_ready() function
787   gpr_atm recv_state_ = 0;
788 };
789 
Create(grpc_call_create_args * args,grpc_call ** out_call)790 grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
791                                           grpc_call** out_call) {
792   Channel* channel = args->channel.get();
793 
794   auto add_init_error = [](grpc_error_handle* composite,
795                            grpc_error_handle new_err) {
796     if (new_err.ok()) return;
797     if (composite->ok()) {
798       *composite = GRPC_ERROR_CREATE("Call creation failed");
799     }
800     *composite = grpc_error_add_child(*composite, new_err);
801   };
802 
803   FilterStackCall* call;
804   grpc_error_handle error;
805   grpc_channel_stack* channel_stack = channel->channel_stack();
806   size_t call_alloc_size =
807       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall)) +
808       channel_stack->call_stack_size;
809 
810   Arena* arena = channel->CreateArena();
811   call = new (arena->Alloc(call_alloc_size)) FilterStackCall(arena, *args);
812   GPR_DEBUG_ASSERT(FromC(call->c_ptr()) == call);
813   GPR_DEBUG_ASSERT(FromCallStack(call->call_stack()) == call);
814   *out_call = call->c_ptr();
815   grpc_slice path = grpc_empty_slice();
816   ScopedContext ctx(call);
817   if (call->is_client()) {
818     call->final_op_.client.status_details = nullptr;
819     call->final_op_.client.status = nullptr;
820     call->final_op_.client.error_string = nullptr;
821     global_stats().IncrementClientCallsCreated();
822     path = CSliceRef(args->path->c_slice());
823     call->send_initial_metadata_.Set(HttpPathMetadata(),
824                                      std::move(*args->path));
825     if (args->authority.has_value()) {
826       call->send_initial_metadata_.Set(HttpAuthorityMetadata(),
827                                        std::move(*args->authority));
828     }
829     call->send_initial_metadata_.Set(
830         GrpcRegisteredMethod(), reinterpret_cast<void*>(static_cast<uintptr_t>(
831                                     args->registered_method)));
832     channel_stack->stats_plugin_group->AddClientCallTracers(
833         Slice(CSliceRef(path)), args->registered_method, call->context_);
834   } else {
835     global_stats().IncrementServerCallsCreated();
836     call->final_op_.server.cancelled = nullptr;
837     call->final_op_.server.core_server = args->server;
838     // TODO(yashykt): In the future, we want to also enable stats and trace
839     // collecting from when the call is created at the transport. The idea is
840     // that the transport would create the call tracer and pass it in as part of
841     // the metadata.
842     // TODO(yijiem): OpenCensus and internal Census is still using this way to
843     // set server call tracer. We need to refactor them to stats plugins
844     // (including removing the client channel filters).
845     if (args->server != nullptr &&
846         args->server->server_call_tracer_factory() != nullptr) {
847       auto* server_call_tracer =
848           args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
849               arena, args->server->channel_args());
850       if (server_call_tracer != nullptr) {
851         // Note that we are setting both
852         // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
853         // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future
854         // promise-based world, we would just a single tracer object for each
855         // stack (call, subchannel_call, server_call.)
856         call->ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE,
857                          server_call_tracer, nullptr);
858         call->ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr);
859       }
860     }
861     channel_stack->stats_plugin_group->AddServerCallTracers(call->context_);
862   }
863 
864   Call* parent = Call::FromC(args->parent);
865   if (parent != nullptr) {
866     add_init_error(&error, absl_status_to_grpc_error(call->InitParent(
867                                parent, args->propagation_mask)));
868   }
869   // initial refcount dropped by grpc_call_unref
870   grpc_call_element_args call_args = {
871       call->call_stack(), args->server_transport_data,
872       call->context_,     path,
873       call->start_time(), call->send_deadline(),
874       call->arena(),      &call->call_combiner_};
875   add_init_error(&error, grpc_call_stack_init(channel_stack, 1, DestroyCall,
876                                               call, &call_args));
877   // Publish this call to parent only after the call stack has been initialized.
878   if (parent != nullptr) {
879     call->PublishToParent(parent);
880   }
881 
882   if (!error.ok()) {
883     call->CancelWithError(error);
884   }
885   if (args->cq != nullptr) {
886     GPR_ASSERT(args->pollset_set_alternative == nullptr &&
887                "Only one of 'cq' and 'pollset_set_alternative' should be "
888                "non-nullptr.");
889     GRPC_CQ_INTERNAL_REF(args->cq, "bind");
890     call->pollent_ =
891         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
892   }
893   if (args->pollset_set_alternative != nullptr) {
894     call->pollent_ = grpc_polling_entity_create_from_pollset_set(
895         args->pollset_set_alternative);
896   }
897   if (!grpc_polling_entity_is_empty(&call->pollent_)) {
898     grpc_call_stack_set_pollset_or_pollset_set(call->call_stack(),
899                                                &call->pollent_);
900   }
901 
902   if (call->is_client()) {
903     channelz::ChannelNode* channelz_channel = channel->channelz_node();
904     if (channelz_channel != nullptr) {
905       channelz_channel->RecordCallStarted();
906     }
907   } else if (call->final_op_.server.core_server != nullptr) {
908     channelz::ServerNode* channelz_node =
909         call->final_op_.server.core_server->channelz_node();
910     if (channelz_node != nullptr) {
911       channelz_node->RecordCallStarted();
912     }
913   }
914 
915   CSliceUnref(path);
916 
917   return error;
918 }
919 
SetCompletionQueue(grpc_completion_queue * cq)920 void FilterStackCall::SetCompletionQueue(grpc_completion_queue* cq) {
921   GPR_ASSERT(cq);
922 
923   if (grpc_polling_entity_pollset_set(&pollent_) != nullptr) {
924     Crash("A pollset_set is already registered for this call.");
925   }
926   cq_ = cq;
927   GRPC_CQ_INTERNAL_REF(cq, "bind");
928   pollent_ = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
929   grpc_call_stack_set_pollset_or_pollset_set(call_stack(), &pollent_);
930 }
931 
ReleaseCall(void * call,grpc_error_handle)932 void FilterStackCall::ReleaseCall(void* call, grpc_error_handle /*error*/) {
933   static_cast<FilterStackCall*>(call)->DeleteThis();
934 }
935 
DestroyCall(void * call,grpc_error_handle)936 void FilterStackCall::DestroyCall(void* call, grpc_error_handle /*error*/) {
937   auto* c = static_cast<FilterStackCall*>(call);
938   c->recv_initial_metadata_.Clear();
939   c->recv_trailing_metadata_.Clear();
940   c->receiving_slice_buffer_.reset();
941   ParentCall* pc = c->parent_call();
942   if (pc != nullptr) {
943     pc->~ParentCall();
944   }
945   if (c->cq_) {
946     GRPC_CQ_INTERNAL_UNREF(c->cq_, "bind");
947   }
948 
949   grpc_error_handle status_error = c->status_error_.get();
950   grpc_error_get_status(status_error, c->send_deadline(),
951                         &c->final_info_.final_status, nullptr, nullptr,
952                         &(c->final_info_.error_string));
953   c->status_error_.set(absl::OkStatus());
954   c->final_info_.stats.latency =
955       gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time());
956   grpc_call_stack_destroy(c->call_stack(), &c->final_info_,
957                           GRPC_CLOSURE_INIT(&c->release_call_, ReleaseCall, c,
958                                             grpc_schedule_on_exec_ctx));
959 }
960 
ExternalUnref()961 void FilterStackCall::ExternalUnref() {
962   if (GPR_LIKELY(!ext_ref_.Unref())) return;
963 
964   ApplicationCallbackExecCtx callback_exec_ctx;
965   ExecCtx exec_ctx;
966 
967   GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (this));
968 
969   MaybeUnpublishFromParent();
970 
971   GPR_ASSERT(!destroy_called_);
972   destroy_called_ = true;
973   bool cancel = gpr_atm_acq_load(&received_final_op_atm_) == 0;
974   if (cancel) {
975     CancelWithError(absl::CancelledError());
976   } else {
977     // Unset the call combiner cancellation closure.  This has the
978     // effect of scheduling the previously set cancellation closure, if
979     // any, so that it can release any internal references it may be
980     // holding to the call stack.
981     call_combiner_.SetNotifyOnCancel(nullptr);
982   }
983   InternalUnref("destroy");
984 }
985 
986 // start_batch_closure points to a caller-allocated closure to be used
987 // for entering the call combiner.
ExecuteBatch(grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)988 void FilterStackCall::ExecuteBatch(grpc_transport_stream_op_batch* batch,
989                                    grpc_closure* start_batch_closure) {
990   // This is called via the call combiner to start sending a batch down
991   // the filter stack.
992   auto execute_batch_in_call_combiner = [](void* arg, grpc_error_handle) {
993     grpc_transport_stream_op_batch* batch =
994         static_cast<grpc_transport_stream_op_batch*>(arg);
995     auto* call =
996         static_cast<FilterStackCall*>(batch->handler_private.extra_arg);
997     grpc_call_element* elem = call->call_elem(0);
998     GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
999     elem->filter->start_transport_stream_op_batch(elem, batch);
1000   };
1001   batch->handler_private.extra_arg = this;
1002   GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
1003                     grpc_schedule_on_exec_ctx);
1004   GRPC_CALL_COMBINER_START(call_combiner(), start_batch_closure,
1005                            absl::OkStatus(), "executing batch");
1006 }
1007 
1008 namespace {
1009 struct CancelState {
1010   FilterStackCall* call;
1011   grpc_closure start_batch;
1012   grpc_closure finish_batch;
1013 };
1014 }  // namespace
1015 
1016 // The on_complete callback used when sending a cancel_stream batch down
1017 // the filter stack.  Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error_handle)1018 static void done_termination(void* arg, grpc_error_handle /*error*/) {
1019   CancelState* state = static_cast<CancelState*>(arg);
1020   GRPC_CALL_COMBINER_STOP(state->call->call_combiner(),
1021                           "on_complete for cancel_stream op");
1022   state->call->InternalUnref("termination");
1023   delete state;
1024 }
1025 
CancelWithError(grpc_error_handle error)1026 void FilterStackCall::CancelWithError(grpc_error_handle error) {
1027   if (!gpr_atm_rel_cas(&cancelled_with_error_, 0, 1)) {
1028     return;
1029   }
1030   ClearPeerString();
1031   InternalRef("termination");
1032   // Inform the call combiner of the cancellation, so that it can cancel
1033   // any in-flight asynchronous actions that may be holding the call
1034   // combiner.  This ensures that the cancel_stream batch can be sent
1035   // down the filter stack in a timely manner.
1036   call_combiner_.Cancel(error);
1037   CancelState* state = new CancelState;
1038   state->call = this;
1039   GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
1040                     grpc_schedule_on_exec_ctx);
1041   grpc_transport_stream_op_batch* op =
1042       grpc_make_transport_stream_op(&state->finish_batch);
1043   op->cancel_stream = true;
1044   op->payload->cancel_stream.cancel_error = error;
1045   ExecuteBatch(op, &state->start_batch);
1046 }
1047 
SetFinalStatus(grpc_error_handle error)1048 void FilterStackCall::SetFinalStatus(grpc_error_handle error) {
1049   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
1050     gpr_log(GPR_DEBUG, "set_final_status %s %s", is_client() ? "CLI" : "SVR",
1051             StatusToString(error).c_str());
1052   }
1053   if (is_client()) {
1054     std::string status_details;
1055     grpc_error_get_status(error, send_deadline(), final_op_.client.status,
1056                           &status_details, nullptr,
1057                           final_op_.client.error_string);
1058     *final_op_.client.status_details =
1059         grpc_slice_from_cpp_string(std::move(status_details));
1060     status_error_.set(error);
1061     channelz::ChannelNode* channelz_channel = channel()->channelz_node();
1062     if (channelz_channel != nullptr) {
1063       if (*final_op_.client.status != GRPC_STATUS_OK) {
1064         channelz_channel->RecordCallFailed();
1065       } else {
1066         channelz_channel->RecordCallSucceeded();
1067       }
1068     }
1069   } else {
1070     *final_op_.server.cancelled =
1071         !error.ok() || !sent_server_trailing_metadata_;
1072     channelz::ServerNode* channelz_node =
1073         final_op_.server.core_server->channelz_node();
1074     if (channelz_node != nullptr) {
1075       if (*final_op_.server.cancelled || !status_error_.ok()) {
1076         channelz_node->RecordCallFailed();
1077       } else {
1078         channelz_node->RecordCallSucceeded();
1079       }
1080     }
1081   }
1082 }
1083 
PrepareApplicationMetadata(size_t count,grpc_metadata * metadata,bool is_trailing)1084 bool FilterStackCall::PrepareApplicationMetadata(size_t count,
1085                                                  grpc_metadata* metadata,
1086                                                  bool is_trailing) {
1087   grpc_metadata_batch* batch =
1088       is_trailing ? &send_trailing_metadata_ : &send_initial_metadata_;
1089   for (size_t i = 0; i < count; i++) {
1090     grpc_metadata* md = &metadata[i];
1091     if (!GRPC_LOG_IF_ERROR("validate_metadata",
1092                            grpc_validate_header_key_is_legal(md->key))) {
1093       return false;
1094     } else if (!grpc_is_binary_header_internal(md->key) &&
1095                !GRPC_LOG_IF_ERROR(
1096                    "validate_metadata",
1097                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
1098       return false;
1099     } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
1100       // HTTP2 hpack encoding has a maximum limit.
1101       return false;
1102     } else if (grpc_slice_str_cmp(md->key, "content-length") == 0) {
1103       // Filter "content-length metadata"
1104       continue;
1105     }
1106     batch->Append(StringViewFromSlice(md->key), Slice(CSliceRef(md->value)),
1107                   [md](absl::string_view error, const Slice& value) {
1108                     gpr_log(GPR_DEBUG, "Append error: %s",
1109                             absl::StrCat("key=", StringViewFromSlice(md->key),
1110                                          " error=", error,
1111                                          " value=", value.as_string_view())
1112                                 .c_str());
1113                   });
1114   }
1115 
1116   return true;
1117 }
1118 
1119 namespace {
1120 class PublishToAppEncoder {
1121  public:
PublishToAppEncoder(grpc_metadata_array * dest,const grpc_metadata_batch * encoding,bool is_client)1122   explicit PublishToAppEncoder(grpc_metadata_array* dest,
1123                                const grpc_metadata_batch* encoding,
1124                                bool is_client)
1125       : dest_(dest), encoding_(encoding), is_client_(is_client) {}
1126 
Encode(const Slice & key,const Slice & value)1127   void Encode(const Slice& key, const Slice& value) {
1128     Append(key.c_slice(), value.c_slice());
1129   }
1130 
1131   // Catch anything that is not explicitly handled, and do not publish it to the
1132   // application. If new metadata is added to a batch that needs to be
1133   // published, it should be called out here.
1134   template <typename Which>
Encode(Which,const typename Which::ValueType &)1135   void Encode(Which, const typename Which::ValueType&) {}
1136 
Encode(UserAgentMetadata,const Slice & slice)1137   void Encode(UserAgentMetadata, const Slice& slice) {
1138     Append(UserAgentMetadata::key(), slice);
1139   }
1140 
Encode(HostMetadata,const Slice & slice)1141   void Encode(HostMetadata, const Slice& slice) {
1142     Append(HostMetadata::key(), slice);
1143   }
1144 
Encode(GrpcPreviousRpcAttemptsMetadata,uint32_t count)1145   void Encode(GrpcPreviousRpcAttemptsMetadata, uint32_t count) {
1146     Append(GrpcPreviousRpcAttemptsMetadata::key(), count);
1147   }
1148 
Encode(GrpcRetryPushbackMsMetadata,Duration count)1149   void Encode(GrpcRetryPushbackMsMetadata, Duration count) {
1150     Append(GrpcRetryPushbackMsMetadata::key(), count.millis());
1151   }
1152 
Encode(LbTokenMetadata,const Slice & slice)1153   void Encode(LbTokenMetadata, const Slice& slice) {
1154     Append(LbTokenMetadata::key(), slice);
1155   }
1156 
1157  private:
Append(absl::string_view key,int64_t value)1158   void Append(absl::string_view key, int64_t value) {
1159     Append(StaticSlice::FromStaticString(key).c_slice(),
1160            Slice::FromInt64(value).c_slice());
1161   }
1162 
Append(absl::string_view key,const Slice & value)1163   void Append(absl::string_view key, const Slice& value) {
1164     Append(StaticSlice::FromStaticString(key).c_slice(), value.c_slice());
1165   }
1166 
Append(grpc_slice key,grpc_slice value)1167   void Append(grpc_slice key, grpc_slice value) {
1168     if (dest_->count == dest_->capacity) {
1169       Crash(absl::StrCat(
1170           "Too many metadata entries: capacity=", dest_->capacity, " on ",
1171           is_client_ ? "client" : "server", " encoding ", encoding_->count(),
1172           " elements: ", encoding_->DebugString().c_str()));
1173     }
1174     auto* mdusr = &dest_->metadata[dest_->count++];
1175     mdusr->key = key;
1176     mdusr->value = value;
1177   }
1178 
1179   grpc_metadata_array* const dest_;
1180   const grpc_metadata_batch* const encoding_;
1181   const bool is_client_;
1182 };
1183 }  // namespace
1184 
PublishAppMetadata(grpc_metadata_batch * b,bool is_trailing)1185 void FilterStackCall::PublishAppMetadata(grpc_metadata_batch* b,
1186                                          bool is_trailing) {
1187   if (b->count() == 0) return;
1188   if (!is_client() && is_trailing) return;
1189   if (is_trailing && buffered_metadata_[1] == nullptr) return;
1190   grpc_metadata_array* dest;
1191   dest = buffered_metadata_[is_trailing];
1192   if (dest->count + b->count() > dest->capacity) {
1193     dest->capacity =
1194         std::max(dest->capacity + b->count(), dest->capacity * 3 / 2);
1195     dest->metadata = static_cast<grpc_metadata*>(
1196         gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1197   }
1198   PublishToAppEncoder encoder(dest, b, is_client());
1199   b->Encode(&encoder);
1200 }
1201 
RecvInitialFilter(grpc_metadata_batch * b)1202 void FilterStackCall::RecvInitialFilter(grpc_metadata_batch* b) {
1203   ProcessIncomingInitialMetadata(*b);
1204   PublishAppMetadata(b, false);
1205 }
1206 
RecvTrailingFilter(grpc_metadata_batch * b,grpc_error_handle batch_error)1207 void FilterStackCall::RecvTrailingFilter(grpc_metadata_batch* b,
1208                                          grpc_error_handle batch_error) {
1209   if (!batch_error.ok()) {
1210     SetFinalStatus(batch_error);
1211   } else {
1212     absl::optional<grpc_status_code> grpc_status =
1213         b->Take(GrpcStatusMetadata());
1214     if (grpc_status.has_value()) {
1215       grpc_status_code status_code = *grpc_status;
1216       grpc_error_handle error;
1217       if (status_code != GRPC_STATUS_OK) {
1218         Slice peer = GetPeerString();
1219         error = grpc_error_set_int(
1220             GRPC_ERROR_CREATE(absl::StrCat("Error received from peer ",
1221                                            peer.as_string_view())),
1222             StatusIntProperty::kRpcStatus, static_cast<intptr_t>(status_code));
1223       }
1224       auto grpc_message = b->Take(GrpcMessageMetadata());
1225       if (grpc_message.has_value()) {
1226         error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage,
1227                                    grpc_message->as_string_view());
1228       } else if (!error.ok()) {
1229         error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage, "");
1230       }
1231       SetFinalStatus(error);
1232     } else if (!is_client()) {
1233       SetFinalStatus(absl::OkStatus());
1234     } else {
1235       gpr_log(GPR_DEBUG,
1236               "Received trailing metadata with no error and no status");
1237       SetFinalStatus(grpc_error_set_int(GRPC_ERROR_CREATE("No status received"),
1238                                         StatusIntProperty::kRpcStatus,
1239                                         GRPC_STATUS_UNKNOWN));
1240     }
1241   }
1242   PublishAppMetadata(b, true);
1243 }
1244 
1245 namespace {
AreWriteFlagsValid(uint32_t flags)1246 bool AreWriteFlagsValid(uint32_t flags) {
1247   // check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set
1248   const uint32_t allowed_write_positions =
1249       (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1250   const uint32_t invalid_positions = ~allowed_write_positions;
1251   return !(flags & invalid_positions);
1252 }
1253 
AreInitialMetadataFlagsValid(uint32_t flags)1254 bool AreInitialMetadataFlagsValid(uint32_t flags) {
1255   // check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set
1256   uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1257   return !(flags & invalid_positions);
1258 }
1259 
BatchSlotForOp(grpc_op_type type)1260 size_t BatchSlotForOp(grpc_op_type type) {
1261   switch (type) {
1262     case GRPC_OP_SEND_INITIAL_METADATA:
1263       return 0;
1264     case GRPC_OP_SEND_MESSAGE:
1265       return 1;
1266     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1267     case GRPC_OP_SEND_STATUS_FROM_SERVER:
1268       return 2;
1269     case GRPC_OP_RECV_INITIAL_METADATA:
1270       return 3;
1271     case GRPC_OP_RECV_MESSAGE:
1272       return 4;
1273     case GRPC_OP_RECV_CLOSE_ON_SERVER:
1274     case GRPC_OP_RECV_STATUS_ON_CLIENT:
1275       return 5;
1276   }
1277   GPR_UNREACHABLE_CODE(return 123456789);
1278 }
1279 }  // namespace
1280 
ReuseOrAllocateBatchControl(const grpc_op * ops)1281 FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl(
1282     const grpc_op* ops) {
1283   size_t slot_idx = BatchSlotForOp(ops[0].op);
1284   BatchControl** pslot = &active_batches_[slot_idx];
1285   BatchControl* bctl;
1286   if (*pslot != nullptr) {
1287     bctl = *pslot;
1288     if (bctl->call_ != nullptr) {
1289       return nullptr;
1290     }
1291     bctl->~BatchControl();
1292     bctl->op_ = {};
1293     new (&bctl->batch_error_) AtomicError();
1294   } else {
1295     bctl = arena()->New<BatchControl>();
1296     *pslot = bctl;
1297   }
1298   bctl->call_ = this;
1299   bctl->call_tracer_ = static_cast<CallTracerAnnotationInterface*>(
1300       ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE));
1301   bctl->op_.payload = &stream_op_payload_;
1302   return bctl;
1303 }
1304 
PostCompletion()1305 void FilterStackCall::BatchControl::PostCompletion() {
1306   FilterStackCall* call = call_;
1307   grpc_error_handle error = batch_error_.get();
1308 
1309   if (IsCallStatusOverrideOnCancellationEnabled()) {
1310     // On the client side, if final call status is already known (i.e if this op
1311     // includes recv_trailing_metadata) and if the call status is known to be
1312     // OK, then disregard the batch error to ensure call->receiving_buffer_ is
1313     // not cleared.
1314     if (op_.recv_trailing_metadata && call->is_client() &&
1315         call->status_error_.ok()) {
1316       error = absl::OkStatus();
1317     }
1318   }
1319 
1320   if (grpc_call_trace.enabled()) {
1321     gpr_log(GPR_DEBUG, "tag:%p batch_error=%s op:%s",
1322             completion_data_.notify_tag.tag, error.ToString().c_str(),
1323             grpc_transport_stream_op_batch_string(&op_, false).c_str());
1324   }
1325 
1326   if (op_.send_initial_metadata) {
1327     call->send_initial_metadata_.Clear();
1328   }
1329   if (op_.send_message) {
1330     if (op_.payload->send_message.stream_write_closed && error.ok()) {
1331       error = grpc_error_add_child(
1332           error, GRPC_ERROR_CREATE(
1333                      "Attempt to send message after stream was closed."));
1334     }
1335     call->sending_message_ = false;
1336     call->send_slice_buffer_.Clear();
1337   }
1338   if (op_.send_trailing_metadata) {
1339     call->send_trailing_metadata_.Clear();
1340   }
1341 
1342   if (!error.ok() && op_.recv_message && *call->receiving_buffer_ != nullptr) {
1343     grpc_byte_buffer_destroy(*call->receiving_buffer_);
1344     *call->receiving_buffer_ = nullptr;
1345   }
1346   if (op_.recv_trailing_metadata) {
1347     // propagate cancellation to any interested children
1348     gpr_atm_rel_store(&call->received_final_op_atm_, 1);
1349     call->PropagateCancellationToChildren();
1350     error = absl::OkStatus();
1351   }
1352   batch_error_.set(absl::OkStatus());
1353 
1354   if (completion_data_.notify_tag.is_closure) {
1355     call_ = nullptr;
1356     Closure::Run(DEBUG_LOCATION,
1357                  static_cast<grpc_closure*>(completion_data_.notify_tag.tag),
1358                  error);
1359     call->InternalUnref("completion");
1360   } else {
1361     grpc_cq_end_op(
1362         call->cq_, completion_data_.notify_tag.tag, error,
1363         [](void* user_data, grpc_cq_completion* /*storage*/) {
1364           BatchControl* bctl = static_cast<BatchControl*>(user_data);
1365           Call* call = bctl->call_;
1366           bctl->call_ = nullptr;
1367           call->InternalUnref("completion");
1368         },
1369         this, &completion_data_.cq_completion);
1370   }
1371 }
1372 
FinishStep(PendingOp op)1373 void FilterStackCall::BatchControl::FinishStep(PendingOp op) {
1374   if (GPR_UNLIKELY(completed_batch_step(op))) {
1375     PostCompletion();
1376   }
1377 }
1378 
ProcessDataAfterMetadata()1379 void FilterStackCall::BatchControl::ProcessDataAfterMetadata() {
1380   FilterStackCall* call = call_;
1381   if (!call->receiving_slice_buffer_.has_value()) {
1382     *call->receiving_buffer_ = nullptr;
1383     call->receiving_message_ = false;
1384     FinishStep(PendingOp::kRecvMessage);
1385   } else {
1386     call->NoteLastMessageFlags(call->receiving_stream_flags_);
1387     if ((call->receiving_stream_flags_ & GRPC_WRITE_INTERNAL_COMPRESS) &&
1388         (call->incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
1389       *call->receiving_buffer_ = grpc_raw_compressed_byte_buffer_create(
1390           nullptr, 0, call->incoming_compression_algorithm());
1391     } else {
1392       *call->receiving_buffer_ = grpc_raw_byte_buffer_create(nullptr, 0);
1393     }
1394     grpc_slice_buffer_move_into(
1395         call->receiving_slice_buffer_->c_slice_buffer(),
1396         &(*call->receiving_buffer_)->data.raw.slice_buffer);
1397     call->receiving_message_ = false;
1398     call->receiving_slice_buffer_.reset();
1399     FinishStep(PendingOp::kRecvMessage);
1400   }
1401 }
1402 
ReceivingStreamReady(grpc_error_handle error)1403 void FilterStackCall::BatchControl::ReceivingStreamReady(
1404     grpc_error_handle error) {
1405   if (grpc_call_trace.enabled()) {
1406     gpr_log(GPR_DEBUG,
1407             "tag:%p ReceivingStreamReady error=%s "
1408             "receiving_slice_buffer.has_value=%d recv_state=%" PRIdPTR,
1409             completion_data_.notify_tag.tag, error.ToString().c_str(),
1410             call_->receiving_slice_buffer_.has_value(),
1411             gpr_atm_no_barrier_load(&call_->recv_state_));
1412   }
1413   FilterStackCall* call = call_;
1414   if (!error.ok()) {
1415     call->receiving_slice_buffer_.reset();
1416     if (batch_error_.ok()) {
1417       batch_error_.set(error);
1418     }
1419     call->CancelWithError(error);
1420   }
1421   // If recv_state is kRecvNone, we will save the batch_control
1422   // object with rel_cas, and will not use it after the cas. Its corresponding
1423   // acq_load is in receiving_initial_metadata_ready()
1424   if (!error.ok() || !call->receiving_slice_buffer_.has_value() ||
1425       !gpr_atm_rel_cas(&call->recv_state_, kRecvNone,
1426                        reinterpret_cast<gpr_atm>(this))) {
1427     ProcessDataAfterMetadata();
1428   }
1429 }
1430 
ReceivingInitialMetadataReady(grpc_error_handle error)1431 void FilterStackCall::BatchControl::ReceivingInitialMetadataReady(
1432     grpc_error_handle error) {
1433   FilterStackCall* call = call_;
1434 
1435   GRPC_CALL_COMBINER_STOP(call->call_combiner(), "recv_initial_metadata_ready");
1436 
1437   if (error.ok()) {
1438     grpc_metadata_batch* md = &call->recv_initial_metadata_;
1439     call->RecvInitialFilter(md);
1440 
1441     absl::optional<Timestamp> deadline = md->get(GrpcTimeoutMetadata());
1442     if (deadline.has_value() && !call->is_client()) {
1443       call_->set_send_deadline(*deadline);
1444     }
1445   } else {
1446     if (batch_error_.ok()) {
1447       batch_error_.set(error);
1448     }
1449     call->CancelWithError(error);
1450   }
1451 
1452   grpc_closure* saved_rsr_closure = nullptr;
1453   while (true) {
1454     gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state_);
1455     // Should only receive initial metadata once
1456     GPR_ASSERT(rsr_bctlp != 1);
1457     if (rsr_bctlp == 0) {
1458       // We haven't seen initial metadata and messages before, thus initial
1459       // metadata is received first.
1460       // no_barrier_cas is used, as this function won't access the batch_control
1461       // object saved by receiving_stream_ready() if the initial metadata is
1462       // received first.
1463       if (gpr_atm_no_barrier_cas(&call->recv_state_, kRecvNone,
1464                                  kRecvInitialMetadataFirst)) {
1465         break;
1466       }
1467     } else {
1468       // Already received messages
1469       saved_rsr_closure = GRPC_CLOSURE_CREATE(
1470           [](void* bctl, grpc_error_handle error) {
1471             static_cast<BatchControl*>(bctl)->ReceivingStreamReady(error);
1472           },
1473           reinterpret_cast<BatchControl*>(rsr_bctlp),
1474           grpc_schedule_on_exec_ctx);
1475       // No need to modify recv_state
1476       break;
1477     }
1478   }
1479   if (saved_rsr_closure != nullptr) {
1480     Closure::Run(DEBUG_LOCATION, saved_rsr_closure, error);
1481   }
1482 
1483   FinishStep(PendingOp::kRecvInitialMetadata);
1484 }
1485 
ReceivingTrailingMetadataReady(grpc_error_handle error)1486 void FilterStackCall::BatchControl::ReceivingTrailingMetadataReady(
1487     grpc_error_handle error) {
1488   GRPC_CALL_COMBINER_STOP(call_->call_combiner(),
1489                           "recv_trailing_metadata_ready");
1490   grpc_metadata_batch* md = &call_->recv_trailing_metadata_;
1491   call_->RecvTrailingFilter(md, error);
1492   FinishStep(PendingOp::kRecvTrailingMetadata);
1493 }
1494 
FinishBatch(grpc_error_handle error)1495 void FilterStackCall::BatchControl::FinishBatch(grpc_error_handle error) {
1496   GRPC_CALL_COMBINER_STOP(call_->call_combiner(), "on_complete");
1497   if (batch_error_.ok()) {
1498     batch_error_.set(error);
1499   }
1500   if (!error.ok()) {
1501     call_->CancelWithError(error);
1502   }
1503   FinishStep(PendingOp::kSends);
1504 }
1505 
1506 namespace {
EndOpImmediately(grpc_completion_queue * cq,void * notify_tag,bool is_notify_tag_closure)1507 void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag,
1508                       bool is_notify_tag_closure) {
1509   if (!is_notify_tag_closure) {
1510     GPR_ASSERT(grpc_cq_begin_op(cq, notify_tag));
1511     grpc_cq_end_op(
1512         cq, notify_tag, absl::OkStatus(),
1513         [](void*, grpc_cq_completion* completion) { gpr_free(completion); },
1514         nullptr,
1515         static_cast<grpc_cq_completion*>(
1516             gpr_malloc(sizeof(grpc_cq_completion))));
1517   } else {
1518     Closure::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(notify_tag),
1519                  absl::OkStatus());
1520   }
1521 }
1522 }  // namespace
1523 
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)1524 grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
1525                                             void* notify_tag,
1526                                             bool is_notify_tag_closure) {
1527   size_t i;
1528   const grpc_op* op;
1529   BatchControl* bctl;
1530   grpc_call_error error = GRPC_CALL_OK;
1531   grpc_transport_stream_op_batch* stream_op;
1532   grpc_transport_stream_op_batch_payload* stream_op_payload;
1533   uint32_t seen_ops = 0;
1534   intptr_t pending_ops = 0;
1535 
1536   for (i = 0; i < nops; i++) {
1537     if (seen_ops & (1u << ops[i].op)) {
1538       return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1539     }
1540     seen_ops |= (1u << ops[i].op);
1541   }
1542 
1543   if (!is_client() &&
1544       (seen_ops & (1u << GRPC_OP_SEND_STATUS_FROM_SERVER)) != 0 &&
1545       (seen_ops & (1u << GRPC_OP_RECV_MESSAGE)) != 0) {
1546     gpr_log(GPR_ERROR,
1547             "******************* SEND_STATUS WITH RECV_MESSAGE "
1548             "*******************");
1549     return GRPC_CALL_ERROR;
1550   }
1551 
1552   GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1553 
1554   if (nops == 0) {
1555     EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
1556     error = GRPC_CALL_OK;
1557     goto done;
1558   }
1559 
1560   bctl = ReuseOrAllocateBatchControl(ops);
1561   if (bctl == nullptr) {
1562     return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1563   }
1564   bctl->completion_data_.notify_tag.tag = notify_tag;
1565   bctl->completion_data_.notify_tag.is_closure =
1566       static_cast<uint8_t>(is_notify_tag_closure != 0);
1567 
1568   stream_op = &bctl->op_;
1569   stream_op_payload = &stream_op_payload_;
1570 
1571   // rewrite batch ops into a transport op
1572   for (i = 0; i < nops; i++) {
1573     op = &ops[i];
1574     if (op->reserved != nullptr) {
1575       error = GRPC_CALL_ERROR;
1576       goto done_with_error;
1577     }
1578     switch (op->op) {
1579       case GRPC_OP_SEND_INITIAL_METADATA: {
1580         // Flag validation: currently allow no flags
1581         if (!AreInitialMetadataFlagsValid(op->flags)) {
1582           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1583           goto done_with_error;
1584         }
1585         if (sent_initial_metadata_) {
1586           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1587           goto done_with_error;
1588         }
1589         if (op->data.send_initial_metadata.count > INT_MAX) {
1590           error = GRPC_CALL_ERROR_INVALID_METADATA;
1591           goto done_with_error;
1592         }
1593         stream_op->send_initial_metadata = true;
1594         sent_initial_metadata_ = true;
1595         if (!PrepareApplicationMetadata(op->data.send_initial_metadata.count,
1596                                         op->data.send_initial_metadata.metadata,
1597                                         false)) {
1598           error = GRPC_CALL_ERROR_INVALID_METADATA;
1599           goto done_with_error;
1600         }
1601         PrepareOutgoingInitialMetadata(*op, send_initial_metadata_);
1602         // TODO(ctiller): just make these the same variable?
1603         if (is_client() && send_deadline() != Timestamp::InfFuture()) {
1604           send_initial_metadata_.Set(GrpcTimeoutMetadata(), send_deadline());
1605         }
1606         if (is_client()) {
1607           send_initial_metadata_.Set(
1608               WaitForReady(),
1609               WaitForReady::ValueType{
1610                   (op->flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) != 0,
1611                   (op->flags &
1612                    GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) != 0});
1613         }
1614         stream_op_payload->send_initial_metadata.send_initial_metadata =
1615             &send_initial_metadata_;
1616         pending_ops |= PendingOpMask(PendingOp::kSends);
1617         break;
1618       }
1619       case GRPC_OP_SEND_MESSAGE: {
1620         if (!AreWriteFlagsValid(op->flags)) {
1621           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1622           goto done_with_error;
1623         }
1624         if (op->data.send_message.send_message == nullptr) {
1625           error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1626           goto done_with_error;
1627         }
1628         if (sending_message_) {
1629           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1630           goto done_with_error;
1631         }
1632         uint32_t flags = op->flags;
1633         // If the outgoing buffer is already compressed, mark it as so in the
1634         // flags. These will be picked up by the compression filter and further
1635         // (wasteful) attempts at compression skipped.
1636         if (op->data.send_message.send_message->data.raw.compression >
1637             GRPC_COMPRESS_NONE) {
1638           flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1639         }
1640         stream_op->send_message = true;
1641         sending_message_ = true;
1642         send_slice_buffer_.Clear();
1643         grpc_slice_buffer_move_into(
1644             &op->data.send_message.send_message->data.raw.slice_buffer,
1645             send_slice_buffer_.c_slice_buffer());
1646         stream_op_payload->send_message.flags = flags;
1647         stream_op_payload->send_message.send_message = &send_slice_buffer_;
1648         pending_ops |= PendingOpMask(PendingOp::kSends);
1649         break;
1650       }
1651       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1652         // Flag validation: currently allow no flags
1653         if (op->flags != 0) {
1654           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1655           goto done_with_error;
1656         }
1657         if (!is_client()) {
1658           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1659           goto done_with_error;
1660         }
1661         if (sent_final_op_) {
1662           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1663           goto done_with_error;
1664         }
1665         stream_op->send_trailing_metadata = true;
1666         sent_final_op_ = true;
1667         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1668             &send_trailing_metadata_;
1669         pending_ops |= PendingOpMask(PendingOp::kSends);
1670         break;
1671       }
1672       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1673         // Flag validation: currently allow no flags
1674         if (op->flags != 0) {
1675           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1676           goto done_with_error;
1677         }
1678         if (is_client()) {
1679           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1680           goto done_with_error;
1681         }
1682         if (sent_final_op_) {
1683           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1684           goto done_with_error;
1685         }
1686         if (op->data.send_status_from_server.trailing_metadata_count >
1687             INT_MAX) {
1688           error = GRPC_CALL_ERROR_INVALID_METADATA;
1689           goto done_with_error;
1690         }
1691         stream_op->send_trailing_metadata = true;
1692         sent_final_op_ = true;
1693 
1694         if (!PrepareApplicationMetadata(
1695                 op->data.send_status_from_server.trailing_metadata_count,
1696                 op->data.send_status_from_server.trailing_metadata, true)) {
1697           error = GRPC_CALL_ERROR_INVALID_METADATA;
1698           goto done_with_error;
1699         }
1700 
1701         grpc_error_handle status_error =
1702             op->data.send_status_from_server.status == GRPC_STATUS_OK
1703                 ? absl::OkStatus()
1704                 : grpc_error_set_int(
1705                       GRPC_ERROR_CREATE("Server returned error"),
1706                       StatusIntProperty::kRpcStatus,
1707                       static_cast<intptr_t>(
1708                           op->data.send_status_from_server.status));
1709         if (op->data.send_status_from_server.status_details != nullptr) {
1710           send_trailing_metadata_.Set(
1711               GrpcMessageMetadata(),
1712               Slice(grpc_slice_copy(
1713                   *op->data.send_status_from_server.status_details)));
1714           if (!status_error.ok()) {
1715             status_error = grpc_error_set_str(
1716                 status_error, StatusStrProperty::kGrpcMessage,
1717                 StringViewFromSlice(
1718                     *op->data.send_status_from_server.status_details));
1719           }
1720         }
1721 
1722         status_error_.set(status_error);
1723 
1724         send_trailing_metadata_.Set(GrpcStatusMetadata(),
1725                                     op->data.send_status_from_server.status);
1726 
1727         // Ignore any te metadata key value pairs specified.
1728         send_trailing_metadata_.Remove(TeMetadata());
1729         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1730             &send_trailing_metadata_;
1731         stream_op_payload->send_trailing_metadata.sent =
1732             &sent_server_trailing_metadata_;
1733         pending_ops |= PendingOpMask(PendingOp::kSends);
1734         break;
1735       }
1736       case GRPC_OP_RECV_INITIAL_METADATA: {
1737         // Flag validation: currently allow no flags
1738         if (op->flags != 0) {
1739           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1740           goto done_with_error;
1741         }
1742         if (received_initial_metadata_) {
1743           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1744           goto done_with_error;
1745         }
1746         received_initial_metadata_ = true;
1747         buffered_metadata_[0] =
1748             op->data.recv_initial_metadata.recv_initial_metadata;
1749         GRPC_CLOSURE_INIT(
1750             &receiving_initial_metadata_ready_,
1751             [](void* bctl, grpc_error_handle error) {
1752               static_cast<BatchControl*>(bctl)->ReceivingInitialMetadataReady(
1753                   error);
1754             },
1755             bctl, grpc_schedule_on_exec_ctx);
1756         stream_op->recv_initial_metadata = true;
1757         stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1758             &recv_initial_metadata_;
1759         stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1760             &receiving_initial_metadata_ready_;
1761         if (is_client()) {
1762           stream_op_payload->recv_initial_metadata.trailing_metadata_available =
1763               &is_trailers_only_;
1764         }
1765         pending_ops |= PendingOpMask(PendingOp::kRecvInitialMetadata);
1766         break;
1767       }
1768       case GRPC_OP_RECV_MESSAGE: {
1769         // Flag validation: currently allow no flags
1770         if (op->flags != 0) {
1771           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1772           goto done_with_error;
1773         }
1774         if (receiving_message_) {
1775           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1776           goto done_with_error;
1777         }
1778         receiving_message_ = true;
1779         stream_op->recv_message = true;
1780         receiving_slice_buffer_.reset();
1781         receiving_buffer_ = op->data.recv_message.recv_message;
1782         stream_op_payload->recv_message.recv_message = &receiving_slice_buffer_;
1783         receiving_stream_flags_ = 0;
1784         stream_op_payload->recv_message.flags = &receiving_stream_flags_;
1785         stream_op_payload->recv_message.call_failed_before_recv_message =
1786             &call_failed_before_recv_message_;
1787         GRPC_CLOSURE_INIT(
1788             &receiving_stream_ready_,
1789             [](void* bctlp, grpc_error_handle error) {
1790               auto* bctl = static_cast<BatchControl*>(bctlp);
1791               auto* call = bctl->call_;
1792               //  Yields the call combiner before processing the received
1793               //  message.
1794               GRPC_CALL_COMBINER_STOP(call->call_combiner(),
1795                                       "recv_message_ready");
1796               bctl->ReceivingStreamReady(error);
1797             },
1798             bctl, grpc_schedule_on_exec_ctx);
1799         stream_op_payload->recv_message.recv_message_ready =
1800             &receiving_stream_ready_;
1801         pending_ops |= PendingOpMask(PendingOp::kRecvMessage);
1802         break;
1803       }
1804       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1805         // Flag validation: currently allow no flags
1806         if (op->flags != 0) {
1807           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1808           goto done_with_error;
1809         }
1810         if (!is_client()) {
1811           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1812           goto done_with_error;
1813         }
1814         if (requested_final_op_) {
1815           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1816           goto done_with_error;
1817         }
1818         requested_final_op_ = true;
1819         buffered_metadata_[1] =
1820             op->data.recv_status_on_client.trailing_metadata;
1821         final_op_.client.status = op->data.recv_status_on_client.status;
1822         final_op_.client.status_details =
1823             op->data.recv_status_on_client.status_details;
1824         final_op_.client.error_string =
1825             op->data.recv_status_on_client.error_string;
1826         stream_op->recv_trailing_metadata = true;
1827         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1828             &recv_trailing_metadata_;
1829         stream_op_payload->recv_trailing_metadata.collect_stats =
1830             &final_info_.stats.transport_stream_stats;
1831         GRPC_CLOSURE_INIT(
1832             &receiving_trailing_metadata_ready_,
1833             [](void* bctl, grpc_error_handle error) {
1834               static_cast<BatchControl*>(bctl)->ReceivingTrailingMetadataReady(
1835                   error);
1836             },
1837             bctl, grpc_schedule_on_exec_ctx);
1838         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1839             &receiving_trailing_metadata_ready_;
1840         pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata);
1841         break;
1842       }
1843       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1844         // Flag validation: currently allow no flags
1845         if (op->flags != 0) {
1846           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1847           goto done_with_error;
1848         }
1849         if (is_client()) {
1850           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1851           goto done_with_error;
1852         }
1853         if (requested_final_op_) {
1854           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1855           goto done_with_error;
1856         }
1857         requested_final_op_ = true;
1858         final_op_.server.cancelled = op->data.recv_close_on_server.cancelled;
1859         stream_op->recv_trailing_metadata = true;
1860         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1861             &recv_trailing_metadata_;
1862         stream_op_payload->recv_trailing_metadata.collect_stats =
1863             &final_info_.stats.transport_stream_stats;
1864         GRPC_CLOSURE_INIT(
1865             &receiving_trailing_metadata_ready_,
1866             [](void* bctl, grpc_error_handle error) {
1867               static_cast<BatchControl*>(bctl)->ReceivingTrailingMetadataReady(
1868                   error);
1869             },
1870             bctl, grpc_schedule_on_exec_ctx);
1871         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1872             &receiving_trailing_metadata_ready_;
1873         pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata);
1874         break;
1875       }
1876     }
1877   }
1878 
1879   InternalRef("completion");
1880   if (!is_notify_tag_closure) {
1881     GPR_ASSERT(grpc_cq_begin_op(cq_, notify_tag));
1882   }
1883   bctl->set_pending_ops(pending_ops);
1884 
1885   if (pending_ops & PendingOpMask(PendingOp::kSends)) {
1886     GRPC_CLOSURE_INIT(
1887         &bctl->finish_batch_,
1888         [](void* bctl, grpc_error_handle error) {
1889           static_cast<BatchControl*>(bctl)->FinishBatch(error);
1890         },
1891         bctl, grpc_schedule_on_exec_ctx);
1892     stream_op->on_complete = &bctl->finish_batch_;
1893   }
1894 
1895   if (grpc_call_trace.enabled()) {
1896     gpr_log(GPR_DEBUG, "BATCH:%p START:%s BATCH:%s (tag:%p)", bctl,
1897             PendingOpString(pending_ops).c_str(),
1898             grpc_transport_stream_op_batch_string(stream_op, false).c_str(),
1899             bctl->completion_data_.notify_tag.tag);
1900   }
1901   ExecuteBatch(stream_op, &bctl->start_batch_);
1902 
1903 done:
1904   return error;
1905 
1906 done_with_error:
1907   // reverse any mutations that occurred
1908   if (stream_op->send_initial_metadata) {
1909     sent_initial_metadata_ = false;
1910     send_initial_metadata_.Clear();
1911   }
1912   if (stream_op->send_message) {
1913     sending_message_ = false;
1914   }
1915   if (stream_op->send_trailing_metadata) {
1916     sent_final_op_ = false;
1917     send_trailing_metadata_.Clear();
1918   }
1919   if (stream_op->recv_initial_metadata) {
1920     received_initial_metadata_ = false;
1921   }
1922   if (stream_op->recv_message) {
1923     receiving_message_ = false;
1924   }
1925   if (stream_op->recv_trailing_metadata) {
1926     requested_final_op_ = false;
1927   }
1928   goto done;
1929 }
1930 
ContextSet(grpc_context_index elem,void * value,void (* destroy)(void *))1931 void FilterStackCall::ContextSet(grpc_context_index elem, void* value,
1932                                  void (*destroy)(void*)) {
1933   if (context_[elem].destroy) {
1934     context_[elem].destroy(context_[elem].value);
1935   }
1936   context_[elem].value = value;
1937   context_[elem].destroy = destroy;
1938 }
1939 
1940 ///////////////////////////////////////////////////////////////////////////////
1941 // Metadata validation helpers
1942 
1943 namespace {
ValidateMetadata(size_t count,grpc_metadata * metadata)1944 bool ValidateMetadata(size_t count, grpc_metadata* metadata) {
1945   if (count > INT_MAX) {
1946     return false;
1947   }
1948   for (size_t i = 0; i < count; i++) {
1949     grpc_metadata* md = &metadata[i];
1950     if (!GRPC_LOG_IF_ERROR("validate_metadata",
1951                            grpc_validate_header_key_is_legal(md->key))) {
1952       return false;
1953     } else if (!grpc_is_binary_header_internal(md->key) &&
1954                !GRPC_LOG_IF_ERROR(
1955                    "validate_metadata",
1956                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
1957       return false;
1958     } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
1959       // HTTP2 hpack encoding has a maximum limit.
1960       return false;
1961     }
1962   }
1963   return true;
1964 }
1965 }  // namespace
1966 
1967 ///////////////////////////////////////////////////////////////////////////////
1968 // PromiseBasedCall
1969 // Will be folded into Call once the promise conversion is done
1970 
1971 class BasicPromiseBasedCall : public Call,
1972                               public Party,
1973                               public grpc_event_engine::experimental::
1974                                   EventEngine::Closure /* for deadlines */ {
1975  public:
1976   using Call::arena;
1977 
BasicPromiseBasedCall(Arena * arena,uint32_t initial_external_refs,uint32_t initial_internal_refs,const grpc_call_create_args & args)1978   BasicPromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
1979                         uint32_t initial_internal_refs,
1980                         const grpc_call_create_args& args)
1981       : Call(arena, args.server_transport_data == nullptr, args.send_deadline,
1982              args.channel->Ref()),
1983         Party(initial_internal_refs),
1984         external_refs_(initial_external_refs),
1985         cq_(args.cq) {
1986     if (args.cq != nullptr) {
1987       GRPC_CQ_INTERNAL_REF(args.cq, "bind");
1988     }
1989   }
1990 
~BasicPromiseBasedCall()1991   ~BasicPromiseBasedCall() override {
1992     if (cq_) GRPC_CQ_INTERNAL_UNREF(cq_, "bind");
1993     for (int i = 0; i < GRPC_CONTEXT_COUNT; i++) {
1994       if (context_[i].destroy) {
1995         context_[i].destroy(context_[i].value);
1996       }
1997     }
1998   }
1999 
2000   // Implementation of EventEngine::Closure, called when deadline expires
2001   void Run() final;
2002 
2003   virtual void OrphanCall() = 0;
2004 
server_call_context()2005   virtual ServerCallContext* server_call_context() { return nullptr; }
SetCompletionQueue(grpc_completion_queue * cq)2006   void SetCompletionQueue(grpc_completion_queue* cq) final {
2007     cq_ = cq;
2008     GRPC_CQ_INTERNAL_REF(cq, "bind");
2009   }
2010 
2011   // Implementation of call refcounting: move this to DualRefCounted once we
2012   // don't need to maintain FilterStackCall compatibility
ExternalRef()2013   void ExternalRef() final {
2014     if (external_refs_.fetch_add(1, std::memory_order_relaxed) == 0) {
2015       InternalRef("external");
2016     }
2017   }
ExternalUnref()2018   void ExternalUnref() final {
2019     if (external_refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2020       OrphanCall();
2021       InternalUnref("external");
2022     }
2023   }
InternalRef(const char * reason)2024   void InternalRef(const char* reason) final {
2025     if (grpc_call_refcount_trace.enabled()) {
2026       gpr_log(GPR_DEBUG, "INTERNAL_REF:%p:%s", this, reason);
2027     }
2028     Party::IncrementRefCount();
2029   }
InternalUnref(const char * reason)2030   void InternalUnref(const char* reason) final {
2031     if (grpc_call_refcount_trace.enabled()) {
2032       gpr_log(GPR_DEBUG, "INTERNAL_UNREF:%p:%s", this, reason);
2033     }
2034     Party::Unref();
2035   }
2036 
RunInContext(absl::AnyInvocable<void ()> fn)2037   void RunInContext(absl::AnyInvocable<void()> fn) {
2038     Spawn(
2039         "run_in_context",
2040         [fn = std::move(fn)]() mutable {
2041           fn();
2042           return Empty{};
2043         },
2044         [](Empty) {});
2045   }
2046 
ContextSet(grpc_context_index elem,void * value,void (* destroy)(void *))2047   void ContextSet(grpc_context_index elem, void* value,
2048                   void (*destroy)(void*)) final {
2049     if (context_[elem].destroy != nullptr) {
2050       context_[elem].destroy(context_[elem].value);
2051     }
2052     context_[elem].value = value;
2053     context_[elem].destroy = destroy;
2054   }
2055 
ContextGet(grpc_context_index elem) const2056   void* ContextGet(grpc_context_index elem) const final {
2057     return context_[elem].value;
2058   }
2059 
2060   void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_);
2061   void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_);
deadline()2062   Timestamp deadline() {
2063     MutexLock lock(&deadline_mu_);
2064     return deadline_;
2065   }
2066 
2067   // Accept the stats from the context (call once we have proof the transport is
2068   // done with them).
AcceptTransportStatsFromContext()2069   void AcceptTransportStatsFromContext() {
2070     final_stats_ = *call_context_.call_stats();
2071   }
2072 
2073   // This should return nullptr for the promise stack (and alternative means
2074   // for that functionality be invented)
call_stack()2075   grpc_call_stack* call_stack() final { return nullptr; }
2076 
MakeCallSpine(CallArgs)2077   virtual RefCountedPtr<CallSpineInterface> MakeCallSpine(CallArgs) {
2078     Crash("Not implemented");
2079   }
2080 
2081  protected:
2082   class ScopedContext
2083       : public ScopedActivity,
2084         public promise_detail::Context<Arena>,
2085         public promise_detail::Context<grpc_call_context_element>,
2086         public promise_detail::Context<CallContext>,
2087         public promise_detail::Context<CallFinalization> {
2088    public:
ScopedContext(BasicPromiseBasedCall * call)2089     explicit ScopedContext(BasicPromiseBasedCall* call)
2090         : ScopedActivity(call),
2091           promise_detail::Context<Arena>(call->arena()),
2092           promise_detail::Context<grpc_call_context_element>(call->context_),
2093           promise_detail::Context<CallContext>(&call->call_context_),
2094           promise_detail::Context<CallFinalization>(&call->finalization_) {}
2095   };
2096 
context()2097   grpc_call_context_element* context() { return context_; }
2098 
cq()2099   grpc_completion_queue* cq() { return cq_; }
2100 
2101   // At the end of the call run any finalization actions.
SetFinalizationStatus(grpc_status_code status,Slice status_details)2102   void SetFinalizationStatus(grpc_status_code status, Slice status_details) {
2103     final_message_ = std::move(status_details);
2104     final_status_ = status;
2105   }
2106 
event_engine() const2107   grpc_event_engine::experimental::EventEngine* event_engine() const override {
2108     return channel()->event_engine();
2109   }
2110 
2111  private:
PartyOver()2112   void PartyOver() final {
2113     {
2114       ScopedContext ctx(this);
2115       std::string message;
2116       grpc_call_final_info final_info;
2117       final_info.stats = final_stats_;
2118       final_info.final_status = final_status_;
2119       // TODO(ctiller): change type here so we don't need to copy this string.
2120       final_info.error_string = nullptr;
2121       if (!final_message_.empty()) {
2122         message = std::string(final_message_.begin(), final_message_.end());
2123         final_info.error_string = message.c_str();
2124       }
2125       final_info.stats.latency =
2126           gpr_cycle_counter_sub(gpr_get_cycle_counter(), start_time());
2127       finalization_.Run(&final_info);
2128       CancelRemainingParticipants();
2129       arena()->DestroyManagedNewObjects();
2130     }
2131     DeleteThis();
2132   }
2133 
2134   // Double refcounted for now: party owns the internal refcount, we track the
2135   // external refcount. Figure out a better scheme post-promise conversion.
2136   std::atomic<size_t> external_refs_;
2137   CallFinalization finalization_;
2138   CallContext call_context_{this};
2139   // Contexts for various subsystems (security, tracing, ...).
2140   grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
2141   grpc_call_stats final_stats_{};
2142   // Current deadline.
2143   Mutex deadline_mu_;
2144   Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture();
2145   grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY(
2146       deadline_mu_) deadline_task_;
2147   Slice final_message_;
2148   grpc_status_code final_status_ = GRPC_STATUS_UNKNOWN;
2149   grpc_completion_queue* cq_;
2150 };
2151 
UpdateDeadline(Timestamp deadline)2152 void BasicPromiseBasedCall::UpdateDeadline(Timestamp deadline) {
2153   MutexLock lock(&deadline_mu_);
2154   if (grpc_call_trace.enabled()) {
2155     gpr_log(GPR_DEBUG, "%s[call] UpdateDeadline from=%s to=%s",
2156             DebugTag().c_str(), deadline_.ToString().c_str(),
2157             deadline.ToString().c_str());
2158   }
2159   if (deadline >= deadline_) return;
2160   auto* const event_engine = channel()->event_engine();
2161   if (deadline_ != Timestamp::InfFuture()) {
2162     if (!event_engine->Cancel(deadline_task_)) return;
2163   } else {
2164     InternalRef("deadline");
2165   }
2166   deadline_ = deadline;
2167   deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this);
2168 }
2169 
ResetDeadline()2170 void BasicPromiseBasedCall::ResetDeadline() {
2171   {
2172     MutexLock lock(&deadline_mu_);
2173     if (deadline_ == Timestamp::InfFuture()) return;
2174     auto* const event_engine = channel()->event_engine();
2175     if (!event_engine->Cancel(deadline_task_)) return;
2176     deadline_ = Timestamp::InfFuture();
2177   }
2178   InternalUnref("deadline[reset]");
2179 }
2180 
Run()2181 void BasicPromiseBasedCall::Run() {
2182   ApplicationCallbackExecCtx callback_exec_ctx;
2183   ExecCtx exec_ctx;
2184   CancelWithError(absl::DeadlineExceededError("Deadline exceeded"));
2185   InternalUnref("deadline[run]");
2186 }
2187 
2188 class PromiseBasedCall : public BasicPromiseBasedCall {
2189  public:
2190   PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
2191                    const grpc_call_create_args& args);
2192 
Completed()2193   bool Completed() final { return finished_.IsSet(); }
2194 
failed_before_recv_message() const2195   bool failed_before_recv_message() const final {
2196     return failed_before_recv_message_.load(std::memory_order_relaxed);
2197   }
2198 
2199   using Call::arena;
2200 
2201  protected:
2202   class ScopedContext : public BasicPromiseBasedCall::ScopedContext,
2203                         public BatchBuilder,
2204                         public promise_detail::Context<BatchBuilder> {
2205    public:
ScopedContext(PromiseBasedCall * call)2206     explicit ScopedContext(PromiseBasedCall* call)
2207         : BasicPromiseBasedCall::ScopedContext(call),
2208           BatchBuilder(&call->batch_payload_),
2209           promise_detail::Context<BatchBuilder>(this) {}
2210   };
2211 
2212   class Completion {
2213    public:
Completion()2214     Completion() : index_(kNullIndex) {}
~Completion()2215     ~Completion() { GPR_ASSERT(index_ == kNullIndex); }
Completion(uint8_t index)2216     explicit Completion(uint8_t index) : index_(index) {}
2217     Completion(const Completion& other) = delete;
2218     Completion& operator=(const Completion& other) = delete;
Completion(Completion && other)2219     Completion(Completion&& other) noexcept : index_(other.index_) {
2220       other.index_ = kNullIndex;
2221     }
operator =(Completion && other)2222     Completion& operator=(Completion&& other) noexcept {
2223       GPR_ASSERT(index_ == kNullIndex);
2224       index_ = other.index_;
2225       other.index_ = kNullIndex;
2226       return *this;
2227     }
2228 
index() const2229     uint8_t index() const { return index_; }
TakeIndex()2230     uint8_t TakeIndex() { return std::exchange(index_, kNullIndex); }
has_value() const2231     bool has_value() const { return index_ != kNullIndex; }
2232 
2233    private:
2234     enum : uint8_t { kNullIndex = 0xff };
2235     uint8_t index_;
2236   };
2237 
2238   // Enumerates why a Completion is still pending
2239   enum class PendingOp {
2240     // We're in the midst of starting a batch of operations
2241     kStartingBatch = 0,
2242     // The following correspond with the batch operations from above
2243     kSendInitialMetadata,
2244     kReceiveInitialMetadata,
2245     kReceiveStatusOnClient,
2246     kReceiveCloseOnServer = kReceiveStatusOnClient,
2247     kSendMessage,
2248     kReceiveMessage,
2249     kSendStatusFromServer,
2250     kSendCloseFromClient = kSendStatusFromServer,
2251   };
2252 
RunParty()2253   bool RunParty() override {
2254     ScopedContext ctx(this);
2255     return Party::RunParty();
2256   }
2257 
PendingOpString(PendingOp reason) const2258   const char* PendingOpString(PendingOp reason) const {
2259     switch (reason) {
2260       case PendingOp::kStartingBatch:
2261         return "StartingBatch";
2262       case PendingOp::kSendInitialMetadata:
2263         return "SendInitialMetadata";
2264       case PendingOp::kReceiveInitialMetadata:
2265         return "ReceiveInitialMetadata";
2266       case PendingOp::kReceiveStatusOnClient:
2267         return is_client() ? "ReceiveStatusOnClient" : "ReceiveCloseOnServer";
2268       case PendingOp::kSendMessage:
2269         return "SendMessage";
2270       case PendingOp::kReceiveMessage:
2271         return "ReceiveMessage";
2272       case PendingOp::kSendStatusFromServer:
2273         return is_client() ? "SendCloseFromClient" : "SendStatusFromServer";
2274     }
2275     return "Unknown";
2276   }
2277 
PendingOpBit(PendingOp reason)2278   static constexpr uint32_t PendingOpBit(PendingOp reason) {
2279     return 1 << static_cast<int>(reason);
2280   }
2281 
2282   // Begin work on a completion, recording the tag/closure to notify.
2283   // Use the op selected in \a ops to determine the index to allocate into.
2284   // Starts the "StartingBatch" PendingOp immediately.
2285   // Assumes at least one operation in \a ops.
2286   Completion StartCompletion(void* tag, bool is_closure, const grpc_op* ops);
2287   // Add one pending op to the completion, and return it.
2288   Completion AddOpToCompletion(const Completion& completion, PendingOp reason);
2289   // Stringify a completion
CompletionString(const Completion & completion) const2290   std::string CompletionString(const Completion& completion) const {
2291     return completion.has_value()
2292                ? completion_info_[completion.index()].pending.ToString(this)
2293                : "no-completion";
2294   }
2295   // Finish one op on the completion. Must have been previously been added.
2296   // The completion as a whole finishes when all pending ops finish.
2297   void FinishOpOnCompletion(Completion* completion, PendingOp reason);
2298   // Mark the completion as failed. Does not finish it.
2299   void FailCompletion(const Completion& completion,
2300                       SourceLocation source_location = {});
2301   // Mark the completion as infallible. Overrides FailCompletion to report
2302   // success always.
2303   void ForceCompletionSuccess(const Completion& completion);
2304 
PresentAndCompletionText(const char * caption,bool has,const Completion & completion) const2305   std::string PresentAndCompletionText(const char* caption, bool has,
2306                                        const Completion& completion) const {
2307     if (has) {
2308       if (completion.has_value()) {
2309         return absl::StrCat(caption, ":", CompletionString(completion), " ");
2310       } else {
2311         return absl::StrCat(caption,
2312                             ":!!BUG:operation is present, no completion!! ");
2313       }
2314     } else {
2315       if (!completion.has_value()) {
2316         return "";
2317       } else {
2318         return absl::StrCat(caption, ":no-op:", CompletionString(completion),
2319                             " ");
2320       }
2321     }
2322   }
2323 
2324   // Spawn a job that will first do FirstPromise then receive a message
2325   template <typename FirstPromise>
2326   void StartRecvMessage(const grpc_op& op, const Completion& completion,
2327                         FirstPromise first,
2328                         PipeReceiver<MessageHandle>* receiver,
2329                         bool cancel_on_error, Party::BulkSpawner& spawner);
2330   void StartSendMessage(const grpc_op& op, const Completion& completion,
2331                         PipeSender<MessageHandle>* sender,
2332                         Party::BulkSpawner& spawner);
2333 
set_completed()2334   void set_completed() { finished_.Set(); }
2335 
2336   // Returns a promise that resolves to Empty whenever the call is completed.
finished()2337   auto finished() { return finished_.Wait(); }
2338 
2339   // Returns a promise that resolves to Empty whenever there is no outstanding
2340   // send operation
WaitForSendingStarted()2341   auto WaitForSendingStarted() {
2342     return [this]() -> Poll<Empty> {
2343       int n = sends_queued_.load(std::memory_order_relaxed);
2344       if (grpc_call_trace.enabled()) {
2345         gpr_log(GPR_DEBUG, "%s[call] WaitForSendingStarted n=%d",
2346                 DebugTag().c_str(), n);
2347       }
2348       if (n != 0) return waiting_for_queued_sends_.pending();
2349       return Empty{};
2350     };
2351   }
2352 
2353   // Mark that a send has been queued - blocks sending trailing metadata.
QueueSend()2354   void QueueSend() {
2355     if (grpc_call_trace.enabled()) {
2356       gpr_log(GPR_DEBUG, "%s[call] QueueSend", DebugTag().c_str());
2357     }
2358     sends_queued_.fetch_add(1, std::memory_order_relaxed);
2359   }
2360   // Mark that a send has been dequeued - allows sending trailing metadata once
2361   // zero sends are queued.
EnactSend()2362   void EnactSend() {
2363     if (grpc_call_trace.enabled()) {
2364       gpr_log(GPR_DEBUG, "%s[call] EnactSend", DebugTag().c_str());
2365     }
2366     if (1 == sends_queued_.fetch_sub(1, std::memory_order_relaxed)) {
2367       waiting_for_queued_sends_.Wake();
2368     }
2369   }
2370 
set_failed_before_recv_message()2371   void set_failed_before_recv_message() {
2372     failed_before_recv_message_.store(true, std::memory_order_relaxed);
2373   }
2374 
2375  private:
2376   union CompletionInfo {
2377     static constexpr uint32_t kOpFailed = 0x8000'0000u;
2378     static constexpr uint32_t kOpForceSuccess = 0x4000'0000u;
CompletionInfo()2379     CompletionInfo() {}
2380     enum CompletionState {
2381       kPending,
2382       kSuccess,
2383       kFailure,
2384     };
2385     struct Pending {
2386       // Bitmask of PendingOps at the bottom, and kOpFailed, kOpForceSuccess at
2387       // the top.
2388       std::atomic<uint32_t> state;
2389       bool is_closure;
2390       // True if this completion was for a recv_message op.
2391       // In that case if the completion as a whole fails we need to cleanup the
2392       // returned message.
2393       bool is_recv_message;
2394       void* tag;
2395 
Startgrpc_core::PromiseBasedCall::CompletionInfo::Pending2396       void Start(bool is_closure, void* tag) {
2397         this->is_closure = is_closure;
2398         this->is_recv_message = false;
2399         this->tag = tag;
2400         state.store(PendingOpBit(PendingOp::kStartingBatch),
2401                     std::memory_order_release);
2402       }
2403 
AddPendingBitgrpc_core::PromiseBasedCall::CompletionInfo::Pending2404       void AddPendingBit(PendingOp reason) {
2405         if (reason == PendingOp::kReceiveMessage) is_recv_message = true;
2406         auto prev =
2407             state.fetch_or(PendingOpBit(reason), std::memory_order_relaxed);
2408         GPR_ASSERT((prev & PendingOpBit(reason)) == 0);
2409       }
2410 
RemovePendingBitgrpc_core::PromiseBasedCall::CompletionInfo::Pending2411       CompletionState RemovePendingBit(PendingOp reason) {
2412         const uint32_t mask = ~PendingOpBit(reason);
2413         auto prev = state.fetch_and(mask, std::memory_order_acq_rel);
2414         GPR_ASSERT((prev & PendingOpBit(reason)) != 0);
2415         switch (prev & mask) {
2416           case kOpFailed:
2417             return kFailure;
2418           case kOpFailed | kOpForceSuccess:
2419           case kOpForceSuccess:
2420           case 0:
2421             return kSuccess;
2422           default:
2423             return kPending;
2424         }
2425       }
2426 
MarkFailedgrpc_core::PromiseBasedCall::CompletionInfo::Pending2427       void MarkFailed() {
2428         state.fetch_or(kOpFailed, std::memory_order_relaxed);
2429       }
2430 
MarkForceSuccessgrpc_core::PromiseBasedCall::CompletionInfo::Pending2431       void MarkForceSuccess() {
2432         state.fetch_or(kOpForceSuccess, std::memory_order_relaxed);
2433       }
2434 
ToStringgrpc_core::PromiseBasedCall::CompletionInfo::Pending2435       std::string ToString(const PromiseBasedCall* call) const {
2436         auto state = this->state.load(std::memory_order_relaxed);
2437         std::vector<absl::string_view> pending_ops;
2438         for (size_t i = 0; i < 24; i++) {
2439           if (state & (1u << i)) {
2440             pending_ops.push_back(
2441                 call->PendingOpString(static_cast<PendingOp>(i)));
2442           }
2443         }
2444         return absl::StrFormat("{%s}%s:tag=%p", absl::StrJoin(pending_ops, ","),
2445                                (state & kOpForceSuccess) ? ":force-success"
2446                                : (state & kOpFailed)     ? ":failed"
2447                                                          : ":success",
2448                                tag);
2449       }
2450     } pending;
2451     grpc_cq_completion completion;
2452   };
2453 
2454   CompletionInfo completion_info_[6];
2455   ExternallyObservableLatch<void> finished_;
2456   // Non-zero with an outstanding GRPC_OP_SEND_INITIAL_METADATA or
2457   // GRPC_OP_SEND_MESSAGE (one count each), and 0 once those payloads have been
2458   // pushed onto the outgoing pipe.
2459   std::atomic<uint8_t> sends_queued_{0};
2460   std::atomic<bool> failed_before_recv_message_{false};
2461   // Waiter for when sends_queued_ becomes 0.
2462   IntraActivityWaiter waiting_for_queued_sends_;
2463   grpc_byte_buffer** recv_message_ = nullptr;
2464   grpc_transport_stream_op_batch_payload batch_payload_{context()};
2465 };
2466 
2467 template <typename T>
MakePromiseBasedCall(grpc_call_create_args * args,grpc_call ** out_call)2468 grpc_error_handle MakePromiseBasedCall(grpc_call_create_args* args,
2469                                        grpc_call** out_call) {
2470   Channel* channel = args->channel.get();
2471 
2472   auto* arena = channel->CreateArena();
2473   PromiseBasedCall* call = arena->New<T>(arena, args);
2474   *out_call = call->c_ptr();
2475   GPR_DEBUG_ASSERT(Call::FromC(*out_call) == call);
2476   return absl::OkStatus();
2477 }
2478 
PromiseBasedCall(Arena * arena,uint32_t initial_external_refs,const grpc_call_create_args & args)2479 PromiseBasedCall::PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
2480                                    const grpc_call_create_args& args)
2481     : BasicPromiseBasedCall(arena, initial_external_refs,
2482                             initial_external_refs != 0 ? 1 : 0, args) {}
2483 
CToMetadata(grpc_metadata * metadata,size_t count,grpc_metadata_batch * b)2484 static void CToMetadata(grpc_metadata* metadata, size_t count,
2485                         grpc_metadata_batch* b) {
2486   for (size_t i = 0; i < count; i++) {
2487     grpc_metadata* md = &metadata[i];
2488     auto key = StringViewFromSlice(md->key);
2489     // Filter "content-length metadata"
2490     if (key == "content-length") continue;
2491     b->Append(key, Slice(CSliceRef(md->value)),
2492               [md](absl::string_view error, const Slice& value) {
2493                 gpr_log(GPR_DEBUG, "Append error: %s",
2494                         absl::StrCat("key=", StringViewFromSlice(md->key),
2495                                      " error=", error,
2496                                      " value=", value.as_string_view())
2497                             .c_str());
2498               });
2499   }
2500 }
2501 
StartCompletion(void * tag,bool is_closure,const grpc_op * ops)2502 PromiseBasedCall::Completion PromiseBasedCall::StartCompletion(
2503     void* tag, bool is_closure, const grpc_op* ops) {
2504   Completion c(BatchSlotForOp(ops[0].op));
2505   if (!is_closure) {
2506     grpc_cq_begin_op(cq(), tag);
2507   }
2508   completion_info_[c.index()].pending.Start(is_closure, tag);
2509   if (grpc_call_trace.enabled()) {
2510     gpr_log(GPR_INFO, "%s[call] StartCompletion %s", DebugTag().c_str(),
2511             CompletionString(c).c_str());
2512   }
2513   return c;
2514 }
2515 
AddOpToCompletion(const Completion & completion,PendingOp reason)2516 PromiseBasedCall::Completion PromiseBasedCall::AddOpToCompletion(
2517     const Completion& completion, PendingOp reason) {
2518   if (grpc_call_trace.enabled()) {
2519     gpr_log(GPR_INFO, "%s[call] AddOpToCompletion %s %s", DebugTag().c_str(),
2520             CompletionString(completion).c_str(), PendingOpString(reason));
2521   }
2522   GPR_ASSERT(completion.has_value());
2523   completion_info_[completion.index()].pending.AddPendingBit(reason);
2524   return Completion(completion.index());
2525 }
2526 
FailCompletion(const Completion & completion,SourceLocation location)2527 void PromiseBasedCall::FailCompletion(const Completion& completion,
2528                                       SourceLocation location) {
2529   if (grpc_call_trace.enabled()) {
2530     gpr_log(location.file(), location.line(), GPR_LOG_SEVERITY_ERROR,
2531             "%s[call] FailCompletion %s", DebugTag().c_str(),
2532             CompletionString(completion).c_str());
2533   }
2534   completion_info_[completion.index()].pending.MarkFailed();
2535 }
2536 
ForceCompletionSuccess(const Completion & completion)2537 void PromiseBasedCall::ForceCompletionSuccess(const Completion& completion) {
2538   completion_info_[completion.index()].pending.MarkForceSuccess();
2539 }
2540 
FinishOpOnCompletion(Completion * completion,PendingOp reason)2541 void PromiseBasedCall::FinishOpOnCompletion(Completion* completion,
2542                                             PendingOp reason) {
2543   if (grpc_call_trace.enabled()) {
2544     gpr_log(GPR_INFO, "%s[call] FinishOpOnCompletion completion:%s finish:%s",
2545             DebugTag().c_str(), CompletionString(*completion).c_str(),
2546             PendingOpString(reason));
2547   }
2548   const uint8_t i = completion->TakeIndex();
2549   GPR_ASSERT(i < GPR_ARRAY_SIZE(completion_info_));
2550   CompletionInfo::Pending& pending = completion_info_[i].pending;
2551   bool success;
2552   switch (pending.RemovePendingBit(reason)) {
2553     case CompletionInfo::kPending:
2554       return;  // Early out
2555     case CompletionInfo::kSuccess:
2556       success = true;
2557       break;
2558     case CompletionInfo::kFailure:
2559       success = false;
2560       break;
2561   }
2562   if (pending.is_recv_message && !success && *recv_message_ != nullptr) {
2563     grpc_byte_buffer_destroy(*recv_message_);
2564     *recv_message_ = nullptr;
2565   }
2566   auto error = success ? absl::OkStatus() : absl::CancelledError();
2567   if (pending.is_closure) {
2568     ExecCtx::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(pending.tag),
2569                  error);
2570   } else {
2571     InternalRef("cq_end_op");
2572     grpc_cq_end_op(
2573         cq(), pending.tag, error,
2574         [](void* p, grpc_cq_completion*) {
2575           static_cast<PromiseBasedCall*>(p)->InternalUnref("cq_end_op");
2576         },
2577         this, &completion_info_[i].completion);
2578   }
2579 }
2580 
StartSendMessage(const grpc_op & op,const Completion & completion,PipeSender<MessageHandle> * sender,Party::BulkSpawner & spawner)2581 void PromiseBasedCall::StartSendMessage(const grpc_op& op,
2582                                         const Completion& completion,
2583                                         PipeSender<MessageHandle>* sender,
2584                                         Party::BulkSpawner& spawner) {
2585   QueueSend();
2586   SliceBuffer send;
2587   grpc_slice_buffer_swap(
2588       &op.data.send_message.send_message->data.raw.slice_buffer,
2589       send.c_slice_buffer());
2590   auto msg = arena()->MakePooled<Message>(std::move(send), op.flags);
2591   spawner.Spawn(
2592       "call_send_message",
2593       [this, sender, msg = std::move(msg)]() mutable {
2594         EnactSend();
2595         return sender->Push(std::move(msg));
2596       },
2597       [this, completion = AddOpToCompletion(
2598                  completion, PendingOp::kSendMessage)](bool result) mutable {
2599         if (grpc_call_trace.enabled()) {
2600           gpr_log(GPR_DEBUG, "%sSendMessage completes %s", DebugTag().c_str(),
2601                   result ? "successfully" : "with failure");
2602         }
2603         if (!result) FailCompletion(completion);
2604         FinishOpOnCompletion(&completion, PendingOp::kSendMessage);
2605       });
2606 }
2607 
2608 template <typename FirstPromiseFactory>
StartRecvMessage(const grpc_op & op,const Completion & completion,FirstPromiseFactory first_promise_factory,PipeReceiver<MessageHandle> * receiver,bool cancel_on_error,Party::BulkSpawner & spawner)2609 void PromiseBasedCall::StartRecvMessage(
2610     const grpc_op& op, const Completion& completion,
2611     FirstPromiseFactory first_promise_factory,
2612     PipeReceiver<MessageHandle>* receiver, bool cancel_on_error,
2613     Party::BulkSpawner& spawner) {
2614   if (grpc_call_trace.enabled()) {
2615     gpr_log(GPR_INFO, "%s[call] Start RecvMessage: %s", DebugTag().c_str(),
2616             CompletionString(completion).c_str());
2617   }
2618   recv_message_ = op.data.recv_message.recv_message;
2619   spawner.Spawn(
2620       "call_recv_message",
2621       [first_promise_factory = std::move(first_promise_factory), receiver]() {
2622         return Seq(first_promise_factory(), receiver->Next());
2623       },
2624       [this, cancel_on_error,
2625        completion = AddOpToCompletion(completion, PendingOp::kReceiveMessage)](
2626           NextResult<MessageHandle> result) mutable {
2627         if (result.has_value()) {
2628           MessageHandle& message = *result;
2629           NoteLastMessageFlags(message->flags());
2630           if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
2631               (incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
2632             *recv_message_ = grpc_raw_compressed_byte_buffer_create(
2633                 nullptr, 0, incoming_compression_algorithm());
2634           } else {
2635             *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0);
2636           }
2637           grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(),
2638                                       &(*recv_message_)->data.raw.slice_buffer);
2639           if (grpc_call_trace.enabled()) {
2640             gpr_log(GPR_INFO,
2641                     "%s[call] RecvMessage: outstanding_recv "
2642                     "finishes: received %" PRIdPTR " byte message",
2643                     DebugTag().c_str(),
2644                     (*recv_message_)->data.raw.slice_buffer.length);
2645           }
2646         } else if (result.cancelled()) {
2647           if (grpc_call_trace.enabled()) {
2648             gpr_log(GPR_INFO,
2649                     "%s[call] RecvMessage: outstanding_recv "
2650                     "finishes: received end-of-stream with error",
2651                     DebugTag().c_str());
2652           }
2653           set_failed_before_recv_message();
2654           FailCompletion(completion);
2655           if (cancel_on_error) CancelWithError(absl::CancelledError());
2656           *recv_message_ = nullptr;
2657         } else {
2658           if (grpc_call_trace.enabled()) {
2659             gpr_log(GPR_INFO,
2660                     "%s[call] RecvMessage: outstanding_recv "
2661                     "finishes: received end-of-stream",
2662                     DebugTag().c_str());
2663           }
2664           *recv_message_ = nullptr;
2665         }
2666         FinishOpOnCompletion(&completion, PendingOp::kReceiveMessage);
2667       });
2668 }
2669 
2670 ///////////////////////////////////////////////////////////////////////////////
2671 // CallContext
2672 
RunInContext(absl::AnyInvocable<void ()> fn)2673 void CallContext::RunInContext(absl::AnyInvocable<void()> fn) {
2674   call_->RunInContext(std::move(fn));
2675 }
2676 
IncrementRefCount(const char * reason)2677 void CallContext::IncrementRefCount(const char* reason) {
2678   call_->InternalRef(reason);
2679 }
2680 
Unref(const char * reason)2681 void CallContext::Unref(const char* reason) { call_->InternalUnref(reason); }
2682 
UpdateDeadline(Timestamp deadline)2683 void CallContext::UpdateDeadline(Timestamp deadline) {
2684   call_->UpdateDeadline(deadline);
2685 }
2686 
deadline() const2687 Timestamp CallContext::deadline() const { return call_->deadline(); }
2688 
server_call_context()2689 ServerCallContext* CallContext::server_call_context() {
2690   return call_->server_call_context();
2691 }
2692 
MakeCallSpine(CallArgs call_args)2693 RefCountedPtr<CallSpineInterface> CallContext::MakeCallSpine(
2694     CallArgs call_args) {
2695   return call_->MakeCallSpine(std::move(call_args));
2696 }
2697 
c_call()2698 grpc_call* CallContext::c_call() { return call_->c_ptr(); }
2699 
2700 ///////////////////////////////////////////////////////////////////////////////
2701 // PublishMetadataArray
2702 
2703 namespace {
PublishMetadataArray(grpc_metadata_batch * md,grpc_metadata_array * array,bool is_client)2704 void PublishMetadataArray(grpc_metadata_batch* md, grpc_metadata_array* array,
2705                           bool is_client) {
2706   const auto md_count = md->count();
2707   if (md_count > array->capacity) {
2708     array->capacity =
2709         std::max(array->capacity + md->count(), array->capacity * 3 / 2);
2710     array->metadata = static_cast<grpc_metadata*>(
2711         gpr_realloc(array->metadata, sizeof(grpc_metadata) * array->capacity));
2712   }
2713   PublishToAppEncoder encoder(array, md, is_client);
2714   md->Encode(&encoder);
2715 }
2716 }  // namespace
2717 
2718 ///////////////////////////////////////////////////////////////////////////////
2719 // ClientPromiseBasedCall
2720 
2721 #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
2722 class ClientPromiseBasedCall final : public PromiseBasedCall {
2723  public:
ClientPromiseBasedCall(Arena * arena,grpc_call_create_args * args)2724   ClientPromiseBasedCall(Arena* arena, grpc_call_create_args* args)
2725       : PromiseBasedCall(arena, 1, *args),
2726         polling_entity_(
2727             args->cq != nullptr
2728                 ? grpc_polling_entity_create_from_pollset(
2729                       grpc_cq_pollset(args->cq))
2730                 : (args->pollset_set_alternative != nullptr
2731                        ? grpc_polling_entity_create_from_pollset_set(
2732                              args->pollset_set_alternative)
2733                        : grpc_polling_entity{})) {
2734     global_stats().IncrementClientCallsCreated();
2735     if (args->cq != nullptr) {
2736       GPR_ASSERT(args->pollset_set_alternative == nullptr &&
2737                  "Only one of 'cq' and 'pollset_set_alternative' should be "
2738                  "non-nullptr.");
2739     }
2740     ScopedContext context(this);
2741     args->channel->channel_stack()->stats_plugin_group->AddClientCallTracers(
2742         *args->path, args->registered_method, this->context());
2743     send_initial_metadata_ = GetContext<Arena>()->MakePooled<ClientMetadata>();
2744     send_initial_metadata_->Set(HttpPathMetadata(), std::move(*args->path));
2745     if (args->authority.has_value()) {
2746       send_initial_metadata_->Set(HttpAuthorityMetadata(),
2747                                   std::move(*args->authority));
2748     }
2749     send_initial_metadata_->Set(GrpcRegisteredMethod(),
2750                                 reinterpret_cast<void*>(static_cast<uintptr_t>(
2751                                     args->registered_method)));
2752     if (auto* channelz_channel = channel()->channelz_node()) {
2753       channelz_channel->RecordCallStarted();
2754     }
2755     if (args->send_deadline != Timestamp::InfFuture()) {
2756       UpdateDeadline(args->send_deadline);
2757     }
2758     Call* parent = Call::FromC(args->parent);
2759     if (parent != nullptr) {
2760       auto parent_status = InitParent(parent, args->propagation_mask);
2761       if (!parent_status.ok()) {
2762         CancelWithError(std::move(parent_status));
2763       }
2764       PublishToParent(parent);
2765     }
2766   }
2767 
OrphanCall()2768   void OrphanCall() override { MaybeUnpublishFromParent(); }
2769 
~ClientPromiseBasedCall()2770   ~ClientPromiseBasedCall() override {
2771     ScopedContext context(this);
2772     send_initial_metadata_.reset();
2773     // Need to destroy the pipes under the ScopedContext above, so we
2774     // move them out here and then allow the destructors to run at
2775     // end of scope, but before context.
2776     auto c2s = std::move(client_to_server_messages_);
2777     auto s2c = std::move(server_to_client_messages_);
2778     auto sim = std::move(server_initial_metadata_);
2779   }
2780 
CancelWithError(absl::Status error)2781   void CancelWithError(absl::Status error) override {
2782     if (cancel_with_error_called_.exchange(true, std::memory_order_relaxed)) {
2783       return;
2784     }
2785     if (!started_.exchange(true, std::memory_order_relaxed)) {
2786       // Initial metadata not sent yet, so we can just fail the call.
2787       Spawn(
2788           "cancel_before_initial_metadata",
2789           [error = std::move(error), this]() {
2790             server_to_client_messages_.sender.Close();
2791             auto md = ServerMetadataFromStatus(error);
2792             md->Set(GrpcCallWasCancelled(), true);
2793             Finish(std::move(md));
2794             return Empty{};
2795           },
2796           [](Empty) {});
2797     } else {
2798       Spawn(
2799           "cancel_with_error",
2800           [error = std::move(error), this]() {
2801             if (!cancel_error_.is_set()) {
2802               auto md = ServerMetadataFromStatus(error);
2803               md->Set(GrpcCallWasCancelled(), true);
2804               cancel_error_.Set(std::move(md));
2805             }
2806             return Empty{};
2807           },
2808           [](Empty) {});
2809     }
2810   }
GetServerAuthority() const2811   absl::string_view GetServerAuthority() const override { abort(); }
is_trailers_only() const2812   bool is_trailers_only() const override { return is_trailers_only_; }
2813 
2814   grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
2815                              bool is_notify_tag_closure) override;
2816 
DebugTag() const2817   std::string DebugTag() const override {
2818     return absl::StrFormat("CLIENT_CALL[%p]: ", this);
2819   }
2820 
MakeCallSpine(CallArgs call_args)2821   RefCountedPtr<CallSpineInterface> MakeCallSpine(CallArgs call_args) final {
2822     class WrappingCallSpine final : public CallSpineInterface {
2823      public:
2824       WrappingCallSpine(ClientPromiseBasedCall* call,
2825                         ClientMetadataHandle metadata)
2826           : call_(call) {
2827         call_->InternalRef("call-spine");
2828         SpawnInfallible(
2829             "send_client_initial_metadata",
2830             [self = Ref(), metadata = std::move(metadata)]() mutable {
2831               return Map(self->client_initial_metadata_.sender.Push(
2832                              std::move(metadata)),
2833                          [self](bool) { return Empty{}; });
2834             });
2835       }
2836 
2837       ~WrappingCallSpine() override {
2838         {
2839           ScopedContext context(call_);
2840           // Move these out and destroy before the internal unref.
2841           auto client_initial_metadata = std::move(client_initial_metadata_);
2842           auto server_trailing_metadata = std::move(server_trailing_metadata_);
2843         }
2844         call_->InternalUnref("call-spine");
2845       }
2846 
2847       Pipe<ClientMetadataHandle>& client_initial_metadata() override {
2848         return client_initial_metadata_;
2849       }
2850 
2851       Pipe<MessageHandle>& client_to_server_messages() override {
2852         return call_->client_to_server_messages_;
2853       }
2854 
2855       Pipe<ServerMetadataHandle>& server_initial_metadata() override {
2856         return call_->server_initial_metadata_;
2857       }
2858 
2859       Pipe<MessageHandle>& server_to_client_messages() override {
2860         return call_->server_to_client_messages_;
2861       }
2862 
2863       Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
2864         return server_trailing_metadata_;
2865       }
2866 
2867       Latch<ServerMetadataHandle>& cancel_latch() override {
2868         return cancel_error_;
2869       }
2870 
2871       Party& party() override { return *call_; }
2872       Arena* arena() override { return call_->arena(); }
2873 
2874       void IncrementRefCount() override { refs_.Ref(); }
2875       void Unref() override {
2876         if (refs_.Unref()) delete this;
2877       }
2878       RefCountedPtr<WrappingCallSpine> Ref() {
2879         IncrementRefCount();
2880         return RefCountedPtr<WrappingCallSpine>(this);
2881       }
2882 
2883      private:
2884       RefCount refs_;
2885       ClientPromiseBasedCall* const call_;
2886       std::atomic<bool> sent_trailing_metadata_{false};
2887       Pipe<ClientMetadataHandle> client_initial_metadata_{call_->arena()};
2888       Pipe<ServerMetadataHandle> server_trailing_metadata_{call_->arena()};
2889       Latch<ServerMetadataHandle> cancel_error_;
2890     };
2891     GPR_ASSERT(call_args.server_initial_metadata ==
2892                &server_initial_metadata_.sender);
2893     GPR_ASSERT(call_args.client_to_server_messages ==
2894                &client_to_server_messages_.receiver);
2895     GPR_ASSERT(call_args.server_to_client_messages ==
2896                &server_to_client_messages_.sender);
2897     call_args.client_initial_metadata_outstanding.Complete(true);
2898     return MakeRefCounted<WrappingCallSpine>(
2899         this, std::move(call_args.client_initial_metadata));
2900   }
2901 
2902  private:
2903   // Finish the call with the given status/trailing metadata.
2904   void Finish(ServerMetadataHandle trailing_metadata);
2905   // Validate that a set of ops is valid for a client call.
2906   grpc_call_error ValidateBatch(const grpc_op* ops, size_t nops) const;
2907   // Commit a valid batch of operations to be executed.
2908   void CommitBatch(const grpc_op* ops, size_t nops,
2909                    const Completion& completion);
2910   // Start the underlying promise.
2911   void StartPromise(ClientMetadataHandle client_initial_metadata,
2912                     const Completion& completion, Party::BulkSpawner& spawner);
2913   // Start receiving initial metadata
2914   void StartRecvInitialMetadata(grpc_metadata_array* array,
2915                                 const Completion& completion,
2916                                 Party::BulkSpawner& spawner);
2917   void StartRecvStatusOnClient(
2918       const Completion& completion,
2919       grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args,
2920       Party::BulkSpawner& spawner);
2921   // Publish status out to the application.
2922   void PublishStatus(
2923       grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args,
2924       ServerMetadataHandle trailing_metadata);
2925   // Publish server initial metadata out to the application.
2926   void PublishInitialMetadata(ServerMetadata* metadata);
2927 
2928   ClientMetadataHandle send_initial_metadata_;
2929   Pipe<ServerMetadataHandle> server_initial_metadata_{arena()};
2930   Latch<ServerMetadataHandle> server_trailing_metadata_;
2931   Latch<ServerMetadataHandle> cancel_error_;
2932   Latch<grpc_polling_entity> polling_entity_;
2933   Pipe<MessageHandle> client_to_server_messages_{arena()};
2934   Pipe<MessageHandle> server_to_client_messages_{arena()};
2935   bool is_trailers_only_ = false;
2936   bool scheduled_receive_status_ = false;
2937   bool scheduled_send_close_ = false;
2938   // True once the promise for the call is started.
2939   // This corresponds to sending initial metadata, or cancelling before doing
2940   // so.
2941   // In the latter case real world code sometimes does not sent the initial
2942   // metadata, and so gating based upon that does not work out.
2943   std::atomic<bool> started_{false};
2944   // True after the first CancelWithError call - prevents spamming cancels from
2945   // overflowing the party.
2946   std::atomic<bool> cancel_with_error_called_{false};
2947   // TODO(ctiller): delete when we remove the filter based API (may require some
2948   // cleanup in wrapped languages: they depend on this to hold slice refs)
2949   ServerMetadataHandle recv_initial_metadata_;
2950   ServerMetadataHandle recv_trailing_metadata_;
2951 };
2952 
StartPromise(ClientMetadataHandle client_initial_metadata,const Completion & completion,Party::BulkSpawner & spawner)2953 void ClientPromiseBasedCall::StartPromise(
2954     ClientMetadataHandle client_initial_metadata, const Completion& completion,
2955     Party::BulkSpawner& spawner) {
2956   auto token = ClientInitialMetadataOutstandingToken::New(arena());
2957   spawner.Spawn(
2958       "call_send_initial_metadata", token.Wait(),
2959       [this,
2960        completion = AddOpToCompletion(
2961            completion, PendingOp::kSendInitialMetadata)](bool result) mutable {
2962         if (!result) FailCompletion(completion);
2963         FinishOpOnCompletion(&completion, PendingOp::kSendInitialMetadata);
2964       });
2965   spawner.Spawn(
2966       "client_promise",
2967       [this, client_initial_metadata = std::move(client_initial_metadata),
2968        token = std::move(token)]() mutable {
2969         return Race(
2970             cancel_error_.Wait(),
2971             Map(channel()->channel_stack()->MakeClientCallPromise(CallArgs{
2972                     std::move(client_initial_metadata), std::move(token),
2973                     &polling_entity_, &server_initial_metadata_.sender,
2974                     &client_to_server_messages_.receiver,
2975                     &server_to_client_messages_.sender}),
2976                 [this](ServerMetadataHandle trailing_metadata) {
2977                   // If we're cancelled the transport doesn't get to return
2978                   // stats.
2979                   AcceptTransportStatsFromContext();
2980                   return trailing_metadata;
2981                 }));
2982       },
2983       [this](ServerMetadataHandle trailing_metadata) {
2984         Finish(std::move(trailing_metadata));
2985       });
2986 }
2987 
ValidateBatch(const grpc_op * ops,size_t nops) const2988 grpc_call_error ClientPromiseBasedCall::ValidateBatch(const grpc_op* ops,
2989                                                       size_t nops) const {
2990   BitSet<8> got_ops;
2991   for (size_t op_idx = 0; op_idx < nops; op_idx++) {
2992     const grpc_op& op = ops[op_idx];
2993     switch (op.op) {
2994       case GRPC_OP_SEND_INITIAL_METADATA:
2995         if (!AreInitialMetadataFlagsValid(op.flags)) {
2996           return GRPC_CALL_ERROR_INVALID_FLAGS;
2997         }
2998         if (!ValidateMetadata(op.data.send_initial_metadata.count,
2999                               op.data.send_initial_metadata.metadata)) {
3000           return GRPC_CALL_ERROR_INVALID_METADATA;
3001         }
3002         break;
3003       case GRPC_OP_SEND_MESSAGE:
3004         if (!AreWriteFlagsValid(op.flags)) {
3005           return GRPC_CALL_ERROR_INVALID_FLAGS;
3006         }
3007         break;
3008       case GRPC_OP_RECV_INITIAL_METADATA:
3009       case GRPC_OP_RECV_MESSAGE:
3010         if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
3011         break;
3012       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
3013         if (scheduled_send_close_) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
3014         if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
3015         break;
3016       case GRPC_OP_RECV_STATUS_ON_CLIENT:
3017         if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
3018         if (scheduled_receive_status_) {
3019           return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
3020         }
3021         break;
3022       case GRPC_OP_RECV_CLOSE_ON_SERVER:
3023       case GRPC_OP_SEND_STATUS_FROM_SERVER:
3024         return GRPC_CALL_ERROR_NOT_ON_CLIENT;
3025     }
3026     if (got_ops.is_set(op.op)) {
3027       return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
3028     }
3029     got_ops.set(op.op);
3030   }
3031   return GRPC_CALL_OK;
3032 }
3033 
CommitBatch(const grpc_op * ops,size_t nops,const Completion & completion)3034 void ClientPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
3035                                          const Completion& completion) {
3036   Party::BulkSpawner spawner(this);
3037   for (size_t op_idx = 0; op_idx < nops; op_idx++) {
3038     const grpc_op& op = ops[op_idx];
3039     switch (op.op) {
3040       case GRPC_OP_SEND_INITIAL_METADATA: {
3041         if (started_.exchange(true, std::memory_order_relaxed)) break;
3042         CToMetadata(op.data.send_initial_metadata.metadata,
3043                     op.data.send_initial_metadata.count,
3044                     send_initial_metadata_.get());
3045         PrepareOutgoingInitialMetadata(op, *send_initial_metadata_);
3046         if (send_deadline() != Timestamp::InfFuture()) {
3047           send_initial_metadata_->Set(GrpcTimeoutMetadata(), send_deadline());
3048         }
3049         send_initial_metadata_->Set(
3050             WaitForReady(),
3051             WaitForReady::ValueType{
3052                 (op.flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) != 0,
3053                 (op.flags &
3054                  GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) != 0});
3055         StartPromise(std::move(send_initial_metadata_), completion, spawner);
3056       } break;
3057       case GRPC_OP_RECV_INITIAL_METADATA: {
3058         StartRecvInitialMetadata(
3059             op.data.recv_initial_metadata.recv_initial_metadata, completion,
3060             spawner);
3061       } break;
3062       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
3063         scheduled_receive_status_ = true;
3064         StartRecvStatusOnClient(completion, op.data.recv_status_on_client,
3065                                 spawner);
3066       } break;
3067       case GRPC_OP_SEND_MESSAGE:
3068         StartSendMessage(op, completion, &client_to_server_messages_.sender,
3069                          spawner);
3070         break;
3071       case GRPC_OP_RECV_MESSAGE:
3072         StartRecvMessage(
3073             op, completion,
3074             [this]() {
3075               return Race(server_initial_metadata_.receiver.AwaitClosed(),
3076                           server_to_client_messages_.receiver.AwaitClosed());
3077             },
3078             &server_to_client_messages_.receiver, false, spawner);
3079         break;
3080       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
3081         scheduled_send_close_ = true;
3082         spawner.Spawn(
3083             "send_close_from_client",
3084             [this]() {
3085               client_to_server_messages_.sender.Close();
3086               return Empty{};
3087             },
3088             [this,
3089              completion = AddOpToCompletion(
3090                  completion, PendingOp::kSendCloseFromClient)](Empty) mutable {
3091               FinishOpOnCompletion(&completion,
3092                                    PendingOp::kSendCloseFromClient);
3093             });
3094         break;
3095       case GRPC_OP_SEND_STATUS_FROM_SERVER:
3096       case GRPC_OP_RECV_CLOSE_ON_SERVER:
3097         abort();  // unreachable
3098     }
3099   }
3100 }
3101 
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)3102 grpc_call_error ClientPromiseBasedCall::StartBatch(const grpc_op* ops,
3103                                                    size_t nops,
3104                                                    void* notify_tag,
3105                                                    bool is_notify_tag_closure) {
3106   if (nops == 0) {
3107     EndOpImmediately(cq(), notify_tag, is_notify_tag_closure);
3108     return GRPC_CALL_OK;
3109   }
3110   const grpc_call_error validation_result = ValidateBatch(ops, nops);
3111   if (validation_result != GRPC_CALL_OK) {
3112     return validation_result;
3113   }
3114   Completion completion =
3115       StartCompletion(notify_tag, is_notify_tag_closure, ops);
3116   CommitBatch(ops, nops, completion);
3117   FinishOpOnCompletion(&completion, PendingOp::kStartingBatch);
3118   return GRPC_CALL_OK;
3119 }
3120 
StartRecvInitialMetadata(grpc_metadata_array * array,const Completion & completion,Party::BulkSpawner & spawner)3121 void ClientPromiseBasedCall::StartRecvInitialMetadata(
3122     grpc_metadata_array* array, const Completion& completion,
3123     Party::BulkSpawner& spawner) {
3124   spawner.Spawn(
3125       "recv_initial_metadata",
3126       [this]() {
3127         return Race(server_initial_metadata_.receiver.Next(),
3128                     Map(finished(), [](Empty) {
3129                       return NextResult<ServerMetadataHandle>(true);
3130                     }));
3131       },
3132       [this, array,
3133        completion =
3134            AddOpToCompletion(completion, PendingOp::kReceiveInitialMetadata)](
3135           NextResult<ServerMetadataHandle> next_metadata) mutable {
3136         server_initial_metadata_.sender.Close();
3137         ServerMetadataHandle metadata;
3138         if (grpc_call_trace.enabled()) {
3139           gpr_log(GPR_INFO, "%s[call] RecvTrailingMetadata: %s",
3140                   DebugTag().c_str(),
3141                   next_metadata.has_value()
3142                       ? next_metadata.value()->DebugString().c_str()
3143                       : "null");
3144         }
3145         if (next_metadata.has_value()) {
3146           metadata = std::move(next_metadata.value());
3147           is_trailers_only_ = metadata->get(GrpcTrailersOnly()).value_or(false);
3148         } else {
3149           is_trailers_only_ = true;
3150           metadata = arena()->MakePooled<ServerMetadata>();
3151         }
3152         ProcessIncomingInitialMetadata(*metadata);
3153         PublishMetadataArray(metadata.get(), array, true);
3154         recv_initial_metadata_ = std::move(metadata);
3155         FinishOpOnCompletion(&completion, PendingOp::kReceiveInitialMetadata);
3156       });
3157 }
3158 
Finish(ServerMetadataHandle trailing_metadata)3159 void ClientPromiseBasedCall::Finish(ServerMetadataHandle trailing_metadata) {
3160   if (grpc_call_trace.enabled()) {
3161     gpr_log(GPR_INFO, "%s[call] Finish: %s", DebugTag().c_str(),
3162             trailing_metadata->DebugString().c_str());
3163   }
3164   ResetDeadline();
3165   set_completed();
3166   client_to_server_messages_.sender.CloseWithError();
3167   client_to_server_messages_.receiver.CloseWithError();
3168   if (trailing_metadata->get(GrpcCallWasCancelled()).value_or(false)) {
3169     server_to_client_messages_.receiver.CloseWithError();
3170     server_initial_metadata_.receiver.CloseWithError();
3171   }
3172   if (auto* channelz_channel = channel()->channelz_node()) {
3173     if (trailing_metadata->get(GrpcStatusMetadata())
3174             .value_or(GRPC_STATUS_UNKNOWN) == GRPC_STATUS_OK) {
3175       channelz_channel->RecordCallSucceeded();
3176     } else {
3177       channelz_channel->RecordCallFailed();
3178     }
3179   }
3180   server_trailing_metadata_.Set(std::move(trailing_metadata));
3181 }
3182 
3183 namespace {
MakeErrorString(const ServerMetadata * trailing_metadata)3184 std::string MakeErrorString(const ServerMetadata* trailing_metadata) {
3185   std::string out = absl::StrCat(
3186       trailing_metadata->get(GrpcStatusFromWire()).value_or(false)
3187           ? "Error received from peer"
3188           : "Error generated by client",
3189       "grpc_status: ",
3190       grpc_status_code_to_string(trailing_metadata->get(GrpcStatusMetadata())
3191                                      .value_or(GRPC_STATUS_UNKNOWN)));
3192   if (const Slice* message =
3193           trailing_metadata->get_pointer(GrpcMessageMetadata())) {
3194     absl::StrAppend(&out, "\ngrpc_message: ", message->as_string_view());
3195   }
3196   if (auto annotations = trailing_metadata->get_pointer(GrpcStatusContext())) {
3197     absl::StrAppend(&out, "\nStatus Context:");
3198     for (const std::string& annotation : *annotations) {
3199       absl::StrAppend(&out, "\n  ", annotation);
3200     }
3201   }
3202   return out;
3203 }
3204 }  // namespace
3205 
StartRecvStatusOnClient(const Completion & completion,grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args,Party::BulkSpawner & spawner)3206 void ClientPromiseBasedCall::StartRecvStatusOnClient(
3207     const Completion& completion,
3208     grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args,
3209     Party::BulkSpawner& spawner) {
3210   ForceCompletionSuccess(completion);
3211   spawner.Spawn(
3212       "recv_status_on_client", server_trailing_metadata_.Wait(),
3213       [this, op_args,
3214        completion =
3215            AddOpToCompletion(completion, PendingOp::kReceiveStatusOnClient)](
3216           ServerMetadataHandle trailing_metadata) mutable {
3217         const grpc_status_code status =
3218             trailing_metadata->get(GrpcStatusMetadata())
3219                 .value_or(GRPC_STATUS_UNKNOWN);
3220         *op_args.status = status;
3221         Slice message_slice;
3222         if (Slice* message =
3223                 trailing_metadata->get_pointer(GrpcMessageMetadata())) {
3224           message_slice = message->Ref();
3225         }
3226         SetFinalizationStatus(status, message_slice.Ref());
3227         *op_args.status_details = message_slice.TakeCSlice();
3228         if (op_args.error_string != nullptr && status != GRPC_STATUS_OK) {
3229           *op_args.error_string =
3230               gpr_strdup(MakeErrorString(trailing_metadata.get()).c_str());
3231         }
3232         PublishMetadataArray(trailing_metadata.get(), op_args.trailing_metadata,
3233                              true);
3234         recv_trailing_metadata_ = std::move(trailing_metadata);
3235         FinishOpOnCompletion(&completion, PendingOp::kReceiveStatusOnClient);
3236       });
3237 }
3238 #endif
3239 
3240 ///////////////////////////////////////////////////////////////////////////////
3241 // ServerPromiseBasedCall
3242 
3243 #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
3244 
3245 class ServerPromiseBasedCall final : public PromiseBasedCall,
3246                                      public ServerCallContext {
3247  public:
3248   ServerPromiseBasedCall(Arena* arena, grpc_call_create_args* args);
3249 
OrphanCall()3250   void OrphanCall() override {}
3251   void CancelWithError(grpc_error_handle) override;
3252   grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
3253                              bool is_notify_tag_closure) override;
is_trailers_only() const3254   bool is_trailers_only() const override {
3255     Crash("is_trailers_only not implemented for server calls");
3256   }
GetServerAuthority() const3257   absl::string_view GetServerAuthority() const override {
3258     const Slice* authority_metadata =
3259         client_initial_metadata_->get_pointer(HttpAuthorityMetadata());
3260     if (authority_metadata == nullptr) return "";
3261     return authority_metadata->as_string_view();
3262   }
3263 
3264   // Polling order for the server promise stack:
3265   //
3266   // │ ┌───────────────────────────────────────┐
3267   // │ │ ServerPromiseBasedCall                ├──► Lifetime management
3268   // │ ├───────────────────────────────────────┤
3269   // │ │ ConnectedChannel                      ├─┐
3270   // │ ├───────────────────────────────────────┤ └► Interactions with the
3271   // │ │ ... closest to transport filter       │    transport - send/recv msgs
3272   // │ ├───────────────────────────────────────┤    and metadata, call phase
3273   // │ │ ...                                   │    ordering
3274   // │ ├───────────────────────────────────────┤
3275   // │ │ ... closest to app filter             │ ┌► Request matching, initial
3276   // │ ├───────────────────────────────────────┤ │  setup, publishing call to
3277   // │ │ Server::ChannelData::MakeCallPromise  ├─┘  application
3278   // │ ├───────────────────────────────────────┤
3279   // │ │ MakeTopOfServerCallPromise            ├──► Send trailing metadata
3280   // ▼ └───────────────────────────────────────┘
3281   // Polling &
3282   // instantiation
3283   // order
3284 
DebugTag() const3285   std::string DebugTag() const override {
3286     return absl::StrFormat("SERVER_CALL[%p]: ", this);
3287   }
3288 
server_call_context()3289   ServerCallContext* server_call_context() override { return this; }
3290 
server_stream_data()3291   const void* server_stream_data() override { return server_transport_data_; }
3292   void PublishInitialMetadata(
3293       ClientMetadataHandle metadata,
3294       grpc_metadata_array* publish_initial_metadata) override;
3295   ArenaPromise<ServerMetadataHandle> MakeTopOfServerCallPromise(
3296       CallArgs call_args, grpc_completion_queue* cq,
3297       absl::FunctionRef<void(grpc_call* call)> publish) override;
3298 
3299  private:
3300   class RecvCloseOpCancelState {
3301    public:
3302     // Request that receiver be filled in per
3303     // grpc_op_recv_close_on_server. Returns true if the request can
3304     // be fulfilled immediately. Returns false if the request will be
3305     // fulfilled later.
ReceiveCloseOnServerOpStarted(int * receiver)3306     bool ReceiveCloseOnServerOpStarted(int* receiver) {
3307       uintptr_t state = state_.load(std::memory_order_acquire);
3308       uintptr_t new_state;
3309       do {
3310         switch (state) {
3311           case kUnset:
3312             new_state = reinterpret_cast<uintptr_t>(receiver);
3313             break;
3314           case kFinishedWithFailure:
3315             *receiver = 1;
3316             return true;
3317           case kFinishedWithSuccess:
3318             *receiver = 0;
3319             return true;
3320           default:
3321             Crash("Two threads offered ReceiveCloseOnServerOpStarted");
3322         }
3323       } while (!state_.compare_exchange_weak(state, new_state,
3324                                              std::memory_order_acq_rel,
3325                                              std::memory_order_acquire));
3326       return false;
3327     }
3328 
3329     // Mark the call as having completed.
3330     // Returns true if this finishes a previous
3331     // RequestReceiveCloseOnServer.
CompleteCallWithCancelledSetTo(bool cancelled)3332     bool CompleteCallWithCancelledSetTo(bool cancelled) {
3333       uintptr_t state = state_.load(std::memory_order_acquire);
3334       uintptr_t new_state;
3335       bool r;
3336       do {
3337         switch (state) {
3338           case kUnset:
3339             new_state = cancelled ? kFinishedWithFailure : kFinishedWithSuccess;
3340             r = false;
3341             break;
3342           case kFinishedWithFailure:
3343             return false;
3344           case kFinishedWithSuccess:
3345             Crash("unreachable");
3346           default:
3347             new_state = cancelled ? kFinishedWithFailure : kFinishedWithSuccess;
3348             r = true;
3349         }
3350       } while (!state_.compare_exchange_weak(state, new_state,
3351                                              std::memory_order_acq_rel,
3352                                              std::memory_order_acquire));
3353       if (r) *reinterpret_cast<int*>(state) = cancelled ? 1 : 0;
3354       return r;
3355     }
3356 
ToString() const3357     std::string ToString() const {
3358       auto state = state_.load(std::memory_order_relaxed);
3359       switch (state) {
3360         case kUnset:
3361           return "Unset";
3362         case kFinishedWithFailure:
3363           return "FinishedWithFailure";
3364         case kFinishedWithSuccess:
3365           return "FinishedWithSuccess";
3366         default:
3367           return absl::StrFormat("WaitingForReceiver(%p)",
3368                                  reinterpret_cast<void*>(state));
3369       }
3370     }
3371 
3372    private:
3373     static constexpr uintptr_t kUnset = 0;
3374     static constexpr uintptr_t kFinishedWithFailure = 1;
3375     static constexpr uintptr_t kFinishedWithSuccess = 2;
3376     // Holds one of kUnset, kFinishedWithFailure, or
3377     // kFinishedWithSuccess OR an int* that wants to receive the
3378     // final status.
3379     std::atomic<uintptr_t> state_{kUnset};
3380   };
3381 
3382   void CommitBatch(const grpc_op* ops, size_t nops,
3383                    const Completion& completion);
3384   void Finish(ServerMetadataHandle result);
3385 
3386   ServerInterface* const server_;
3387   const void* const server_transport_data_;
3388   PipeSender<ServerMetadataHandle>* server_initial_metadata_ = nullptr;
3389   PipeSender<MessageHandle>* server_to_client_messages_ = nullptr;
3390   PipeReceiver<MessageHandle>* client_to_server_messages_ = nullptr;
3391   Latch<ServerMetadataHandle> send_trailing_metadata_;
3392   RecvCloseOpCancelState recv_close_op_cancel_state_;
3393   ClientMetadataHandle client_initial_metadata_;
3394   Completion recv_close_completion_;
3395   std::atomic<bool> cancelled_{false};
3396 };
3397 
ServerPromiseBasedCall(Arena * arena,grpc_call_create_args * args)3398 ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena,
3399                                                grpc_call_create_args* args)
3400     : PromiseBasedCall(arena, 0, *args),
3401       server_(args->server),
3402       server_transport_data_(args->server_transport_data) {
3403   global_stats().IncrementServerCallsCreated();
3404   channelz::ServerNode* channelz_node = server_->channelz_node();
3405   if (channelz_node != nullptr) {
3406     channelz_node->RecordCallStarted();
3407   }
3408   ScopedContext activity_context(this);
3409   // TODO(yashykt): In the future, we want to also enable stats and trace
3410   // collecting from when the call is created at the transport. The idea is that
3411   // the transport would create the call tracer and pass it in as part of the
3412   // metadata.
3413   // TODO(yijiem): OpenCensus and internal Census is still using this way to
3414   // set server call tracer. We need to refactor them to stats plugins
3415   // (including removing the client channel filters).
3416   if (args->server != nullptr &&
3417       args->server->server_call_tracer_factory() != nullptr) {
3418     auto* server_call_tracer =
3419         args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
3420             arena, args->server->channel_args());
3421     if (server_call_tracer != nullptr) {
3422       // Note that we are setting both
3423       // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
3424       // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future
3425       // promise-based world, we would just a single tracer object for each
3426       // stack (call, subchannel_call, server_call.)
3427       ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE,
3428                  server_call_tracer, nullptr);
3429       ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr);
3430     }
3431   }
3432   args->channel->channel_stack()->stats_plugin_group->AddServerCallTracers(
3433       context());
3434   Spawn("server_promise",
3435         channel()->channel_stack()->MakeServerCallPromise(
3436             CallArgs{nullptr, ClientInitialMetadataOutstandingToken::Empty(),
3437                      nullptr, nullptr, nullptr, nullptr}),
3438         [this](ServerMetadataHandle result) { Finish(std::move(result)); });
3439 }
3440 
Finish(ServerMetadataHandle result)3441 void ServerPromiseBasedCall::Finish(ServerMetadataHandle result) {
3442   if (grpc_call_trace.enabled()) {
3443     gpr_log(GPR_INFO, "%s[call] Finish: recv_close_state:%s result:%s",
3444             DebugTag().c_str(), recv_close_op_cancel_state_.ToString().c_str(),
3445             result->DebugString().c_str());
3446   }
3447   const auto status =
3448       result->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
3449   channelz::ServerNode* channelz_node = server_->channelz_node();
3450   if (channelz_node != nullptr) {
3451     if (status == GRPC_STATUS_OK) {
3452       channelz_node->RecordCallSucceeded();
3453     } else {
3454       channelz_node->RecordCallFailed();
3455     }
3456   }
3457   bool was_cancelled = result->get(GrpcCallWasCancelled()).value_or(true);
3458   if (recv_close_op_cancel_state_.CompleteCallWithCancelledSetTo(
3459           was_cancelled)) {
3460     FinishOpOnCompletion(&recv_close_completion_,
3461                          PendingOp::kReceiveCloseOnServer);
3462   }
3463   if (was_cancelled) set_failed_before_recv_message();
3464   if (server_initial_metadata_ != nullptr) {
3465     server_initial_metadata_->Close();
3466   }
3467   Slice message_slice;
3468   if (Slice* message = result->get_pointer(GrpcMessageMetadata())) {
3469     message_slice = message->Ref();
3470   }
3471   AcceptTransportStatsFromContext();
3472   SetFinalizationStatus(status, std::move(message_slice));
3473   set_completed();
3474   ResetDeadline();
3475   PropagateCancellationToChildren();
3476 }
3477 
ValidateServerBatch(const grpc_op * ops,size_t nops)3478 grpc_call_error ValidateServerBatch(const grpc_op* ops, size_t nops) {
3479   BitSet<8> got_ops;
3480   for (size_t op_idx = 0; op_idx < nops; op_idx++) {
3481     const grpc_op& op = ops[op_idx];
3482     switch (op.op) {
3483       case GRPC_OP_SEND_INITIAL_METADATA:
3484         if (!AreInitialMetadataFlagsValid(op.flags)) {
3485           return GRPC_CALL_ERROR_INVALID_FLAGS;
3486         }
3487         if (!ValidateMetadata(op.data.send_initial_metadata.count,
3488                               op.data.send_initial_metadata.metadata)) {
3489           return GRPC_CALL_ERROR_INVALID_METADATA;
3490         }
3491         break;
3492       case GRPC_OP_SEND_MESSAGE:
3493         if (!AreWriteFlagsValid(op.flags)) {
3494           return GRPC_CALL_ERROR_INVALID_FLAGS;
3495         }
3496         break;
3497       case GRPC_OP_SEND_STATUS_FROM_SERVER:
3498         if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
3499         if (!ValidateMetadata(
3500                 op.data.send_status_from_server.trailing_metadata_count,
3501                 op.data.send_status_from_server.trailing_metadata)) {
3502           return GRPC_CALL_ERROR_INVALID_METADATA;
3503         }
3504         break;
3505       case GRPC_OP_RECV_MESSAGE:
3506       case GRPC_OP_RECV_CLOSE_ON_SERVER:
3507         if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
3508         break;
3509       case GRPC_OP_RECV_INITIAL_METADATA:
3510       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
3511       case GRPC_OP_RECV_STATUS_ON_CLIENT:
3512         return GRPC_CALL_ERROR_NOT_ON_SERVER;
3513     }
3514     if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
3515     got_ops.set(op.op);
3516   }
3517   return GRPC_CALL_OK;
3518 }
3519 
CommitBatch(const grpc_op * ops,size_t nops,const Completion & completion)3520 void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
3521                                          const Completion& completion) {
3522   Party::BulkSpawner spawner(this);
3523   for (size_t op_idx = 0; op_idx < nops; op_idx++) {
3524     const grpc_op& op = ops[op_idx];
3525     switch (op.op) {
3526       case GRPC_OP_SEND_INITIAL_METADATA: {
3527         auto metadata = arena()->MakePooled<ServerMetadata>();
3528         PrepareOutgoingInitialMetadata(op, *metadata);
3529         CToMetadata(op.data.send_initial_metadata.metadata,
3530                     op.data.send_initial_metadata.count, metadata.get());
3531         if (grpc_call_trace.enabled()) {
3532           gpr_log(GPR_INFO, "%s[call] Send initial metadata",
3533                   DebugTag().c_str());
3534         }
3535         QueueSend();
3536         spawner.Spawn(
3537             "call_send_initial_metadata",
3538             [this, metadata = std::move(metadata)]() mutable {
3539               EnactSend();
3540               return server_initial_metadata_->Push(std::move(metadata));
3541             },
3542             [this,
3543              completion = AddOpToCompletion(
3544                  completion, PendingOp::kSendInitialMetadata)](bool r) mutable {
3545               if (!r) {
3546                 set_failed_before_recv_message();
3547                 FailCompletion(completion);
3548               }
3549               FinishOpOnCompletion(&completion,
3550                                    PendingOp::kSendInitialMetadata);
3551             });
3552       } break;
3553       case GRPC_OP_SEND_MESSAGE:
3554         StartSendMessage(op, completion, server_to_client_messages_, spawner);
3555         break;
3556       case GRPC_OP_RECV_MESSAGE:
3557         if (cancelled_.load(std::memory_order_relaxed)) {
3558           set_failed_before_recv_message();
3559           FailCompletion(completion);
3560           break;
3561         }
3562         StartRecvMessage(
3563             op, completion, []() { return []() { return Empty{}; }; },
3564             client_to_server_messages_, true, spawner);
3565         break;
3566       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
3567         auto metadata = arena()->MakePooled<ServerMetadata>();
3568         CToMetadata(op.data.send_status_from_server.trailing_metadata,
3569                     op.data.send_status_from_server.trailing_metadata_count,
3570                     metadata.get());
3571         metadata->Set(GrpcStatusMetadata(),
3572                       op.data.send_status_from_server.status);
3573         if (auto* details = op.data.send_status_from_server.status_details) {
3574           // TODO(ctiller): this should not be a copy, but we have callers that
3575           // allocate and pass in a slice created with
3576           // grpc_slice_from_static_string and then delete the string after
3577           // passing it in, which shouldn't be a supported API.
3578           metadata->Set(GrpcMessageMetadata(),
3579                         Slice(grpc_slice_copy(*details)));
3580         }
3581         spawner.Spawn(
3582             "call_send_status_from_server",
3583             [this, metadata = std::move(metadata)]() mutable {
3584               bool r = true;
3585               if (send_trailing_metadata_.is_set()) {
3586                 r = false;
3587               } else {
3588                 send_trailing_metadata_.Set(std::move(metadata));
3589               }
3590               return Map(WaitForSendingStarted(), [this, r](Empty) {
3591                 server_initial_metadata_->Close();
3592                 server_to_client_messages_->Close();
3593                 return r;
3594               });
3595             },
3596             [this, completion = AddOpToCompletion(
3597                        completion, PendingOp::kSendStatusFromServer)](
3598                 bool ok) mutable {
3599               if (!ok) {
3600                 set_failed_before_recv_message();
3601                 FailCompletion(completion);
3602               }
3603               FinishOpOnCompletion(&completion,
3604                                    PendingOp::kSendStatusFromServer);
3605             });
3606       } break;
3607       case GRPC_OP_RECV_CLOSE_ON_SERVER:
3608         if (grpc_call_trace.enabled()) {
3609           gpr_log(GPR_INFO, "%s[call] StartBatch: RecvClose %s",
3610                   DebugTag().c_str(),
3611                   recv_close_op_cancel_state_.ToString().c_str());
3612         }
3613         ForceCompletionSuccess(completion);
3614         recv_close_completion_ =
3615             AddOpToCompletion(completion, PendingOp::kReceiveCloseOnServer);
3616         if (recv_close_op_cancel_state_.ReceiveCloseOnServerOpStarted(
3617                 op.data.recv_close_on_server.cancelled)) {
3618           FinishOpOnCompletion(&recv_close_completion_,
3619                                PendingOp::kReceiveCloseOnServer);
3620         }
3621         break;
3622       case GRPC_OP_RECV_STATUS_ON_CLIENT:
3623       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
3624       case GRPC_OP_RECV_INITIAL_METADATA:
3625         abort();  // unreachable
3626     }
3627   }
3628 }
3629 
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)3630 grpc_call_error ServerPromiseBasedCall::StartBatch(const grpc_op* ops,
3631                                                    size_t nops,
3632                                                    void* notify_tag,
3633                                                    bool is_notify_tag_closure) {
3634   if (nops == 0) {
3635     EndOpImmediately(cq(), notify_tag, is_notify_tag_closure);
3636     return GRPC_CALL_OK;
3637   }
3638   const grpc_call_error validation_result = ValidateServerBatch(ops, nops);
3639   if (validation_result != GRPC_CALL_OK) {
3640     return validation_result;
3641   }
3642   Completion completion =
3643       StartCompletion(notify_tag, is_notify_tag_closure, ops);
3644   CommitBatch(ops, nops, completion);
3645   FinishOpOnCompletion(&completion, PendingOp::kStartingBatch);
3646   return GRPC_CALL_OK;
3647 }
3648 
CancelWithError(absl::Status error)3649 void ServerPromiseBasedCall::CancelWithError(absl::Status error) {
3650   cancelled_.store(true, std::memory_order_relaxed);
3651   Spawn(
3652       "cancel_with_error",
3653       [this, error = std::move(error)]() {
3654         if (!send_trailing_metadata_.is_set()) {
3655           auto md = ServerMetadataFromStatus(error);
3656           md->Set(GrpcCallWasCancelled(), true);
3657           send_trailing_metadata_.Set(std::move(md));
3658         }
3659         if (server_to_client_messages_ != nullptr) {
3660           server_to_client_messages_->Close();
3661         }
3662         if (server_initial_metadata_ != nullptr) {
3663           server_initial_metadata_->Close();
3664         }
3665         return Empty{};
3666       },
3667       [](Empty) {});
3668 }
3669 #endif
3670 
3671 #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
PublishInitialMetadata(ClientMetadataHandle metadata,grpc_metadata_array * publish_initial_metadata)3672 void ServerPromiseBasedCall::PublishInitialMetadata(
3673     ClientMetadataHandle metadata,
3674     grpc_metadata_array* publish_initial_metadata) {
3675   if (grpc_call_trace.enabled()) {
3676     gpr_log(GPR_INFO, "%s[call] PublishInitialMetadata: %s", DebugTag().c_str(),
3677             metadata->DebugString().c_str());
3678   }
3679   PublishMetadataArray(metadata.get(), publish_initial_metadata, false);
3680   client_initial_metadata_ = std::move(metadata);
3681 }
3682 
3683 ArenaPromise<ServerMetadataHandle>
MakeTopOfServerCallPromise(CallArgs call_args,grpc_completion_queue * cq,absl::FunctionRef<void (grpc_call * call)> publish)3684 ServerPromiseBasedCall::MakeTopOfServerCallPromise(
3685     CallArgs call_args, grpc_completion_queue* cq,
3686     absl::FunctionRef<void(grpc_call* call)> publish) {
3687   SetCompletionQueue(cq);
3688   call_args.polling_entity->Set(
3689       grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)));
3690   server_to_client_messages_ = call_args.server_to_client_messages;
3691   client_to_server_messages_ = call_args.client_to_server_messages;
3692   server_initial_metadata_ = call_args.server_initial_metadata;
3693   set_send_deadline(deadline());
3694   ProcessIncomingInitialMetadata(*client_initial_metadata_);
3695   ExternalRef();
3696   publish(c_ptr());
3697   return Seq(server_to_client_messages_->AwaitClosed(),
3698              send_trailing_metadata_.Wait());
3699 }
3700 
3701 ///////////////////////////////////////////////////////////////////////////////
3702 // CallSpine based Server Call
3703 
3704 class ServerCallSpine final : public CallSpineInterface,
3705                               public ServerCallContext,
3706                               public BasicPromiseBasedCall {
3707  public:
3708   ServerCallSpine(ServerInterface* server, Channel* channel, Arena* arena);
3709 
3710   // CallSpineInterface
client_initial_metadata()3711   Pipe<ClientMetadataHandle>& client_initial_metadata() override {
3712     return client_initial_metadata_;
3713   }
server_initial_metadata()3714   Pipe<ServerMetadataHandle>& server_initial_metadata() override {
3715     return server_initial_metadata_;
3716   }
client_to_server_messages()3717   Pipe<MessageHandle>& client_to_server_messages() override {
3718     return client_to_server_messages_;
3719   }
server_to_client_messages()3720   Pipe<MessageHandle>& server_to_client_messages() override {
3721     return server_to_client_messages_;
3722   }
server_trailing_metadata()3723   Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
3724     return server_trailing_metadata_;
3725   }
cancel_latch()3726   Latch<ServerMetadataHandle>& cancel_latch() override { return cancel_latch_; }
party()3727   Party& party() override { return *this; }
arena()3728   Arena* arena() override { return BasicPromiseBasedCall::arena(); }
IncrementRefCount()3729   void IncrementRefCount() override { InternalRef("CallSpine"); }
Unref()3730   void Unref() override { InternalUnref("CallSpine"); }
3731 
3732   // PromiseBasedCall
OrphanCall()3733   void OrphanCall() override {
3734     ResetDeadline();
3735     CancelWithError(absl::CancelledError());
3736   }
CancelWithError(grpc_error_handle error)3737   void CancelWithError(grpc_error_handle error) override {
3738     SpawnInfallible("CancelWithError", [this, error = std::move(error)] {
3739       std::ignore = Cancel(ServerMetadataFromStatus(error));
3740       return Empty{};
3741     });
3742   }
is_trailers_only() const3743   bool is_trailers_only() const override {
3744     Crash("is_trailers_only not implemented for server calls");
3745   }
GetServerAuthority() const3746   absl::string_view GetServerAuthority() const override {
3747     Crash("unimplemented");
3748   }
3749   grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
3750                              bool is_notify_tag_closure) override;
3751 
Completed()3752   bool Completed() final { Crash("unimplemented"); }
failed_before_recv_message() const3753   bool failed_before_recv_message() const final { Crash("unimplemented"); }
3754 
server_call_context()3755   ServerCallContext* server_call_context() override { return this; }
server_stream_data()3756   const void* server_stream_data() override { Crash("unimplemented"); }
3757   void PublishInitialMetadata(
3758       ClientMetadataHandle metadata,
3759       grpc_metadata_array* publish_initial_metadata) override;
MakeTopOfServerCallPromise(CallArgs,grpc_completion_queue *,absl::FunctionRef<void (grpc_call * call)>)3760   ArenaPromise<ServerMetadataHandle> MakeTopOfServerCallPromise(
3761       CallArgs, grpc_completion_queue*,
3762       absl::FunctionRef<void(grpc_call* call)>) override {
3763     Crash("unimplemented");
3764   }
3765 
RunParty()3766   bool RunParty() override {
3767     ScopedContext ctx(this);
3768     return Party::RunParty();
3769   }
3770 
3771  private:
3772   void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
3773                    bool is_notify_tag_closure);
3774   StatusFlag FinishRecvMessage(NextResult<MessageHandle> result);
3775 
DebugTag() const3776   std::string DebugTag() const override {
3777     return absl::StrFormat("SERVER_CALL_SPINE[%p]: ", this);
3778   }
3779 
3780   // Initial metadata from client to server
3781   Pipe<ClientMetadataHandle> client_initial_metadata_;
3782   // Initial metadata from server to client
3783   Pipe<ServerMetadataHandle> server_initial_metadata_;
3784   // Messages travelling from the application to the transport.
3785   Pipe<MessageHandle> client_to_server_messages_;
3786   // Messages travelling from the transport to the application.
3787   Pipe<MessageHandle> server_to_client_messages_;
3788   // Trailing metadata from server to client
3789   Pipe<ServerMetadataHandle> server_trailing_metadata_;
3790   // Latch that can be set to terminate the call
3791   Latch<ServerMetadataHandle> cancel_latch_;
3792   grpc_byte_buffer** recv_message_ = nullptr;
3793   ClientMetadataHandle client_initial_metadata_stored_;
3794 };
3795 
ServerCallSpine(ServerInterface * server,Channel * channel,Arena * arena)3796 ServerCallSpine::ServerCallSpine(ServerInterface* server, Channel* channel,
3797                                  Arena* arena)
3798     : BasicPromiseBasedCall(arena, 0, 1,
3799                             [channel, server]() -> grpc_call_create_args {
3800                               grpc_call_create_args args;
3801                               args.channel = channel->Ref();
3802                               args.server = server;
3803                               args.parent = nullptr;
3804                               args.propagation_mask = 0;
3805                               args.cq = nullptr;
3806                               args.pollset_set_alternative = nullptr;
3807                               args.server_transport_data =
3808                                   &args;  // Arbitrary non-null pointer
3809                               args.send_deadline = Timestamp::InfFuture();
3810                               return args;
3811                             }()),
3812       client_initial_metadata_(arena),
3813       server_initial_metadata_(arena),
3814       client_to_server_messages_(arena),
3815       server_to_client_messages_(arena),
3816       server_trailing_metadata_(arena) {
3817   global_stats().IncrementServerCallsCreated();
3818   ScopedContext ctx(this);
3819   channel->channel_stack()->InitServerCallSpine(this);
3820 }
3821 
PublishInitialMetadata(ClientMetadataHandle metadata,grpc_metadata_array * publish_initial_metadata)3822 void ServerCallSpine::PublishInitialMetadata(
3823     ClientMetadataHandle metadata,
3824     grpc_metadata_array* publish_initial_metadata) {
3825   if (grpc_call_trace.enabled()) {
3826     gpr_log(GPR_INFO, "%s[call] PublishInitialMetadata: %s", DebugTag().c_str(),
3827             metadata->DebugString().c_str());
3828   }
3829   PublishMetadataArray(metadata.get(), publish_initial_metadata, false);
3830   client_initial_metadata_stored_ = std::move(metadata);
3831 }
3832 
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)3833 grpc_call_error ServerCallSpine::StartBatch(const grpc_op* ops, size_t nops,
3834                                             void* notify_tag,
3835                                             bool is_notify_tag_closure) {
3836   if (nops == 0) {
3837     EndOpImmediately(cq(), notify_tag, is_notify_tag_closure);
3838     return GRPC_CALL_OK;
3839   }
3840   const grpc_call_error validation_result = ValidateServerBatch(ops, nops);
3841   if (validation_result != GRPC_CALL_OK) {
3842     return validation_result;
3843   }
3844   CommitBatch(ops, nops, notify_tag, is_notify_tag_closure);
3845   return GRPC_CALL_OK;
3846 }
3847 
3848 namespace {
3849 template <typename SetupFn>
3850 class MaybeOpImpl {
3851  public:
3852   using SetupResult = decltype(std::declval<SetupFn>()(grpc_op()));
3853   using PromiseFactory = promise_detail::OncePromiseFactory<void, SetupResult>;
3854   using Promise = typename PromiseFactory::Promise;
3855   struct Dismissed {};
3856   using State = absl::variant<Dismissed, PromiseFactory, Promise>;
3857 
3858   // op_ is garbage but shouldn't be uninitialized
MaybeOpImpl()3859   MaybeOpImpl() : state_(Dismissed{}), op_(GRPC_OP_RECV_STATUS_ON_CLIENT) {}
MaybeOpImpl(SetupResult result,grpc_op_type op)3860   MaybeOpImpl(SetupResult result, grpc_op_type op)
3861       : state_(PromiseFactory(std::move(result))), op_(op) {}
3862 
3863   MaybeOpImpl(const MaybeOpImpl&) = delete;
3864   MaybeOpImpl& operator=(const MaybeOpImpl&) = delete;
MaybeOpImpl(MaybeOpImpl && other)3865   MaybeOpImpl(MaybeOpImpl&& other) noexcept
3866       : state_(MoveState(other.state_)), op_(other.op_) {}
operator =(MaybeOpImpl && other)3867   MaybeOpImpl& operator=(MaybeOpImpl&& other) noexcept {
3868     op_ = other.op_;
3869     if (absl::holds_alternative<Dismissed>(state_)) {
3870       state_.template emplace<Dismissed>();
3871       return *this;
3872     }
3873     // Can't move after first poll => Promise is not an option
3874     state_.template emplace<PromiseFactory>(
3875         std::move(absl::get<PromiseFactory>(other.state_)));
3876     return *this;
3877   }
3878 
operator ()()3879   Poll<StatusFlag> operator()() {
3880     if (absl::holds_alternative<Dismissed>(state_)) return Success{};
3881     if (absl::holds_alternative<PromiseFactory>(state_)) {
3882       auto& factory = absl::get<PromiseFactory>(state_);
3883       auto promise = factory.Make();
3884       state_.template emplace<Promise>(std::move(promise));
3885     }
3886     if (grpc_call_trace.enabled()) {
3887       gpr_log(GPR_INFO, "%sBeginPoll %s",
3888               Activity::current()->DebugTag().c_str(), OpName(op_).c_str());
3889     }
3890     auto& promise = absl::get<Promise>(state_);
3891     auto r = poll_cast<StatusFlag>(promise());
3892     if (grpc_call_trace.enabled()) {
3893       gpr_log(GPR_INFO, "%sEndPoll %s --> %s",
3894               Activity::current()->DebugTag().c_str(), OpName(op_).c_str(),
3895               r.pending() ? "PENDING" : (r.value().ok() ? "OK" : "FAILURE"));
3896     }
3897     return r;
3898   }
3899 
3900  private:
3901   GPR_NO_UNIQUE_ADDRESS State state_;
3902   GPR_NO_UNIQUE_ADDRESS grpc_op_type op_;
3903 
OpName(grpc_op_type op)3904   static std::string OpName(grpc_op_type op) {
3905     switch (op) {
3906       case GRPC_OP_SEND_INITIAL_METADATA:
3907         return "SendInitialMetadata";
3908       case GRPC_OP_SEND_MESSAGE:
3909         return "SendMessage";
3910       case GRPC_OP_SEND_STATUS_FROM_SERVER:
3911         return "SendStatusFromServer";
3912       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
3913         return "SendCloseFromClient";
3914       case GRPC_OP_RECV_MESSAGE:
3915         return "RecvMessage";
3916       case GRPC_OP_RECV_CLOSE_ON_SERVER:
3917         return "RecvCloseOnServer";
3918       case GRPC_OP_RECV_INITIAL_METADATA:
3919         return "RecvInitialMetadata";
3920       case GRPC_OP_RECV_STATUS_ON_CLIENT:
3921         return "RecvStatusOnClient";
3922     }
3923     return absl::StrCat("UnknownOp(", op, ")");
3924   }
3925 
MoveState(State & state)3926   static State MoveState(State& state) {
3927     if (absl::holds_alternative<Dismissed>(state)) return Dismissed{};
3928     // Can't move after first poll => Promise is not an option
3929     return std::move(absl::get<PromiseFactory>(state));
3930   }
3931 };
3932 
3933 // MaybeOp captures a fairly complicated dance we need to do for the batch API.
3934 // We first check if an op is included or not, and if it is, we run the setup
3935 // function in the context of the API call (NOT in the call party).
3936 // This setup function returns a promise factory which we'll then run *in* the
3937 // party to do initial setup, and have it return the promise that we'll
3938 // ultimately poll on til completion.
3939 // Once we express our surface API in terms of core internal types this whole
3940 // dance will go away.
3941 template <typename SetupFn>
MaybeOp(const grpc_op * ops,uint8_t idx,SetupFn setup)3942 auto MaybeOp(const grpc_op* ops, uint8_t idx, SetupFn setup) {
3943   if (idx == 255) {
3944     return MaybeOpImpl<SetupFn>();
3945   } else {
3946     return MaybeOpImpl<SetupFn>(setup(ops[idx]), ops[idx].op);
3947   }
3948 }
3949 
3950 template <typename F>
3951 class PollBatchLogger {
3952  public:
PollBatchLogger(void * tag,F f)3953   PollBatchLogger(void* tag, F f) : tag_(tag), f_(std::move(f)) {}
3954 
operator ()()3955   auto operator()() {
3956     if (grpc_call_trace.enabled()) {
3957       gpr_log(GPR_INFO, "Poll batch %p", tag_);
3958     }
3959     auto r = f_();
3960     if (grpc_call_trace.enabled()) {
3961       gpr_log(GPR_INFO, "Poll batch %p --> %s", tag_, ResultString(r).c_str());
3962     }
3963     return r;
3964   }
3965 
3966  private:
3967   template <typename T>
ResultString(Poll<T> r)3968   static std::string ResultString(Poll<T> r) {
3969     if (r.pending()) return "PENDING";
3970     return ResultString(r.value());
3971   }
ResultString(Empty)3972   static std::string ResultString(Empty) { return "DONE"; }
3973 
3974   void* tag_;
3975   F f_;
3976 };
3977 
3978 template <typename F>
LogPollBatch(void * tag,F f)3979 PollBatchLogger<F> LogPollBatch(void* tag, F f) {
3980   return PollBatchLogger<F>(tag, std::move(f));
3981 }
3982 }  // namespace
3983 
FinishRecvMessage(NextResult<MessageHandle> result)3984 StatusFlag ServerCallSpine::FinishRecvMessage(
3985     NextResult<MessageHandle> result) {
3986   if (result.has_value()) {
3987     MessageHandle& message = *result;
3988     NoteLastMessageFlags(message->flags());
3989     if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
3990         (incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
3991       *recv_message_ = grpc_raw_compressed_byte_buffer_create(
3992           nullptr, 0, incoming_compression_algorithm());
3993     } else {
3994       *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0);
3995     }
3996     grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(),
3997                                 &(*recv_message_)->data.raw.slice_buffer);
3998     if (grpc_call_trace.enabled()) {
3999       gpr_log(GPR_INFO,
4000               "%s[call] RecvMessage: outstanding_recv "
4001               "finishes: received %" PRIdPTR " byte message",
4002               DebugTag().c_str(),
4003               (*recv_message_)->data.raw.slice_buffer.length);
4004     }
4005     recv_message_ = nullptr;
4006     return Success{};
4007   }
4008   if (result.cancelled()) {
4009     if (grpc_call_trace.enabled()) {
4010       gpr_log(GPR_INFO,
4011               "%s[call] RecvMessage: outstanding_recv "
4012               "finishes: received end-of-stream with error",
4013               DebugTag().c_str());
4014     }
4015     *recv_message_ = nullptr;
4016     recv_message_ = nullptr;
4017     return Failure{};
4018   }
4019   if (grpc_call_trace.enabled()) {
4020     gpr_log(GPR_INFO,
4021             "%s[call] RecvMessage: outstanding_recv "
4022             "finishes: received end-of-stream",
4023             DebugTag().c_str());
4024   }
4025   *recv_message_ = nullptr;
4026   recv_message_ = nullptr;
4027   return Success{};
4028 }
4029 
CommitBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)4030 void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
4031                                   void* notify_tag,
4032                                   bool is_notify_tag_closure) {
4033   std::array<uint8_t, 8> got_ops{255, 255, 255, 255, 255, 255, 255, 255};
4034   for (size_t op_idx = 0; op_idx < nops; op_idx++) {
4035     const grpc_op& op = ops[op_idx];
4036     got_ops[op.op] = op_idx;
4037   }
4038   if (!is_notify_tag_closure) grpc_cq_begin_op(cq(), notify_tag);
4039   auto send_initial_metadata = MaybeOp(
4040       ops, got_ops[GRPC_OP_SEND_INITIAL_METADATA], [this](const grpc_op& op) {
4041         auto metadata = arena()->MakePooled<ServerMetadata>();
4042         PrepareOutgoingInitialMetadata(op, *metadata);
4043         CToMetadata(op.data.send_initial_metadata.metadata,
4044                     op.data.send_initial_metadata.count, metadata.get());
4045         if (grpc_call_trace.enabled()) {
4046           gpr_log(GPR_INFO, "%s[call] Send initial metadata",
4047                   DebugTag().c_str());
4048         }
4049         return [this, metadata = std::move(metadata)]() mutable {
4050           return Map(server_initial_metadata_.sender.Push(std::move(metadata)),
4051                      [this](bool r) {
4052                        server_initial_metadata_.sender.Close();
4053                        return StatusFlag(r);
4054                      });
4055         };
4056       });
4057   auto send_message =
4058       MaybeOp(ops, got_ops[GRPC_OP_SEND_MESSAGE], [this](const grpc_op& op) {
4059         SliceBuffer send;
4060         grpc_slice_buffer_swap(
4061             &op.data.send_message.send_message->data.raw.slice_buffer,
4062             send.c_slice_buffer());
4063         auto msg = arena()->MakePooled<Message>(std::move(send), op.flags);
4064         return [this, msg = std::move(msg)]() mutable {
4065           return Map(server_to_client_messages_.sender.Push(std::move(msg)),
4066                      [](bool r) { return StatusFlag(r); });
4067         };
4068       });
4069   auto send_trailing_metadata = MaybeOp(
4070       ops, got_ops[GRPC_OP_SEND_STATUS_FROM_SERVER], [this](const grpc_op& op) {
4071         auto metadata = arena()->MakePooled<ServerMetadata>();
4072         CToMetadata(op.data.send_status_from_server.trailing_metadata,
4073                     op.data.send_status_from_server.trailing_metadata_count,
4074                     metadata.get());
4075         metadata->Set(GrpcStatusMetadata(),
4076                       op.data.send_status_from_server.status);
4077         if (auto* details = op.data.send_status_from_server.status_details) {
4078           // TODO(ctiller): this should not be a copy, but we have
4079           // callers that allocate and pass in a slice created with
4080           // grpc_slice_from_static_string and then delete the string
4081           // after passing it in, which shouldn't be a supported API.
4082           metadata->Set(GrpcMessageMetadata(),
4083                         Slice(grpc_slice_copy(*details)));
4084         }
4085         return [this, metadata = std::move(metadata)]() mutable {
4086           server_to_client_messages_.sender.Close();
4087           return Map(server_trailing_metadata_.sender.Push(std::move(metadata)),
4088                      [](bool r) { return StatusFlag(r); });
4089         };
4090       });
4091   auto recv_message =
4092       MaybeOp(ops, got_ops[GRPC_OP_RECV_MESSAGE], [this](const grpc_op& op) {
4093         GPR_ASSERT(recv_message_ == nullptr);
4094         recv_message_ = op.data.recv_message.recv_message;
4095         return [this]() mutable {
4096           return Map(client_to_server_messages_.receiver.Next(),
4097                      [this](NextResult<MessageHandle> msg) {
4098                        return FinishRecvMessage(std::move(msg));
4099                      });
4100         };
4101       });
4102   auto primary_ops = AllOk<StatusFlag>(
4103       std::move(send_initial_metadata), std::move(send_message),
4104       std::move(send_trailing_metadata), std::move(recv_message));
4105   if (got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER] != 255) {
4106     auto recv_trailing_metadata = MaybeOp(
4107         ops, got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER], [this](const grpc_op& op) {
4108           return [this, cancelled = op.data.recv_close_on_server.cancelled]() {
4109             return Map(server_trailing_metadata_.receiver.AwaitClosed(),
4110                        [cancelled, this](bool result) -> Success {
4111                          ResetDeadline();
4112                          *cancelled = result ? 1 : 0;
4113                          return Success{};
4114                        });
4115           };
4116         });
4117     SpawnInfallible(
4118         "final-batch",
4119         [primary_ops = std::move(primary_ops),
4120          recv_trailing_metadata = std::move(recv_trailing_metadata),
4121          is_notify_tag_closure, notify_tag, this]() mutable {
4122           return LogPollBatch(
4123               notify_tag,
4124               Seq(std::move(primary_ops), std::move(recv_trailing_metadata),
4125                   [is_notify_tag_closure, notify_tag, this](StatusFlag) {
4126                     return WaitForCqEndOp(is_notify_tag_closure, notify_tag,
4127                                           absl::OkStatus(), cq());
4128                   }));
4129         });
4130   } else {
4131     SpawnInfallible("batch", [primary_ops = std::move(primary_ops),
4132                               is_notify_tag_closure, notify_tag,
4133                               this]() mutable {
4134       return LogPollBatch(
4135           notify_tag,
4136           Seq(std::move(primary_ops),
4137               [is_notify_tag_closure, notify_tag, this](StatusFlag r) {
4138                 return WaitForCqEndOp(is_notify_tag_closure, notify_tag,
4139                                       StatusCast<grpc_error_handle>(r), cq());
4140               }));
4141     });
4142   }
4143 }
4144 
MakeServerCall(ServerInterface * server,Channel * channel,Arena * arena)4145 RefCountedPtr<CallSpineInterface> MakeServerCall(ServerInterface* server,
4146                                                  Channel* channel,
4147                                                  Arena* arena) {
4148   return RefCountedPtr<ServerCallSpine>(
4149       arena->New<ServerCallSpine>(server, channel, arena));
4150 }
4151 #else
MakeServerCall(ServerInterface *,Channel *,Arena *)4152 RefCountedPtr<CallSpineInterface> MakeServerCall(ServerInterface*, Channel*,
4153                                                  Arena*) {
4154   Crash("not implemented");
4155 }
4156 #endif
4157 
4158 }  // namespace grpc_core
4159 
4160 ///////////////////////////////////////////////////////////////////////////////
4161 // C-based API
4162 
grpc_call_arena_alloc(grpc_call * call,size_t size)4163 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
4164   grpc_core::ExecCtx exec_ctx;
4165   return grpc_core::Call::FromC(call)->arena()->Alloc(size);
4166 }
4167 
grpc_call_get_initial_size_estimate()4168 size_t grpc_call_get_initial_size_estimate() {
4169   return grpc_core::FilterStackCall::InitialSizeEstimate();
4170 }
4171 
grpc_call_create(grpc_call_create_args * args,grpc_call ** out_call)4172 grpc_error_handle grpc_call_create(grpc_call_create_args* args,
4173                                    grpc_call** out_call) {
4174 #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
4175   if (grpc_core::IsPromiseBasedClientCallEnabled() &&
4176       args->server_transport_data == nullptr && args->channel->is_promising()) {
4177     return grpc_core::MakePromiseBasedCall<grpc_core::ClientPromiseBasedCall>(
4178         args, out_call);
4179   }
4180 #endif
4181 #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
4182   if (grpc_core::IsPromiseBasedServerCallEnabled() &&
4183       args->server_transport_data != nullptr && args->channel->is_promising()) {
4184     return grpc_core::MakePromiseBasedCall<grpc_core::ServerPromiseBasedCall>(
4185         args, out_call);
4186   }
4187 #endif
4188   return grpc_core::FilterStackCall::Create(args, out_call);
4189 }
4190 
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)4191 void grpc_call_set_completion_queue(grpc_call* call,
4192                                     grpc_completion_queue* cq) {
4193   grpc_core::Call::FromC(call)->SetCompletionQueue(cq);
4194 }
4195 
grpc_call_ref(grpc_call * c)4196 void grpc_call_ref(grpc_call* c) { grpc_core::Call::FromC(c)->ExternalRef(); }
4197 
grpc_call_unref(grpc_call * c)4198 void grpc_call_unref(grpc_call* c) {
4199   grpc_core::ExecCtx exec_ctx;
4200   grpc_core::Call::FromC(c)->ExternalUnref();
4201 }
4202 
grpc_call_get_peer(grpc_call * call)4203 char* grpc_call_get_peer(grpc_call* call) {
4204   return grpc_core::Call::FromC(call)->GetPeer();
4205 }
4206 
grpc_call_from_top_element(grpc_call_element * surface_element)4207 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {
4208   return grpc_core::FilterStackCall::FromTopElem(surface_element)->c_ptr();
4209 }
4210 
grpc_call_cancel(grpc_call * call,void * reserved)4211 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
4212   GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
4213   GPR_ASSERT(reserved == nullptr);
4214   if (call == nullptr) {
4215     return GRPC_CALL_ERROR;
4216   }
4217   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
4218   grpc_core::ExecCtx exec_ctx;
4219   grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError());
4220   return GRPC_CALL_OK;
4221 }
4222 
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)4223 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
4224                                              grpc_status_code status,
4225                                              const char* description,
4226                                              void* reserved) {
4227   GRPC_API_TRACE(
4228       "grpc_call_cancel_with_status("
4229       "c=%p, status=%d, description=%s, reserved=%p)",
4230       4, (c, (int)status, description, reserved));
4231   GPR_ASSERT(reserved == nullptr);
4232   if (c == nullptr) {
4233     return GRPC_CALL_ERROR;
4234   }
4235   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
4236   grpc_core::ExecCtx exec_ctx;
4237   grpc_core::Call::FromC(c)->CancelWithStatus(status, description);
4238   return GRPC_CALL_OK;
4239 }
4240 
grpc_call_cancel_internal(grpc_call * call)4241 void grpc_call_cancel_internal(grpc_call* call) {
4242   grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError());
4243 }
4244 
grpc_call_test_only_get_compression_algorithm(grpc_call * call)4245 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
4246     grpc_call* call) {
4247   return grpc_core::Call::FromC(call)->test_only_compression_algorithm();
4248 }
4249 
grpc_call_test_only_get_message_flags(grpc_call * call)4250 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
4251   return grpc_core::Call::FromC(call)->test_only_message_flags();
4252 }
4253 
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)4254 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
4255   return grpc_core::Call::FromC(call)
4256       ->encodings_accepted_by_peer()
4257       .ToLegacyBitmask();
4258 }
4259 
grpc_call_get_arena(grpc_call * call)4260 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) {
4261   return grpc_core::Call::FromC(call)->arena();
4262 }
4263 
grpc_call_get_call_stack(grpc_call * call)4264 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
4265   return grpc_core::Call::FromC(call)->call_stack();
4266 }
4267 
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)4268 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
4269                                       size_t nops, void* tag, void* reserved) {
4270   GRPC_API_TRACE(
4271       "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
4272       "reserved=%p)",
4273       5, (call, ops, (unsigned long)nops, tag, reserved));
4274 
4275   if (reserved != nullptr || call == nullptr) {
4276     return GRPC_CALL_ERROR;
4277   } else {
4278     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
4279     grpc_core::ExecCtx exec_ctx;
4280     return grpc_core::Call::FromC(call)->StartBatch(ops, nops, tag, false);
4281   }
4282 }
4283 
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)4284 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
4285                                                   const grpc_op* ops,
4286                                                   size_t nops,
4287                                                   grpc_closure* closure) {
4288   return grpc_core::Call::FromC(call)->StartBatch(ops, nops, closure, true);
4289 }
4290 
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))4291 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
4292                            void* value, void (*destroy)(void* value)) {
4293   return grpc_core::Call::FromC(call)->ContextSet(elem, value, destroy);
4294 }
4295 
grpc_call_context_get(grpc_call * call,grpc_context_index elem)4296 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
4297   return grpc_core::Call::FromC(call)->ContextGet(elem);
4298 }
4299 
grpc_call_is_client(grpc_call * call)4300 uint8_t grpc_call_is_client(grpc_call* call) {
4301   return grpc_core::Call::FromC(call)->is_client();
4302 }
4303 
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)4304 grpc_compression_algorithm grpc_call_compression_for_level(
4305     grpc_call* call, grpc_compression_level level) {
4306   return grpc_core::Call::FromC(call)
4307       ->encodings_accepted_by_peer()
4308       .CompressionAlgorithmForLevel(level);
4309 }
4310 
grpc_call_is_trailers_only(const grpc_call * call)4311 bool grpc_call_is_trailers_only(const grpc_call* call) {
4312   return grpc_core::Call::FromC(call)->is_trailers_only();
4313 }
4314 
grpc_call_failed_before_recv_message(const grpc_call * c)4315 int grpc_call_failed_before_recv_message(const grpc_call* c) {
4316   return grpc_core::Call::FromC(c)->failed_before_recv_message();
4317 }
4318 
grpc_call_server_authority(const grpc_call * call)4319 absl::string_view grpc_call_server_authority(const grpc_call* call) {
4320   return grpc_core::Call::FromC(call)->GetServerAuthority();
4321 }
4322 
grpc_call_error_to_string(grpc_call_error error)4323 const char* grpc_call_error_to_string(grpc_call_error error) {
4324   switch (error) {
4325     case GRPC_CALL_ERROR:
4326       return "GRPC_CALL_ERROR";
4327     case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
4328       return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
4329     case GRPC_CALL_ERROR_ALREADY_FINISHED:
4330       return "GRPC_CALL_ERROR_ALREADY_FINISHED";
4331     case GRPC_CALL_ERROR_ALREADY_INVOKED:
4332       return "GRPC_CALL_ERROR_ALREADY_INVOKED";
4333     case GRPC_CALL_ERROR_BATCH_TOO_BIG:
4334       return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
4335     case GRPC_CALL_ERROR_INVALID_FLAGS:
4336       return "GRPC_CALL_ERROR_INVALID_FLAGS";
4337     case GRPC_CALL_ERROR_INVALID_MESSAGE:
4338       return "GRPC_CALL_ERROR_INVALID_MESSAGE";
4339     case GRPC_CALL_ERROR_INVALID_METADATA:
4340       return "GRPC_CALL_ERROR_INVALID_METADATA";
4341     case GRPC_CALL_ERROR_NOT_INVOKED:
4342       return "GRPC_CALL_ERROR_NOT_INVOKED";
4343     case GRPC_CALL_ERROR_NOT_ON_CLIENT:
4344       return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
4345     case GRPC_CALL_ERROR_NOT_ON_SERVER:
4346       return "GRPC_CALL_ERROR_NOT_ON_SERVER";
4347     case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
4348       return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
4349     case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
4350       return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
4351     case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
4352       return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
4353     case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
4354       return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
4355     case GRPC_CALL_OK:
4356       return "GRPC_CALL_OK";
4357   }
4358   GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
4359 }
4360 
grpc_call_run_in_event_engine(const grpc_call * call,absl::AnyInvocable<void ()> cb)4361 void grpc_call_run_in_event_engine(const grpc_call* call,
4362                                    absl::AnyInvocable<void()> cb) {
4363   grpc_core::Call::FromC(call)->event_engine()->Run(std::move(cb));
4364 }
4365