1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/surface/call.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdlib.h>
26 #include <string.h>
27
28 #include <algorithm>
29 #include <atomic>
30 #include <memory>
31 #include <new>
32 #include <string>
33 #include <type_traits>
34 #include <utility>
35 #include <vector>
36
37 #include "absl/base/thread_annotations.h"
38 #include "absl/status/status.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_format.h"
41 #include "absl/strings/str_join.h"
42 #include "absl/strings/string_view.h"
43
44 #include <grpc/byte_buffer.h>
45 #include <grpc/compression.h>
46 #include <grpc/event_engine/event_engine.h>
47 #include <grpc/grpc.h>
48 #include <grpc/impl/call.h>
49 #include <grpc/impl/propagation_bits.h>
50 #include <grpc/slice.h>
51 #include <grpc/slice_buffer.h>
52 #include <grpc/status.h>
53 #include <grpc/support/alloc.h>
54 #include <grpc/support/atm.h>
55 #include <grpc/support/log.h>
56 #include <grpc/support/string_util.h>
57
58 #include "src/core/lib/channel/call_finalization.h"
59 #include "src/core/lib/channel/call_tracer.h"
60 #include "src/core/lib/channel/channel_stack.h"
61 #include "src/core/lib/channel/channelz.h"
62 #include "src/core/lib/channel/context.h"
63 #include "src/core/lib/channel/status_util.h"
64 #include "src/core/lib/compression/compression_internal.h"
65 #include "src/core/lib/debug/stats.h"
66 #include "src/core/lib/debug/stats_data.h"
67 #include "src/core/lib/experiments/experiments.h"
68 #include "src/core/lib/gpr/alloc.h"
69 #include "src/core/lib/gpr/time_precise.h"
70 #include "src/core/lib/gpr/useful.h"
71 #include "src/core/lib/gprpp/bitset.h"
72 #include "src/core/lib/gprpp/cpp_impl_of.h"
73 #include "src/core/lib/gprpp/crash.h"
74 #include "src/core/lib/gprpp/debug_location.h"
75 #include "src/core/lib/gprpp/ref_counted.h"
76 #include "src/core/lib/gprpp/ref_counted_ptr.h"
77 #include "src/core/lib/gprpp/status_helper.h"
78 #include "src/core/lib/gprpp/sync.h"
79 #include "src/core/lib/iomgr/call_combiner.h"
80 #include "src/core/lib/iomgr/exec_ctx.h"
81 #include "src/core/lib/iomgr/polling_entity.h"
82 #include "src/core/lib/promise/activity.h"
83 #include "src/core/lib/promise/all_ok.h"
84 #include "src/core/lib/promise/arena_promise.h"
85 #include "src/core/lib/promise/context.h"
86 #include "src/core/lib/promise/latch.h"
87 #include "src/core/lib/promise/map.h"
88 #include "src/core/lib/promise/party.h"
89 #include "src/core/lib/promise/pipe.h"
90 #include "src/core/lib/promise/poll.h"
91 #include "src/core/lib/promise/race.h"
92 #include "src/core/lib/promise/seq.h"
93 #include "src/core/lib/promise/status_flag.h"
94 #include "src/core/lib/resource_quota/arena.h"
95 #include "src/core/lib/slice/slice_buffer.h"
96 #include "src/core/lib/slice/slice_internal.h"
97 #include "src/core/lib/surface/api_trace.h"
98 #include "src/core/lib/surface/call_test_only.h"
99 #include "src/core/lib/surface/channel.h"
100 #include "src/core/lib/surface/completion_queue.h"
101 #include "src/core/lib/surface/server_interface.h"
102 #include "src/core/lib/surface/validate_metadata.h"
103 #include "src/core/lib/surface/wait_for_cq_end_op.h"
104 #include "src/core/lib/transport/batch_builder.h"
105 #include "src/core/lib/transport/error_utils.h"
106 #include "src/core/lib/transport/metadata_batch.h"
107 #include "src/core/lib/transport/transport.h"
108
109 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
110 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
111 grpc_core::TraceFlag grpc_call_trace(false, "call");
112 grpc_core::DebugOnlyTraceFlag grpc_call_refcount_trace(false, "call_refcount");
113
114 namespace grpc_core {
115
116 ///////////////////////////////////////////////////////////////////////////////
117 // Call
118
119 class Call : public CppImplOf<Call, grpc_call> {
120 public:
arena()121 Arena* arena() { return arena_; }
is_client() const122 bool is_client() const { return is_client_; }
123
124 virtual void ContextSet(grpc_context_index elem, void* value,
125 void (*destroy)(void* value)) = 0;
126 virtual void* ContextGet(grpc_context_index elem) const = 0;
127 virtual bool Completed() = 0;
128 void CancelWithStatus(grpc_status_code status, const char* description);
129 virtual void CancelWithError(grpc_error_handle error) = 0;
130 virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0;
131 char* GetPeer();
132 virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops,
133 void* notify_tag,
134 bool is_notify_tag_closure) = 0;
135 virtual bool failed_before_recv_message() const = 0;
136 virtual bool is_trailers_only() const = 0;
137 virtual absl::string_view GetServerAuthority() const = 0;
138 virtual void ExternalRef() = 0;
139 virtual void ExternalUnref() = 0;
140 virtual void InternalRef(const char* reason) = 0;
141 virtual void InternalUnref(const char* reason) = 0;
142
test_only_compression_algorithm()143 grpc_compression_algorithm test_only_compression_algorithm() {
144 return incoming_compression_algorithm_;
145 }
test_only_message_flags()146 uint32_t test_only_message_flags() { return test_only_last_message_flags_; }
encodings_accepted_by_peer()147 CompressionAlgorithmSet encodings_accepted_by_peer() {
148 return encodings_accepted_by_peer_;
149 }
150
151 // This should return nullptr for the promise stack (and alternative means
152 // for that functionality be invented)
153 virtual grpc_call_stack* call_stack() = 0;
154
155 // Return the EventEngine used for this call's async execution.
156 virtual grpc_event_engine::experimental::EventEngine* event_engine()
157 const = 0;
158
159 protected:
160 // The maximum number of concurrent batches possible.
161 // Based upon the maximum number of individually queueable ops in the batch
162 // api:
163 // - initial metadata send
164 // - message send
165 // - status/close send (depending on client/server)
166 // - initial metadata recv
167 // - message recv
168 // - status/close recv (depending on client/server)
169 static constexpr size_t kMaxConcurrentBatches = 6;
170
171 struct ParentCall {
172 Mutex child_list_mu;
173 Call* first_child ABSL_GUARDED_BY(child_list_mu) = nullptr;
174 };
175
176 struct ChildCall {
ChildCallgrpc_core::Call::ChildCall177 explicit ChildCall(Call* parent) : parent(parent) {}
178 Call* parent;
179 /// siblings: children of the same parent form a list, and this list is
180 /// protected under
181 /// parent->mu
182 Call* sibling_next = nullptr;
183 Call* sibling_prev = nullptr;
184 };
185
Call(Arena * arena,bool is_client,Timestamp send_deadline,RefCountedPtr<Channel> channel)186 Call(Arena* arena, bool is_client, Timestamp send_deadline,
187 RefCountedPtr<Channel> channel)
188 : channel_(std::move(channel)),
189 arena_(arena),
190 send_deadline_(send_deadline),
191 is_client_(is_client) {
192 GPR_DEBUG_ASSERT(arena_ != nullptr);
193 GPR_DEBUG_ASSERT(channel_ != nullptr);
194 }
195 virtual ~Call() = default;
196
197 void DeleteThis();
198
199 ParentCall* GetOrCreateParentCall();
200 ParentCall* parent_call();
channel() const201 Channel* channel() const {
202 GPR_DEBUG_ASSERT(channel_ != nullptr);
203 return channel_.get();
204 }
205
206 absl::Status InitParent(Call* parent, uint32_t propagation_mask);
207 void PublishToParent(Call* parent);
208 void MaybeUnpublishFromParent();
209 void PropagateCancellationToChildren();
210
send_deadline() const211 Timestamp send_deadline() const { return send_deadline_; }
set_send_deadline(Timestamp send_deadline)212 void set_send_deadline(Timestamp send_deadline) {
213 send_deadline_ = send_deadline;
214 }
215
GetPeerString() const216 Slice GetPeerString() const {
217 MutexLock lock(&peer_mu_);
218 return peer_string_.Ref();
219 }
220
SetPeerString(Slice peer_string)221 void SetPeerString(Slice peer_string) {
222 MutexLock lock(&peer_mu_);
223 peer_string_ = std::move(peer_string);
224 }
225
ClearPeerString()226 void ClearPeerString() { SetPeerString(Slice(grpc_empty_slice())); }
227
228 // TODO(ctiller): cancel_func is for cancellation of the call - filter stack
229 // holds no mutexes here, promise stack does, and so locking is different.
230 // Remove this and cancel directly once promise conversion is done.
231 void ProcessIncomingInitialMetadata(grpc_metadata_batch& md);
232 // Fixup outgoing metadata before sending - adds compression, protects
233 // internal headers against external modification.
234 void PrepareOutgoingInitialMetadata(const grpc_op& op,
235 grpc_metadata_batch& md);
NoteLastMessageFlags(uint32_t flags)236 void NoteLastMessageFlags(uint32_t flags) {
237 test_only_last_message_flags_ = flags;
238 }
incoming_compression_algorithm() const239 grpc_compression_algorithm incoming_compression_algorithm() const {
240 return incoming_compression_algorithm_;
241 }
242
243 void HandleCompressionAlgorithmDisabled(
244 grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
245 void HandleCompressionAlgorithmNotAccepted(
246 grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
247
start_time() const248 gpr_cycle_counter start_time() const { return start_time_; }
249
250 private:
251 RefCountedPtr<Channel> channel_;
252 Arena* const arena_;
253 std::atomic<ParentCall*> parent_call_{nullptr};
254 ChildCall* child_ = nullptr;
255 Timestamp send_deadline_;
256 const bool is_client_;
257 // flag indicating that cancellation is inherited
258 bool cancellation_is_inherited_ = false;
259 // Compression algorithm for *incoming* data
260 grpc_compression_algorithm incoming_compression_algorithm_ =
261 GRPC_COMPRESS_NONE;
262 // Supported encodings (compression algorithms), a bitset.
263 // Always support no compression.
264 CompressionAlgorithmSet encodings_accepted_by_peer_{GRPC_COMPRESS_NONE};
265 uint32_t test_only_last_message_flags_ = 0;
266 // Peer name is protected by a mutex because it can be accessed by the
267 // application at the same moment as it is being set by the completion
268 // of the recv_initial_metadata op. The mutex should be mostly uncontended.
269 mutable Mutex peer_mu_;
270 Slice peer_string_;
271 gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
272 };
273
GetOrCreateParentCall()274 Call::ParentCall* Call::GetOrCreateParentCall() {
275 ParentCall* p = parent_call_.load(std::memory_order_acquire);
276 if (p == nullptr) {
277 p = arena_->New<ParentCall>();
278 ParentCall* expected = nullptr;
279 if (!parent_call_.compare_exchange_strong(expected, p,
280 std::memory_order_release,
281 std::memory_order_relaxed)) {
282 p->~ParentCall();
283 p = expected;
284 }
285 }
286 return p;
287 }
288
parent_call()289 Call::ParentCall* Call::parent_call() {
290 return parent_call_.load(std::memory_order_acquire);
291 }
292
InitParent(Call * parent,uint32_t propagation_mask)293 absl::Status Call::InitParent(Call* parent, uint32_t propagation_mask) {
294 child_ = arena()->New<ChildCall>(parent);
295
296 parent->InternalRef("child");
297 GPR_ASSERT(is_client_);
298 GPR_ASSERT(!parent->is_client_);
299
300 if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
301 send_deadline_ = std::min(send_deadline_, parent->send_deadline_);
302 }
303 // for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
304 // GRPC_PROPAGATE_STATS_CONTEXT
305 // TODO(ctiller): This should change to use the appropriate census start_op
306 // call.
307 if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
308 if (0 == (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
309 return absl::UnknownError(
310 "Census tracing propagation requested without Census context "
311 "propagation");
312 }
313 ContextSet(GRPC_CONTEXT_TRACING, parent->ContextGet(GRPC_CONTEXT_TRACING),
314 nullptr);
315 } else if (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
316 return absl::UnknownError(
317 "Census context propagation requested without Census tracing "
318 "propagation");
319 }
320 if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
321 cancellation_is_inherited_ = true;
322 }
323 return absl::OkStatus();
324 }
325
PublishToParent(Call * parent)326 void Call::PublishToParent(Call* parent) {
327 ChildCall* cc = child_;
328 ParentCall* pc = parent->GetOrCreateParentCall();
329 MutexLock lock(&pc->child_list_mu);
330 if (pc->first_child == nullptr) {
331 pc->first_child = this;
332 cc->sibling_next = cc->sibling_prev = this;
333 } else {
334 cc->sibling_next = pc->first_child;
335 cc->sibling_prev = pc->first_child->child_->sibling_prev;
336 cc->sibling_next->child_->sibling_prev =
337 cc->sibling_prev->child_->sibling_next = this;
338 }
339 if (parent->Completed()) {
340 CancelWithError(absl::CancelledError());
341 }
342 }
343
MaybeUnpublishFromParent()344 void Call::MaybeUnpublishFromParent() {
345 ChildCall* cc = child_;
346 if (cc == nullptr) return;
347
348 ParentCall* pc = cc->parent->parent_call();
349 {
350 MutexLock lock(&pc->child_list_mu);
351 if (this == pc->first_child) {
352 pc->first_child = cc->sibling_next;
353 if (this == pc->first_child) {
354 pc->first_child = nullptr;
355 }
356 }
357 cc->sibling_prev->child_->sibling_next = cc->sibling_next;
358 cc->sibling_next->child_->sibling_prev = cc->sibling_prev;
359 }
360 cc->parent->InternalUnref("child");
361 }
362
CancelWithStatus(grpc_status_code status,const char * description)363 void Call::CancelWithStatus(grpc_status_code status, const char* description) {
364 // copying 'description' is needed to ensure the grpc_call_cancel_with_status
365 // guarantee that can be short-lived.
366 // TODO(ctiller): change to
367 // absl::Status(static_cast<absl::StatusCode>(status), description)
368 // (ie remove the set_int, set_str).
369 CancelWithError(grpc_error_set_int(
370 grpc_error_set_str(
371 absl::Status(static_cast<absl::StatusCode>(status), description),
372 StatusStrProperty::kGrpcMessage, description),
373 StatusIntProperty::kRpcStatus, status));
374 }
375
PropagateCancellationToChildren()376 void Call::PropagateCancellationToChildren() {
377 ParentCall* pc = parent_call();
378 if (pc != nullptr) {
379 Call* child;
380 MutexLock lock(&pc->child_list_mu);
381 child = pc->first_child;
382 if (child != nullptr) {
383 do {
384 Call* next_child_call = child->child_->sibling_next;
385 if (child->cancellation_is_inherited_) {
386 child->InternalRef("propagate_cancel");
387 child->CancelWithError(absl::CancelledError());
388 child->InternalUnref("propagate_cancel");
389 }
390 child = next_child_call;
391 } while (child != pc->first_child);
392 }
393 }
394 }
395
GetPeer()396 char* Call::GetPeer() {
397 Slice peer_slice = GetPeerString();
398 if (!peer_slice.empty()) {
399 absl::string_view peer_string_view = peer_slice.as_string_view();
400 char* peer_string =
401 static_cast<char*>(gpr_malloc(peer_string_view.size() + 1));
402 memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
403 peer_string[peer_string_view.size()] = '\0';
404 return peer_string;
405 }
406 char* peer_string = grpc_channel_get_target(channel_->c_ptr());
407 if (peer_string != nullptr) return peer_string;
408 return gpr_strdup("unknown");
409 }
410
DeleteThis()411 void Call::DeleteThis() {
412 RefCountedPtr<Channel> channel = std::move(channel_);
413 Arena* arena = arena_;
414 this->~Call();
415 channel->DestroyArena(arena);
416 }
417
PrepareOutgoingInitialMetadata(const grpc_op & op,grpc_metadata_batch & md)418 void Call::PrepareOutgoingInitialMetadata(const grpc_op& op,
419 grpc_metadata_batch& md) {
420 // TODO(juanlishen): If the user has already specified a compression
421 // algorithm by setting the initial metadata with key of
422 // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
423 // with the compression algorithm mapped from compression level.
424 // process compression level
425 grpc_compression_level effective_compression_level = GRPC_COMPRESS_LEVEL_NONE;
426 bool level_set = false;
427 if (op.data.send_initial_metadata.maybe_compression_level.is_set) {
428 effective_compression_level =
429 op.data.send_initial_metadata.maybe_compression_level.level;
430 level_set = true;
431 } else {
432 const grpc_compression_options copts = channel()->compression_options();
433 if (copts.default_level.is_set) {
434 level_set = true;
435 effective_compression_level = copts.default_level.level;
436 }
437 }
438 // Currently, only server side supports compression level setting.
439 if (level_set && !is_client()) {
440 const grpc_compression_algorithm calgo =
441 encodings_accepted_by_peer().CompressionAlgorithmForLevel(
442 effective_compression_level);
443 // The following metadata will be checked and removed by the message
444 // compression filter. It will be used as the call's compression
445 // algorithm.
446 md.Set(GrpcInternalEncodingRequest(), calgo);
447 }
448 // Ignore any te metadata key value pairs specified.
449 md.Remove(TeMetadata());
450 // Should never come from applications
451 md.Remove(GrpcLbClientStatsMetadata());
452 }
453
ProcessIncomingInitialMetadata(grpc_metadata_batch & md)454 void Call::ProcessIncomingInitialMetadata(grpc_metadata_batch& md) {
455 Slice* peer_string = md.get_pointer(PeerString());
456 if (peer_string != nullptr) SetPeerString(peer_string->Ref());
457
458 incoming_compression_algorithm_ =
459 md.Take(GrpcEncodingMetadata()).value_or(GRPC_COMPRESS_NONE);
460 encodings_accepted_by_peer_ =
461 md.Take(GrpcAcceptEncodingMetadata())
462 .value_or(CompressionAlgorithmSet{GRPC_COMPRESS_NONE});
463
464 const grpc_compression_options compression_options =
465 channel_->compression_options();
466 const grpc_compression_algorithm compression_algorithm =
467 incoming_compression_algorithm_;
468 if (GPR_UNLIKELY(!CompressionAlgorithmSet::FromUint32(
469 compression_options.enabled_algorithms_bitset)
470 .IsSet(compression_algorithm))) {
471 // check if algorithm is supported by current channel config
472 HandleCompressionAlgorithmDisabled(compression_algorithm);
473 }
474 // GRPC_COMPRESS_NONE is always set.
475 GPR_DEBUG_ASSERT(encodings_accepted_by_peer_.IsSet(GRPC_COMPRESS_NONE));
476 if (GPR_UNLIKELY(!encodings_accepted_by_peer_.IsSet(compression_algorithm))) {
477 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
478 HandleCompressionAlgorithmNotAccepted(compression_algorithm);
479 }
480 }
481 }
482
HandleCompressionAlgorithmNotAccepted(grpc_compression_algorithm compression_algorithm)483 void Call::HandleCompressionAlgorithmNotAccepted(
484 grpc_compression_algorithm compression_algorithm) {
485 const char* algo_name = nullptr;
486 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
487 gpr_log(GPR_ERROR,
488 "Compression algorithm ('%s') not present in the "
489 "accepted encodings (%s)",
490 algo_name,
491 std::string(encodings_accepted_by_peer_.ToString()).c_str());
492 }
493
HandleCompressionAlgorithmDisabled(grpc_compression_algorithm compression_algorithm)494 void Call::HandleCompressionAlgorithmDisabled(
495 grpc_compression_algorithm compression_algorithm) {
496 const char* algo_name = nullptr;
497 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
498 std::string error_msg =
499 absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
500 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
501 CancelWithError(grpc_error_set_int(absl::UnimplementedError(error_msg),
502 StatusIntProperty::kRpcStatus,
503 GRPC_STATUS_UNIMPLEMENTED));
504 }
505
506 ///////////////////////////////////////////////////////////////////////////////
507 // FilterStackCall
508 // To be removed once promise conversion is complete
509
510 class FilterStackCall final : public Call {
511 public:
~FilterStackCall()512 ~FilterStackCall() override {
513 for (int i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
514 if (context_[i].destroy) {
515 context_[i].destroy(context_[i].value);
516 }
517 }
518 gpr_free(static_cast<void*>(const_cast<char*>(final_info_.error_string)));
519 }
520
Completed()521 bool Completed() override {
522 return gpr_atm_acq_load(&received_final_op_atm_) != 0;
523 }
524
525 // TODO(ctiller): return absl::StatusOr<SomeSmartPointer<Call>>?
526 static grpc_error_handle Create(grpc_call_create_args* args,
527 grpc_call** out_call);
528
FromTopElem(grpc_call_element * elem)529 static Call* FromTopElem(grpc_call_element* elem) {
530 return FromCallStack(grpc_call_stack_from_top_element(elem));
531 }
532
call_stack()533 grpc_call_stack* call_stack() override {
534 return reinterpret_cast<grpc_call_stack*>(
535 reinterpret_cast<char*>(this) +
536 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this)));
537 }
538
event_engine() const539 grpc_event_engine::experimental::EventEngine* event_engine() const override {
540 return channel()->event_engine();
541 }
542
call_elem(size_t idx)543 grpc_call_element* call_elem(size_t idx) {
544 return grpc_call_stack_element(call_stack(), idx);
545 }
546
call_combiner()547 CallCombiner* call_combiner() { return &call_combiner_; }
548
549 void CancelWithError(grpc_error_handle error) override;
550 void SetCompletionQueue(grpc_completion_queue* cq) override;
551 grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
552 bool is_notify_tag_closure) override;
ExternalRef()553 void ExternalRef() override { ext_ref_.Ref(); }
554 void ExternalUnref() override;
InternalRef(const char * reason)555 void InternalRef(const char* reason) override {
556 GRPC_CALL_STACK_REF(call_stack(), reason);
557 }
InternalUnref(const char * reason)558 void InternalUnref(const char* reason) override {
559 GRPC_CALL_STACK_UNREF(call_stack(), reason);
560 }
561
562 void ContextSet(grpc_context_index elem, void* value,
563 void (*destroy)(void* value)) override;
ContextGet(grpc_context_index elem) const564 void* ContextGet(grpc_context_index elem) const override {
565 return context_[elem].value;
566 }
567
is_trailers_only() const568 bool is_trailers_only() const override {
569 bool result = is_trailers_only_;
570 GPR_DEBUG_ASSERT(!result || recv_initial_metadata_.TransportSize() == 0);
571 return result;
572 }
573
failed_before_recv_message() const574 bool failed_before_recv_message() const override {
575 return call_failed_before_recv_message_;
576 }
577
GetServerAuthority() const578 absl::string_view GetServerAuthority() const override {
579 const Slice* authority_metadata =
580 recv_initial_metadata_.get_pointer(HttpAuthorityMetadata());
581 if (authority_metadata == nullptr) return "";
582 return authority_metadata->as_string_view();
583 }
584
InitialSizeEstimate()585 static size_t InitialSizeEstimate() {
586 return sizeof(FilterStackCall) +
587 sizeof(BatchControl) * kMaxConcurrentBatches;
588 }
589
590 private:
591 class ScopedContext : public promise_detail::Context<Arena> {
592 public:
ScopedContext(FilterStackCall * call)593 explicit ScopedContext(FilterStackCall* call)
594 : promise_detail::Context<Arena>(call->arena()) {}
595 };
596
597 static constexpr gpr_atm kRecvNone = 0;
598 static constexpr gpr_atm kRecvInitialMetadataFirst = 1;
599
600 enum class PendingOp {
601 kRecvMessage,
602 kRecvInitialMetadata,
603 kRecvTrailingMetadata,
604 kSends
605 };
PendingOpMask(PendingOp op)606 static intptr_t PendingOpMask(PendingOp op) {
607 return static_cast<intptr_t>(1) << static_cast<intptr_t>(op);
608 }
PendingOpString(intptr_t pending_ops)609 static std::string PendingOpString(intptr_t pending_ops) {
610 std::vector<absl::string_view> pending_op_strings;
611 if (pending_ops & PendingOpMask(PendingOp::kRecvMessage)) {
612 pending_op_strings.push_back("kRecvMessage");
613 }
614 if (pending_ops & PendingOpMask(PendingOp::kRecvInitialMetadata)) {
615 pending_op_strings.push_back("kRecvInitialMetadata");
616 }
617 if (pending_ops & PendingOpMask(PendingOp::kRecvTrailingMetadata)) {
618 pending_op_strings.push_back("kRecvTrailingMetadata");
619 }
620 if (pending_ops & PendingOpMask(PendingOp::kSends)) {
621 pending_op_strings.push_back("kSends");
622 }
623 return absl::StrCat("{", absl::StrJoin(pending_op_strings, ","), "}");
624 }
625 struct BatchControl {
626 FilterStackCall* call_ = nullptr;
627 CallTracerAnnotationInterface* call_tracer_ = nullptr;
628 grpc_transport_stream_op_batch op_;
629 // Share memory for cq_completion and notify_tag as they are never needed
630 // simultaneously. Each byte used in this data structure count as six bytes
631 // per call, so any savings we can make are worthwhile,
632
633 // We use notify_tag to determine whether or not to send notification to the
634 // completion queue. Once we've made that determination, we can reuse the
635 // memory for cq_completion.
636 union {
637 grpc_cq_completion cq_completion;
638 struct {
639 // Any given op indicates completion by either (a) calling a closure or
640 // (b) sending a notification on the call's completion queue. If
641 // \a is_closure is true, \a tag indicates a closure to be invoked;
642 // otherwise, \a tag indicates the tag to be used in the notification to
643 // be sent to the completion queue.
644 void* tag;
645 bool is_closure;
646 } notify_tag;
647 } completion_data_;
648 grpc_closure start_batch_;
649 grpc_closure finish_batch_;
650 std::atomic<intptr_t> ops_pending_{0};
651 AtomicError batch_error_;
set_pending_opsgrpc_core::FilterStackCall::BatchControl652 void set_pending_ops(uintptr_t ops) {
653 ops_pending_.store(ops, std::memory_order_release);
654 }
completed_batch_stepgrpc_core::FilterStackCall::BatchControl655 bool completed_batch_step(PendingOp op) {
656 auto mask = PendingOpMask(op);
657 auto r = ops_pending_.fetch_sub(mask, std::memory_order_acq_rel);
658 if (grpc_call_trace.enabled()) {
659 gpr_log(GPR_DEBUG, "BATCH:%p COMPLETE:%s REMAINING:%s (tag:%p)", this,
660 PendingOpString(mask).c_str(),
661 PendingOpString(r & ~mask).c_str(),
662 completion_data_.notify_tag.tag);
663 }
664 GPR_ASSERT((r & mask) != 0);
665 return r == mask;
666 }
667
668 void PostCompletion();
669 void FinishStep(PendingOp op);
670 void ProcessDataAfterMetadata();
671 void ReceivingStreamReady(grpc_error_handle error);
672 void ReceivingInitialMetadataReady(grpc_error_handle error);
673 void ReceivingTrailingMetadataReady(grpc_error_handle error);
674 void FinishBatch(grpc_error_handle error);
675 };
676
FilterStackCall(Arena * arena,const grpc_call_create_args & args)677 FilterStackCall(Arena* arena, const grpc_call_create_args& args)
678 : Call(arena, args.server_transport_data == nullptr, args.send_deadline,
679 args.channel->Ref()),
680 cq_(args.cq),
681 stream_op_payload_(context_) {}
682
683 static void ReleaseCall(void* call, grpc_error_handle);
684 static void DestroyCall(void* call, grpc_error_handle);
685
FromCallStack(grpc_call_stack * call_stack)686 static FilterStackCall* FromCallStack(grpc_call_stack* call_stack) {
687 return reinterpret_cast<FilterStackCall*>(
688 reinterpret_cast<char*>(call_stack) -
689 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall)));
690 }
691
692 void ExecuteBatch(grpc_transport_stream_op_batch* batch,
693 grpc_closure* start_batch_closure);
694 void SetFinalStatus(grpc_error_handle error);
695 BatchControl* ReuseOrAllocateBatchControl(const grpc_op* ops);
696 bool PrepareApplicationMetadata(size_t count, grpc_metadata* metadata,
697 bool is_trailing);
698 void PublishAppMetadata(grpc_metadata_batch* b, bool is_trailing);
699 void RecvInitialFilter(grpc_metadata_batch* b);
700 void RecvTrailingFilter(grpc_metadata_batch* b,
701 grpc_error_handle batch_error);
702
703 RefCount ext_ref_;
704 CallCombiner call_combiner_;
705 grpc_completion_queue* cq_;
706 grpc_polling_entity pollent_;
707
708 /// has grpc_call_unref been called
709 bool destroy_called_ = false;
710 // Trailers-only response status
711 bool is_trailers_only_ = false;
712 /// which ops are in-flight
713 bool sent_initial_metadata_ = false;
714 bool sending_message_ = false;
715 bool sent_final_op_ = false;
716 bool received_initial_metadata_ = false;
717 bool receiving_message_ = false;
718 bool requested_final_op_ = false;
719 gpr_atm received_final_op_atm_ = 0;
720
721 BatchControl* active_batches_[kMaxConcurrentBatches] = {};
722 grpc_transport_stream_op_batch_payload stream_op_payload_;
723
724 // first idx: is_receiving, second idx: is_trailing
725 grpc_metadata_batch send_initial_metadata_;
726 grpc_metadata_batch send_trailing_metadata_;
727 grpc_metadata_batch recv_initial_metadata_;
728 grpc_metadata_batch recv_trailing_metadata_;
729
730 // Buffered read metadata waiting to be returned to the application.
731 // Element 0 is initial metadata, element 1 is trailing metadata.
732 grpc_metadata_array* buffered_metadata_[2] = {};
733
734 // Call data useful used for reporting. Only valid after the call has
735 // completed
736 grpc_call_final_info final_info_;
737
738 // Contexts for various subsystems (security, tracing, ...).
739 grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
740
741 SliceBuffer send_slice_buffer_;
742 absl::optional<SliceBuffer> receiving_slice_buffer_;
743 uint32_t receiving_stream_flags_;
744
745 bool call_failed_before_recv_message_ = false;
746 grpc_byte_buffer** receiving_buffer_ = nullptr;
747 grpc_slice receiving_slice_ = grpc_empty_slice();
748 grpc_closure receiving_stream_ready_;
749 grpc_closure receiving_initial_metadata_ready_;
750 grpc_closure receiving_trailing_metadata_ready_;
751 // Status about operation of call
752 bool sent_server_trailing_metadata_ = false;
753 gpr_atm cancelled_with_error_ = 0;
754
755 grpc_closure release_call_;
756
757 union {
758 struct {
759 grpc_status_code* status;
760 grpc_slice* status_details;
761 const char** error_string;
762 } client;
763 struct {
764 int* cancelled;
765 // backpointer to owning server if this is a server side call.
766 ServerInterface* core_server;
767 } server;
768 } final_op_;
769 AtomicError status_error_;
770
771 // recv_state can contain one of the following values:
772 // RECV_NONE : : no initial metadata and messages received
773 // RECV_INITIAL_METADATA_FIRST : received initial metadata first
774 // a batch_control* : received messages first
775
776 // +------1------RECV_NONE------3-----+
777 // | |
778 // | |
779 // v v
780 // RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
781 // | ^ | ^
782 // | | | |
783 // +-----2-----+ +-----4-----+
784
785 // For 1, 4: See receiving_initial_metadata_ready() function
786 // For 2, 3: See receiving_stream_ready() function
787 gpr_atm recv_state_ = 0;
788 };
789
Create(grpc_call_create_args * args,grpc_call ** out_call)790 grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
791 grpc_call** out_call) {
792 Channel* channel = args->channel.get();
793
794 auto add_init_error = [](grpc_error_handle* composite,
795 grpc_error_handle new_err) {
796 if (new_err.ok()) return;
797 if (composite->ok()) {
798 *composite = GRPC_ERROR_CREATE("Call creation failed");
799 }
800 *composite = grpc_error_add_child(*composite, new_err);
801 };
802
803 FilterStackCall* call;
804 grpc_error_handle error;
805 grpc_channel_stack* channel_stack = channel->channel_stack();
806 size_t call_alloc_size =
807 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall)) +
808 channel_stack->call_stack_size;
809
810 Arena* arena = channel->CreateArena();
811 call = new (arena->Alloc(call_alloc_size)) FilterStackCall(arena, *args);
812 GPR_DEBUG_ASSERT(FromC(call->c_ptr()) == call);
813 GPR_DEBUG_ASSERT(FromCallStack(call->call_stack()) == call);
814 *out_call = call->c_ptr();
815 grpc_slice path = grpc_empty_slice();
816 ScopedContext ctx(call);
817 if (call->is_client()) {
818 call->final_op_.client.status_details = nullptr;
819 call->final_op_.client.status = nullptr;
820 call->final_op_.client.error_string = nullptr;
821 global_stats().IncrementClientCallsCreated();
822 path = CSliceRef(args->path->c_slice());
823 call->send_initial_metadata_.Set(HttpPathMetadata(),
824 std::move(*args->path));
825 if (args->authority.has_value()) {
826 call->send_initial_metadata_.Set(HttpAuthorityMetadata(),
827 std::move(*args->authority));
828 }
829 call->send_initial_metadata_.Set(
830 GrpcRegisteredMethod(), reinterpret_cast<void*>(static_cast<uintptr_t>(
831 args->registered_method)));
832 channel_stack->stats_plugin_group->AddClientCallTracers(
833 Slice(CSliceRef(path)), args->registered_method, call->context_);
834 } else {
835 global_stats().IncrementServerCallsCreated();
836 call->final_op_.server.cancelled = nullptr;
837 call->final_op_.server.core_server = args->server;
838 // TODO(yashykt): In the future, we want to also enable stats and trace
839 // collecting from when the call is created at the transport. The idea is
840 // that the transport would create the call tracer and pass it in as part of
841 // the metadata.
842 // TODO(yijiem): OpenCensus and internal Census is still using this way to
843 // set server call tracer. We need to refactor them to stats plugins
844 // (including removing the client channel filters).
845 if (args->server != nullptr &&
846 args->server->server_call_tracer_factory() != nullptr) {
847 auto* server_call_tracer =
848 args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
849 arena, args->server->channel_args());
850 if (server_call_tracer != nullptr) {
851 // Note that we are setting both
852 // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
853 // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future
854 // promise-based world, we would just a single tracer object for each
855 // stack (call, subchannel_call, server_call.)
856 call->ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE,
857 server_call_tracer, nullptr);
858 call->ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr);
859 }
860 }
861 channel_stack->stats_plugin_group->AddServerCallTracers(call->context_);
862 }
863
864 Call* parent = Call::FromC(args->parent);
865 if (parent != nullptr) {
866 add_init_error(&error, absl_status_to_grpc_error(call->InitParent(
867 parent, args->propagation_mask)));
868 }
869 // initial refcount dropped by grpc_call_unref
870 grpc_call_element_args call_args = {
871 call->call_stack(), args->server_transport_data,
872 call->context_, path,
873 call->start_time(), call->send_deadline(),
874 call->arena(), &call->call_combiner_};
875 add_init_error(&error, grpc_call_stack_init(channel_stack, 1, DestroyCall,
876 call, &call_args));
877 // Publish this call to parent only after the call stack has been initialized.
878 if (parent != nullptr) {
879 call->PublishToParent(parent);
880 }
881
882 if (!error.ok()) {
883 call->CancelWithError(error);
884 }
885 if (args->cq != nullptr) {
886 GPR_ASSERT(args->pollset_set_alternative == nullptr &&
887 "Only one of 'cq' and 'pollset_set_alternative' should be "
888 "non-nullptr.");
889 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
890 call->pollent_ =
891 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
892 }
893 if (args->pollset_set_alternative != nullptr) {
894 call->pollent_ = grpc_polling_entity_create_from_pollset_set(
895 args->pollset_set_alternative);
896 }
897 if (!grpc_polling_entity_is_empty(&call->pollent_)) {
898 grpc_call_stack_set_pollset_or_pollset_set(call->call_stack(),
899 &call->pollent_);
900 }
901
902 if (call->is_client()) {
903 channelz::ChannelNode* channelz_channel = channel->channelz_node();
904 if (channelz_channel != nullptr) {
905 channelz_channel->RecordCallStarted();
906 }
907 } else if (call->final_op_.server.core_server != nullptr) {
908 channelz::ServerNode* channelz_node =
909 call->final_op_.server.core_server->channelz_node();
910 if (channelz_node != nullptr) {
911 channelz_node->RecordCallStarted();
912 }
913 }
914
915 CSliceUnref(path);
916
917 return error;
918 }
919
SetCompletionQueue(grpc_completion_queue * cq)920 void FilterStackCall::SetCompletionQueue(grpc_completion_queue* cq) {
921 GPR_ASSERT(cq);
922
923 if (grpc_polling_entity_pollset_set(&pollent_) != nullptr) {
924 Crash("A pollset_set is already registered for this call.");
925 }
926 cq_ = cq;
927 GRPC_CQ_INTERNAL_REF(cq, "bind");
928 pollent_ = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
929 grpc_call_stack_set_pollset_or_pollset_set(call_stack(), &pollent_);
930 }
931
ReleaseCall(void * call,grpc_error_handle)932 void FilterStackCall::ReleaseCall(void* call, grpc_error_handle /*error*/) {
933 static_cast<FilterStackCall*>(call)->DeleteThis();
934 }
935
DestroyCall(void * call,grpc_error_handle)936 void FilterStackCall::DestroyCall(void* call, grpc_error_handle /*error*/) {
937 auto* c = static_cast<FilterStackCall*>(call);
938 c->recv_initial_metadata_.Clear();
939 c->recv_trailing_metadata_.Clear();
940 c->receiving_slice_buffer_.reset();
941 ParentCall* pc = c->parent_call();
942 if (pc != nullptr) {
943 pc->~ParentCall();
944 }
945 if (c->cq_) {
946 GRPC_CQ_INTERNAL_UNREF(c->cq_, "bind");
947 }
948
949 grpc_error_handle status_error = c->status_error_.get();
950 grpc_error_get_status(status_error, c->send_deadline(),
951 &c->final_info_.final_status, nullptr, nullptr,
952 &(c->final_info_.error_string));
953 c->status_error_.set(absl::OkStatus());
954 c->final_info_.stats.latency =
955 gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time());
956 grpc_call_stack_destroy(c->call_stack(), &c->final_info_,
957 GRPC_CLOSURE_INIT(&c->release_call_, ReleaseCall, c,
958 grpc_schedule_on_exec_ctx));
959 }
960
ExternalUnref()961 void FilterStackCall::ExternalUnref() {
962 if (GPR_LIKELY(!ext_ref_.Unref())) return;
963
964 ApplicationCallbackExecCtx callback_exec_ctx;
965 ExecCtx exec_ctx;
966
967 GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (this));
968
969 MaybeUnpublishFromParent();
970
971 GPR_ASSERT(!destroy_called_);
972 destroy_called_ = true;
973 bool cancel = gpr_atm_acq_load(&received_final_op_atm_) == 0;
974 if (cancel) {
975 CancelWithError(absl::CancelledError());
976 } else {
977 // Unset the call combiner cancellation closure. This has the
978 // effect of scheduling the previously set cancellation closure, if
979 // any, so that it can release any internal references it may be
980 // holding to the call stack.
981 call_combiner_.SetNotifyOnCancel(nullptr);
982 }
983 InternalUnref("destroy");
984 }
985
986 // start_batch_closure points to a caller-allocated closure to be used
987 // for entering the call combiner.
ExecuteBatch(grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)988 void FilterStackCall::ExecuteBatch(grpc_transport_stream_op_batch* batch,
989 grpc_closure* start_batch_closure) {
990 // This is called via the call combiner to start sending a batch down
991 // the filter stack.
992 auto execute_batch_in_call_combiner = [](void* arg, grpc_error_handle) {
993 grpc_transport_stream_op_batch* batch =
994 static_cast<grpc_transport_stream_op_batch*>(arg);
995 auto* call =
996 static_cast<FilterStackCall*>(batch->handler_private.extra_arg);
997 grpc_call_element* elem = call->call_elem(0);
998 GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
999 elem->filter->start_transport_stream_op_batch(elem, batch);
1000 };
1001 batch->handler_private.extra_arg = this;
1002 GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
1003 grpc_schedule_on_exec_ctx);
1004 GRPC_CALL_COMBINER_START(call_combiner(), start_batch_closure,
1005 absl::OkStatus(), "executing batch");
1006 }
1007
1008 namespace {
1009 struct CancelState {
1010 FilterStackCall* call;
1011 grpc_closure start_batch;
1012 grpc_closure finish_batch;
1013 };
1014 } // namespace
1015
1016 // The on_complete callback used when sending a cancel_stream batch down
1017 // the filter stack. Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error_handle)1018 static void done_termination(void* arg, grpc_error_handle /*error*/) {
1019 CancelState* state = static_cast<CancelState*>(arg);
1020 GRPC_CALL_COMBINER_STOP(state->call->call_combiner(),
1021 "on_complete for cancel_stream op");
1022 state->call->InternalUnref("termination");
1023 delete state;
1024 }
1025
CancelWithError(grpc_error_handle error)1026 void FilterStackCall::CancelWithError(grpc_error_handle error) {
1027 if (!gpr_atm_rel_cas(&cancelled_with_error_, 0, 1)) {
1028 return;
1029 }
1030 ClearPeerString();
1031 InternalRef("termination");
1032 // Inform the call combiner of the cancellation, so that it can cancel
1033 // any in-flight asynchronous actions that may be holding the call
1034 // combiner. This ensures that the cancel_stream batch can be sent
1035 // down the filter stack in a timely manner.
1036 call_combiner_.Cancel(error);
1037 CancelState* state = new CancelState;
1038 state->call = this;
1039 GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
1040 grpc_schedule_on_exec_ctx);
1041 grpc_transport_stream_op_batch* op =
1042 grpc_make_transport_stream_op(&state->finish_batch);
1043 op->cancel_stream = true;
1044 op->payload->cancel_stream.cancel_error = error;
1045 ExecuteBatch(op, &state->start_batch);
1046 }
1047
SetFinalStatus(grpc_error_handle error)1048 void FilterStackCall::SetFinalStatus(grpc_error_handle error) {
1049 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
1050 gpr_log(GPR_DEBUG, "set_final_status %s %s", is_client() ? "CLI" : "SVR",
1051 StatusToString(error).c_str());
1052 }
1053 if (is_client()) {
1054 std::string status_details;
1055 grpc_error_get_status(error, send_deadline(), final_op_.client.status,
1056 &status_details, nullptr,
1057 final_op_.client.error_string);
1058 *final_op_.client.status_details =
1059 grpc_slice_from_cpp_string(std::move(status_details));
1060 status_error_.set(error);
1061 channelz::ChannelNode* channelz_channel = channel()->channelz_node();
1062 if (channelz_channel != nullptr) {
1063 if (*final_op_.client.status != GRPC_STATUS_OK) {
1064 channelz_channel->RecordCallFailed();
1065 } else {
1066 channelz_channel->RecordCallSucceeded();
1067 }
1068 }
1069 } else {
1070 *final_op_.server.cancelled =
1071 !error.ok() || !sent_server_trailing_metadata_;
1072 channelz::ServerNode* channelz_node =
1073 final_op_.server.core_server->channelz_node();
1074 if (channelz_node != nullptr) {
1075 if (*final_op_.server.cancelled || !status_error_.ok()) {
1076 channelz_node->RecordCallFailed();
1077 } else {
1078 channelz_node->RecordCallSucceeded();
1079 }
1080 }
1081 }
1082 }
1083
PrepareApplicationMetadata(size_t count,grpc_metadata * metadata,bool is_trailing)1084 bool FilterStackCall::PrepareApplicationMetadata(size_t count,
1085 grpc_metadata* metadata,
1086 bool is_trailing) {
1087 grpc_metadata_batch* batch =
1088 is_trailing ? &send_trailing_metadata_ : &send_initial_metadata_;
1089 for (size_t i = 0; i < count; i++) {
1090 grpc_metadata* md = &metadata[i];
1091 if (!GRPC_LOG_IF_ERROR("validate_metadata",
1092 grpc_validate_header_key_is_legal(md->key))) {
1093 return false;
1094 } else if (!grpc_is_binary_header_internal(md->key) &&
1095 !GRPC_LOG_IF_ERROR(
1096 "validate_metadata",
1097 grpc_validate_header_nonbin_value_is_legal(md->value))) {
1098 return false;
1099 } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
1100 // HTTP2 hpack encoding has a maximum limit.
1101 return false;
1102 } else if (grpc_slice_str_cmp(md->key, "content-length") == 0) {
1103 // Filter "content-length metadata"
1104 continue;
1105 }
1106 batch->Append(StringViewFromSlice(md->key), Slice(CSliceRef(md->value)),
1107 [md](absl::string_view error, const Slice& value) {
1108 gpr_log(GPR_DEBUG, "Append error: %s",
1109 absl::StrCat("key=", StringViewFromSlice(md->key),
1110 " error=", error,
1111 " value=", value.as_string_view())
1112 .c_str());
1113 });
1114 }
1115
1116 return true;
1117 }
1118
1119 namespace {
1120 class PublishToAppEncoder {
1121 public:
PublishToAppEncoder(grpc_metadata_array * dest,const grpc_metadata_batch * encoding,bool is_client)1122 explicit PublishToAppEncoder(grpc_metadata_array* dest,
1123 const grpc_metadata_batch* encoding,
1124 bool is_client)
1125 : dest_(dest), encoding_(encoding), is_client_(is_client) {}
1126
Encode(const Slice & key,const Slice & value)1127 void Encode(const Slice& key, const Slice& value) {
1128 Append(key.c_slice(), value.c_slice());
1129 }
1130
1131 // Catch anything that is not explicitly handled, and do not publish it to the
1132 // application. If new metadata is added to a batch that needs to be
1133 // published, it should be called out here.
1134 template <typename Which>
Encode(Which,const typename Which::ValueType &)1135 void Encode(Which, const typename Which::ValueType&) {}
1136
Encode(UserAgentMetadata,const Slice & slice)1137 void Encode(UserAgentMetadata, const Slice& slice) {
1138 Append(UserAgentMetadata::key(), slice);
1139 }
1140
Encode(HostMetadata,const Slice & slice)1141 void Encode(HostMetadata, const Slice& slice) {
1142 Append(HostMetadata::key(), slice);
1143 }
1144
Encode(GrpcPreviousRpcAttemptsMetadata,uint32_t count)1145 void Encode(GrpcPreviousRpcAttemptsMetadata, uint32_t count) {
1146 Append(GrpcPreviousRpcAttemptsMetadata::key(), count);
1147 }
1148
Encode(GrpcRetryPushbackMsMetadata,Duration count)1149 void Encode(GrpcRetryPushbackMsMetadata, Duration count) {
1150 Append(GrpcRetryPushbackMsMetadata::key(), count.millis());
1151 }
1152
Encode(LbTokenMetadata,const Slice & slice)1153 void Encode(LbTokenMetadata, const Slice& slice) {
1154 Append(LbTokenMetadata::key(), slice);
1155 }
1156
1157 private:
Append(absl::string_view key,int64_t value)1158 void Append(absl::string_view key, int64_t value) {
1159 Append(StaticSlice::FromStaticString(key).c_slice(),
1160 Slice::FromInt64(value).c_slice());
1161 }
1162
Append(absl::string_view key,const Slice & value)1163 void Append(absl::string_view key, const Slice& value) {
1164 Append(StaticSlice::FromStaticString(key).c_slice(), value.c_slice());
1165 }
1166
Append(grpc_slice key,grpc_slice value)1167 void Append(grpc_slice key, grpc_slice value) {
1168 if (dest_->count == dest_->capacity) {
1169 Crash(absl::StrCat(
1170 "Too many metadata entries: capacity=", dest_->capacity, " on ",
1171 is_client_ ? "client" : "server", " encoding ", encoding_->count(),
1172 " elements: ", encoding_->DebugString().c_str()));
1173 }
1174 auto* mdusr = &dest_->metadata[dest_->count++];
1175 mdusr->key = key;
1176 mdusr->value = value;
1177 }
1178
1179 grpc_metadata_array* const dest_;
1180 const grpc_metadata_batch* const encoding_;
1181 const bool is_client_;
1182 };
1183 } // namespace
1184
PublishAppMetadata(grpc_metadata_batch * b,bool is_trailing)1185 void FilterStackCall::PublishAppMetadata(grpc_metadata_batch* b,
1186 bool is_trailing) {
1187 if (b->count() == 0) return;
1188 if (!is_client() && is_trailing) return;
1189 if (is_trailing && buffered_metadata_[1] == nullptr) return;
1190 grpc_metadata_array* dest;
1191 dest = buffered_metadata_[is_trailing];
1192 if (dest->count + b->count() > dest->capacity) {
1193 dest->capacity =
1194 std::max(dest->capacity + b->count(), dest->capacity * 3 / 2);
1195 dest->metadata = static_cast<grpc_metadata*>(
1196 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1197 }
1198 PublishToAppEncoder encoder(dest, b, is_client());
1199 b->Encode(&encoder);
1200 }
1201
RecvInitialFilter(grpc_metadata_batch * b)1202 void FilterStackCall::RecvInitialFilter(grpc_metadata_batch* b) {
1203 ProcessIncomingInitialMetadata(*b);
1204 PublishAppMetadata(b, false);
1205 }
1206
RecvTrailingFilter(grpc_metadata_batch * b,grpc_error_handle batch_error)1207 void FilterStackCall::RecvTrailingFilter(grpc_metadata_batch* b,
1208 grpc_error_handle batch_error) {
1209 if (!batch_error.ok()) {
1210 SetFinalStatus(batch_error);
1211 } else {
1212 absl::optional<grpc_status_code> grpc_status =
1213 b->Take(GrpcStatusMetadata());
1214 if (grpc_status.has_value()) {
1215 grpc_status_code status_code = *grpc_status;
1216 grpc_error_handle error;
1217 if (status_code != GRPC_STATUS_OK) {
1218 Slice peer = GetPeerString();
1219 error = grpc_error_set_int(
1220 GRPC_ERROR_CREATE(absl::StrCat("Error received from peer ",
1221 peer.as_string_view())),
1222 StatusIntProperty::kRpcStatus, static_cast<intptr_t>(status_code));
1223 }
1224 auto grpc_message = b->Take(GrpcMessageMetadata());
1225 if (grpc_message.has_value()) {
1226 error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage,
1227 grpc_message->as_string_view());
1228 } else if (!error.ok()) {
1229 error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage, "");
1230 }
1231 SetFinalStatus(error);
1232 } else if (!is_client()) {
1233 SetFinalStatus(absl::OkStatus());
1234 } else {
1235 gpr_log(GPR_DEBUG,
1236 "Received trailing metadata with no error and no status");
1237 SetFinalStatus(grpc_error_set_int(GRPC_ERROR_CREATE("No status received"),
1238 StatusIntProperty::kRpcStatus,
1239 GRPC_STATUS_UNKNOWN));
1240 }
1241 }
1242 PublishAppMetadata(b, true);
1243 }
1244
1245 namespace {
AreWriteFlagsValid(uint32_t flags)1246 bool AreWriteFlagsValid(uint32_t flags) {
1247 // check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set
1248 const uint32_t allowed_write_positions =
1249 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1250 const uint32_t invalid_positions = ~allowed_write_positions;
1251 return !(flags & invalid_positions);
1252 }
1253
AreInitialMetadataFlagsValid(uint32_t flags)1254 bool AreInitialMetadataFlagsValid(uint32_t flags) {
1255 // check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set
1256 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1257 return !(flags & invalid_positions);
1258 }
1259
BatchSlotForOp(grpc_op_type type)1260 size_t BatchSlotForOp(grpc_op_type type) {
1261 switch (type) {
1262 case GRPC_OP_SEND_INITIAL_METADATA:
1263 return 0;
1264 case GRPC_OP_SEND_MESSAGE:
1265 return 1;
1266 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1267 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1268 return 2;
1269 case GRPC_OP_RECV_INITIAL_METADATA:
1270 return 3;
1271 case GRPC_OP_RECV_MESSAGE:
1272 return 4;
1273 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1274 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1275 return 5;
1276 }
1277 GPR_UNREACHABLE_CODE(return 123456789);
1278 }
1279 } // namespace
1280
ReuseOrAllocateBatchControl(const grpc_op * ops)1281 FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl(
1282 const grpc_op* ops) {
1283 size_t slot_idx = BatchSlotForOp(ops[0].op);
1284 BatchControl** pslot = &active_batches_[slot_idx];
1285 BatchControl* bctl;
1286 if (*pslot != nullptr) {
1287 bctl = *pslot;
1288 if (bctl->call_ != nullptr) {
1289 return nullptr;
1290 }
1291 bctl->~BatchControl();
1292 bctl->op_ = {};
1293 new (&bctl->batch_error_) AtomicError();
1294 } else {
1295 bctl = arena()->New<BatchControl>();
1296 *pslot = bctl;
1297 }
1298 bctl->call_ = this;
1299 bctl->call_tracer_ = static_cast<CallTracerAnnotationInterface*>(
1300 ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE));
1301 bctl->op_.payload = &stream_op_payload_;
1302 return bctl;
1303 }
1304
PostCompletion()1305 void FilterStackCall::BatchControl::PostCompletion() {
1306 FilterStackCall* call = call_;
1307 grpc_error_handle error = batch_error_.get();
1308
1309 if (IsCallStatusOverrideOnCancellationEnabled()) {
1310 // On the client side, if final call status is already known (i.e if this op
1311 // includes recv_trailing_metadata) and if the call status is known to be
1312 // OK, then disregard the batch error to ensure call->receiving_buffer_ is
1313 // not cleared.
1314 if (op_.recv_trailing_metadata && call->is_client() &&
1315 call->status_error_.ok()) {
1316 error = absl::OkStatus();
1317 }
1318 }
1319
1320 if (grpc_call_trace.enabled()) {
1321 gpr_log(GPR_DEBUG, "tag:%p batch_error=%s op:%s",
1322 completion_data_.notify_tag.tag, error.ToString().c_str(),
1323 grpc_transport_stream_op_batch_string(&op_, false).c_str());
1324 }
1325
1326 if (op_.send_initial_metadata) {
1327 call->send_initial_metadata_.Clear();
1328 }
1329 if (op_.send_message) {
1330 if (op_.payload->send_message.stream_write_closed && error.ok()) {
1331 error = grpc_error_add_child(
1332 error, GRPC_ERROR_CREATE(
1333 "Attempt to send message after stream was closed."));
1334 }
1335 call->sending_message_ = false;
1336 call->send_slice_buffer_.Clear();
1337 }
1338 if (op_.send_trailing_metadata) {
1339 call->send_trailing_metadata_.Clear();
1340 }
1341
1342 if (!error.ok() && op_.recv_message && *call->receiving_buffer_ != nullptr) {
1343 grpc_byte_buffer_destroy(*call->receiving_buffer_);
1344 *call->receiving_buffer_ = nullptr;
1345 }
1346 if (op_.recv_trailing_metadata) {
1347 // propagate cancellation to any interested children
1348 gpr_atm_rel_store(&call->received_final_op_atm_, 1);
1349 call->PropagateCancellationToChildren();
1350 error = absl::OkStatus();
1351 }
1352 batch_error_.set(absl::OkStatus());
1353
1354 if (completion_data_.notify_tag.is_closure) {
1355 call_ = nullptr;
1356 Closure::Run(DEBUG_LOCATION,
1357 static_cast<grpc_closure*>(completion_data_.notify_tag.tag),
1358 error);
1359 call->InternalUnref("completion");
1360 } else {
1361 grpc_cq_end_op(
1362 call->cq_, completion_data_.notify_tag.tag, error,
1363 [](void* user_data, grpc_cq_completion* /*storage*/) {
1364 BatchControl* bctl = static_cast<BatchControl*>(user_data);
1365 Call* call = bctl->call_;
1366 bctl->call_ = nullptr;
1367 call->InternalUnref("completion");
1368 },
1369 this, &completion_data_.cq_completion);
1370 }
1371 }
1372
FinishStep(PendingOp op)1373 void FilterStackCall::BatchControl::FinishStep(PendingOp op) {
1374 if (GPR_UNLIKELY(completed_batch_step(op))) {
1375 PostCompletion();
1376 }
1377 }
1378
ProcessDataAfterMetadata()1379 void FilterStackCall::BatchControl::ProcessDataAfterMetadata() {
1380 FilterStackCall* call = call_;
1381 if (!call->receiving_slice_buffer_.has_value()) {
1382 *call->receiving_buffer_ = nullptr;
1383 call->receiving_message_ = false;
1384 FinishStep(PendingOp::kRecvMessage);
1385 } else {
1386 call->NoteLastMessageFlags(call->receiving_stream_flags_);
1387 if ((call->receiving_stream_flags_ & GRPC_WRITE_INTERNAL_COMPRESS) &&
1388 (call->incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
1389 *call->receiving_buffer_ = grpc_raw_compressed_byte_buffer_create(
1390 nullptr, 0, call->incoming_compression_algorithm());
1391 } else {
1392 *call->receiving_buffer_ = grpc_raw_byte_buffer_create(nullptr, 0);
1393 }
1394 grpc_slice_buffer_move_into(
1395 call->receiving_slice_buffer_->c_slice_buffer(),
1396 &(*call->receiving_buffer_)->data.raw.slice_buffer);
1397 call->receiving_message_ = false;
1398 call->receiving_slice_buffer_.reset();
1399 FinishStep(PendingOp::kRecvMessage);
1400 }
1401 }
1402
ReceivingStreamReady(grpc_error_handle error)1403 void FilterStackCall::BatchControl::ReceivingStreamReady(
1404 grpc_error_handle error) {
1405 if (grpc_call_trace.enabled()) {
1406 gpr_log(GPR_DEBUG,
1407 "tag:%p ReceivingStreamReady error=%s "
1408 "receiving_slice_buffer.has_value=%d recv_state=%" PRIdPTR,
1409 completion_data_.notify_tag.tag, error.ToString().c_str(),
1410 call_->receiving_slice_buffer_.has_value(),
1411 gpr_atm_no_barrier_load(&call_->recv_state_));
1412 }
1413 FilterStackCall* call = call_;
1414 if (!error.ok()) {
1415 call->receiving_slice_buffer_.reset();
1416 if (batch_error_.ok()) {
1417 batch_error_.set(error);
1418 }
1419 call->CancelWithError(error);
1420 }
1421 // If recv_state is kRecvNone, we will save the batch_control
1422 // object with rel_cas, and will not use it after the cas. Its corresponding
1423 // acq_load is in receiving_initial_metadata_ready()
1424 if (!error.ok() || !call->receiving_slice_buffer_.has_value() ||
1425 !gpr_atm_rel_cas(&call->recv_state_, kRecvNone,
1426 reinterpret_cast<gpr_atm>(this))) {
1427 ProcessDataAfterMetadata();
1428 }
1429 }
1430
ReceivingInitialMetadataReady(grpc_error_handle error)1431 void FilterStackCall::BatchControl::ReceivingInitialMetadataReady(
1432 grpc_error_handle error) {
1433 FilterStackCall* call = call_;
1434
1435 GRPC_CALL_COMBINER_STOP(call->call_combiner(), "recv_initial_metadata_ready");
1436
1437 if (error.ok()) {
1438 grpc_metadata_batch* md = &call->recv_initial_metadata_;
1439 call->RecvInitialFilter(md);
1440
1441 absl::optional<Timestamp> deadline = md->get(GrpcTimeoutMetadata());
1442 if (deadline.has_value() && !call->is_client()) {
1443 call_->set_send_deadline(*deadline);
1444 }
1445 } else {
1446 if (batch_error_.ok()) {
1447 batch_error_.set(error);
1448 }
1449 call->CancelWithError(error);
1450 }
1451
1452 grpc_closure* saved_rsr_closure = nullptr;
1453 while (true) {
1454 gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state_);
1455 // Should only receive initial metadata once
1456 GPR_ASSERT(rsr_bctlp != 1);
1457 if (rsr_bctlp == 0) {
1458 // We haven't seen initial metadata and messages before, thus initial
1459 // metadata is received first.
1460 // no_barrier_cas is used, as this function won't access the batch_control
1461 // object saved by receiving_stream_ready() if the initial metadata is
1462 // received first.
1463 if (gpr_atm_no_barrier_cas(&call->recv_state_, kRecvNone,
1464 kRecvInitialMetadataFirst)) {
1465 break;
1466 }
1467 } else {
1468 // Already received messages
1469 saved_rsr_closure = GRPC_CLOSURE_CREATE(
1470 [](void* bctl, grpc_error_handle error) {
1471 static_cast<BatchControl*>(bctl)->ReceivingStreamReady(error);
1472 },
1473 reinterpret_cast<BatchControl*>(rsr_bctlp),
1474 grpc_schedule_on_exec_ctx);
1475 // No need to modify recv_state
1476 break;
1477 }
1478 }
1479 if (saved_rsr_closure != nullptr) {
1480 Closure::Run(DEBUG_LOCATION, saved_rsr_closure, error);
1481 }
1482
1483 FinishStep(PendingOp::kRecvInitialMetadata);
1484 }
1485
ReceivingTrailingMetadataReady(grpc_error_handle error)1486 void FilterStackCall::BatchControl::ReceivingTrailingMetadataReady(
1487 grpc_error_handle error) {
1488 GRPC_CALL_COMBINER_STOP(call_->call_combiner(),
1489 "recv_trailing_metadata_ready");
1490 grpc_metadata_batch* md = &call_->recv_trailing_metadata_;
1491 call_->RecvTrailingFilter(md, error);
1492 FinishStep(PendingOp::kRecvTrailingMetadata);
1493 }
1494
FinishBatch(grpc_error_handle error)1495 void FilterStackCall::BatchControl::FinishBatch(grpc_error_handle error) {
1496 GRPC_CALL_COMBINER_STOP(call_->call_combiner(), "on_complete");
1497 if (batch_error_.ok()) {
1498 batch_error_.set(error);
1499 }
1500 if (!error.ok()) {
1501 call_->CancelWithError(error);
1502 }
1503 FinishStep(PendingOp::kSends);
1504 }
1505
1506 namespace {
EndOpImmediately(grpc_completion_queue * cq,void * notify_tag,bool is_notify_tag_closure)1507 void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag,
1508 bool is_notify_tag_closure) {
1509 if (!is_notify_tag_closure) {
1510 GPR_ASSERT(grpc_cq_begin_op(cq, notify_tag));
1511 grpc_cq_end_op(
1512 cq, notify_tag, absl::OkStatus(),
1513 [](void*, grpc_cq_completion* completion) { gpr_free(completion); },
1514 nullptr,
1515 static_cast<grpc_cq_completion*>(
1516 gpr_malloc(sizeof(grpc_cq_completion))));
1517 } else {
1518 Closure::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(notify_tag),
1519 absl::OkStatus());
1520 }
1521 }
1522 } // namespace
1523
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)1524 grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
1525 void* notify_tag,
1526 bool is_notify_tag_closure) {
1527 size_t i;
1528 const grpc_op* op;
1529 BatchControl* bctl;
1530 grpc_call_error error = GRPC_CALL_OK;
1531 grpc_transport_stream_op_batch* stream_op;
1532 grpc_transport_stream_op_batch_payload* stream_op_payload;
1533 uint32_t seen_ops = 0;
1534 intptr_t pending_ops = 0;
1535
1536 for (i = 0; i < nops; i++) {
1537 if (seen_ops & (1u << ops[i].op)) {
1538 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1539 }
1540 seen_ops |= (1u << ops[i].op);
1541 }
1542
1543 if (!is_client() &&
1544 (seen_ops & (1u << GRPC_OP_SEND_STATUS_FROM_SERVER)) != 0 &&
1545 (seen_ops & (1u << GRPC_OP_RECV_MESSAGE)) != 0) {
1546 gpr_log(GPR_ERROR,
1547 "******************* SEND_STATUS WITH RECV_MESSAGE "
1548 "*******************");
1549 return GRPC_CALL_ERROR;
1550 }
1551
1552 GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1553
1554 if (nops == 0) {
1555 EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
1556 error = GRPC_CALL_OK;
1557 goto done;
1558 }
1559
1560 bctl = ReuseOrAllocateBatchControl(ops);
1561 if (bctl == nullptr) {
1562 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1563 }
1564 bctl->completion_data_.notify_tag.tag = notify_tag;
1565 bctl->completion_data_.notify_tag.is_closure =
1566 static_cast<uint8_t>(is_notify_tag_closure != 0);
1567
1568 stream_op = &bctl->op_;
1569 stream_op_payload = &stream_op_payload_;
1570
1571 // rewrite batch ops into a transport op
1572 for (i = 0; i < nops; i++) {
1573 op = &ops[i];
1574 if (op->reserved != nullptr) {
1575 error = GRPC_CALL_ERROR;
1576 goto done_with_error;
1577 }
1578 switch (op->op) {
1579 case GRPC_OP_SEND_INITIAL_METADATA: {
1580 // Flag validation: currently allow no flags
1581 if (!AreInitialMetadataFlagsValid(op->flags)) {
1582 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1583 goto done_with_error;
1584 }
1585 if (sent_initial_metadata_) {
1586 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1587 goto done_with_error;
1588 }
1589 if (op->data.send_initial_metadata.count > INT_MAX) {
1590 error = GRPC_CALL_ERROR_INVALID_METADATA;
1591 goto done_with_error;
1592 }
1593 stream_op->send_initial_metadata = true;
1594 sent_initial_metadata_ = true;
1595 if (!PrepareApplicationMetadata(op->data.send_initial_metadata.count,
1596 op->data.send_initial_metadata.metadata,
1597 false)) {
1598 error = GRPC_CALL_ERROR_INVALID_METADATA;
1599 goto done_with_error;
1600 }
1601 PrepareOutgoingInitialMetadata(*op, send_initial_metadata_);
1602 // TODO(ctiller): just make these the same variable?
1603 if (is_client() && send_deadline() != Timestamp::InfFuture()) {
1604 send_initial_metadata_.Set(GrpcTimeoutMetadata(), send_deadline());
1605 }
1606 if (is_client()) {
1607 send_initial_metadata_.Set(
1608 WaitForReady(),
1609 WaitForReady::ValueType{
1610 (op->flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) != 0,
1611 (op->flags &
1612 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) != 0});
1613 }
1614 stream_op_payload->send_initial_metadata.send_initial_metadata =
1615 &send_initial_metadata_;
1616 pending_ops |= PendingOpMask(PendingOp::kSends);
1617 break;
1618 }
1619 case GRPC_OP_SEND_MESSAGE: {
1620 if (!AreWriteFlagsValid(op->flags)) {
1621 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1622 goto done_with_error;
1623 }
1624 if (op->data.send_message.send_message == nullptr) {
1625 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1626 goto done_with_error;
1627 }
1628 if (sending_message_) {
1629 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1630 goto done_with_error;
1631 }
1632 uint32_t flags = op->flags;
1633 // If the outgoing buffer is already compressed, mark it as so in the
1634 // flags. These will be picked up by the compression filter and further
1635 // (wasteful) attempts at compression skipped.
1636 if (op->data.send_message.send_message->data.raw.compression >
1637 GRPC_COMPRESS_NONE) {
1638 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1639 }
1640 stream_op->send_message = true;
1641 sending_message_ = true;
1642 send_slice_buffer_.Clear();
1643 grpc_slice_buffer_move_into(
1644 &op->data.send_message.send_message->data.raw.slice_buffer,
1645 send_slice_buffer_.c_slice_buffer());
1646 stream_op_payload->send_message.flags = flags;
1647 stream_op_payload->send_message.send_message = &send_slice_buffer_;
1648 pending_ops |= PendingOpMask(PendingOp::kSends);
1649 break;
1650 }
1651 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1652 // Flag validation: currently allow no flags
1653 if (op->flags != 0) {
1654 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1655 goto done_with_error;
1656 }
1657 if (!is_client()) {
1658 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1659 goto done_with_error;
1660 }
1661 if (sent_final_op_) {
1662 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1663 goto done_with_error;
1664 }
1665 stream_op->send_trailing_metadata = true;
1666 sent_final_op_ = true;
1667 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1668 &send_trailing_metadata_;
1669 pending_ops |= PendingOpMask(PendingOp::kSends);
1670 break;
1671 }
1672 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1673 // Flag validation: currently allow no flags
1674 if (op->flags != 0) {
1675 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1676 goto done_with_error;
1677 }
1678 if (is_client()) {
1679 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1680 goto done_with_error;
1681 }
1682 if (sent_final_op_) {
1683 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1684 goto done_with_error;
1685 }
1686 if (op->data.send_status_from_server.trailing_metadata_count >
1687 INT_MAX) {
1688 error = GRPC_CALL_ERROR_INVALID_METADATA;
1689 goto done_with_error;
1690 }
1691 stream_op->send_trailing_metadata = true;
1692 sent_final_op_ = true;
1693
1694 if (!PrepareApplicationMetadata(
1695 op->data.send_status_from_server.trailing_metadata_count,
1696 op->data.send_status_from_server.trailing_metadata, true)) {
1697 error = GRPC_CALL_ERROR_INVALID_METADATA;
1698 goto done_with_error;
1699 }
1700
1701 grpc_error_handle status_error =
1702 op->data.send_status_from_server.status == GRPC_STATUS_OK
1703 ? absl::OkStatus()
1704 : grpc_error_set_int(
1705 GRPC_ERROR_CREATE("Server returned error"),
1706 StatusIntProperty::kRpcStatus,
1707 static_cast<intptr_t>(
1708 op->data.send_status_from_server.status));
1709 if (op->data.send_status_from_server.status_details != nullptr) {
1710 send_trailing_metadata_.Set(
1711 GrpcMessageMetadata(),
1712 Slice(grpc_slice_copy(
1713 *op->data.send_status_from_server.status_details)));
1714 if (!status_error.ok()) {
1715 status_error = grpc_error_set_str(
1716 status_error, StatusStrProperty::kGrpcMessage,
1717 StringViewFromSlice(
1718 *op->data.send_status_from_server.status_details));
1719 }
1720 }
1721
1722 status_error_.set(status_error);
1723
1724 send_trailing_metadata_.Set(GrpcStatusMetadata(),
1725 op->data.send_status_from_server.status);
1726
1727 // Ignore any te metadata key value pairs specified.
1728 send_trailing_metadata_.Remove(TeMetadata());
1729 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1730 &send_trailing_metadata_;
1731 stream_op_payload->send_trailing_metadata.sent =
1732 &sent_server_trailing_metadata_;
1733 pending_ops |= PendingOpMask(PendingOp::kSends);
1734 break;
1735 }
1736 case GRPC_OP_RECV_INITIAL_METADATA: {
1737 // Flag validation: currently allow no flags
1738 if (op->flags != 0) {
1739 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1740 goto done_with_error;
1741 }
1742 if (received_initial_metadata_) {
1743 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1744 goto done_with_error;
1745 }
1746 received_initial_metadata_ = true;
1747 buffered_metadata_[0] =
1748 op->data.recv_initial_metadata.recv_initial_metadata;
1749 GRPC_CLOSURE_INIT(
1750 &receiving_initial_metadata_ready_,
1751 [](void* bctl, grpc_error_handle error) {
1752 static_cast<BatchControl*>(bctl)->ReceivingInitialMetadataReady(
1753 error);
1754 },
1755 bctl, grpc_schedule_on_exec_ctx);
1756 stream_op->recv_initial_metadata = true;
1757 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1758 &recv_initial_metadata_;
1759 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1760 &receiving_initial_metadata_ready_;
1761 if (is_client()) {
1762 stream_op_payload->recv_initial_metadata.trailing_metadata_available =
1763 &is_trailers_only_;
1764 }
1765 pending_ops |= PendingOpMask(PendingOp::kRecvInitialMetadata);
1766 break;
1767 }
1768 case GRPC_OP_RECV_MESSAGE: {
1769 // Flag validation: currently allow no flags
1770 if (op->flags != 0) {
1771 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1772 goto done_with_error;
1773 }
1774 if (receiving_message_) {
1775 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1776 goto done_with_error;
1777 }
1778 receiving_message_ = true;
1779 stream_op->recv_message = true;
1780 receiving_slice_buffer_.reset();
1781 receiving_buffer_ = op->data.recv_message.recv_message;
1782 stream_op_payload->recv_message.recv_message = &receiving_slice_buffer_;
1783 receiving_stream_flags_ = 0;
1784 stream_op_payload->recv_message.flags = &receiving_stream_flags_;
1785 stream_op_payload->recv_message.call_failed_before_recv_message =
1786 &call_failed_before_recv_message_;
1787 GRPC_CLOSURE_INIT(
1788 &receiving_stream_ready_,
1789 [](void* bctlp, grpc_error_handle error) {
1790 auto* bctl = static_cast<BatchControl*>(bctlp);
1791 auto* call = bctl->call_;
1792 // Yields the call combiner before processing the received
1793 // message.
1794 GRPC_CALL_COMBINER_STOP(call->call_combiner(),
1795 "recv_message_ready");
1796 bctl->ReceivingStreamReady(error);
1797 },
1798 bctl, grpc_schedule_on_exec_ctx);
1799 stream_op_payload->recv_message.recv_message_ready =
1800 &receiving_stream_ready_;
1801 pending_ops |= PendingOpMask(PendingOp::kRecvMessage);
1802 break;
1803 }
1804 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1805 // Flag validation: currently allow no flags
1806 if (op->flags != 0) {
1807 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1808 goto done_with_error;
1809 }
1810 if (!is_client()) {
1811 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1812 goto done_with_error;
1813 }
1814 if (requested_final_op_) {
1815 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1816 goto done_with_error;
1817 }
1818 requested_final_op_ = true;
1819 buffered_metadata_[1] =
1820 op->data.recv_status_on_client.trailing_metadata;
1821 final_op_.client.status = op->data.recv_status_on_client.status;
1822 final_op_.client.status_details =
1823 op->data.recv_status_on_client.status_details;
1824 final_op_.client.error_string =
1825 op->data.recv_status_on_client.error_string;
1826 stream_op->recv_trailing_metadata = true;
1827 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1828 &recv_trailing_metadata_;
1829 stream_op_payload->recv_trailing_metadata.collect_stats =
1830 &final_info_.stats.transport_stream_stats;
1831 GRPC_CLOSURE_INIT(
1832 &receiving_trailing_metadata_ready_,
1833 [](void* bctl, grpc_error_handle error) {
1834 static_cast<BatchControl*>(bctl)->ReceivingTrailingMetadataReady(
1835 error);
1836 },
1837 bctl, grpc_schedule_on_exec_ctx);
1838 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1839 &receiving_trailing_metadata_ready_;
1840 pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata);
1841 break;
1842 }
1843 case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1844 // Flag validation: currently allow no flags
1845 if (op->flags != 0) {
1846 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1847 goto done_with_error;
1848 }
1849 if (is_client()) {
1850 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1851 goto done_with_error;
1852 }
1853 if (requested_final_op_) {
1854 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1855 goto done_with_error;
1856 }
1857 requested_final_op_ = true;
1858 final_op_.server.cancelled = op->data.recv_close_on_server.cancelled;
1859 stream_op->recv_trailing_metadata = true;
1860 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1861 &recv_trailing_metadata_;
1862 stream_op_payload->recv_trailing_metadata.collect_stats =
1863 &final_info_.stats.transport_stream_stats;
1864 GRPC_CLOSURE_INIT(
1865 &receiving_trailing_metadata_ready_,
1866 [](void* bctl, grpc_error_handle error) {
1867 static_cast<BatchControl*>(bctl)->ReceivingTrailingMetadataReady(
1868 error);
1869 },
1870 bctl, grpc_schedule_on_exec_ctx);
1871 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1872 &receiving_trailing_metadata_ready_;
1873 pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata);
1874 break;
1875 }
1876 }
1877 }
1878
1879 InternalRef("completion");
1880 if (!is_notify_tag_closure) {
1881 GPR_ASSERT(grpc_cq_begin_op(cq_, notify_tag));
1882 }
1883 bctl->set_pending_ops(pending_ops);
1884
1885 if (pending_ops & PendingOpMask(PendingOp::kSends)) {
1886 GRPC_CLOSURE_INIT(
1887 &bctl->finish_batch_,
1888 [](void* bctl, grpc_error_handle error) {
1889 static_cast<BatchControl*>(bctl)->FinishBatch(error);
1890 },
1891 bctl, grpc_schedule_on_exec_ctx);
1892 stream_op->on_complete = &bctl->finish_batch_;
1893 }
1894
1895 if (grpc_call_trace.enabled()) {
1896 gpr_log(GPR_DEBUG, "BATCH:%p START:%s BATCH:%s (tag:%p)", bctl,
1897 PendingOpString(pending_ops).c_str(),
1898 grpc_transport_stream_op_batch_string(stream_op, false).c_str(),
1899 bctl->completion_data_.notify_tag.tag);
1900 }
1901 ExecuteBatch(stream_op, &bctl->start_batch_);
1902
1903 done:
1904 return error;
1905
1906 done_with_error:
1907 // reverse any mutations that occurred
1908 if (stream_op->send_initial_metadata) {
1909 sent_initial_metadata_ = false;
1910 send_initial_metadata_.Clear();
1911 }
1912 if (stream_op->send_message) {
1913 sending_message_ = false;
1914 }
1915 if (stream_op->send_trailing_metadata) {
1916 sent_final_op_ = false;
1917 send_trailing_metadata_.Clear();
1918 }
1919 if (stream_op->recv_initial_metadata) {
1920 received_initial_metadata_ = false;
1921 }
1922 if (stream_op->recv_message) {
1923 receiving_message_ = false;
1924 }
1925 if (stream_op->recv_trailing_metadata) {
1926 requested_final_op_ = false;
1927 }
1928 goto done;
1929 }
1930
ContextSet(grpc_context_index elem,void * value,void (* destroy)(void *))1931 void FilterStackCall::ContextSet(grpc_context_index elem, void* value,
1932 void (*destroy)(void*)) {
1933 if (context_[elem].destroy) {
1934 context_[elem].destroy(context_[elem].value);
1935 }
1936 context_[elem].value = value;
1937 context_[elem].destroy = destroy;
1938 }
1939
1940 ///////////////////////////////////////////////////////////////////////////////
1941 // Metadata validation helpers
1942
1943 namespace {
ValidateMetadata(size_t count,grpc_metadata * metadata)1944 bool ValidateMetadata(size_t count, grpc_metadata* metadata) {
1945 if (count > INT_MAX) {
1946 return false;
1947 }
1948 for (size_t i = 0; i < count; i++) {
1949 grpc_metadata* md = &metadata[i];
1950 if (!GRPC_LOG_IF_ERROR("validate_metadata",
1951 grpc_validate_header_key_is_legal(md->key))) {
1952 return false;
1953 } else if (!grpc_is_binary_header_internal(md->key) &&
1954 !GRPC_LOG_IF_ERROR(
1955 "validate_metadata",
1956 grpc_validate_header_nonbin_value_is_legal(md->value))) {
1957 return false;
1958 } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
1959 // HTTP2 hpack encoding has a maximum limit.
1960 return false;
1961 }
1962 }
1963 return true;
1964 }
1965 } // namespace
1966
1967 ///////////////////////////////////////////////////////////////////////////////
1968 // PromiseBasedCall
1969 // Will be folded into Call once the promise conversion is done
1970
1971 class BasicPromiseBasedCall : public Call,
1972 public Party,
1973 public grpc_event_engine::experimental::
1974 EventEngine::Closure /* for deadlines */ {
1975 public:
1976 using Call::arena;
1977
BasicPromiseBasedCall(Arena * arena,uint32_t initial_external_refs,uint32_t initial_internal_refs,const grpc_call_create_args & args)1978 BasicPromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
1979 uint32_t initial_internal_refs,
1980 const grpc_call_create_args& args)
1981 : Call(arena, args.server_transport_data == nullptr, args.send_deadline,
1982 args.channel->Ref()),
1983 Party(initial_internal_refs),
1984 external_refs_(initial_external_refs),
1985 cq_(args.cq) {
1986 if (args.cq != nullptr) {
1987 GRPC_CQ_INTERNAL_REF(args.cq, "bind");
1988 }
1989 }
1990
~BasicPromiseBasedCall()1991 ~BasicPromiseBasedCall() override {
1992 if (cq_) GRPC_CQ_INTERNAL_UNREF(cq_, "bind");
1993 for (int i = 0; i < GRPC_CONTEXT_COUNT; i++) {
1994 if (context_[i].destroy) {
1995 context_[i].destroy(context_[i].value);
1996 }
1997 }
1998 }
1999
2000 // Implementation of EventEngine::Closure, called when deadline expires
2001 void Run() final;
2002
2003 virtual void OrphanCall() = 0;
2004
server_call_context()2005 virtual ServerCallContext* server_call_context() { return nullptr; }
SetCompletionQueue(grpc_completion_queue * cq)2006 void SetCompletionQueue(grpc_completion_queue* cq) final {
2007 cq_ = cq;
2008 GRPC_CQ_INTERNAL_REF(cq, "bind");
2009 }
2010
2011 // Implementation of call refcounting: move this to DualRefCounted once we
2012 // don't need to maintain FilterStackCall compatibility
ExternalRef()2013 void ExternalRef() final {
2014 if (external_refs_.fetch_add(1, std::memory_order_relaxed) == 0) {
2015 InternalRef("external");
2016 }
2017 }
ExternalUnref()2018 void ExternalUnref() final {
2019 if (external_refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2020 OrphanCall();
2021 InternalUnref("external");
2022 }
2023 }
InternalRef(const char * reason)2024 void InternalRef(const char* reason) final {
2025 if (grpc_call_refcount_trace.enabled()) {
2026 gpr_log(GPR_DEBUG, "INTERNAL_REF:%p:%s", this, reason);
2027 }
2028 Party::IncrementRefCount();
2029 }
InternalUnref(const char * reason)2030 void InternalUnref(const char* reason) final {
2031 if (grpc_call_refcount_trace.enabled()) {
2032 gpr_log(GPR_DEBUG, "INTERNAL_UNREF:%p:%s", this, reason);
2033 }
2034 Party::Unref();
2035 }
2036
RunInContext(absl::AnyInvocable<void ()> fn)2037 void RunInContext(absl::AnyInvocable<void()> fn) {
2038 Spawn(
2039 "run_in_context",
2040 [fn = std::move(fn)]() mutable {
2041 fn();
2042 return Empty{};
2043 },
2044 [](Empty) {});
2045 }
2046
ContextSet(grpc_context_index elem,void * value,void (* destroy)(void *))2047 void ContextSet(grpc_context_index elem, void* value,
2048 void (*destroy)(void*)) final {
2049 if (context_[elem].destroy != nullptr) {
2050 context_[elem].destroy(context_[elem].value);
2051 }
2052 context_[elem].value = value;
2053 context_[elem].destroy = destroy;
2054 }
2055
ContextGet(grpc_context_index elem) const2056 void* ContextGet(grpc_context_index elem) const final {
2057 return context_[elem].value;
2058 }
2059
2060 void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_);
2061 void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_);
deadline()2062 Timestamp deadline() {
2063 MutexLock lock(&deadline_mu_);
2064 return deadline_;
2065 }
2066
2067 // Accept the stats from the context (call once we have proof the transport is
2068 // done with them).
AcceptTransportStatsFromContext()2069 void AcceptTransportStatsFromContext() {
2070 final_stats_ = *call_context_.call_stats();
2071 }
2072
2073 // This should return nullptr for the promise stack (and alternative means
2074 // for that functionality be invented)
call_stack()2075 grpc_call_stack* call_stack() final { return nullptr; }
2076
MakeCallSpine(CallArgs)2077 virtual RefCountedPtr<CallSpineInterface> MakeCallSpine(CallArgs) {
2078 Crash("Not implemented");
2079 }
2080
2081 protected:
2082 class ScopedContext
2083 : public ScopedActivity,
2084 public promise_detail::Context<Arena>,
2085 public promise_detail::Context<grpc_call_context_element>,
2086 public promise_detail::Context<CallContext>,
2087 public promise_detail::Context<CallFinalization> {
2088 public:
ScopedContext(BasicPromiseBasedCall * call)2089 explicit ScopedContext(BasicPromiseBasedCall* call)
2090 : ScopedActivity(call),
2091 promise_detail::Context<Arena>(call->arena()),
2092 promise_detail::Context<grpc_call_context_element>(call->context_),
2093 promise_detail::Context<CallContext>(&call->call_context_),
2094 promise_detail::Context<CallFinalization>(&call->finalization_) {}
2095 };
2096
context()2097 grpc_call_context_element* context() { return context_; }
2098
cq()2099 grpc_completion_queue* cq() { return cq_; }
2100
2101 // At the end of the call run any finalization actions.
SetFinalizationStatus(grpc_status_code status,Slice status_details)2102 void SetFinalizationStatus(grpc_status_code status, Slice status_details) {
2103 final_message_ = std::move(status_details);
2104 final_status_ = status;
2105 }
2106
event_engine() const2107 grpc_event_engine::experimental::EventEngine* event_engine() const override {
2108 return channel()->event_engine();
2109 }
2110
2111 private:
PartyOver()2112 void PartyOver() final {
2113 {
2114 ScopedContext ctx(this);
2115 std::string message;
2116 grpc_call_final_info final_info;
2117 final_info.stats = final_stats_;
2118 final_info.final_status = final_status_;
2119 // TODO(ctiller): change type here so we don't need to copy this string.
2120 final_info.error_string = nullptr;
2121 if (!final_message_.empty()) {
2122 message = std::string(final_message_.begin(), final_message_.end());
2123 final_info.error_string = message.c_str();
2124 }
2125 final_info.stats.latency =
2126 gpr_cycle_counter_sub(gpr_get_cycle_counter(), start_time());
2127 finalization_.Run(&final_info);
2128 CancelRemainingParticipants();
2129 arena()->DestroyManagedNewObjects();
2130 }
2131 DeleteThis();
2132 }
2133
2134 // Double refcounted for now: party owns the internal refcount, we track the
2135 // external refcount. Figure out a better scheme post-promise conversion.
2136 std::atomic<size_t> external_refs_;
2137 CallFinalization finalization_;
2138 CallContext call_context_{this};
2139 // Contexts for various subsystems (security, tracing, ...).
2140 grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
2141 grpc_call_stats final_stats_{};
2142 // Current deadline.
2143 Mutex deadline_mu_;
2144 Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture();
2145 grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY(
2146 deadline_mu_) deadline_task_;
2147 Slice final_message_;
2148 grpc_status_code final_status_ = GRPC_STATUS_UNKNOWN;
2149 grpc_completion_queue* cq_;
2150 };
2151
UpdateDeadline(Timestamp deadline)2152 void BasicPromiseBasedCall::UpdateDeadline(Timestamp deadline) {
2153 MutexLock lock(&deadline_mu_);
2154 if (grpc_call_trace.enabled()) {
2155 gpr_log(GPR_DEBUG, "%s[call] UpdateDeadline from=%s to=%s",
2156 DebugTag().c_str(), deadline_.ToString().c_str(),
2157 deadline.ToString().c_str());
2158 }
2159 if (deadline >= deadline_) return;
2160 auto* const event_engine = channel()->event_engine();
2161 if (deadline_ != Timestamp::InfFuture()) {
2162 if (!event_engine->Cancel(deadline_task_)) return;
2163 } else {
2164 InternalRef("deadline");
2165 }
2166 deadline_ = deadline;
2167 deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this);
2168 }
2169
ResetDeadline()2170 void BasicPromiseBasedCall::ResetDeadline() {
2171 {
2172 MutexLock lock(&deadline_mu_);
2173 if (deadline_ == Timestamp::InfFuture()) return;
2174 auto* const event_engine = channel()->event_engine();
2175 if (!event_engine->Cancel(deadline_task_)) return;
2176 deadline_ = Timestamp::InfFuture();
2177 }
2178 InternalUnref("deadline[reset]");
2179 }
2180
Run()2181 void BasicPromiseBasedCall::Run() {
2182 ApplicationCallbackExecCtx callback_exec_ctx;
2183 ExecCtx exec_ctx;
2184 CancelWithError(absl::DeadlineExceededError("Deadline exceeded"));
2185 InternalUnref("deadline[run]");
2186 }
2187
2188 class PromiseBasedCall : public BasicPromiseBasedCall {
2189 public:
2190 PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
2191 const grpc_call_create_args& args);
2192
Completed()2193 bool Completed() final { return finished_.IsSet(); }
2194
failed_before_recv_message() const2195 bool failed_before_recv_message() const final {
2196 return failed_before_recv_message_.load(std::memory_order_relaxed);
2197 }
2198
2199 using Call::arena;
2200
2201 protected:
2202 class ScopedContext : public BasicPromiseBasedCall::ScopedContext,
2203 public BatchBuilder,
2204 public promise_detail::Context<BatchBuilder> {
2205 public:
ScopedContext(PromiseBasedCall * call)2206 explicit ScopedContext(PromiseBasedCall* call)
2207 : BasicPromiseBasedCall::ScopedContext(call),
2208 BatchBuilder(&call->batch_payload_),
2209 promise_detail::Context<BatchBuilder>(this) {}
2210 };
2211
2212 class Completion {
2213 public:
Completion()2214 Completion() : index_(kNullIndex) {}
~Completion()2215 ~Completion() { GPR_ASSERT(index_ == kNullIndex); }
Completion(uint8_t index)2216 explicit Completion(uint8_t index) : index_(index) {}
2217 Completion(const Completion& other) = delete;
2218 Completion& operator=(const Completion& other) = delete;
Completion(Completion && other)2219 Completion(Completion&& other) noexcept : index_(other.index_) {
2220 other.index_ = kNullIndex;
2221 }
operator =(Completion && other)2222 Completion& operator=(Completion&& other) noexcept {
2223 GPR_ASSERT(index_ == kNullIndex);
2224 index_ = other.index_;
2225 other.index_ = kNullIndex;
2226 return *this;
2227 }
2228
index() const2229 uint8_t index() const { return index_; }
TakeIndex()2230 uint8_t TakeIndex() { return std::exchange(index_, kNullIndex); }
has_value() const2231 bool has_value() const { return index_ != kNullIndex; }
2232
2233 private:
2234 enum : uint8_t { kNullIndex = 0xff };
2235 uint8_t index_;
2236 };
2237
2238 // Enumerates why a Completion is still pending
2239 enum class PendingOp {
2240 // We're in the midst of starting a batch of operations
2241 kStartingBatch = 0,
2242 // The following correspond with the batch operations from above
2243 kSendInitialMetadata,
2244 kReceiveInitialMetadata,
2245 kReceiveStatusOnClient,
2246 kReceiveCloseOnServer = kReceiveStatusOnClient,
2247 kSendMessage,
2248 kReceiveMessage,
2249 kSendStatusFromServer,
2250 kSendCloseFromClient = kSendStatusFromServer,
2251 };
2252
RunParty()2253 bool RunParty() override {
2254 ScopedContext ctx(this);
2255 return Party::RunParty();
2256 }
2257
PendingOpString(PendingOp reason) const2258 const char* PendingOpString(PendingOp reason) const {
2259 switch (reason) {
2260 case PendingOp::kStartingBatch:
2261 return "StartingBatch";
2262 case PendingOp::kSendInitialMetadata:
2263 return "SendInitialMetadata";
2264 case PendingOp::kReceiveInitialMetadata:
2265 return "ReceiveInitialMetadata";
2266 case PendingOp::kReceiveStatusOnClient:
2267 return is_client() ? "ReceiveStatusOnClient" : "ReceiveCloseOnServer";
2268 case PendingOp::kSendMessage:
2269 return "SendMessage";
2270 case PendingOp::kReceiveMessage:
2271 return "ReceiveMessage";
2272 case PendingOp::kSendStatusFromServer:
2273 return is_client() ? "SendCloseFromClient" : "SendStatusFromServer";
2274 }
2275 return "Unknown";
2276 }
2277
PendingOpBit(PendingOp reason)2278 static constexpr uint32_t PendingOpBit(PendingOp reason) {
2279 return 1 << static_cast<int>(reason);
2280 }
2281
2282 // Begin work on a completion, recording the tag/closure to notify.
2283 // Use the op selected in \a ops to determine the index to allocate into.
2284 // Starts the "StartingBatch" PendingOp immediately.
2285 // Assumes at least one operation in \a ops.
2286 Completion StartCompletion(void* tag, bool is_closure, const grpc_op* ops);
2287 // Add one pending op to the completion, and return it.
2288 Completion AddOpToCompletion(const Completion& completion, PendingOp reason);
2289 // Stringify a completion
CompletionString(const Completion & completion) const2290 std::string CompletionString(const Completion& completion) const {
2291 return completion.has_value()
2292 ? completion_info_[completion.index()].pending.ToString(this)
2293 : "no-completion";
2294 }
2295 // Finish one op on the completion. Must have been previously been added.
2296 // The completion as a whole finishes when all pending ops finish.
2297 void FinishOpOnCompletion(Completion* completion, PendingOp reason);
2298 // Mark the completion as failed. Does not finish it.
2299 void FailCompletion(const Completion& completion,
2300 SourceLocation source_location = {});
2301 // Mark the completion as infallible. Overrides FailCompletion to report
2302 // success always.
2303 void ForceCompletionSuccess(const Completion& completion);
2304
PresentAndCompletionText(const char * caption,bool has,const Completion & completion) const2305 std::string PresentAndCompletionText(const char* caption, bool has,
2306 const Completion& completion) const {
2307 if (has) {
2308 if (completion.has_value()) {
2309 return absl::StrCat(caption, ":", CompletionString(completion), " ");
2310 } else {
2311 return absl::StrCat(caption,
2312 ":!!BUG:operation is present, no completion!! ");
2313 }
2314 } else {
2315 if (!completion.has_value()) {
2316 return "";
2317 } else {
2318 return absl::StrCat(caption, ":no-op:", CompletionString(completion),
2319 " ");
2320 }
2321 }
2322 }
2323
2324 // Spawn a job that will first do FirstPromise then receive a message
2325 template <typename FirstPromise>
2326 void StartRecvMessage(const grpc_op& op, const Completion& completion,
2327 FirstPromise first,
2328 PipeReceiver<MessageHandle>* receiver,
2329 bool cancel_on_error, Party::BulkSpawner& spawner);
2330 void StartSendMessage(const grpc_op& op, const Completion& completion,
2331 PipeSender<MessageHandle>* sender,
2332 Party::BulkSpawner& spawner);
2333
set_completed()2334 void set_completed() { finished_.Set(); }
2335
2336 // Returns a promise that resolves to Empty whenever the call is completed.
finished()2337 auto finished() { return finished_.Wait(); }
2338
2339 // Returns a promise that resolves to Empty whenever there is no outstanding
2340 // send operation
WaitForSendingStarted()2341 auto WaitForSendingStarted() {
2342 return [this]() -> Poll<Empty> {
2343 int n = sends_queued_.load(std::memory_order_relaxed);
2344 if (grpc_call_trace.enabled()) {
2345 gpr_log(GPR_DEBUG, "%s[call] WaitForSendingStarted n=%d",
2346 DebugTag().c_str(), n);
2347 }
2348 if (n != 0) return waiting_for_queued_sends_.pending();
2349 return Empty{};
2350 };
2351 }
2352
2353 // Mark that a send has been queued - blocks sending trailing metadata.
QueueSend()2354 void QueueSend() {
2355 if (grpc_call_trace.enabled()) {
2356 gpr_log(GPR_DEBUG, "%s[call] QueueSend", DebugTag().c_str());
2357 }
2358 sends_queued_.fetch_add(1, std::memory_order_relaxed);
2359 }
2360 // Mark that a send has been dequeued - allows sending trailing metadata once
2361 // zero sends are queued.
EnactSend()2362 void EnactSend() {
2363 if (grpc_call_trace.enabled()) {
2364 gpr_log(GPR_DEBUG, "%s[call] EnactSend", DebugTag().c_str());
2365 }
2366 if (1 == sends_queued_.fetch_sub(1, std::memory_order_relaxed)) {
2367 waiting_for_queued_sends_.Wake();
2368 }
2369 }
2370
set_failed_before_recv_message()2371 void set_failed_before_recv_message() {
2372 failed_before_recv_message_.store(true, std::memory_order_relaxed);
2373 }
2374
2375 private:
2376 union CompletionInfo {
2377 static constexpr uint32_t kOpFailed = 0x8000'0000u;
2378 static constexpr uint32_t kOpForceSuccess = 0x4000'0000u;
CompletionInfo()2379 CompletionInfo() {}
2380 enum CompletionState {
2381 kPending,
2382 kSuccess,
2383 kFailure,
2384 };
2385 struct Pending {
2386 // Bitmask of PendingOps at the bottom, and kOpFailed, kOpForceSuccess at
2387 // the top.
2388 std::atomic<uint32_t> state;
2389 bool is_closure;
2390 // True if this completion was for a recv_message op.
2391 // In that case if the completion as a whole fails we need to cleanup the
2392 // returned message.
2393 bool is_recv_message;
2394 void* tag;
2395
Startgrpc_core::PromiseBasedCall::CompletionInfo::Pending2396 void Start(bool is_closure, void* tag) {
2397 this->is_closure = is_closure;
2398 this->is_recv_message = false;
2399 this->tag = tag;
2400 state.store(PendingOpBit(PendingOp::kStartingBatch),
2401 std::memory_order_release);
2402 }
2403
AddPendingBitgrpc_core::PromiseBasedCall::CompletionInfo::Pending2404 void AddPendingBit(PendingOp reason) {
2405 if (reason == PendingOp::kReceiveMessage) is_recv_message = true;
2406 auto prev =
2407 state.fetch_or(PendingOpBit(reason), std::memory_order_relaxed);
2408 GPR_ASSERT((prev & PendingOpBit(reason)) == 0);
2409 }
2410
RemovePendingBitgrpc_core::PromiseBasedCall::CompletionInfo::Pending2411 CompletionState RemovePendingBit(PendingOp reason) {
2412 const uint32_t mask = ~PendingOpBit(reason);
2413 auto prev = state.fetch_and(mask, std::memory_order_acq_rel);
2414 GPR_ASSERT((prev & PendingOpBit(reason)) != 0);
2415 switch (prev & mask) {
2416 case kOpFailed:
2417 return kFailure;
2418 case kOpFailed | kOpForceSuccess:
2419 case kOpForceSuccess:
2420 case 0:
2421 return kSuccess;
2422 default:
2423 return kPending;
2424 }
2425 }
2426
MarkFailedgrpc_core::PromiseBasedCall::CompletionInfo::Pending2427 void MarkFailed() {
2428 state.fetch_or(kOpFailed, std::memory_order_relaxed);
2429 }
2430
MarkForceSuccessgrpc_core::PromiseBasedCall::CompletionInfo::Pending2431 void MarkForceSuccess() {
2432 state.fetch_or(kOpForceSuccess, std::memory_order_relaxed);
2433 }
2434
ToStringgrpc_core::PromiseBasedCall::CompletionInfo::Pending2435 std::string ToString(const PromiseBasedCall* call) const {
2436 auto state = this->state.load(std::memory_order_relaxed);
2437 std::vector<absl::string_view> pending_ops;
2438 for (size_t i = 0; i < 24; i++) {
2439 if (state & (1u << i)) {
2440 pending_ops.push_back(
2441 call->PendingOpString(static_cast<PendingOp>(i)));
2442 }
2443 }
2444 return absl::StrFormat("{%s}%s:tag=%p", absl::StrJoin(pending_ops, ","),
2445 (state & kOpForceSuccess) ? ":force-success"
2446 : (state & kOpFailed) ? ":failed"
2447 : ":success",
2448 tag);
2449 }
2450 } pending;
2451 grpc_cq_completion completion;
2452 };
2453
2454 CompletionInfo completion_info_[6];
2455 ExternallyObservableLatch<void> finished_;
2456 // Non-zero with an outstanding GRPC_OP_SEND_INITIAL_METADATA or
2457 // GRPC_OP_SEND_MESSAGE (one count each), and 0 once those payloads have been
2458 // pushed onto the outgoing pipe.
2459 std::atomic<uint8_t> sends_queued_{0};
2460 std::atomic<bool> failed_before_recv_message_{false};
2461 // Waiter for when sends_queued_ becomes 0.
2462 IntraActivityWaiter waiting_for_queued_sends_;
2463 grpc_byte_buffer** recv_message_ = nullptr;
2464 grpc_transport_stream_op_batch_payload batch_payload_{context()};
2465 };
2466
2467 template <typename T>
MakePromiseBasedCall(grpc_call_create_args * args,grpc_call ** out_call)2468 grpc_error_handle MakePromiseBasedCall(grpc_call_create_args* args,
2469 grpc_call** out_call) {
2470 Channel* channel = args->channel.get();
2471
2472 auto* arena = channel->CreateArena();
2473 PromiseBasedCall* call = arena->New<T>(arena, args);
2474 *out_call = call->c_ptr();
2475 GPR_DEBUG_ASSERT(Call::FromC(*out_call) == call);
2476 return absl::OkStatus();
2477 }
2478
PromiseBasedCall(Arena * arena,uint32_t initial_external_refs,const grpc_call_create_args & args)2479 PromiseBasedCall::PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
2480 const grpc_call_create_args& args)
2481 : BasicPromiseBasedCall(arena, initial_external_refs,
2482 initial_external_refs != 0 ? 1 : 0, args) {}
2483
CToMetadata(grpc_metadata * metadata,size_t count,grpc_metadata_batch * b)2484 static void CToMetadata(grpc_metadata* metadata, size_t count,
2485 grpc_metadata_batch* b) {
2486 for (size_t i = 0; i < count; i++) {
2487 grpc_metadata* md = &metadata[i];
2488 auto key = StringViewFromSlice(md->key);
2489 // Filter "content-length metadata"
2490 if (key == "content-length") continue;
2491 b->Append(key, Slice(CSliceRef(md->value)),
2492 [md](absl::string_view error, const Slice& value) {
2493 gpr_log(GPR_DEBUG, "Append error: %s",
2494 absl::StrCat("key=", StringViewFromSlice(md->key),
2495 " error=", error,
2496 " value=", value.as_string_view())
2497 .c_str());
2498 });
2499 }
2500 }
2501
StartCompletion(void * tag,bool is_closure,const grpc_op * ops)2502 PromiseBasedCall::Completion PromiseBasedCall::StartCompletion(
2503 void* tag, bool is_closure, const grpc_op* ops) {
2504 Completion c(BatchSlotForOp(ops[0].op));
2505 if (!is_closure) {
2506 grpc_cq_begin_op(cq(), tag);
2507 }
2508 completion_info_[c.index()].pending.Start(is_closure, tag);
2509 if (grpc_call_trace.enabled()) {
2510 gpr_log(GPR_INFO, "%s[call] StartCompletion %s", DebugTag().c_str(),
2511 CompletionString(c).c_str());
2512 }
2513 return c;
2514 }
2515
AddOpToCompletion(const Completion & completion,PendingOp reason)2516 PromiseBasedCall::Completion PromiseBasedCall::AddOpToCompletion(
2517 const Completion& completion, PendingOp reason) {
2518 if (grpc_call_trace.enabled()) {
2519 gpr_log(GPR_INFO, "%s[call] AddOpToCompletion %s %s", DebugTag().c_str(),
2520 CompletionString(completion).c_str(), PendingOpString(reason));
2521 }
2522 GPR_ASSERT(completion.has_value());
2523 completion_info_[completion.index()].pending.AddPendingBit(reason);
2524 return Completion(completion.index());
2525 }
2526
FailCompletion(const Completion & completion,SourceLocation location)2527 void PromiseBasedCall::FailCompletion(const Completion& completion,
2528 SourceLocation location) {
2529 if (grpc_call_trace.enabled()) {
2530 gpr_log(location.file(), location.line(), GPR_LOG_SEVERITY_ERROR,
2531 "%s[call] FailCompletion %s", DebugTag().c_str(),
2532 CompletionString(completion).c_str());
2533 }
2534 completion_info_[completion.index()].pending.MarkFailed();
2535 }
2536
ForceCompletionSuccess(const Completion & completion)2537 void PromiseBasedCall::ForceCompletionSuccess(const Completion& completion) {
2538 completion_info_[completion.index()].pending.MarkForceSuccess();
2539 }
2540
FinishOpOnCompletion(Completion * completion,PendingOp reason)2541 void PromiseBasedCall::FinishOpOnCompletion(Completion* completion,
2542 PendingOp reason) {
2543 if (grpc_call_trace.enabled()) {
2544 gpr_log(GPR_INFO, "%s[call] FinishOpOnCompletion completion:%s finish:%s",
2545 DebugTag().c_str(), CompletionString(*completion).c_str(),
2546 PendingOpString(reason));
2547 }
2548 const uint8_t i = completion->TakeIndex();
2549 GPR_ASSERT(i < GPR_ARRAY_SIZE(completion_info_));
2550 CompletionInfo::Pending& pending = completion_info_[i].pending;
2551 bool success;
2552 switch (pending.RemovePendingBit(reason)) {
2553 case CompletionInfo::kPending:
2554 return; // Early out
2555 case CompletionInfo::kSuccess:
2556 success = true;
2557 break;
2558 case CompletionInfo::kFailure:
2559 success = false;
2560 break;
2561 }
2562 if (pending.is_recv_message && !success && *recv_message_ != nullptr) {
2563 grpc_byte_buffer_destroy(*recv_message_);
2564 *recv_message_ = nullptr;
2565 }
2566 auto error = success ? absl::OkStatus() : absl::CancelledError();
2567 if (pending.is_closure) {
2568 ExecCtx::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(pending.tag),
2569 error);
2570 } else {
2571 InternalRef("cq_end_op");
2572 grpc_cq_end_op(
2573 cq(), pending.tag, error,
2574 [](void* p, grpc_cq_completion*) {
2575 static_cast<PromiseBasedCall*>(p)->InternalUnref("cq_end_op");
2576 },
2577 this, &completion_info_[i].completion);
2578 }
2579 }
2580
StartSendMessage(const grpc_op & op,const Completion & completion,PipeSender<MessageHandle> * sender,Party::BulkSpawner & spawner)2581 void PromiseBasedCall::StartSendMessage(const grpc_op& op,
2582 const Completion& completion,
2583 PipeSender<MessageHandle>* sender,
2584 Party::BulkSpawner& spawner) {
2585 QueueSend();
2586 SliceBuffer send;
2587 grpc_slice_buffer_swap(
2588 &op.data.send_message.send_message->data.raw.slice_buffer,
2589 send.c_slice_buffer());
2590 auto msg = arena()->MakePooled<Message>(std::move(send), op.flags);
2591 spawner.Spawn(
2592 "call_send_message",
2593 [this, sender, msg = std::move(msg)]() mutable {
2594 EnactSend();
2595 return sender->Push(std::move(msg));
2596 },
2597 [this, completion = AddOpToCompletion(
2598 completion, PendingOp::kSendMessage)](bool result) mutable {
2599 if (grpc_call_trace.enabled()) {
2600 gpr_log(GPR_DEBUG, "%sSendMessage completes %s", DebugTag().c_str(),
2601 result ? "successfully" : "with failure");
2602 }
2603 if (!result) FailCompletion(completion);
2604 FinishOpOnCompletion(&completion, PendingOp::kSendMessage);
2605 });
2606 }
2607
2608 template <typename FirstPromiseFactory>
StartRecvMessage(const grpc_op & op,const Completion & completion,FirstPromiseFactory first_promise_factory,PipeReceiver<MessageHandle> * receiver,bool cancel_on_error,Party::BulkSpawner & spawner)2609 void PromiseBasedCall::StartRecvMessage(
2610 const grpc_op& op, const Completion& completion,
2611 FirstPromiseFactory first_promise_factory,
2612 PipeReceiver<MessageHandle>* receiver, bool cancel_on_error,
2613 Party::BulkSpawner& spawner) {
2614 if (grpc_call_trace.enabled()) {
2615 gpr_log(GPR_INFO, "%s[call] Start RecvMessage: %s", DebugTag().c_str(),
2616 CompletionString(completion).c_str());
2617 }
2618 recv_message_ = op.data.recv_message.recv_message;
2619 spawner.Spawn(
2620 "call_recv_message",
2621 [first_promise_factory = std::move(first_promise_factory), receiver]() {
2622 return Seq(first_promise_factory(), receiver->Next());
2623 },
2624 [this, cancel_on_error,
2625 completion = AddOpToCompletion(completion, PendingOp::kReceiveMessage)](
2626 NextResult<MessageHandle> result) mutable {
2627 if (result.has_value()) {
2628 MessageHandle& message = *result;
2629 NoteLastMessageFlags(message->flags());
2630 if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
2631 (incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
2632 *recv_message_ = grpc_raw_compressed_byte_buffer_create(
2633 nullptr, 0, incoming_compression_algorithm());
2634 } else {
2635 *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0);
2636 }
2637 grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(),
2638 &(*recv_message_)->data.raw.slice_buffer);
2639 if (grpc_call_trace.enabled()) {
2640 gpr_log(GPR_INFO,
2641 "%s[call] RecvMessage: outstanding_recv "
2642 "finishes: received %" PRIdPTR " byte message",
2643 DebugTag().c_str(),
2644 (*recv_message_)->data.raw.slice_buffer.length);
2645 }
2646 } else if (result.cancelled()) {
2647 if (grpc_call_trace.enabled()) {
2648 gpr_log(GPR_INFO,
2649 "%s[call] RecvMessage: outstanding_recv "
2650 "finishes: received end-of-stream with error",
2651 DebugTag().c_str());
2652 }
2653 set_failed_before_recv_message();
2654 FailCompletion(completion);
2655 if (cancel_on_error) CancelWithError(absl::CancelledError());
2656 *recv_message_ = nullptr;
2657 } else {
2658 if (grpc_call_trace.enabled()) {
2659 gpr_log(GPR_INFO,
2660 "%s[call] RecvMessage: outstanding_recv "
2661 "finishes: received end-of-stream",
2662 DebugTag().c_str());
2663 }
2664 *recv_message_ = nullptr;
2665 }
2666 FinishOpOnCompletion(&completion, PendingOp::kReceiveMessage);
2667 });
2668 }
2669
2670 ///////////////////////////////////////////////////////////////////////////////
2671 // CallContext
2672
RunInContext(absl::AnyInvocable<void ()> fn)2673 void CallContext::RunInContext(absl::AnyInvocable<void()> fn) {
2674 call_->RunInContext(std::move(fn));
2675 }
2676
IncrementRefCount(const char * reason)2677 void CallContext::IncrementRefCount(const char* reason) {
2678 call_->InternalRef(reason);
2679 }
2680
Unref(const char * reason)2681 void CallContext::Unref(const char* reason) { call_->InternalUnref(reason); }
2682
UpdateDeadline(Timestamp deadline)2683 void CallContext::UpdateDeadline(Timestamp deadline) {
2684 call_->UpdateDeadline(deadline);
2685 }
2686
deadline() const2687 Timestamp CallContext::deadline() const { return call_->deadline(); }
2688
server_call_context()2689 ServerCallContext* CallContext::server_call_context() {
2690 return call_->server_call_context();
2691 }
2692
MakeCallSpine(CallArgs call_args)2693 RefCountedPtr<CallSpineInterface> CallContext::MakeCallSpine(
2694 CallArgs call_args) {
2695 return call_->MakeCallSpine(std::move(call_args));
2696 }
2697
c_call()2698 grpc_call* CallContext::c_call() { return call_->c_ptr(); }
2699
2700 ///////////////////////////////////////////////////////////////////////////////
2701 // PublishMetadataArray
2702
2703 namespace {
PublishMetadataArray(grpc_metadata_batch * md,grpc_metadata_array * array,bool is_client)2704 void PublishMetadataArray(grpc_metadata_batch* md, grpc_metadata_array* array,
2705 bool is_client) {
2706 const auto md_count = md->count();
2707 if (md_count > array->capacity) {
2708 array->capacity =
2709 std::max(array->capacity + md->count(), array->capacity * 3 / 2);
2710 array->metadata = static_cast<grpc_metadata*>(
2711 gpr_realloc(array->metadata, sizeof(grpc_metadata) * array->capacity));
2712 }
2713 PublishToAppEncoder encoder(array, md, is_client);
2714 md->Encode(&encoder);
2715 }
2716 } // namespace
2717
2718 ///////////////////////////////////////////////////////////////////////////////
2719 // ClientPromiseBasedCall
2720
2721 #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
2722 class ClientPromiseBasedCall final : public PromiseBasedCall {
2723 public:
ClientPromiseBasedCall(Arena * arena,grpc_call_create_args * args)2724 ClientPromiseBasedCall(Arena* arena, grpc_call_create_args* args)
2725 : PromiseBasedCall(arena, 1, *args),
2726 polling_entity_(
2727 args->cq != nullptr
2728 ? grpc_polling_entity_create_from_pollset(
2729 grpc_cq_pollset(args->cq))
2730 : (args->pollset_set_alternative != nullptr
2731 ? grpc_polling_entity_create_from_pollset_set(
2732 args->pollset_set_alternative)
2733 : grpc_polling_entity{})) {
2734 global_stats().IncrementClientCallsCreated();
2735 if (args->cq != nullptr) {
2736 GPR_ASSERT(args->pollset_set_alternative == nullptr &&
2737 "Only one of 'cq' and 'pollset_set_alternative' should be "
2738 "non-nullptr.");
2739 }
2740 ScopedContext context(this);
2741 args->channel->channel_stack()->stats_plugin_group->AddClientCallTracers(
2742 *args->path, args->registered_method, this->context());
2743 send_initial_metadata_ = GetContext<Arena>()->MakePooled<ClientMetadata>();
2744 send_initial_metadata_->Set(HttpPathMetadata(), std::move(*args->path));
2745 if (args->authority.has_value()) {
2746 send_initial_metadata_->Set(HttpAuthorityMetadata(),
2747 std::move(*args->authority));
2748 }
2749 send_initial_metadata_->Set(GrpcRegisteredMethod(),
2750 reinterpret_cast<void*>(static_cast<uintptr_t>(
2751 args->registered_method)));
2752 if (auto* channelz_channel = channel()->channelz_node()) {
2753 channelz_channel->RecordCallStarted();
2754 }
2755 if (args->send_deadline != Timestamp::InfFuture()) {
2756 UpdateDeadline(args->send_deadline);
2757 }
2758 Call* parent = Call::FromC(args->parent);
2759 if (parent != nullptr) {
2760 auto parent_status = InitParent(parent, args->propagation_mask);
2761 if (!parent_status.ok()) {
2762 CancelWithError(std::move(parent_status));
2763 }
2764 PublishToParent(parent);
2765 }
2766 }
2767
OrphanCall()2768 void OrphanCall() override { MaybeUnpublishFromParent(); }
2769
~ClientPromiseBasedCall()2770 ~ClientPromiseBasedCall() override {
2771 ScopedContext context(this);
2772 send_initial_metadata_.reset();
2773 // Need to destroy the pipes under the ScopedContext above, so we
2774 // move them out here and then allow the destructors to run at
2775 // end of scope, but before context.
2776 auto c2s = std::move(client_to_server_messages_);
2777 auto s2c = std::move(server_to_client_messages_);
2778 auto sim = std::move(server_initial_metadata_);
2779 }
2780
CancelWithError(absl::Status error)2781 void CancelWithError(absl::Status error) override {
2782 if (cancel_with_error_called_.exchange(true, std::memory_order_relaxed)) {
2783 return;
2784 }
2785 if (!started_.exchange(true, std::memory_order_relaxed)) {
2786 // Initial metadata not sent yet, so we can just fail the call.
2787 Spawn(
2788 "cancel_before_initial_metadata",
2789 [error = std::move(error), this]() {
2790 server_to_client_messages_.sender.Close();
2791 auto md = ServerMetadataFromStatus(error);
2792 md->Set(GrpcCallWasCancelled(), true);
2793 Finish(std::move(md));
2794 return Empty{};
2795 },
2796 [](Empty) {});
2797 } else {
2798 Spawn(
2799 "cancel_with_error",
2800 [error = std::move(error), this]() {
2801 if (!cancel_error_.is_set()) {
2802 auto md = ServerMetadataFromStatus(error);
2803 md->Set(GrpcCallWasCancelled(), true);
2804 cancel_error_.Set(std::move(md));
2805 }
2806 return Empty{};
2807 },
2808 [](Empty) {});
2809 }
2810 }
GetServerAuthority() const2811 absl::string_view GetServerAuthority() const override { abort(); }
is_trailers_only() const2812 bool is_trailers_only() const override { return is_trailers_only_; }
2813
2814 grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
2815 bool is_notify_tag_closure) override;
2816
DebugTag() const2817 std::string DebugTag() const override {
2818 return absl::StrFormat("CLIENT_CALL[%p]: ", this);
2819 }
2820
MakeCallSpine(CallArgs call_args)2821 RefCountedPtr<CallSpineInterface> MakeCallSpine(CallArgs call_args) final {
2822 class WrappingCallSpine final : public CallSpineInterface {
2823 public:
2824 WrappingCallSpine(ClientPromiseBasedCall* call,
2825 ClientMetadataHandle metadata)
2826 : call_(call) {
2827 call_->InternalRef("call-spine");
2828 SpawnInfallible(
2829 "send_client_initial_metadata",
2830 [self = Ref(), metadata = std::move(metadata)]() mutable {
2831 return Map(self->client_initial_metadata_.sender.Push(
2832 std::move(metadata)),
2833 [self](bool) { return Empty{}; });
2834 });
2835 }
2836
2837 ~WrappingCallSpine() override {
2838 {
2839 ScopedContext context(call_);
2840 // Move these out and destroy before the internal unref.
2841 auto client_initial_metadata = std::move(client_initial_metadata_);
2842 auto server_trailing_metadata = std::move(server_trailing_metadata_);
2843 }
2844 call_->InternalUnref("call-spine");
2845 }
2846
2847 Pipe<ClientMetadataHandle>& client_initial_metadata() override {
2848 return client_initial_metadata_;
2849 }
2850
2851 Pipe<MessageHandle>& client_to_server_messages() override {
2852 return call_->client_to_server_messages_;
2853 }
2854
2855 Pipe<ServerMetadataHandle>& server_initial_metadata() override {
2856 return call_->server_initial_metadata_;
2857 }
2858
2859 Pipe<MessageHandle>& server_to_client_messages() override {
2860 return call_->server_to_client_messages_;
2861 }
2862
2863 Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
2864 return server_trailing_metadata_;
2865 }
2866
2867 Latch<ServerMetadataHandle>& cancel_latch() override {
2868 return cancel_error_;
2869 }
2870
2871 Party& party() override { return *call_; }
2872 Arena* arena() override { return call_->arena(); }
2873
2874 void IncrementRefCount() override { refs_.Ref(); }
2875 void Unref() override {
2876 if (refs_.Unref()) delete this;
2877 }
2878 RefCountedPtr<WrappingCallSpine> Ref() {
2879 IncrementRefCount();
2880 return RefCountedPtr<WrappingCallSpine>(this);
2881 }
2882
2883 private:
2884 RefCount refs_;
2885 ClientPromiseBasedCall* const call_;
2886 std::atomic<bool> sent_trailing_metadata_{false};
2887 Pipe<ClientMetadataHandle> client_initial_metadata_{call_->arena()};
2888 Pipe<ServerMetadataHandle> server_trailing_metadata_{call_->arena()};
2889 Latch<ServerMetadataHandle> cancel_error_;
2890 };
2891 GPR_ASSERT(call_args.server_initial_metadata ==
2892 &server_initial_metadata_.sender);
2893 GPR_ASSERT(call_args.client_to_server_messages ==
2894 &client_to_server_messages_.receiver);
2895 GPR_ASSERT(call_args.server_to_client_messages ==
2896 &server_to_client_messages_.sender);
2897 call_args.client_initial_metadata_outstanding.Complete(true);
2898 return MakeRefCounted<WrappingCallSpine>(
2899 this, std::move(call_args.client_initial_metadata));
2900 }
2901
2902 private:
2903 // Finish the call with the given status/trailing metadata.
2904 void Finish(ServerMetadataHandle trailing_metadata);
2905 // Validate that a set of ops is valid for a client call.
2906 grpc_call_error ValidateBatch(const grpc_op* ops, size_t nops) const;
2907 // Commit a valid batch of operations to be executed.
2908 void CommitBatch(const grpc_op* ops, size_t nops,
2909 const Completion& completion);
2910 // Start the underlying promise.
2911 void StartPromise(ClientMetadataHandle client_initial_metadata,
2912 const Completion& completion, Party::BulkSpawner& spawner);
2913 // Start receiving initial metadata
2914 void StartRecvInitialMetadata(grpc_metadata_array* array,
2915 const Completion& completion,
2916 Party::BulkSpawner& spawner);
2917 void StartRecvStatusOnClient(
2918 const Completion& completion,
2919 grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args,
2920 Party::BulkSpawner& spawner);
2921 // Publish status out to the application.
2922 void PublishStatus(
2923 grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args,
2924 ServerMetadataHandle trailing_metadata);
2925 // Publish server initial metadata out to the application.
2926 void PublishInitialMetadata(ServerMetadata* metadata);
2927
2928 ClientMetadataHandle send_initial_metadata_;
2929 Pipe<ServerMetadataHandle> server_initial_metadata_{arena()};
2930 Latch<ServerMetadataHandle> server_trailing_metadata_;
2931 Latch<ServerMetadataHandle> cancel_error_;
2932 Latch<grpc_polling_entity> polling_entity_;
2933 Pipe<MessageHandle> client_to_server_messages_{arena()};
2934 Pipe<MessageHandle> server_to_client_messages_{arena()};
2935 bool is_trailers_only_ = false;
2936 bool scheduled_receive_status_ = false;
2937 bool scheduled_send_close_ = false;
2938 // True once the promise for the call is started.
2939 // This corresponds to sending initial metadata, or cancelling before doing
2940 // so.
2941 // In the latter case real world code sometimes does not sent the initial
2942 // metadata, and so gating based upon that does not work out.
2943 std::atomic<bool> started_{false};
2944 // True after the first CancelWithError call - prevents spamming cancels from
2945 // overflowing the party.
2946 std::atomic<bool> cancel_with_error_called_{false};
2947 // TODO(ctiller): delete when we remove the filter based API (may require some
2948 // cleanup in wrapped languages: they depend on this to hold slice refs)
2949 ServerMetadataHandle recv_initial_metadata_;
2950 ServerMetadataHandle recv_trailing_metadata_;
2951 };
2952
StartPromise(ClientMetadataHandle client_initial_metadata,const Completion & completion,Party::BulkSpawner & spawner)2953 void ClientPromiseBasedCall::StartPromise(
2954 ClientMetadataHandle client_initial_metadata, const Completion& completion,
2955 Party::BulkSpawner& spawner) {
2956 auto token = ClientInitialMetadataOutstandingToken::New(arena());
2957 spawner.Spawn(
2958 "call_send_initial_metadata", token.Wait(),
2959 [this,
2960 completion = AddOpToCompletion(
2961 completion, PendingOp::kSendInitialMetadata)](bool result) mutable {
2962 if (!result) FailCompletion(completion);
2963 FinishOpOnCompletion(&completion, PendingOp::kSendInitialMetadata);
2964 });
2965 spawner.Spawn(
2966 "client_promise",
2967 [this, client_initial_metadata = std::move(client_initial_metadata),
2968 token = std::move(token)]() mutable {
2969 return Race(
2970 cancel_error_.Wait(),
2971 Map(channel()->channel_stack()->MakeClientCallPromise(CallArgs{
2972 std::move(client_initial_metadata), std::move(token),
2973 &polling_entity_, &server_initial_metadata_.sender,
2974 &client_to_server_messages_.receiver,
2975 &server_to_client_messages_.sender}),
2976 [this](ServerMetadataHandle trailing_metadata) {
2977 // If we're cancelled the transport doesn't get to return
2978 // stats.
2979 AcceptTransportStatsFromContext();
2980 return trailing_metadata;
2981 }));
2982 },
2983 [this](ServerMetadataHandle trailing_metadata) {
2984 Finish(std::move(trailing_metadata));
2985 });
2986 }
2987
ValidateBatch(const grpc_op * ops,size_t nops) const2988 grpc_call_error ClientPromiseBasedCall::ValidateBatch(const grpc_op* ops,
2989 size_t nops) const {
2990 BitSet<8> got_ops;
2991 for (size_t op_idx = 0; op_idx < nops; op_idx++) {
2992 const grpc_op& op = ops[op_idx];
2993 switch (op.op) {
2994 case GRPC_OP_SEND_INITIAL_METADATA:
2995 if (!AreInitialMetadataFlagsValid(op.flags)) {
2996 return GRPC_CALL_ERROR_INVALID_FLAGS;
2997 }
2998 if (!ValidateMetadata(op.data.send_initial_metadata.count,
2999 op.data.send_initial_metadata.metadata)) {
3000 return GRPC_CALL_ERROR_INVALID_METADATA;
3001 }
3002 break;
3003 case GRPC_OP_SEND_MESSAGE:
3004 if (!AreWriteFlagsValid(op.flags)) {
3005 return GRPC_CALL_ERROR_INVALID_FLAGS;
3006 }
3007 break;
3008 case GRPC_OP_RECV_INITIAL_METADATA:
3009 case GRPC_OP_RECV_MESSAGE:
3010 if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
3011 break;
3012 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
3013 if (scheduled_send_close_) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
3014 if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
3015 break;
3016 case GRPC_OP_RECV_STATUS_ON_CLIENT:
3017 if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
3018 if (scheduled_receive_status_) {
3019 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
3020 }
3021 break;
3022 case GRPC_OP_RECV_CLOSE_ON_SERVER:
3023 case GRPC_OP_SEND_STATUS_FROM_SERVER:
3024 return GRPC_CALL_ERROR_NOT_ON_CLIENT;
3025 }
3026 if (got_ops.is_set(op.op)) {
3027 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
3028 }
3029 got_ops.set(op.op);
3030 }
3031 return GRPC_CALL_OK;
3032 }
3033
CommitBatch(const grpc_op * ops,size_t nops,const Completion & completion)3034 void ClientPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
3035 const Completion& completion) {
3036 Party::BulkSpawner spawner(this);
3037 for (size_t op_idx = 0; op_idx < nops; op_idx++) {
3038 const grpc_op& op = ops[op_idx];
3039 switch (op.op) {
3040 case GRPC_OP_SEND_INITIAL_METADATA: {
3041 if (started_.exchange(true, std::memory_order_relaxed)) break;
3042 CToMetadata(op.data.send_initial_metadata.metadata,
3043 op.data.send_initial_metadata.count,
3044 send_initial_metadata_.get());
3045 PrepareOutgoingInitialMetadata(op, *send_initial_metadata_);
3046 if (send_deadline() != Timestamp::InfFuture()) {
3047 send_initial_metadata_->Set(GrpcTimeoutMetadata(), send_deadline());
3048 }
3049 send_initial_metadata_->Set(
3050 WaitForReady(),
3051 WaitForReady::ValueType{
3052 (op.flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) != 0,
3053 (op.flags &
3054 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) != 0});
3055 StartPromise(std::move(send_initial_metadata_), completion, spawner);
3056 } break;
3057 case GRPC_OP_RECV_INITIAL_METADATA: {
3058 StartRecvInitialMetadata(
3059 op.data.recv_initial_metadata.recv_initial_metadata, completion,
3060 spawner);
3061 } break;
3062 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
3063 scheduled_receive_status_ = true;
3064 StartRecvStatusOnClient(completion, op.data.recv_status_on_client,
3065 spawner);
3066 } break;
3067 case GRPC_OP_SEND_MESSAGE:
3068 StartSendMessage(op, completion, &client_to_server_messages_.sender,
3069 spawner);
3070 break;
3071 case GRPC_OP_RECV_MESSAGE:
3072 StartRecvMessage(
3073 op, completion,
3074 [this]() {
3075 return Race(server_initial_metadata_.receiver.AwaitClosed(),
3076 server_to_client_messages_.receiver.AwaitClosed());
3077 },
3078 &server_to_client_messages_.receiver, false, spawner);
3079 break;
3080 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
3081 scheduled_send_close_ = true;
3082 spawner.Spawn(
3083 "send_close_from_client",
3084 [this]() {
3085 client_to_server_messages_.sender.Close();
3086 return Empty{};
3087 },
3088 [this,
3089 completion = AddOpToCompletion(
3090 completion, PendingOp::kSendCloseFromClient)](Empty) mutable {
3091 FinishOpOnCompletion(&completion,
3092 PendingOp::kSendCloseFromClient);
3093 });
3094 break;
3095 case GRPC_OP_SEND_STATUS_FROM_SERVER:
3096 case GRPC_OP_RECV_CLOSE_ON_SERVER:
3097 abort(); // unreachable
3098 }
3099 }
3100 }
3101
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)3102 grpc_call_error ClientPromiseBasedCall::StartBatch(const grpc_op* ops,
3103 size_t nops,
3104 void* notify_tag,
3105 bool is_notify_tag_closure) {
3106 if (nops == 0) {
3107 EndOpImmediately(cq(), notify_tag, is_notify_tag_closure);
3108 return GRPC_CALL_OK;
3109 }
3110 const grpc_call_error validation_result = ValidateBatch(ops, nops);
3111 if (validation_result != GRPC_CALL_OK) {
3112 return validation_result;
3113 }
3114 Completion completion =
3115 StartCompletion(notify_tag, is_notify_tag_closure, ops);
3116 CommitBatch(ops, nops, completion);
3117 FinishOpOnCompletion(&completion, PendingOp::kStartingBatch);
3118 return GRPC_CALL_OK;
3119 }
3120
StartRecvInitialMetadata(grpc_metadata_array * array,const Completion & completion,Party::BulkSpawner & spawner)3121 void ClientPromiseBasedCall::StartRecvInitialMetadata(
3122 grpc_metadata_array* array, const Completion& completion,
3123 Party::BulkSpawner& spawner) {
3124 spawner.Spawn(
3125 "recv_initial_metadata",
3126 [this]() {
3127 return Race(server_initial_metadata_.receiver.Next(),
3128 Map(finished(), [](Empty) {
3129 return NextResult<ServerMetadataHandle>(true);
3130 }));
3131 },
3132 [this, array,
3133 completion =
3134 AddOpToCompletion(completion, PendingOp::kReceiveInitialMetadata)](
3135 NextResult<ServerMetadataHandle> next_metadata) mutable {
3136 server_initial_metadata_.sender.Close();
3137 ServerMetadataHandle metadata;
3138 if (grpc_call_trace.enabled()) {
3139 gpr_log(GPR_INFO, "%s[call] RecvTrailingMetadata: %s",
3140 DebugTag().c_str(),
3141 next_metadata.has_value()
3142 ? next_metadata.value()->DebugString().c_str()
3143 : "null");
3144 }
3145 if (next_metadata.has_value()) {
3146 metadata = std::move(next_metadata.value());
3147 is_trailers_only_ = metadata->get(GrpcTrailersOnly()).value_or(false);
3148 } else {
3149 is_trailers_only_ = true;
3150 metadata = arena()->MakePooled<ServerMetadata>();
3151 }
3152 ProcessIncomingInitialMetadata(*metadata);
3153 PublishMetadataArray(metadata.get(), array, true);
3154 recv_initial_metadata_ = std::move(metadata);
3155 FinishOpOnCompletion(&completion, PendingOp::kReceiveInitialMetadata);
3156 });
3157 }
3158
Finish(ServerMetadataHandle trailing_metadata)3159 void ClientPromiseBasedCall::Finish(ServerMetadataHandle trailing_metadata) {
3160 if (grpc_call_trace.enabled()) {
3161 gpr_log(GPR_INFO, "%s[call] Finish: %s", DebugTag().c_str(),
3162 trailing_metadata->DebugString().c_str());
3163 }
3164 ResetDeadline();
3165 set_completed();
3166 client_to_server_messages_.sender.CloseWithError();
3167 client_to_server_messages_.receiver.CloseWithError();
3168 if (trailing_metadata->get(GrpcCallWasCancelled()).value_or(false)) {
3169 server_to_client_messages_.receiver.CloseWithError();
3170 server_initial_metadata_.receiver.CloseWithError();
3171 }
3172 if (auto* channelz_channel = channel()->channelz_node()) {
3173 if (trailing_metadata->get(GrpcStatusMetadata())
3174 .value_or(GRPC_STATUS_UNKNOWN) == GRPC_STATUS_OK) {
3175 channelz_channel->RecordCallSucceeded();
3176 } else {
3177 channelz_channel->RecordCallFailed();
3178 }
3179 }
3180 server_trailing_metadata_.Set(std::move(trailing_metadata));
3181 }
3182
3183 namespace {
MakeErrorString(const ServerMetadata * trailing_metadata)3184 std::string MakeErrorString(const ServerMetadata* trailing_metadata) {
3185 std::string out = absl::StrCat(
3186 trailing_metadata->get(GrpcStatusFromWire()).value_or(false)
3187 ? "Error received from peer"
3188 : "Error generated by client",
3189 "grpc_status: ",
3190 grpc_status_code_to_string(trailing_metadata->get(GrpcStatusMetadata())
3191 .value_or(GRPC_STATUS_UNKNOWN)));
3192 if (const Slice* message =
3193 trailing_metadata->get_pointer(GrpcMessageMetadata())) {
3194 absl::StrAppend(&out, "\ngrpc_message: ", message->as_string_view());
3195 }
3196 if (auto annotations = trailing_metadata->get_pointer(GrpcStatusContext())) {
3197 absl::StrAppend(&out, "\nStatus Context:");
3198 for (const std::string& annotation : *annotations) {
3199 absl::StrAppend(&out, "\n ", annotation);
3200 }
3201 }
3202 return out;
3203 }
3204 } // namespace
3205
StartRecvStatusOnClient(const Completion & completion,grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args,Party::BulkSpawner & spawner)3206 void ClientPromiseBasedCall::StartRecvStatusOnClient(
3207 const Completion& completion,
3208 grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args,
3209 Party::BulkSpawner& spawner) {
3210 ForceCompletionSuccess(completion);
3211 spawner.Spawn(
3212 "recv_status_on_client", server_trailing_metadata_.Wait(),
3213 [this, op_args,
3214 completion =
3215 AddOpToCompletion(completion, PendingOp::kReceiveStatusOnClient)](
3216 ServerMetadataHandle trailing_metadata) mutable {
3217 const grpc_status_code status =
3218 trailing_metadata->get(GrpcStatusMetadata())
3219 .value_or(GRPC_STATUS_UNKNOWN);
3220 *op_args.status = status;
3221 Slice message_slice;
3222 if (Slice* message =
3223 trailing_metadata->get_pointer(GrpcMessageMetadata())) {
3224 message_slice = message->Ref();
3225 }
3226 SetFinalizationStatus(status, message_slice.Ref());
3227 *op_args.status_details = message_slice.TakeCSlice();
3228 if (op_args.error_string != nullptr && status != GRPC_STATUS_OK) {
3229 *op_args.error_string =
3230 gpr_strdup(MakeErrorString(trailing_metadata.get()).c_str());
3231 }
3232 PublishMetadataArray(trailing_metadata.get(), op_args.trailing_metadata,
3233 true);
3234 recv_trailing_metadata_ = std::move(trailing_metadata);
3235 FinishOpOnCompletion(&completion, PendingOp::kReceiveStatusOnClient);
3236 });
3237 }
3238 #endif
3239
3240 ///////////////////////////////////////////////////////////////////////////////
3241 // ServerPromiseBasedCall
3242
3243 #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
3244
3245 class ServerPromiseBasedCall final : public PromiseBasedCall,
3246 public ServerCallContext {
3247 public:
3248 ServerPromiseBasedCall(Arena* arena, grpc_call_create_args* args);
3249
OrphanCall()3250 void OrphanCall() override {}
3251 void CancelWithError(grpc_error_handle) override;
3252 grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
3253 bool is_notify_tag_closure) override;
is_trailers_only() const3254 bool is_trailers_only() const override {
3255 Crash("is_trailers_only not implemented for server calls");
3256 }
GetServerAuthority() const3257 absl::string_view GetServerAuthority() const override {
3258 const Slice* authority_metadata =
3259 client_initial_metadata_->get_pointer(HttpAuthorityMetadata());
3260 if (authority_metadata == nullptr) return "";
3261 return authority_metadata->as_string_view();
3262 }
3263
3264 // Polling order for the server promise stack:
3265 //
3266 // │ ┌───────────────────────────────────────┐
3267 // │ │ ServerPromiseBasedCall ├──► Lifetime management
3268 // │ ├───────────────────────────────────────┤
3269 // │ │ ConnectedChannel ├─┐
3270 // │ ├───────────────────────────────────────┤ └► Interactions with the
3271 // │ │ ... closest to transport filter │ transport - send/recv msgs
3272 // │ ├───────────────────────────────────────┤ and metadata, call phase
3273 // │ │ ... │ ordering
3274 // │ ├───────────────────────────────────────┤
3275 // │ │ ... closest to app filter │ ┌► Request matching, initial
3276 // │ ├───────────────────────────────────────┤ │ setup, publishing call to
3277 // │ │ Server::ChannelData::MakeCallPromise ├─┘ application
3278 // │ ├───────────────────────────────────────┤
3279 // │ │ MakeTopOfServerCallPromise ├──► Send trailing metadata
3280 // ▼ └───────────────────────────────────────┘
3281 // Polling &
3282 // instantiation
3283 // order
3284
DebugTag() const3285 std::string DebugTag() const override {
3286 return absl::StrFormat("SERVER_CALL[%p]: ", this);
3287 }
3288
server_call_context()3289 ServerCallContext* server_call_context() override { return this; }
3290
server_stream_data()3291 const void* server_stream_data() override { return server_transport_data_; }
3292 void PublishInitialMetadata(
3293 ClientMetadataHandle metadata,
3294 grpc_metadata_array* publish_initial_metadata) override;
3295 ArenaPromise<ServerMetadataHandle> MakeTopOfServerCallPromise(
3296 CallArgs call_args, grpc_completion_queue* cq,
3297 absl::FunctionRef<void(grpc_call* call)> publish) override;
3298
3299 private:
3300 class RecvCloseOpCancelState {
3301 public:
3302 // Request that receiver be filled in per
3303 // grpc_op_recv_close_on_server. Returns true if the request can
3304 // be fulfilled immediately. Returns false if the request will be
3305 // fulfilled later.
ReceiveCloseOnServerOpStarted(int * receiver)3306 bool ReceiveCloseOnServerOpStarted(int* receiver) {
3307 uintptr_t state = state_.load(std::memory_order_acquire);
3308 uintptr_t new_state;
3309 do {
3310 switch (state) {
3311 case kUnset:
3312 new_state = reinterpret_cast<uintptr_t>(receiver);
3313 break;
3314 case kFinishedWithFailure:
3315 *receiver = 1;
3316 return true;
3317 case kFinishedWithSuccess:
3318 *receiver = 0;
3319 return true;
3320 default:
3321 Crash("Two threads offered ReceiveCloseOnServerOpStarted");
3322 }
3323 } while (!state_.compare_exchange_weak(state, new_state,
3324 std::memory_order_acq_rel,
3325 std::memory_order_acquire));
3326 return false;
3327 }
3328
3329 // Mark the call as having completed.
3330 // Returns true if this finishes a previous
3331 // RequestReceiveCloseOnServer.
CompleteCallWithCancelledSetTo(bool cancelled)3332 bool CompleteCallWithCancelledSetTo(bool cancelled) {
3333 uintptr_t state = state_.load(std::memory_order_acquire);
3334 uintptr_t new_state;
3335 bool r;
3336 do {
3337 switch (state) {
3338 case kUnset:
3339 new_state = cancelled ? kFinishedWithFailure : kFinishedWithSuccess;
3340 r = false;
3341 break;
3342 case kFinishedWithFailure:
3343 return false;
3344 case kFinishedWithSuccess:
3345 Crash("unreachable");
3346 default:
3347 new_state = cancelled ? kFinishedWithFailure : kFinishedWithSuccess;
3348 r = true;
3349 }
3350 } while (!state_.compare_exchange_weak(state, new_state,
3351 std::memory_order_acq_rel,
3352 std::memory_order_acquire));
3353 if (r) *reinterpret_cast<int*>(state) = cancelled ? 1 : 0;
3354 return r;
3355 }
3356
ToString() const3357 std::string ToString() const {
3358 auto state = state_.load(std::memory_order_relaxed);
3359 switch (state) {
3360 case kUnset:
3361 return "Unset";
3362 case kFinishedWithFailure:
3363 return "FinishedWithFailure";
3364 case kFinishedWithSuccess:
3365 return "FinishedWithSuccess";
3366 default:
3367 return absl::StrFormat("WaitingForReceiver(%p)",
3368 reinterpret_cast<void*>(state));
3369 }
3370 }
3371
3372 private:
3373 static constexpr uintptr_t kUnset = 0;
3374 static constexpr uintptr_t kFinishedWithFailure = 1;
3375 static constexpr uintptr_t kFinishedWithSuccess = 2;
3376 // Holds one of kUnset, kFinishedWithFailure, or
3377 // kFinishedWithSuccess OR an int* that wants to receive the
3378 // final status.
3379 std::atomic<uintptr_t> state_{kUnset};
3380 };
3381
3382 void CommitBatch(const grpc_op* ops, size_t nops,
3383 const Completion& completion);
3384 void Finish(ServerMetadataHandle result);
3385
3386 ServerInterface* const server_;
3387 const void* const server_transport_data_;
3388 PipeSender<ServerMetadataHandle>* server_initial_metadata_ = nullptr;
3389 PipeSender<MessageHandle>* server_to_client_messages_ = nullptr;
3390 PipeReceiver<MessageHandle>* client_to_server_messages_ = nullptr;
3391 Latch<ServerMetadataHandle> send_trailing_metadata_;
3392 RecvCloseOpCancelState recv_close_op_cancel_state_;
3393 ClientMetadataHandle client_initial_metadata_;
3394 Completion recv_close_completion_;
3395 std::atomic<bool> cancelled_{false};
3396 };
3397
ServerPromiseBasedCall(Arena * arena,grpc_call_create_args * args)3398 ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena,
3399 grpc_call_create_args* args)
3400 : PromiseBasedCall(arena, 0, *args),
3401 server_(args->server),
3402 server_transport_data_(args->server_transport_data) {
3403 global_stats().IncrementServerCallsCreated();
3404 channelz::ServerNode* channelz_node = server_->channelz_node();
3405 if (channelz_node != nullptr) {
3406 channelz_node->RecordCallStarted();
3407 }
3408 ScopedContext activity_context(this);
3409 // TODO(yashykt): In the future, we want to also enable stats and trace
3410 // collecting from when the call is created at the transport. The idea is that
3411 // the transport would create the call tracer and pass it in as part of the
3412 // metadata.
3413 // TODO(yijiem): OpenCensus and internal Census is still using this way to
3414 // set server call tracer. We need to refactor them to stats plugins
3415 // (including removing the client channel filters).
3416 if (args->server != nullptr &&
3417 args->server->server_call_tracer_factory() != nullptr) {
3418 auto* server_call_tracer =
3419 args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
3420 arena, args->server->channel_args());
3421 if (server_call_tracer != nullptr) {
3422 // Note that we are setting both
3423 // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
3424 // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future
3425 // promise-based world, we would just a single tracer object for each
3426 // stack (call, subchannel_call, server_call.)
3427 ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE,
3428 server_call_tracer, nullptr);
3429 ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr);
3430 }
3431 }
3432 args->channel->channel_stack()->stats_plugin_group->AddServerCallTracers(
3433 context());
3434 Spawn("server_promise",
3435 channel()->channel_stack()->MakeServerCallPromise(
3436 CallArgs{nullptr, ClientInitialMetadataOutstandingToken::Empty(),
3437 nullptr, nullptr, nullptr, nullptr}),
3438 [this](ServerMetadataHandle result) { Finish(std::move(result)); });
3439 }
3440
Finish(ServerMetadataHandle result)3441 void ServerPromiseBasedCall::Finish(ServerMetadataHandle result) {
3442 if (grpc_call_trace.enabled()) {
3443 gpr_log(GPR_INFO, "%s[call] Finish: recv_close_state:%s result:%s",
3444 DebugTag().c_str(), recv_close_op_cancel_state_.ToString().c_str(),
3445 result->DebugString().c_str());
3446 }
3447 const auto status =
3448 result->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
3449 channelz::ServerNode* channelz_node = server_->channelz_node();
3450 if (channelz_node != nullptr) {
3451 if (status == GRPC_STATUS_OK) {
3452 channelz_node->RecordCallSucceeded();
3453 } else {
3454 channelz_node->RecordCallFailed();
3455 }
3456 }
3457 bool was_cancelled = result->get(GrpcCallWasCancelled()).value_or(true);
3458 if (recv_close_op_cancel_state_.CompleteCallWithCancelledSetTo(
3459 was_cancelled)) {
3460 FinishOpOnCompletion(&recv_close_completion_,
3461 PendingOp::kReceiveCloseOnServer);
3462 }
3463 if (was_cancelled) set_failed_before_recv_message();
3464 if (server_initial_metadata_ != nullptr) {
3465 server_initial_metadata_->Close();
3466 }
3467 Slice message_slice;
3468 if (Slice* message = result->get_pointer(GrpcMessageMetadata())) {
3469 message_slice = message->Ref();
3470 }
3471 AcceptTransportStatsFromContext();
3472 SetFinalizationStatus(status, std::move(message_slice));
3473 set_completed();
3474 ResetDeadline();
3475 PropagateCancellationToChildren();
3476 }
3477
ValidateServerBatch(const grpc_op * ops,size_t nops)3478 grpc_call_error ValidateServerBatch(const grpc_op* ops, size_t nops) {
3479 BitSet<8> got_ops;
3480 for (size_t op_idx = 0; op_idx < nops; op_idx++) {
3481 const grpc_op& op = ops[op_idx];
3482 switch (op.op) {
3483 case GRPC_OP_SEND_INITIAL_METADATA:
3484 if (!AreInitialMetadataFlagsValid(op.flags)) {
3485 return GRPC_CALL_ERROR_INVALID_FLAGS;
3486 }
3487 if (!ValidateMetadata(op.data.send_initial_metadata.count,
3488 op.data.send_initial_metadata.metadata)) {
3489 return GRPC_CALL_ERROR_INVALID_METADATA;
3490 }
3491 break;
3492 case GRPC_OP_SEND_MESSAGE:
3493 if (!AreWriteFlagsValid(op.flags)) {
3494 return GRPC_CALL_ERROR_INVALID_FLAGS;
3495 }
3496 break;
3497 case GRPC_OP_SEND_STATUS_FROM_SERVER:
3498 if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
3499 if (!ValidateMetadata(
3500 op.data.send_status_from_server.trailing_metadata_count,
3501 op.data.send_status_from_server.trailing_metadata)) {
3502 return GRPC_CALL_ERROR_INVALID_METADATA;
3503 }
3504 break;
3505 case GRPC_OP_RECV_MESSAGE:
3506 case GRPC_OP_RECV_CLOSE_ON_SERVER:
3507 if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
3508 break;
3509 case GRPC_OP_RECV_INITIAL_METADATA:
3510 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
3511 case GRPC_OP_RECV_STATUS_ON_CLIENT:
3512 return GRPC_CALL_ERROR_NOT_ON_SERVER;
3513 }
3514 if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
3515 got_ops.set(op.op);
3516 }
3517 return GRPC_CALL_OK;
3518 }
3519
CommitBatch(const grpc_op * ops,size_t nops,const Completion & completion)3520 void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
3521 const Completion& completion) {
3522 Party::BulkSpawner spawner(this);
3523 for (size_t op_idx = 0; op_idx < nops; op_idx++) {
3524 const grpc_op& op = ops[op_idx];
3525 switch (op.op) {
3526 case GRPC_OP_SEND_INITIAL_METADATA: {
3527 auto metadata = arena()->MakePooled<ServerMetadata>();
3528 PrepareOutgoingInitialMetadata(op, *metadata);
3529 CToMetadata(op.data.send_initial_metadata.metadata,
3530 op.data.send_initial_metadata.count, metadata.get());
3531 if (grpc_call_trace.enabled()) {
3532 gpr_log(GPR_INFO, "%s[call] Send initial metadata",
3533 DebugTag().c_str());
3534 }
3535 QueueSend();
3536 spawner.Spawn(
3537 "call_send_initial_metadata",
3538 [this, metadata = std::move(metadata)]() mutable {
3539 EnactSend();
3540 return server_initial_metadata_->Push(std::move(metadata));
3541 },
3542 [this,
3543 completion = AddOpToCompletion(
3544 completion, PendingOp::kSendInitialMetadata)](bool r) mutable {
3545 if (!r) {
3546 set_failed_before_recv_message();
3547 FailCompletion(completion);
3548 }
3549 FinishOpOnCompletion(&completion,
3550 PendingOp::kSendInitialMetadata);
3551 });
3552 } break;
3553 case GRPC_OP_SEND_MESSAGE:
3554 StartSendMessage(op, completion, server_to_client_messages_, spawner);
3555 break;
3556 case GRPC_OP_RECV_MESSAGE:
3557 if (cancelled_.load(std::memory_order_relaxed)) {
3558 set_failed_before_recv_message();
3559 FailCompletion(completion);
3560 break;
3561 }
3562 StartRecvMessage(
3563 op, completion, []() { return []() { return Empty{}; }; },
3564 client_to_server_messages_, true, spawner);
3565 break;
3566 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
3567 auto metadata = arena()->MakePooled<ServerMetadata>();
3568 CToMetadata(op.data.send_status_from_server.trailing_metadata,
3569 op.data.send_status_from_server.trailing_metadata_count,
3570 metadata.get());
3571 metadata->Set(GrpcStatusMetadata(),
3572 op.data.send_status_from_server.status);
3573 if (auto* details = op.data.send_status_from_server.status_details) {
3574 // TODO(ctiller): this should not be a copy, but we have callers that
3575 // allocate and pass in a slice created with
3576 // grpc_slice_from_static_string and then delete the string after
3577 // passing it in, which shouldn't be a supported API.
3578 metadata->Set(GrpcMessageMetadata(),
3579 Slice(grpc_slice_copy(*details)));
3580 }
3581 spawner.Spawn(
3582 "call_send_status_from_server",
3583 [this, metadata = std::move(metadata)]() mutable {
3584 bool r = true;
3585 if (send_trailing_metadata_.is_set()) {
3586 r = false;
3587 } else {
3588 send_trailing_metadata_.Set(std::move(metadata));
3589 }
3590 return Map(WaitForSendingStarted(), [this, r](Empty) {
3591 server_initial_metadata_->Close();
3592 server_to_client_messages_->Close();
3593 return r;
3594 });
3595 },
3596 [this, completion = AddOpToCompletion(
3597 completion, PendingOp::kSendStatusFromServer)](
3598 bool ok) mutable {
3599 if (!ok) {
3600 set_failed_before_recv_message();
3601 FailCompletion(completion);
3602 }
3603 FinishOpOnCompletion(&completion,
3604 PendingOp::kSendStatusFromServer);
3605 });
3606 } break;
3607 case GRPC_OP_RECV_CLOSE_ON_SERVER:
3608 if (grpc_call_trace.enabled()) {
3609 gpr_log(GPR_INFO, "%s[call] StartBatch: RecvClose %s",
3610 DebugTag().c_str(),
3611 recv_close_op_cancel_state_.ToString().c_str());
3612 }
3613 ForceCompletionSuccess(completion);
3614 recv_close_completion_ =
3615 AddOpToCompletion(completion, PendingOp::kReceiveCloseOnServer);
3616 if (recv_close_op_cancel_state_.ReceiveCloseOnServerOpStarted(
3617 op.data.recv_close_on_server.cancelled)) {
3618 FinishOpOnCompletion(&recv_close_completion_,
3619 PendingOp::kReceiveCloseOnServer);
3620 }
3621 break;
3622 case GRPC_OP_RECV_STATUS_ON_CLIENT:
3623 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
3624 case GRPC_OP_RECV_INITIAL_METADATA:
3625 abort(); // unreachable
3626 }
3627 }
3628 }
3629
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)3630 grpc_call_error ServerPromiseBasedCall::StartBatch(const grpc_op* ops,
3631 size_t nops,
3632 void* notify_tag,
3633 bool is_notify_tag_closure) {
3634 if (nops == 0) {
3635 EndOpImmediately(cq(), notify_tag, is_notify_tag_closure);
3636 return GRPC_CALL_OK;
3637 }
3638 const grpc_call_error validation_result = ValidateServerBatch(ops, nops);
3639 if (validation_result != GRPC_CALL_OK) {
3640 return validation_result;
3641 }
3642 Completion completion =
3643 StartCompletion(notify_tag, is_notify_tag_closure, ops);
3644 CommitBatch(ops, nops, completion);
3645 FinishOpOnCompletion(&completion, PendingOp::kStartingBatch);
3646 return GRPC_CALL_OK;
3647 }
3648
CancelWithError(absl::Status error)3649 void ServerPromiseBasedCall::CancelWithError(absl::Status error) {
3650 cancelled_.store(true, std::memory_order_relaxed);
3651 Spawn(
3652 "cancel_with_error",
3653 [this, error = std::move(error)]() {
3654 if (!send_trailing_metadata_.is_set()) {
3655 auto md = ServerMetadataFromStatus(error);
3656 md->Set(GrpcCallWasCancelled(), true);
3657 send_trailing_metadata_.Set(std::move(md));
3658 }
3659 if (server_to_client_messages_ != nullptr) {
3660 server_to_client_messages_->Close();
3661 }
3662 if (server_initial_metadata_ != nullptr) {
3663 server_initial_metadata_->Close();
3664 }
3665 return Empty{};
3666 },
3667 [](Empty) {});
3668 }
3669 #endif
3670
3671 #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
PublishInitialMetadata(ClientMetadataHandle metadata,grpc_metadata_array * publish_initial_metadata)3672 void ServerPromiseBasedCall::PublishInitialMetadata(
3673 ClientMetadataHandle metadata,
3674 grpc_metadata_array* publish_initial_metadata) {
3675 if (grpc_call_trace.enabled()) {
3676 gpr_log(GPR_INFO, "%s[call] PublishInitialMetadata: %s", DebugTag().c_str(),
3677 metadata->DebugString().c_str());
3678 }
3679 PublishMetadataArray(metadata.get(), publish_initial_metadata, false);
3680 client_initial_metadata_ = std::move(metadata);
3681 }
3682
3683 ArenaPromise<ServerMetadataHandle>
MakeTopOfServerCallPromise(CallArgs call_args,grpc_completion_queue * cq,absl::FunctionRef<void (grpc_call * call)> publish)3684 ServerPromiseBasedCall::MakeTopOfServerCallPromise(
3685 CallArgs call_args, grpc_completion_queue* cq,
3686 absl::FunctionRef<void(grpc_call* call)> publish) {
3687 SetCompletionQueue(cq);
3688 call_args.polling_entity->Set(
3689 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)));
3690 server_to_client_messages_ = call_args.server_to_client_messages;
3691 client_to_server_messages_ = call_args.client_to_server_messages;
3692 server_initial_metadata_ = call_args.server_initial_metadata;
3693 set_send_deadline(deadline());
3694 ProcessIncomingInitialMetadata(*client_initial_metadata_);
3695 ExternalRef();
3696 publish(c_ptr());
3697 return Seq(server_to_client_messages_->AwaitClosed(),
3698 send_trailing_metadata_.Wait());
3699 }
3700
3701 ///////////////////////////////////////////////////////////////////////////////
3702 // CallSpine based Server Call
3703
3704 class ServerCallSpine final : public CallSpineInterface,
3705 public ServerCallContext,
3706 public BasicPromiseBasedCall {
3707 public:
3708 ServerCallSpine(ServerInterface* server, Channel* channel, Arena* arena);
3709
3710 // CallSpineInterface
client_initial_metadata()3711 Pipe<ClientMetadataHandle>& client_initial_metadata() override {
3712 return client_initial_metadata_;
3713 }
server_initial_metadata()3714 Pipe<ServerMetadataHandle>& server_initial_metadata() override {
3715 return server_initial_metadata_;
3716 }
client_to_server_messages()3717 Pipe<MessageHandle>& client_to_server_messages() override {
3718 return client_to_server_messages_;
3719 }
server_to_client_messages()3720 Pipe<MessageHandle>& server_to_client_messages() override {
3721 return server_to_client_messages_;
3722 }
server_trailing_metadata()3723 Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
3724 return server_trailing_metadata_;
3725 }
cancel_latch()3726 Latch<ServerMetadataHandle>& cancel_latch() override { return cancel_latch_; }
party()3727 Party& party() override { return *this; }
arena()3728 Arena* arena() override { return BasicPromiseBasedCall::arena(); }
IncrementRefCount()3729 void IncrementRefCount() override { InternalRef("CallSpine"); }
Unref()3730 void Unref() override { InternalUnref("CallSpine"); }
3731
3732 // PromiseBasedCall
OrphanCall()3733 void OrphanCall() override {
3734 ResetDeadline();
3735 CancelWithError(absl::CancelledError());
3736 }
CancelWithError(grpc_error_handle error)3737 void CancelWithError(grpc_error_handle error) override {
3738 SpawnInfallible("CancelWithError", [this, error = std::move(error)] {
3739 std::ignore = Cancel(ServerMetadataFromStatus(error));
3740 return Empty{};
3741 });
3742 }
is_trailers_only() const3743 bool is_trailers_only() const override {
3744 Crash("is_trailers_only not implemented for server calls");
3745 }
GetServerAuthority() const3746 absl::string_view GetServerAuthority() const override {
3747 Crash("unimplemented");
3748 }
3749 grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
3750 bool is_notify_tag_closure) override;
3751
Completed()3752 bool Completed() final { Crash("unimplemented"); }
failed_before_recv_message() const3753 bool failed_before_recv_message() const final { Crash("unimplemented"); }
3754
server_call_context()3755 ServerCallContext* server_call_context() override { return this; }
server_stream_data()3756 const void* server_stream_data() override { Crash("unimplemented"); }
3757 void PublishInitialMetadata(
3758 ClientMetadataHandle metadata,
3759 grpc_metadata_array* publish_initial_metadata) override;
MakeTopOfServerCallPromise(CallArgs,grpc_completion_queue *,absl::FunctionRef<void (grpc_call * call)>)3760 ArenaPromise<ServerMetadataHandle> MakeTopOfServerCallPromise(
3761 CallArgs, grpc_completion_queue*,
3762 absl::FunctionRef<void(grpc_call* call)>) override {
3763 Crash("unimplemented");
3764 }
3765
RunParty()3766 bool RunParty() override {
3767 ScopedContext ctx(this);
3768 return Party::RunParty();
3769 }
3770
3771 private:
3772 void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
3773 bool is_notify_tag_closure);
3774 StatusFlag FinishRecvMessage(NextResult<MessageHandle> result);
3775
DebugTag() const3776 std::string DebugTag() const override {
3777 return absl::StrFormat("SERVER_CALL_SPINE[%p]: ", this);
3778 }
3779
3780 // Initial metadata from client to server
3781 Pipe<ClientMetadataHandle> client_initial_metadata_;
3782 // Initial metadata from server to client
3783 Pipe<ServerMetadataHandle> server_initial_metadata_;
3784 // Messages travelling from the application to the transport.
3785 Pipe<MessageHandle> client_to_server_messages_;
3786 // Messages travelling from the transport to the application.
3787 Pipe<MessageHandle> server_to_client_messages_;
3788 // Trailing metadata from server to client
3789 Pipe<ServerMetadataHandle> server_trailing_metadata_;
3790 // Latch that can be set to terminate the call
3791 Latch<ServerMetadataHandle> cancel_latch_;
3792 grpc_byte_buffer** recv_message_ = nullptr;
3793 ClientMetadataHandle client_initial_metadata_stored_;
3794 };
3795
ServerCallSpine(ServerInterface * server,Channel * channel,Arena * arena)3796 ServerCallSpine::ServerCallSpine(ServerInterface* server, Channel* channel,
3797 Arena* arena)
3798 : BasicPromiseBasedCall(arena, 0, 1,
3799 [channel, server]() -> grpc_call_create_args {
3800 grpc_call_create_args args;
3801 args.channel = channel->Ref();
3802 args.server = server;
3803 args.parent = nullptr;
3804 args.propagation_mask = 0;
3805 args.cq = nullptr;
3806 args.pollset_set_alternative = nullptr;
3807 args.server_transport_data =
3808 &args; // Arbitrary non-null pointer
3809 args.send_deadline = Timestamp::InfFuture();
3810 return args;
3811 }()),
3812 client_initial_metadata_(arena),
3813 server_initial_metadata_(arena),
3814 client_to_server_messages_(arena),
3815 server_to_client_messages_(arena),
3816 server_trailing_metadata_(arena) {
3817 global_stats().IncrementServerCallsCreated();
3818 ScopedContext ctx(this);
3819 channel->channel_stack()->InitServerCallSpine(this);
3820 }
3821
PublishInitialMetadata(ClientMetadataHandle metadata,grpc_metadata_array * publish_initial_metadata)3822 void ServerCallSpine::PublishInitialMetadata(
3823 ClientMetadataHandle metadata,
3824 grpc_metadata_array* publish_initial_metadata) {
3825 if (grpc_call_trace.enabled()) {
3826 gpr_log(GPR_INFO, "%s[call] PublishInitialMetadata: %s", DebugTag().c_str(),
3827 metadata->DebugString().c_str());
3828 }
3829 PublishMetadataArray(metadata.get(), publish_initial_metadata, false);
3830 client_initial_metadata_stored_ = std::move(metadata);
3831 }
3832
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)3833 grpc_call_error ServerCallSpine::StartBatch(const grpc_op* ops, size_t nops,
3834 void* notify_tag,
3835 bool is_notify_tag_closure) {
3836 if (nops == 0) {
3837 EndOpImmediately(cq(), notify_tag, is_notify_tag_closure);
3838 return GRPC_CALL_OK;
3839 }
3840 const grpc_call_error validation_result = ValidateServerBatch(ops, nops);
3841 if (validation_result != GRPC_CALL_OK) {
3842 return validation_result;
3843 }
3844 CommitBatch(ops, nops, notify_tag, is_notify_tag_closure);
3845 return GRPC_CALL_OK;
3846 }
3847
3848 namespace {
3849 template <typename SetupFn>
3850 class MaybeOpImpl {
3851 public:
3852 using SetupResult = decltype(std::declval<SetupFn>()(grpc_op()));
3853 using PromiseFactory = promise_detail::OncePromiseFactory<void, SetupResult>;
3854 using Promise = typename PromiseFactory::Promise;
3855 struct Dismissed {};
3856 using State = absl::variant<Dismissed, PromiseFactory, Promise>;
3857
3858 // op_ is garbage but shouldn't be uninitialized
MaybeOpImpl()3859 MaybeOpImpl() : state_(Dismissed{}), op_(GRPC_OP_RECV_STATUS_ON_CLIENT) {}
MaybeOpImpl(SetupResult result,grpc_op_type op)3860 MaybeOpImpl(SetupResult result, grpc_op_type op)
3861 : state_(PromiseFactory(std::move(result))), op_(op) {}
3862
3863 MaybeOpImpl(const MaybeOpImpl&) = delete;
3864 MaybeOpImpl& operator=(const MaybeOpImpl&) = delete;
MaybeOpImpl(MaybeOpImpl && other)3865 MaybeOpImpl(MaybeOpImpl&& other) noexcept
3866 : state_(MoveState(other.state_)), op_(other.op_) {}
operator =(MaybeOpImpl && other)3867 MaybeOpImpl& operator=(MaybeOpImpl&& other) noexcept {
3868 op_ = other.op_;
3869 if (absl::holds_alternative<Dismissed>(state_)) {
3870 state_.template emplace<Dismissed>();
3871 return *this;
3872 }
3873 // Can't move after first poll => Promise is not an option
3874 state_.template emplace<PromiseFactory>(
3875 std::move(absl::get<PromiseFactory>(other.state_)));
3876 return *this;
3877 }
3878
operator ()()3879 Poll<StatusFlag> operator()() {
3880 if (absl::holds_alternative<Dismissed>(state_)) return Success{};
3881 if (absl::holds_alternative<PromiseFactory>(state_)) {
3882 auto& factory = absl::get<PromiseFactory>(state_);
3883 auto promise = factory.Make();
3884 state_.template emplace<Promise>(std::move(promise));
3885 }
3886 if (grpc_call_trace.enabled()) {
3887 gpr_log(GPR_INFO, "%sBeginPoll %s",
3888 Activity::current()->DebugTag().c_str(), OpName(op_).c_str());
3889 }
3890 auto& promise = absl::get<Promise>(state_);
3891 auto r = poll_cast<StatusFlag>(promise());
3892 if (grpc_call_trace.enabled()) {
3893 gpr_log(GPR_INFO, "%sEndPoll %s --> %s",
3894 Activity::current()->DebugTag().c_str(), OpName(op_).c_str(),
3895 r.pending() ? "PENDING" : (r.value().ok() ? "OK" : "FAILURE"));
3896 }
3897 return r;
3898 }
3899
3900 private:
3901 GPR_NO_UNIQUE_ADDRESS State state_;
3902 GPR_NO_UNIQUE_ADDRESS grpc_op_type op_;
3903
OpName(grpc_op_type op)3904 static std::string OpName(grpc_op_type op) {
3905 switch (op) {
3906 case GRPC_OP_SEND_INITIAL_METADATA:
3907 return "SendInitialMetadata";
3908 case GRPC_OP_SEND_MESSAGE:
3909 return "SendMessage";
3910 case GRPC_OP_SEND_STATUS_FROM_SERVER:
3911 return "SendStatusFromServer";
3912 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
3913 return "SendCloseFromClient";
3914 case GRPC_OP_RECV_MESSAGE:
3915 return "RecvMessage";
3916 case GRPC_OP_RECV_CLOSE_ON_SERVER:
3917 return "RecvCloseOnServer";
3918 case GRPC_OP_RECV_INITIAL_METADATA:
3919 return "RecvInitialMetadata";
3920 case GRPC_OP_RECV_STATUS_ON_CLIENT:
3921 return "RecvStatusOnClient";
3922 }
3923 return absl::StrCat("UnknownOp(", op, ")");
3924 }
3925
MoveState(State & state)3926 static State MoveState(State& state) {
3927 if (absl::holds_alternative<Dismissed>(state)) return Dismissed{};
3928 // Can't move after first poll => Promise is not an option
3929 return std::move(absl::get<PromiseFactory>(state));
3930 }
3931 };
3932
3933 // MaybeOp captures a fairly complicated dance we need to do for the batch API.
3934 // We first check if an op is included or not, and if it is, we run the setup
3935 // function in the context of the API call (NOT in the call party).
3936 // This setup function returns a promise factory which we'll then run *in* the
3937 // party to do initial setup, and have it return the promise that we'll
3938 // ultimately poll on til completion.
3939 // Once we express our surface API in terms of core internal types this whole
3940 // dance will go away.
3941 template <typename SetupFn>
MaybeOp(const grpc_op * ops,uint8_t idx,SetupFn setup)3942 auto MaybeOp(const grpc_op* ops, uint8_t idx, SetupFn setup) {
3943 if (idx == 255) {
3944 return MaybeOpImpl<SetupFn>();
3945 } else {
3946 return MaybeOpImpl<SetupFn>(setup(ops[idx]), ops[idx].op);
3947 }
3948 }
3949
3950 template <typename F>
3951 class PollBatchLogger {
3952 public:
PollBatchLogger(void * tag,F f)3953 PollBatchLogger(void* tag, F f) : tag_(tag), f_(std::move(f)) {}
3954
operator ()()3955 auto operator()() {
3956 if (grpc_call_trace.enabled()) {
3957 gpr_log(GPR_INFO, "Poll batch %p", tag_);
3958 }
3959 auto r = f_();
3960 if (grpc_call_trace.enabled()) {
3961 gpr_log(GPR_INFO, "Poll batch %p --> %s", tag_, ResultString(r).c_str());
3962 }
3963 return r;
3964 }
3965
3966 private:
3967 template <typename T>
ResultString(Poll<T> r)3968 static std::string ResultString(Poll<T> r) {
3969 if (r.pending()) return "PENDING";
3970 return ResultString(r.value());
3971 }
ResultString(Empty)3972 static std::string ResultString(Empty) { return "DONE"; }
3973
3974 void* tag_;
3975 F f_;
3976 };
3977
3978 template <typename F>
LogPollBatch(void * tag,F f)3979 PollBatchLogger<F> LogPollBatch(void* tag, F f) {
3980 return PollBatchLogger<F>(tag, std::move(f));
3981 }
3982 } // namespace
3983
FinishRecvMessage(NextResult<MessageHandle> result)3984 StatusFlag ServerCallSpine::FinishRecvMessage(
3985 NextResult<MessageHandle> result) {
3986 if (result.has_value()) {
3987 MessageHandle& message = *result;
3988 NoteLastMessageFlags(message->flags());
3989 if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
3990 (incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
3991 *recv_message_ = grpc_raw_compressed_byte_buffer_create(
3992 nullptr, 0, incoming_compression_algorithm());
3993 } else {
3994 *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0);
3995 }
3996 grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(),
3997 &(*recv_message_)->data.raw.slice_buffer);
3998 if (grpc_call_trace.enabled()) {
3999 gpr_log(GPR_INFO,
4000 "%s[call] RecvMessage: outstanding_recv "
4001 "finishes: received %" PRIdPTR " byte message",
4002 DebugTag().c_str(),
4003 (*recv_message_)->data.raw.slice_buffer.length);
4004 }
4005 recv_message_ = nullptr;
4006 return Success{};
4007 }
4008 if (result.cancelled()) {
4009 if (grpc_call_trace.enabled()) {
4010 gpr_log(GPR_INFO,
4011 "%s[call] RecvMessage: outstanding_recv "
4012 "finishes: received end-of-stream with error",
4013 DebugTag().c_str());
4014 }
4015 *recv_message_ = nullptr;
4016 recv_message_ = nullptr;
4017 return Failure{};
4018 }
4019 if (grpc_call_trace.enabled()) {
4020 gpr_log(GPR_INFO,
4021 "%s[call] RecvMessage: outstanding_recv "
4022 "finishes: received end-of-stream",
4023 DebugTag().c_str());
4024 }
4025 *recv_message_ = nullptr;
4026 recv_message_ = nullptr;
4027 return Success{};
4028 }
4029
CommitBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)4030 void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
4031 void* notify_tag,
4032 bool is_notify_tag_closure) {
4033 std::array<uint8_t, 8> got_ops{255, 255, 255, 255, 255, 255, 255, 255};
4034 for (size_t op_idx = 0; op_idx < nops; op_idx++) {
4035 const grpc_op& op = ops[op_idx];
4036 got_ops[op.op] = op_idx;
4037 }
4038 if (!is_notify_tag_closure) grpc_cq_begin_op(cq(), notify_tag);
4039 auto send_initial_metadata = MaybeOp(
4040 ops, got_ops[GRPC_OP_SEND_INITIAL_METADATA], [this](const grpc_op& op) {
4041 auto metadata = arena()->MakePooled<ServerMetadata>();
4042 PrepareOutgoingInitialMetadata(op, *metadata);
4043 CToMetadata(op.data.send_initial_metadata.metadata,
4044 op.data.send_initial_metadata.count, metadata.get());
4045 if (grpc_call_trace.enabled()) {
4046 gpr_log(GPR_INFO, "%s[call] Send initial metadata",
4047 DebugTag().c_str());
4048 }
4049 return [this, metadata = std::move(metadata)]() mutable {
4050 return Map(server_initial_metadata_.sender.Push(std::move(metadata)),
4051 [this](bool r) {
4052 server_initial_metadata_.sender.Close();
4053 return StatusFlag(r);
4054 });
4055 };
4056 });
4057 auto send_message =
4058 MaybeOp(ops, got_ops[GRPC_OP_SEND_MESSAGE], [this](const grpc_op& op) {
4059 SliceBuffer send;
4060 grpc_slice_buffer_swap(
4061 &op.data.send_message.send_message->data.raw.slice_buffer,
4062 send.c_slice_buffer());
4063 auto msg = arena()->MakePooled<Message>(std::move(send), op.flags);
4064 return [this, msg = std::move(msg)]() mutable {
4065 return Map(server_to_client_messages_.sender.Push(std::move(msg)),
4066 [](bool r) { return StatusFlag(r); });
4067 };
4068 });
4069 auto send_trailing_metadata = MaybeOp(
4070 ops, got_ops[GRPC_OP_SEND_STATUS_FROM_SERVER], [this](const grpc_op& op) {
4071 auto metadata = arena()->MakePooled<ServerMetadata>();
4072 CToMetadata(op.data.send_status_from_server.trailing_metadata,
4073 op.data.send_status_from_server.trailing_metadata_count,
4074 metadata.get());
4075 metadata->Set(GrpcStatusMetadata(),
4076 op.data.send_status_from_server.status);
4077 if (auto* details = op.data.send_status_from_server.status_details) {
4078 // TODO(ctiller): this should not be a copy, but we have
4079 // callers that allocate and pass in a slice created with
4080 // grpc_slice_from_static_string and then delete the string
4081 // after passing it in, which shouldn't be a supported API.
4082 metadata->Set(GrpcMessageMetadata(),
4083 Slice(grpc_slice_copy(*details)));
4084 }
4085 return [this, metadata = std::move(metadata)]() mutable {
4086 server_to_client_messages_.sender.Close();
4087 return Map(server_trailing_metadata_.sender.Push(std::move(metadata)),
4088 [](bool r) { return StatusFlag(r); });
4089 };
4090 });
4091 auto recv_message =
4092 MaybeOp(ops, got_ops[GRPC_OP_RECV_MESSAGE], [this](const grpc_op& op) {
4093 GPR_ASSERT(recv_message_ == nullptr);
4094 recv_message_ = op.data.recv_message.recv_message;
4095 return [this]() mutable {
4096 return Map(client_to_server_messages_.receiver.Next(),
4097 [this](NextResult<MessageHandle> msg) {
4098 return FinishRecvMessage(std::move(msg));
4099 });
4100 };
4101 });
4102 auto primary_ops = AllOk<StatusFlag>(
4103 std::move(send_initial_metadata), std::move(send_message),
4104 std::move(send_trailing_metadata), std::move(recv_message));
4105 if (got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER] != 255) {
4106 auto recv_trailing_metadata = MaybeOp(
4107 ops, got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER], [this](const grpc_op& op) {
4108 return [this, cancelled = op.data.recv_close_on_server.cancelled]() {
4109 return Map(server_trailing_metadata_.receiver.AwaitClosed(),
4110 [cancelled, this](bool result) -> Success {
4111 ResetDeadline();
4112 *cancelled = result ? 1 : 0;
4113 return Success{};
4114 });
4115 };
4116 });
4117 SpawnInfallible(
4118 "final-batch",
4119 [primary_ops = std::move(primary_ops),
4120 recv_trailing_metadata = std::move(recv_trailing_metadata),
4121 is_notify_tag_closure, notify_tag, this]() mutable {
4122 return LogPollBatch(
4123 notify_tag,
4124 Seq(std::move(primary_ops), std::move(recv_trailing_metadata),
4125 [is_notify_tag_closure, notify_tag, this](StatusFlag) {
4126 return WaitForCqEndOp(is_notify_tag_closure, notify_tag,
4127 absl::OkStatus(), cq());
4128 }));
4129 });
4130 } else {
4131 SpawnInfallible("batch", [primary_ops = std::move(primary_ops),
4132 is_notify_tag_closure, notify_tag,
4133 this]() mutable {
4134 return LogPollBatch(
4135 notify_tag,
4136 Seq(std::move(primary_ops),
4137 [is_notify_tag_closure, notify_tag, this](StatusFlag r) {
4138 return WaitForCqEndOp(is_notify_tag_closure, notify_tag,
4139 StatusCast<grpc_error_handle>(r), cq());
4140 }));
4141 });
4142 }
4143 }
4144
MakeServerCall(ServerInterface * server,Channel * channel,Arena * arena)4145 RefCountedPtr<CallSpineInterface> MakeServerCall(ServerInterface* server,
4146 Channel* channel,
4147 Arena* arena) {
4148 return RefCountedPtr<ServerCallSpine>(
4149 arena->New<ServerCallSpine>(server, channel, arena));
4150 }
4151 #else
MakeServerCall(ServerInterface *,Channel *,Arena *)4152 RefCountedPtr<CallSpineInterface> MakeServerCall(ServerInterface*, Channel*,
4153 Arena*) {
4154 Crash("not implemented");
4155 }
4156 #endif
4157
4158 } // namespace grpc_core
4159
4160 ///////////////////////////////////////////////////////////////////////////////
4161 // C-based API
4162
grpc_call_arena_alloc(grpc_call * call,size_t size)4163 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
4164 grpc_core::ExecCtx exec_ctx;
4165 return grpc_core::Call::FromC(call)->arena()->Alloc(size);
4166 }
4167
grpc_call_get_initial_size_estimate()4168 size_t grpc_call_get_initial_size_estimate() {
4169 return grpc_core::FilterStackCall::InitialSizeEstimate();
4170 }
4171
grpc_call_create(grpc_call_create_args * args,grpc_call ** out_call)4172 grpc_error_handle grpc_call_create(grpc_call_create_args* args,
4173 grpc_call** out_call) {
4174 #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
4175 if (grpc_core::IsPromiseBasedClientCallEnabled() &&
4176 args->server_transport_data == nullptr && args->channel->is_promising()) {
4177 return grpc_core::MakePromiseBasedCall<grpc_core::ClientPromiseBasedCall>(
4178 args, out_call);
4179 }
4180 #endif
4181 #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
4182 if (grpc_core::IsPromiseBasedServerCallEnabled() &&
4183 args->server_transport_data != nullptr && args->channel->is_promising()) {
4184 return grpc_core::MakePromiseBasedCall<grpc_core::ServerPromiseBasedCall>(
4185 args, out_call);
4186 }
4187 #endif
4188 return grpc_core::FilterStackCall::Create(args, out_call);
4189 }
4190
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)4191 void grpc_call_set_completion_queue(grpc_call* call,
4192 grpc_completion_queue* cq) {
4193 grpc_core::Call::FromC(call)->SetCompletionQueue(cq);
4194 }
4195
grpc_call_ref(grpc_call * c)4196 void grpc_call_ref(grpc_call* c) { grpc_core::Call::FromC(c)->ExternalRef(); }
4197
grpc_call_unref(grpc_call * c)4198 void grpc_call_unref(grpc_call* c) {
4199 grpc_core::ExecCtx exec_ctx;
4200 grpc_core::Call::FromC(c)->ExternalUnref();
4201 }
4202
grpc_call_get_peer(grpc_call * call)4203 char* grpc_call_get_peer(grpc_call* call) {
4204 return grpc_core::Call::FromC(call)->GetPeer();
4205 }
4206
grpc_call_from_top_element(grpc_call_element * surface_element)4207 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {
4208 return grpc_core::FilterStackCall::FromTopElem(surface_element)->c_ptr();
4209 }
4210
grpc_call_cancel(grpc_call * call,void * reserved)4211 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
4212 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
4213 GPR_ASSERT(reserved == nullptr);
4214 if (call == nullptr) {
4215 return GRPC_CALL_ERROR;
4216 }
4217 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
4218 grpc_core::ExecCtx exec_ctx;
4219 grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError());
4220 return GRPC_CALL_OK;
4221 }
4222
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)4223 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
4224 grpc_status_code status,
4225 const char* description,
4226 void* reserved) {
4227 GRPC_API_TRACE(
4228 "grpc_call_cancel_with_status("
4229 "c=%p, status=%d, description=%s, reserved=%p)",
4230 4, (c, (int)status, description, reserved));
4231 GPR_ASSERT(reserved == nullptr);
4232 if (c == nullptr) {
4233 return GRPC_CALL_ERROR;
4234 }
4235 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
4236 grpc_core::ExecCtx exec_ctx;
4237 grpc_core::Call::FromC(c)->CancelWithStatus(status, description);
4238 return GRPC_CALL_OK;
4239 }
4240
grpc_call_cancel_internal(grpc_call * call)4241 void grpc_call_cancel_internal(grpc_call* call) {
4242 grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError());
4243 }
4244
grpc_call_test_only_get_compression_algorithm(grpc_call * call)4245 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
4246 grpc_call* call) {
4247 return grpc_core::Call::FromC(call)->test_only_compression_algorithm();
4248 }
4249
grpc_call_test_only_get_message_flags(grpc_call * call)4250 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
4251 return grpc_core::Call::FromC(call)->test_only_message_flags();
4252 }
4253
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)4254 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
4255 return grpc_core::Call::FromC(call)
4256 ->encodings_accepted_by_peer()
4257 .ToLegacyBitmask();
4258 }
4259
grpc_call_get_arena(grpc_call * call)4260 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) {
4261 return grpc_core::Call::FromC(call)->arena();
4262 }
4263
grpc_call_get_call_stack(grpc_call * call)4264 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
4265 return grpc_core::Call::FromC(call)->call_stack();
4266 }
4267
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)4268 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
4269 size_t nops, void* tag, void* reserved) {
4270 GRPC_API_TRACE(
4271 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
4272 "reserved=%p)",
4273 5, (call, ops, (unsigned long)nops, tag, reserved));
4274
4275 if (reserved != nullptr || call == nullptr) {
4276 return GRPC_CALL_ERROR;
4277 } else {
4278 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
4279 grpc_core::ExecCtx exec_ctx;
4280 return grpc_core::Call::FromC(call)->StartBatch(ops, nops, tag, false);
4281 }
4282 }
4283
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)4284 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
4285 const grpc_op* ops,
4286 size_t nops,
4287 grpc_closure* closure) {
4288 return grpc_core::Call::FromC(call)->StartBatch(ops, nops, closure, true);
4289 }
4290
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))4291 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
4292 void* value, void (*destroy)(void* value)) {
4293 return grpc_core::Call::FromC(call)->ContextSet(elem, value, destroy);
4294 }
4295
grpc_call_context_get(grpc_call * call,grpc_context_index elem)4296 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
4297 return grpc_core::Call::FromC(call)->ContextGet(elem);
4298 }
4299
grpc_call_is_client(grpc_call * call)4300 uint8_t grpc_call_is_client(grpc_call* call) {
4301 return grpc_core::Call::FromC(call)->is_client();
4302 }
4303
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)4304 grpc_compression_algorithm grpc_call_compression_for_level(
4305 grpc_call* call, grpc_compression_level level) {
4306 return grpc_core::Call::FromC(call)
4307 ->encodings_accepted_by_peer()
4308 .CompressionAlgorithmForLevel(level);
4309 }
4310
grpc_call_is_trailers_only(const grpc_call * call)4311 bool grpc_call_is_trailers_only(const grpc_call* call) {
4312 return grpc_core::Call::FromC(call)->is_trailers_only();
4313 }
4314
grpc_call_failed_before_recv_message(const grpc_call * c)4315 int grpc_call_failed_before_recv_message(const grpc_call* c) {
4316 return grpc_core::Call::FromC(c)->failed_before_recv_message();
4317 }
4318
grpc_call_server_authority(const grpc_call * call)4319 absl::string_view grpc_call_server_authority(const grpc_call* call) {
4320 return grpc_core::Call::FromC(call)->GetServerAuthority();
4321 }
4322
grpc_call_error_to_string(grpc_call_error error)4323 const char* grpc_call_error_to_string(grpc_call_error error) {
4324 switch (error) {
4325 case GRPC_CALL_ERROR:
4326 return "GRPC_CALL_ERROR";
4327 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
4328 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
4329 case GRPC_CALL_ERROR_ALREADY_FINISHED:
4330 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
4331 case GRPC_CALL_ERROR_ALREADY_INVOKED:
4332 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
4333 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
4334 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
4335 case GRPC_CALL_ERROR_INVALID_FLAGS:
4336 return "GRPC_CALL_ERROR_INVALID_FLAGS";
4337 case GRPC_CALL_ERROR_INVALID_MESSAGE:
4338 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
4339 case GRPC_CALL_ERROR_INVALID_METADATA:
4340 return "GRPC_CALL_ERROR_INVALID_METADATA";
4341 case GRPC_CALL_ERROR_NOT_INVOKED:
4342 return "GRPC_CALL_ERROR_NOT_INVOKED";
4343 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
4344 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
4345 case GRPC_CALL_ERROR_NOT_ON_SERVER:
4346 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
4347 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
4348 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
4349 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
4350 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
4351 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
4352 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
4353 case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
4354 return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
4355 case GRPC_CALL_OK:
4356 return "GRPC_CALL_OK";
4357 }
4358 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
4359 }
4360
grpc_call_run_in_event_engine(const grpc_call * call,absl::AnyInvocable<void ()> cb)4361 void grpc_call_run_in_event_engine(const grpc_call* call,
4362 absl::AnyInvocable<void()> cb) {
4363 grpc_core::Call::FromC(call)->event_engine()->Run(std::move(cb));
4364 }
4365