1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/filters/client_channel/retry_filter.h"
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stddef.h>
24
25 #include <memory>
26 #include <new>
27 #include <string>
28 #include <utility>
29
30 #include "absl/container/inlined_vector.h"
31 #include "absl/functional/any_invocable.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/strings/strip.h"
37 #include "absl/types/optional.h"
38
39 #include <grpc/event_engine/event_engine.h>
40 #include <grpc/grpc.h>
41 #include <grpc/slice.h>
42 #include <grpc/status.h>
43 #include <grpc/support/log.h>
44
45 #include "src/core/ext/filters/client_channel/client_channel.h"
46 #include "src/core/ext/filters/client_channel/client_channel_internal.h"
47 #include "src/core/ext/filters/client_channel/retry_service_config.h"
48 #include "src/core/ext/filters/client_channel/retry_throttle.h"
49 #include "src/core/lib/backoff/backoff.h"
50 #include "src/core/lib/channel/channel_args.h"
51 #include "src/core/lib/channel/channel_stack.h"
52 #include "src/core/lib/channel/context.h"
53 #include "src/core/lib/channel/status_util.h"
54 #include "src/core/lib/debug/trace.h"
55 #include "src/core/lib/gpr/useful.h"
56 #include "src/core/lib/gprpp/construct_destruct.h"
57 #include "src/core/lib/gprpp/debug_location.h"
58 #include "src/core/lib/gprpp/orphanable.h"
59 #include "src/core/lib/gprpp/ref_counted.h"
60 #include "src/core/lib/gprpp/ref_counted_ptr.h"
61 #include "src/core/lib/gprpp/status_helper.h"
62 #include "src/core/lib/gprpp/time.h"
63 #include "src/core/lib/iomgr/call_combiner.h"
64 #include "src/core/lib/iomgr/closure.h"
65 #include "src/core/lib/iomgr/error.h"
66 #include "src/core/lib/iomgr/exec_ctx.h"
67 #include "src/core/lib/iomgr/polling_entity.h"
68 #include "src/core/lib/resource_quota/arena.h"
69 #include "src/core/lib/service_config/service_config.h"
70 #include "src/core/lib/service_config/service_config_call_data.h"
71 #include "src/core/lib/slice/slice.h"
72 #include "src/core/lib/slice/slice_buffer.h"
73 #include "src/core/lib/transport/error_utils.h"
74 #include "src/core/lib/transport/metadata_batch.h"
75 #include "src/core/lib/transport/transport.h"
76 #include "src/core/lib/uri/uri_parser.h"
77
78 //
79 // Retry filter
80 //
81
82 // This filter is intended to be used in the DynamicFilter stack in the
83 // client channel, which is situated between the name resolver and the
84 // LB policy. Normally, the last filter in the DynamicFilter stack is
85 // the DynamicTerminationFilter (see client_channel.cc), which creates a
86 // LoadBalancedCall and delegates to it. However, when retries are
87 // enabled, this filter is used instead of the DynamicTerminationFilter.
88 //
89 // In order to support retries, we act as a proxy for stream op batches.
90 // When we get a batch from the surface, we add it to our list of pending
91 // batches, and we then use those batches to construct separate "child"
92 // batches to be started on an LB call. When the child batches return, we
93 // then decide which pending batches have been completed and schedule their
94 // callbacks accordingly. If a call attempt fails and we want to retry it,
95 // we create a new LB call and start again, constructing new "child" batches
96 // for the new LB call.
97 //
98 // Note that retries are committed when receiving data from the server
99 // (except for Trailers-Only responses). However, there may be many
100 // send ops started before receiving any data, so we may have already
101 // completed some number of send ops (and returned the completions up to
102 // the surface) by the time we realize that we need to retry. To deal
103 // with this, we cache data for send ops, so that we can replay them on a
104 // different LB call even after we have completed the original batches.
105 //
106 // The code is structured as follows:
107 // - In CallData (in the parent channel), we maintain a list of pending
108 // ops and cached data for send ops.
109 // - There is a CallData::CallAttempt object for each retry attempt.
110 // This object contains the LB call for that attempt and state to indicate
111 // which ops from the CallData object have already been sent down to that
112 // LB call.
113 // - There is a CallData::CallAttempt::BatchData object for each "child"
114 // batch sent on the LB call.
115 //
116 // When constructing the "child" batches, we compare the state in the
117 // CallAttempt object against the state in the CallData object to see
118 // which batches need to be sent on the LB call for a given attempt.
119
120 // TODO(roth): In subsequent PRs:
121 // - implement hedging
122
123 // By default, we buffer 256 KiB per RPC for retries.
124 // TODO(roth): Do we have any data to suggest a better value?
125 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
126
127 // This value was picked arbitrarily. It can be changed if there is
128 // any even moderately compelling reason to do so.
129 #define RETRY_BACKOFF_JITTER 0.2
130
131 namespace grpc_core {
132
133 namespace {
134
135 using grpc_event_engine::experimental::EventEngine;
136 using internal::RetryGlobalConfig;
137 using internal::RetryMethodConfig;
138 using internal::RetryServiceConfigParser;
139 using internal::ServerRetryThrottleData;
140
141 TraceFlag grpc_retry_trace(false, "retry");
142
143 //
144 // RetryFilter
145 //
146
147 class RetryFilter {
148 public:
149 class CallData;
150
Init(grpc_channel_element * elem,grpc_channel_element_args * args)151 static grpc_error_handle Init(grpc_channel_element* elem,
152 grpc_channel_element_args* args) {
153 GPR_ASSERT(args->is_last);
154 GPR_ASSERT(elem->filter == &kRetryFilterVtable);
155 grpc_error_handle error;
156 new (elem->channel_data) RetryFilter(args->channel_args, &error);
157 return error;
158 }
159
Destroy(grpc_channel_element * elem)160 static void Destroy(grpc_channel_element* elem) {
161 auto* chand = static_cast<RetryFilter*>(elem->channel_data);
162 chand->~RetryFilter();
163 }
164
165 // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)166 static void StartTransportOp(grpc_channel_element* /*elem*/,
167 grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)168 static void GetChannelInfo(grpc_channel_element* /*elem*/,
169 const grpc_channel_info* /*info*/) {}
170
171 private:
GetMaxPerRpcRetryBufferSize(const ChannelArgs & args)172 static size_t GetMaxPerRpcRetryBufferSize(const ChannelArgs& args) {
173 return Clamp(args.GetInt(GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE)
174 .value_or(DEFAULT_PER_RPC_RETRY_BUFFER_SIZE),
175 0, INT_MAX);
176 }
177
RetryFilter(const ChannelArgs & args,grpc_error_handle * error)178 RetryFilter(const ChannelArgs& args, grpc_error_handle* error)
179 : client_channel_(args.GetObject<ClientChannel>()),
180 event_engine_(args.GetObject<EventEngine>()),
181 per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)),
182 service_config_parser_index_(
183 internal::RetryServiceConfigParser::ParserIndex()) {
184 // Get retry throttling parameters from service config.
185 auto* service_config = args.GetObject<ServiceConfig>();
186 if (service_config == nullptr) return;
187 const auto* config = static_cast<const RetryGlobalConfig*>(
188 service_config->GetGlobalParsedConfig(
189 RetryServiceConfigParser::ParserIndex()));
190 if (config == nullptr) return;
191 // Get server name from target URI.
192 auto server_uri = args.GetString(GRPC_ARG_SERVER_URI);
193 if (!server_uri.has_value()) {
194 *error = GRPC_ERROR_CREATE(
195 "server URI channel arg missing or wrong type in client channel "
196 "filter");
197 return;
198 }
199 absl::StatusOr<URI> uri = URI::Parse(*server_uri);
200 if (!uri.ok() || uri->path().empty()) {
201 *error =
202 GRPC_ERROR_CREATE("could not extract server name from target URI");
203 return;
204 }
205 std::string server_name(absl::StripPrefix(uri->path(), "/"));
206 // Get throttling config for server_name.
207 retry_throttle_data_ =
208 internal::ServerRetryThrottleMap::Get()->GetDataForServer(
209 server_name, config->max_milli_tokens(),
210 config->milli_token_ratio());
211 }
212
213 const RetryMethodConfig* GetRetryPolicy(
214 const grpc_call_context_element* context);
215
216 ClientChannel* client_channel_;
217 EventEngine* const event_engine_;
218 size_t per_rpc_retry_buffer_size_;
219 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
220 const size_t service_config_parser_index_;
221 };
222
223 //
224 // RetryFilter::CallData
225 //
226
227 class RetryFilter::CallData {
228 public:
229 static grpc_error_handle Init(grpc_call_element* elem,
230 const grpc_call_element_args* args);
231 static void Destroy(grpc_call_element* elem,
232 const grpc_call_final_info* /*final_info*/,
233 grpc_closure* then_schedule_closure);
234 static void StartTransportStreamOpBatch(
235 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
236 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
237
238 private:
239 class CallStackDestructionBarrier;
240
241 // Pending batches stored in call data.
242 struct PendingBatch {
243 // The pending batch. If nullptr, this slot is empty.
244 grpc_transport_stream_op_batch* batch = nullptr;
245 // Indicates whether payload for send ops has been cached in CallData.
246 bool send_ops_cached = false;
247 };
248
249 // State associated with each call attempt.
250 class CallAttempt : public RefCounted<CallAttempt> {
251 public:
252 CallAttempt(CallData* calld, bool is_transparent_retry);
253 ~CallAttempt() override;
254
lb_call_committed() const255 bool lb_call_committed() const { return lb_call_committed_; }
256
257 // Constructs and starts whatever batches are needed on this call
258 // attempt.
259 void StartRetriableBatches();
260
261 // Frees cached send ops that have already been completed after
262 // committing the call.
263 void FreeCachedSendOpDataAfterCommit();
264
265 // Cancels the call attempt.
266 void CancelFromSurface(grpc_transport_stream_op_batch* cancel_batch);
267
268 private:
269 // State used for starting a retryable batch on the call attempt's LB call.
270 // This provides its own grpc_transport_stream_op_batch and other data
271 // structures needed to populate the ops in the batch.
272 // We allocate one struct on the arena for each attempt at starting a
273 // batch on a given LB call.
274 class BatchData
275 : public RefCounted<BatchData, PolymorphicRefCount, UnrefCallDtor> {
276 public:
277 BatchData(RefCountedPtr<CallAttempt> call_attempt, int refcount,
278 bool set_on_complete);
279 ~BatchData() override;
280
batch()281 grpc_transport_stream_op_batch* batch() { return &batch_; }
282
283 // Adds retriable send_initial_metadata op.
284 void AddRetriableSendInitialMetadataOp();
285 // Adds retriable send_message op.
286 void AddRetriableSendMessageOp();
287 // Adds retriable send_trailing_metadata op.
288 void AddRetriableSendTrailingMetadataOp();
289 // Adds retriable recv_initial_metadata op.
290 void AddRetriableRecvInitialMetadataOp();
291 // Adds retriable recv_message op.
292 void AddRetriableRecvMessageOp();
293 // Adds retriable recv_trailing_metadata op.
294 void AddRetriableRecvTrailingMetadataOp();
295 // Adds cancel_stream op.
296 void AddCancelStreamOp(grpc_error_handle error);
297
298 private:
299 // Frees cached send ops that were completed by the completed batch in
300 // batch_data. Used when batches are completed after the call is
301 // committed.
302 void FreeCachedSendOpDataForCompletedBatch();
303
304 // If there is a pending recv_initial_metadata op, adds a closure
305 // to closures for recv_initial_metadata_ready.
306 void MaybeAddClosureForRecvInitialMetadataCallback(
307 grpc_error_handle error, CallCombinerClosureList* closures);
308 // Intercepts recv_initial_metadata_ready callback for retries.
309 // Commits the call and returns the initial metadata up the stack.
310 static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
311
312 // If there is a pending recv_message op, adds a closure to closures
313 // for recv_message_ready.
314 void MaybeAddClosureForRecvMessageCallback(
315 grpc_error_handle error, CallCombinerClosureList* closures);
316 // Intercepts recv_message_ready callback for retries.
317 // Commits the call and returns the message up the stack.
318 static void RecvMessageReady(void* arg, grpc_error_handle error);
319
320 // If there is a pending recv_trailing_metadata op, adds a closure to
321 // closures for recv_trailing_metadata_ready.
322 void MaybeAddClosureForRecvTrailingMetadataReady(
323 grpc_error_handle error, CallCombinerClosureList* closures);
324 // Adds any necessary closures for deferred batch completion
325 // callbacks to closures.
326 void AddClosuresForDeferredCompletionCallbacks(
327 CallCombinerClosureList* closures);
328 // For any pending batch containing an op that has not yet been started,
329 // adds the pending batch's completion closures to closures.
330 void AddClosuresToFailUnstartedPendingBatches(
331 grpc_error_handle error, CallCombinerClosureList* closures);
332 // Runs necessary closures upon completion of a call attempt.
333 void RunClosuresForCompletedCall(grpc_error_handle error);
334 // Intercepts recv_trailing_metadata_ready callback for retries.
335 // Commits the call and returns the trailing metadata up the stack.
336 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
337
338 // Adds the on_complete closure for the pending batch completed in
339 // batch_data to closures.
340 void AddClosuresForCompletedPendingBatch(
341 grpc_error_handle error, CallCombinerClosureList* closures);
342
343 // If there are any cached ops to replay or pending ops to start on the
344 // LB call, adds them to closures.
345 void AddClosuresForReplayOrPendingSendOps(
346 CallCombinerClosureList* closures);
347
348 // Callback used to intercept on_complete from LB calls.
349 static void OnComplete(void* arg, grpc_error_handle error);
350
351 // Callback used to handle on_complete for internally generated
352 // cancel_stream op.
353 static void OnCompleteForCancelOp(void* arg, grpc_error_handle error);
354
355 // This DOES hold a ref, but it cannot be a RefCountedPtr<>, because
356 // our dtor unrefs the owning call, which may delete the arena in
357 // which we are allocated, which means that running the dtor of any
358 // data members after that would cause a crash.
359 CallAttempt* call_attempt_;
360 // The batch to use in the LB call.
361 // Its payload field points to CallAttempt::batch_payload_.
362 grpc_transport_stream_op_batch batch_;
363 // For intercepting on_complete.
364 grpc_closure on_complete_;
365 };
366
367 // Creates a BatchData object on the call's arena with the
368 // specified refcount. If set_on_complete is true, the batch's
369 // on_complete callback will be set to point to on_complete();
370 // otherwise, the batch's on_complete callback will be null.
CreateBatch(int refcount,bool set_on_complete)371 BatchData* CreateBatch(int refcount, bool set_on_complete) {
372 return calld_->arena_->New<BatchData>(Ref(DEBUG_LOCATION, "CreateBatch"),
373 refcount, set_on_complete);
374 }
375
376 // If there are any cached send ops that need to be replayed on this
377 // call attempt, creates and returns a new batch to replay those ops.
378 // Otherwise, returns nullptr.
379 BatchData* MaybeCreateBatchForReplay();
380
381 // Adds a closure to closures that will execute batch in the call combiner.
382 void AddClosureForBatch(grpc_transport_stream_op_batch* batch,
383 const char* reason,
384 CallCombinerClosureList* closures);
385
386 // Helper function used to start a recv_trailing_metadata batch. This
387 // is used in the case where a recv_initial_metadata or recv_message
388 // op fails in a way that we know the call is over but when the application
389 // has not yet started its own recv_trailing_metadata op.
390 void AddBatchForInternalRecvTrailingMetadata(
391 CallCombinerClosureList* closures);
392
393 // Adds a batch to closures to cancel this call attempt, if
394 // cancellation has not already been sent on the LB call.
395 void MaybeAddBatchForCancelOp(grpc_error_handle error,
396 CallCombinerClosureList* closures);
397
398 // Adds batches for pending batches to closures.
399 void AddBatchesForPendingBatches(CallCombinerClosureList* closures);
400
401 // Adds whatever batches are needed on this attempt to closures.
402 void AddRetriableBatches(CallCombinerClosureList* closures);
403
404 // Returns true if any send op in the batch was not yet started on this
405 // attempt.
406 bool PendingBatchContainsUnstartedSendOps(PendingBatch* pending);
407
408 // Returns true if there are cached send ops to replay.
409 bool HaveSendOpsToReplay();
410
411 // If our retry state is no longer needed, switch to fast path by moving
412 // our LB call into calld_->committed_call_ and having calld_ drop
413 // its ref to us.
414 void MaybeSwitchToFastPath();
415
416 // Returns true if the call should be retried.
417 bool ShouldRetry(absl::optional<grpc_status_code> status,
418 absl::optional<Duration> server_pushback_ms);
419
420 // Abandons the call attempt. Unrefs any deferred batches.
421 void Abandon();
422
423 void OnPerAttemptRecvTimer();
424 static void OnPerAttemptRecvTimerLocked(void* arg, grpc_error_handle error);
425 void MaybeCancelPerAttemptRecvTimer();
426
427 CallData* calld_;
428 OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> lb_call_;
429 bool lb_call_committed_ = false;
430
431 grpc_closure on_per_attempt_recv_timer_;
432 absl::optional<EventEngine::TaskHandle> per_attempt_recv_timer_handle_;
433
434 // BatchData.batch.payload points to this.
435 grpc_transport_stream_op_batch_payload batch_payload_;
436 // For send_initial_metadata.
437 grpc_metadata_batch send_initial_metadata_{calld_->arena_};
438 // For send_trailing_metadata.
439 grpc_metadata_batch send_trailing_metadata_{calld_->arena_};
440 // For intercepting recv_initial_metadata.
441 grpc_metadata_batch recv_initial_metadata_{calld_->arena_};
442 grpc_closure recv_initial_metadata_ready_;
443 bool trailing_metadata_available_ = false;
444 // For intercepting recv_message.
445 grpc_closure recv_message_ready_;
446 absl::optional<SliceBuffer> recv_message_;
447 uint32_t recv_message_flags_;
448 // For intercepting recv_trailing_metadata.
449 grpc_metadata_batch recv_trailing_metadata_{calld_->arena_};
450 grpc_transport_stream_stats collect_stats_;
451 grpc_closure recv_trailing_metadata_ready_;
452 // These fields indicate which ops have been started and completed on
453 // this call attempt.
454 size_t started_send_message_count_ = 0;
455 size_t completed_send_message_count_ = 0;
456 size_t started_recv_message_count_ = 0;
457 size_t completed_recv_message_count_ = 0;
458 bool started_send_initial_metadata_ : 1;
459 bool completed_send_initial_metadata_ : 1;
460 bool started_send_trailing_metadata_ : 1;
461 bool completed_send_trailing_metadata_ : 1;
462 bool started_recv_initial_metadata_ : 1;
463 bool completed_recv_initial_metadata_ : 1;
464 bool started_recv_trailing_metadata_ : 1;
465 bool completed_recv_trailing_metadata_ : 1;
466 bool sent_cancel_stream_ : 1;
467 // State for callback processing.
468 RefCountedPtr<BatchData> recv_initial_metadata_ready_deferred_batch_;
469 grpc_error_handle recv_initial_metadata_error_;
470 RefCountedPtr<BatchData> recv_message_ready_deferred_batch_;
471 grpc_error_handle recv_message_error_;
472 struct OnCompleteDeferredBatch {
OnCompleteDeferredBatchgrpc_core::__anon8285bb3f0111::RetryFilter::CallData::CallAttempt::OnCompleteDeferredBatch473 OnCompleteDeferredBatch(RefCountedPtr<BatchData> batch,
474 grpc_error_handle error)
475 : batch(std::move(batch)), error(error) {}
476 RefCountedPtr<BatchData> batch;
477 grpc_error_handle error;
478 };
479 // There cannot be more than 3 pending send op batches at a time.
480 absl::InlinedVector<OnCompleteDeferredBatch, 3>
481 on_complete_deferred_batches_;
482 RefCountedPtr<BatchData> recv_trailing_metadata_internal_batch_;
483 grpc_error_handle recv_trailing_metadata_error_;
484 bool seen_recv_trailing_metadata_from_surface_ : 1;
485 // NOTE: Do not move this next to the metadata bitfields above. That would
486 // save space but will also result in a data race because compiler
487 // will generate a 2 byte store which overwrites the meta-data
488 // fields upon setting this field.
489 bool abandoned_ : 1;
490 };
491
492 CallData(RetryFilter* chand, const grpc_call_element_args& args);
493 ~CallData();
494
495 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
496
497 // Returns the index into pending_batches_ to be used for batch.
498 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
499 PendingBatch* PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
500 void PendingBatchClear(PendingBatch* pending);
501 void MaybeClearPendingBatch(PendingBatch* pending);
502 static void FailPendingBatchInCallCombiner(void* arg,
503 grpc_error_handle error);
504 // Fails all pending batches. Does NOT yield call combiner.
505 void PendingBatchesFail(grpc_error_handle error);
506 // Returns a pointer to the first pending batch for which predicate(batch)
507 // returns true, or null if not found.
508 template <typename Predicate>
509 PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
510
511 // Caches data for send ops so that it can be retried later, if not
512 // already cached.
513 void MaybeCacheSendOpsForBatch(PendingBatch* pending);
514 void FreeCachedSendInitialMetadata();
515 // Frees cached send_message at index idx.
516 void FreeCachedSendMessage(size_t idx);
517 void FreeCachedSendTrailingMetadata();
518 void FreeAllCachedSendOpData();
519
520 // Commits the call so that no further retry attempts will be performed.
521 void RetryCommit(CallAttempt* call_attempt);
522
523 // Starts a timer to retry after appropriate back-off.
524 // If server_pushback is nullopt, retry_backoff_ is used.
525 void StartRetryTimer(absl::optional<Duration> server_pushback);
526
527 void OnRetryTimer();
528 static void OnRetryTimerLocked(void* arg, grpc_error_handle /*error*/);
529
530 // Adds a closure to closures to start a transparent retry.
531 void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
532 static void StartTransparentRetry(void* arg, grpc_error_handle error);
533
534 OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
535 CreateLoadBalancedCall(absl::AnyInvocable<void()> on_commit,
536 bool is_transparent_retry);
537
538 void CreateCallAttempt(bool is_transparent_retry);
539
540 RetryFilter* chand_;
541 grpc_polling_entity* pollent_;
542 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
543 const RetryMethodConfig* retry_policy_ = nullptr;
544 BackOff retry_backoff_;
545
546 grpc_slice path_; // Request path.
547 Timestamp deadline_;
548 Arena* arena_;
549 grpc_call_stack* owning_call_;
550 CallCombiner* call_combiner_;
551 grpc_call_context_element* call_context_;
552
553 grpc_error_handle cancelled_from_surface_;
554
555 RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_;
556
557 // TODO(roth): As part of implementing hedging, we will need to maintain a
558 // list of all pending attempts, so that we can cancel them all if the call
559 // gets cancelled.
560 RefCountedPtr<CallAttempt> call_attempt_;
561
562 // LB call used when we've committed to a call attempt and the retry
563 // state for that attempt is no longer needed. This provides a fast
564 // path for long-running streaming calls that minimizes overhead.
565 OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> committed_call_;
566
567 // When are are not yet fully committed to a particular call (i.e.,
568 // either we might still retry or we have committed to the call but
569 // there are still some cached ops to be replayed on the call),
570 // batches received from above will be added to this list, and they
571 // will not be removed until we have invoked their completion callbacks.
572 size_t bytes_buffered_for_retry_ = 0;
573 PendingBatch pending_batches_[MAX_PENDING_BATCHES];
574 bool pending_send_initial_metadata_ : 1;
575 bool pending_send_message_ : 1;
576 bool pending_send_trailing_metadata_ : 1;
577
578 // Retry state.
579 bool retry_committed_ : 1;
580 bool retry_codepath_started_ : 1;
581 bool sent_transparent_retry_not_seen_by_server_ : 1;
582 int num_attempts_completed_ = 0;
583 absl::optional<EventEngine::TaskHandle> retry_timer_handle_;
584 grpc_closure retry_closure_;
585
586 // Cached data for retrying send ops.
587 // send_initial_metadata
588 bool seen_send_initial_metadata_ = false;
589 grpc_metadata_batch send_initial_metadata_{arena_};
590 // send_message
591 // When we get a send_message op, we replace the original byte stream
592 // with a CachingByteStream that caches the slices to a local buffer for
593 // use in retries.
594 // Note: We inline the cache for the first 3 send_message ops and use
595 // dynamic allocation after that. This number was essentially picked
596 // at random; it could be changed in the future to tune performance.
597 struct CachedSendMessage {
598 SliceBuffer* slices;
599 uint32_t flags;
600 };
601 absl::InlinedVector<CachedSendMessage, 3> send_messages_;
602 // send_trailing_metadata
603 bool seen_send_trailing_metadata_ = false;
604 grpc_metadata_batch send_trailing_metadata_{arena_};
605 };
606
607 //
608 // RetryFilter::CallData::CallStackDestructionBarrier
609 //
610
611 // A class to track the existence of LoadBalancedCall call stacks that
612 // we've created. We wait until all such call stacks have been
613 // destroyed before we return the on_call_stack_destruction closure up
614 // to the surface.
615 //
616 // The parent RetryFilter::CallData object holds a ref to this object.
617 // When it is destroyed, it will store the on_call_stack_destruction
618 // closure from the surface in this object and then release its ref.
619 // We also take a ref to this object for each LB call we create, and
620 // those refs are not released until the LB call stack is destroyed.
621 // When this object is destroyed, it will invoke the
622 // on_call_stack_destruction closure from the surface.
623 class RetryFilter::CallData::CallStackDestructionBarrier
624 : public RefCounted<CallStackDestructionBarrier, PolymorphicRefCount,
625 UnrefCallDtor> {
626 public:
CallStackDestructionBarrier()627 CallStackDestructionBarrier() {}
628
~CallStackDestructionBarrier()629 ~CallStackDestructionBarrier() override {
630 // TODO(yashkt) : This can potentially be a Closure::Run
631 ExecCtx::Run(DEBUG_LOCATION, on_call_stack_destruction_, absl::OkStatus());
632 }
633
634 // Set the closure from the surface. This closure will be invoked
635 // when this object is destroyed.
set_on_call_stack_destruction(grpc_closure * on_call_stack_destruction)636 void set_on_call_stack_destruction(grpc_closure* on_call_stack_destruction) {
637 on_call_stack_destruction_ = on_call_stack_destruction;
638 }
639
640 // Invoked to get an on_call_stack_destruction closure for a new LB call.
MakeLbCallDestructionClosure(CallData * calld)641 grpc_closure* MakeLbCallDestructionClosure(CallData* calld) {
642 Ref().release(); // Ref held by callback.
643 grpc_closure* on_lb_call_destruction_complete =
644 calld->arena_->New<grpc_closure>();
645 GRPC_CLOSURE_INIT(on_lb_call_destruction_complete,
646 OnLbCallDestructionComplete, this, nullptr);
647 return on_lb_call_destruction_complete;
648 }
649
650 private:
OnLbCallDestructionComplete(void * arg,grpc_error_handle)651 static void OnLbCallDestructionComplete(void* arg,
652 grpc_error_handle /*error*/) {
653 auto* self = static_cast<CallStackDestructionBarrier*>(arg);
654 self->Unref();
655 }
656
657 grpc_closure* on_call_stack_destruction_ = nullptr;
658 };
659
660 //
661 // RetryFilter::CallData::CallAttempt
662 //
663
CallAttempt(CallData * calld,bool is_transparent_retry)664 RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld,
665 bool is_transparent_retry)
666 : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "CallAttempt"
667 : nullptr),
668 calld_(calld),
669 batch_payload_(calld->call_context_),
670 started_send_initial_metadata_(false),
671 completed_send_initial_metadata_(false),
672 started_send_trailing_metadata_(false),
673 completed_send_trailing_metadata_(false),
674 started_recv_initial_metadata_(false),
675 completed_recv_initial_metadata_(false),
676 started_recv_trailing_metadata_(false),
677 completed_recv_trailing_metadata_(false),
678 sent_cancel_stream_(false),
679 seen_recv_trailing_metadata_from_surface_(false),
680 abandoned_(false) {
681 lb_call_ = calld->CreateLoadBalancedCall(
682 [this]() {
683 lb_call_committed_ = true;
684 if (calld_->retry_committed_) {
685 auto* service_config_call_data =
686 static_cast<ClientChannelServiceConfigCallData*>(
687 calld_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA]
688 .value);
689 service_config_call_data->Commit();
690 }
691 },
692 is_transparent_retry);
693 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
694 gpr_log(GPR_INFO,
695 "chand=%p calld=%p attempt=%p: created attempt, lb_call=%p",
696 calld->chand_, calld, this, lb_call_.get());
697 }
698 // If per_attempt_recv_timeout is set, start a timer.
699 if (calld->retry_policy_ != nullptr &&
700 calld->retry_policy_->per_attempt_recv_timeout().has_value()) {
701 const Duration per_attempt_recv_timeout =
702 *calld->retry_policy_->per_attempt_recv_timeout();
703 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
704 gpr_log(GPR_INFO,
705 "chand=%p calld=%p attempt=%p: per-attempt timeout in %" PRId64
706 " ms",
707 calld->chand_, calld, this, per_attempt_recv_timeout.millis());
708 }
709 // Schedule retry after computed delay.
710 GRPC_CALL_STACK_REF(calld->owning_call_, "OnPerAttemptRecvTimer");
711 Ref(DEBUG_LOCATION, "OnPerAttemptRecvTimer").release();
712 per_attempt_recv_timer_handle_ = calld_->chand_->event_engine_->RunAfter(
713 per_attempt_recv_timeout, [this] {
714 ApplicationCallbackExecCtx callback_exec_ctx;
715 ExecCtx exec_ctx;
716 OnPerAttemptRecvTimer();
717 });
718 }
719 }
720
~CallAttempt()721 RetryFilter::CallData::CallAttempt::~CallAttempt() {
722 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
723 gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: destroying call attempt",
724 calld_->chand_, calld_, this);
725 }
726 }
727
FreeCachedSendOpDataAfterCommit()728 void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() {
729 // TODO(roth): When we implement hedging, this logic will need to get
730 // a bit more complex, because there may be other (now abandoned) call
731 // attempts still using this data. We may need to do some sort of
732 // ref-counting instead.
733 if (completed_send_initial_metadata_) {
734 calld_->FreeCachedSendInitialMetadata();
735 }
736 for (size_t i = 0; i < completed_send_message_count_; ++i) {
737 calld_->FreeCachedSendMessage(i);
738 }
739 if (completed_send_trailing_metadata_) {
740 calld_->FreeCachedSendTrailingMetadata();
741 }
742 }
743
PendingBatchContainsUnstartedSendOps(PendingBatch * pending)744 bool RetryFilter::CallData::CallAttempt::PendingBatchContainsUnstartedSendOps(
745 PendingBatch* pending) {
746 if (pending->batch->on_complete == nullptr) return false;
747 if (pending->batch->send_initial_metadata &&
748 !started_send_initial_metadata_) {
749 return true;
750 }
751 if (pending->batch->send_message &&
752 started_send_message_count_ < calld_->send_messages_.size()) {
753 return true;
754 }
755 if (pending->batch->send_trailing_metadata &&
756 !started_send_trailing_metadata_) {
757 return true;
758 }
759 return false;
760 }
761
HaveSendOpsToReplay()762 bool RetryFilter::CallData::CallAttempt::HaveSendOpsToReplay() {
763 // We don't check send_initial_metadata here, because that op will always
764 // be started as soon as it is received from the surface, so it will
765 // never need to be started at this point.
766 return started_send_message_count_ < calld_->send_messages_.size() ||
767 (calld_->seen_send_trailing_metadata_ &&
768 !started_send_trailing_metadata_);
769 }
770
MaybeSwitchToFastPath()771 void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() {
772 // If we're not yet committed, we can't switch yet.
773 // TODO(roth): As part of implementing hedging, this logic needs to
774 // check that *this* call attempt is the one that we've committed to.
775 // Might need to replace abandoned_ with an enum indicating whether we're
776 // in flight, abandoned, or the winning call attempt.
777 if (!calld_->retry_committed_) return;
778 // If we've already switched to fast path, there's nothing to do here.
779 if (calld_->committed_call_ != nullptr) return;
780 // If the perAttemptRecvTimeout timer is pending, we can't switch yet.
781 if (per_attempt_recv_timer_handle_.has_value()) return;
782 // If there are still send ops to replay, we can't switch yet.
783 if (HaveSendOpsToReplay()) return;
784 // If we started an internal batch for recv_trailing_metadata but have not
785 // yet seen that op from the surface, we can't switch yet.
786 if (recv_trailing_metadata_internal_batch_ != nullptr) return;
787 // Switch to fast path.
788 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
789 gpr_log(GPR_INFO,
790 "chand=%p calld=%p attempt=%p: retry state no longer needed; "
791 "moving LB call to parent and unreffing the call attempt",
792 calld_->chand_, calld_, this);
793 }
794 calld_->committed_call_ = std::move(lb_call_);
795 calld_->call_attempt_.reset(DEBUG_LOCATION, "MaybeSwitchToFastPath");
796 }
797
798 // If there are any cached send ops that need to be replayed on the
799 // current call attempt, creates and returns a new batch to replay those ops.
800 // Otherwise, returns nullptr.
801 RetryFilter::CallData::CallAttempt::BatchData*
MaybeCreateBatchForReplay()802 RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() {
803 BatchData* replay_batch_data = nullptr;
804 // send_initial_metadata.
805 if (calld_->seen_send_initial_metadata_ && !started_send_initial_metadata_ &&
806 !calld_->pending_send_initial_metadata_) {
807 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
808 gpr_log(GPR_INFO,
809 "chand=%p calld=%p attempt=%p: replaying previously completed "
810 "send_initial_metadata op",
811 calld_->chand_, calld_, this);
812 }
813 replay_batch_data = CreateBatch(1, true /* set_on_complete */);
814 replay_batch_data->AddRetriableSendInitialMetadataOp();
815 }
816 // send_message.
817 // Note that we can only have one send_message op in flight at a time.
818 if (started_send_message_count_ < calld_->send_messages_.size() &&
819 started_send_message_count_ == completed_send_message_count_ &&
820 !calld_->pending_send_message_) {
821 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
822 gpr_log(GPR_INFO,
823 "chand=%p calld=%p attempt=%p: replaying previously completed "
824 "send_message op",
825 calld_->chand_, calld_, this);
826 }
827 if (replay_batch_data == nullptr) {
828 replay_batch_data = CreateBatch(1, true /* set_on_complete */);
829 }
830 replay_batch_data->AddRetriableSendMessageOp();
831 }
832 // send_trailing_metadata.
833 // Note that we only add this op if we have no more send_message ops
834 // to start, since we can't send down any more send_message ops after
835 // send_trailing_metadata.
836 if (calld_->seen_send_trailing_metadata_ &&
837 started_send_message_count_ == calld_->send_messages_.size() &&
838 !started_send_trailing_metadata_ &&
839 !calld_->pending_send_trailing_metadata_) {
840 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
841 gpr_log(GPR_INFO,
842 "chand=%p calld=%p attempt=%p: replaying previously completed "
843 "send_trailing_metadata op",
844 calld_->chand_, calld_, this);
845 }
846 if (replay_batch_data == nullptr) {
847 replay_batch_data = CreateBatch(1, true /* set_on_complete */);
848 }
849 replay_batch_data->AddRetriableSendTrailingMetadataOp();
850 }
851 return replay_batch_data;
852 }
853
854 namespace {
855
StartBatchInCallCombiner(void * arg,grpc_error_handle)856 void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
857 grpc_transport_stream_op_batch* batch =
858 static_cast<grpc_transport_stream_op_batch*>(arg);
859 auto* lb_call = static_cast<ClientChannel::FilterBasedLoadBalancedCall*>(
860 batch->handler_private.extra_arg);
861 // Note: This will release the call combiner.
862 lb_call->StartTransportStreamOpBatch(batch);
863 }
864
865 } // namespace
866
AddClosureForBatch(grpc_transport_stream_op_batch * batch,const char * reason,CallCombinerClosureList * closures)867 void RetryFilter::CallData::CallAttempt::AddClosureForBatch(
868 grpc_transport_stream_op_batch* batch, const char* reason,
869 CallCombinerClosureList* closures) {
870 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
871 gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: adding batch (%s): %s",
872 calld_->chand_, calld_, this, reason,
873 grpc_transport_stream_op_batch_string(batch, false).c_str());
874 }
875 batch->handler_private.extra_arg = lb_call_.get();
876 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
877 batch, grpc_schedule_on_exec_ctx);
878 closures->Add(&batch->handler_private.closure, absl::OkStatus(), reason);
879 }
880
881 void RetryFilter::CallData::CallAttempt::
AddBatchForInternalRecvTrailingMetadata(CallCombinerClosureList * closures)882 AddBatchForInternalRecvTrailingMetadata(CallCombinerClosureList* closures) {
883 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
884 gpr_log(GPR_INFO,
885 "chand=%p calld=%p attempt=%p: call failed but "
886 "recv_trailing_metadata not started; starting it internally",
887 calld_->chand_, calld_, this);
888 }
889 // Create batch_data with 2 refs, since this batch will be unreffed twice:
890 // once for the recv_trailing_metadata_ready callback when the batch
891 // completes, and again when we actually get a recv_trailing_metadata
892 // op from the surface.
893 BatchData* batch_data = CreateBatch(2, false /* set_on_complete */);
894 batch_data->AddRetriableRecvTrailingMetadataOp();
895 recv_trailing_metadata_internal_batch_.reset(batch_data);
896 AddClosureForBatch(batch_data->batch(),
897 "starting internal recv_trailing_metadata", closures);
898 }
899
MaybeAddBatchForCancelOp(grpc_error_handle error,CallCombinerClosureList * closures)900 void RetryFilter::CallData::CallAttempt::MaybeAddBatchForCancelOp(
901 grpc_error_handle error, CallCombinerClosureList* closures) {
902 if (sent_cancel_stream_) {
903 return;
904 }
905 sent_cancel_stream_ = true;
906 BatchData* cancel_batch_data = CreateBatch(1, /*set_on_complete=*/true);
907 cancel_batch_data->AddCancelStreamOp(error);
908 AddClosureForBatch(cancel_batch_data->batch(),
909 "start cancellation batch on call attempt", closures);
910 }
911
AddBatchesForPendingBatches(CallCombinerClosureList * closures)912 void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
913 CallCombinerClosureList* closures) {
914 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld_->pending_batches_); ++i) {
915 PendingBatch* pending = &calld_->pending_batches_[i];
916 grpc_transport_stream_op_batch* batch = pending->batch;
917 if (batch == nullptr) continue;
918 bool has_send_ops = false;
919 // Skip any batch that either (a) has already been started on this
920 // call attempt or (b) we can't start yet because we're still
921 // replaying send ops that need to be completed first.
922 // TODO(roth): Note that if any one op in the batch can't be sent
923 // yet due to ops that we're replaying, we don't start any of the ops
924 // in the batch. This is probably okay, but it could conceivably
925 // lead to increased latency in some cases -- e.g., we could delay
926 // starting a recv op due to it being in the same batch with a send
927 // op. If/when we revamp the callback protocol in
928 // transport_stream_op_batch, we may be able to fix this.
929 if (batch->send_initial_metadata) {
930 if (started_send_initial_metadata_) continue;
931 has_send_ops = true;
932 }
933 if (batch->send_message) {
934 // Cases where we can't start this send_message op:
935 // - We are currently replaying a previous cached send_message op.
936 // - We have already replayed all send_message ops, including this
937 // one. (This can happen if a send_message op is in the same
938 // batch as a recv op, the send_message op has already completed
939 // but the recv op hasn't, and then a subsequent batch with another
940 // recv op is started from the surface.)
941 if (completed_send_message_count_ < started_send_message_count_ ||
942 completed_send_message_count_ ==
943 (calld_->send_messages_.size() + !pending->send_ops_cached)) {
944 continue;
945 }
946 has_send_ops = true;
947 }
948 // Note that we only start send_trailing_metadata if we have no more
949 // send_message ops to start, since we can't send down any more
950 // send_message ops after send_trailing_metadata.
951 if (batch->send_trailing_metadata) {
952 if (started_send_message_count_ + batch->send_message <
953 calld_->send_messages_.size() ||
954 started_send_trailing_metadata_) {
955 continue;
956 }
957 has_send_ops = true;
958 }
959 int num_callbacks = has_send_ops; // All send ops share one callback.
960 if (batch->recv_initial_metadata) {
961 if (started_recv_initial_metadata_) continue;
962 ++num_callbacks;
963 }
964 if (batch->recv_message) {
965 // Skip if the op is already in flight, or if it has already completed
966 // but the completion has not yet been sent to the surface.
967 if (completed_recv_message_count_ < started_recv_message_count_ ||
968 recv_message_ready_deferred_batch_ != nullptr) {
969 continue;
970 }
971 ++num_callbacks;
972 }
973 if (batch->recv_trailing_metadata) {
974 if (started_recv_trailing_metadata_) {
975 seen_recv_trailing_metadata_from_surface_ = true;
976 // If we previously completed a recv_trailing_metadata op
977 // initiated by AddBatchForInternalRecvTrailingMetadata(), use the
978 // result of that instead of trying to re-start this op.
979 if (GPR_UNLIKELY(recv_trailing_metadata_internal_batch_ != nullptr)) {
980 // If the batch completed, then trigger the completion callback
981 // directly, so that we return the previously returned results to
982 // the application. Otherwise, just unref the internally started
983 // batch, since we'll propagate the completion when it completes.
984 if (completed_recv_trailing_metadata_) {
985 closures->Add(
986 &recv_trailing_metadata_ready_, recv_trailing_metadata_error_,
987 "re-executing recv_trailing_metadata_ready to propagate "
988 "internally triggered result");
989 // Ref will be released by callback.
990 recv_trailing_metadata_internal_batch_.release();
991 } else {
992 recv_trailing_metadata_internal_batch_.reset(
993 DEBUG_LOCATION,
994 "internally started recv_trailing_metadata batch pending and "
995 "recv_trailing_metadata started from surface");
996 }
997 recv_trailing_metadata_error_ = absl::OkStatus();
998 }
999 // We don't want the fact that we've already started this op internally
1000 // to prevent us from adding a batch that may contain other ops.
1001 // Instead, we'll just skip adding this op below.
1002 if (num_callbacks == 0) continue;
1003 } else {
1004 ++num_callbacks;
1005 }
1006 }
1007 // If we're already committed and the following conditions are met,
1008 // just send the batch down as-is:
1009 // - The batch contains no cached send ops. (If it does, we need
1010 // the logic below to use the cached payloads.)
1011 // - The batch does not contain recv_trailing_metadata when we have
1012 // already started an internal recv_trailing_metadata batch. (If
1013 // we've already started an internal recv_trailing_metadata batch,
1014 // then we need the logic below to send all ops in the batch
1015 // *except* the recv_trailing_metadata op.)
1016 if (calld_->retry_committed_ && !pending->send_ops_cached &&
1017 (!batch->recv_trailing_metadata || !started_recv_trailing_metadata_)) {
1018 AddClosureForBatch(
1019 batch,
1020 "start non-replayable pending batch on call attempt after commit",
1021 closures);
1022 calld_->PendingBatchClear(pending);
1023 continue;
1024 }
1025 // Create batch with the right number of callbacks.
1026 BatchData* batch_data =
1027 CreateBatch(num_callbacks, has_send_ops /* set_on_complete */);
1028 // Cache send ops if needed.
1029 calld_->MaybeCacheSendOpsForBatch(pending);
1030 // send_initial_metadata.
1031 if (batch->send_initial_metadata) {
1032 batch_data->AddRetriableSendInitialMetadataOp();
1033 }
1034 // send_message.
1035 if (batch->send_message) {
1036 batch_data->AddRetriableSendMessageOp();
1037 }
1038 // send_trailing_metadata.
1039 if (batch->send_trailing_metadata) {
1040 batch_data->AddRetriableSendTrailingMetadataOp();
1041 }
1042 // recv_initial_metadata.
1043 if (batch->recv_initial_metadata) {
1044 batch_data->AddRetriableRecvInitialMetadataOp();
1045 }
1046 // recv_message.
1047 if (batch->recv_message) {
1048 batch_data->AddRetriableRecvMessageOp();
1049 }
1050 // recv_trailing_metadata.
1051 if (batch->recv_trailing_metadata && !started_recv_trailing_metadata_) {
1052 batch_data->AddRetriableRecvTrailingMetadataOp();
1053 }
1054 AddClosureForBatch(batch_data->batch(),
1055 "start replayable pending batch on call attempt",
1056 closures);
1057 }
1058 }
1059
AddRetriableBatches(CallCombinerClosureList * closures)1060 void RetryFilter::CallData::CallAttempt::AddRetriableBatches(
1061 CallCombinerClosureList* closures) {
1062 // Replay previously-returned send_* ops if needed.
1063 BatchData* replay_batch_data = MaybeCreateBatchForReplay();
1064 if (replay_batch_data != nullptr) {
1065 AddClosureForBatch(replay_batch_data->batch(),
1066 "start replay batch on call attempt", closures);
1067 }
1068 // Now add pending batches.
1069 AddBatchesForPendingBatches(closures);
1070 }
1071
StartRetriableBatches()1072 void RetryFilter::CallData::CallAttempt::StartRetriableBatches() {
1073 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1074 gpr_log(GPR_INFO,
1075 "chand=%p calld=%p attempt=%p: constructing retriable batches",
1076 calld_->chand_, calld_, this);
1077 }
1078 // Construct list of closures to execute, one for each pending batch.
1079 CallCombinerClosureList closures;
1080 AddRetriableBatches(&closures);
1081 // Note: This will yield the call combiner.
1082 // Start batches on LB call.
1083 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1084 gpr_log(GPR_INFO,
1085 "chand=%p calld=%p attempt=%p: starting %" PRIuPTR
1086 " retriable batches on lb_call=%p",
1087 calld_->chand_, calld_, this, closures.size(), lb_call_.get());
1088 }
1089 closures.RunClosures(calld_->call_combiner_);
1090 }
1091
CancelFromSurface(grpc_transport_stream_op_batch * cancel_batch)1092 void RetryFilter::CallData::CallAttempt::CancelFromSurface(
1093 grpc_transport_stream_op_batch* cancel_batch) {
1094 MaybeCancelPerAttemptRecvTimer();
1095 Abandon();
1096 // Propagate cancellation to LB call.
1097 lb_call_->StartTransportStreamOpBatch(cancel_batch);
1098 }
1099
ShouldRetry(absl::optional<grpc_status_code> status,absl::optional<Duration> server_pushback)1100 bool RetryFilter::CallData::CallAttempt::ShouldRetry(
1101 absl::optional<grpc_status_code> status,
1102 absl::optional<Duration> server_pushback) {
1103 // If no retry policy, don't retry.
1104 if (calld_->retry_policy_ == nullptr) return false;
1105 // Check status.
1106 if (status.has_value()) {
1107 if (GPR_LIKELY(*status == GRPC_STATUS_OK)) {
1108 if (calld_->retry_throttle_data_ != nullptr) {
1109 calld_->retry_throttle_data_->RecordSuccess();
1110 }
1111 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1112 gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: call succeeded",
1113 calld_->chand_, calld_, this);
1114 }
1115 return false;
1116 }
1117 // Status is not OK. Check whether the status is retryable.
1118 if (!calld_->retry_policy_->retryable_status_codes().Contains(*status)) {
1119 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1120 gpr_log(GPR_INFO,
1121 "chand=%p calld=%p attempt=%p: status %s not configured as "
1122 "retryable",
1123 calld_->chand_, calld_, this,
1124 grpc_status_code_to_string(*status));
1125 }
1126 return false;
1127 }
1128 }
1129 // Record the failure and check whether retries are throttled.
1130 // Note that it's important for this check to come after the status
1131 // code check above, since we should only record failures whose statuses
1132 // match the configured retryable status codes, so that we don't count
1133 // things like failures due to malformed requests (INVALID_ARGUMENT).
1134 // Conversely, it's important for this to come before the remaining
1135 // checks, so that we don't fail to record failures due to other factors.
1136 if (calld_->retry_throttle_data_ != nullptr &&
1137 !calld_->retry_throttle_data_->RecordFailure()) {
1138 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1139 gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: retries throttled",
1140 calld_->chand_, calld_, this);
1141 }
1142 return false;
1143 }
1144 // Check whether the call is committed.
1145 if (calld_->retry_committed_) {
1146 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1147 gpr_log(GPR_INFO,
1148 "chand=%p calld=%p attempt=%p: retries already committed",
1149 calld_->chand_, calld_, this);
1150 }
1151 return false;
1152 }
1153 // Check whether we have retries remaining.
1154 ++calld_->num_attempts_completed_;
1155 if (calld_->num_attempts_completed_ >=
1156 calld_->retry_policy_->max_attempts()) {
1157 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1158 gpr_log(
1159 GPR_INFO, "chand=%p calld=%p attempt=%p: exceeded %d retry attempts",
1160 calld_->chand_, calld_, this, calld_->retry_policy_->max_attempts());
1161 }
1162 return false;
1163 }
1164 // Check server push-back.
1165 if (server_pushback.has_value()) {
1166 if (*server_pushback < Duration::Zero()) {
1167 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1168 gpr_log(GPR_INFO,
1169 "chand=%p calld=%p attempt=%p: not retrying due to server "
1170 "push-back",
1171 calld_->chand_, calld_, this);
1172 }
1173 return false;
1174 } else {
1175 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1176 gpr_log(
1177 GPR_INFO,
1178 "chand=%p calld=%p attempt=%p: server push-back: retry in %" PRIu64
1179 " ms",
1180 calld_->chand_, calld_, this, server_pushback->millis());
1181 }
1182 }
1183 }
1184 // We should retry.
1185 return true;
1186 }
1187
Abandon()1188 void RetryFilter::CallData::CallAttempt::Abandon() {
1189 abandoned_ = true;
1190 // Unref batches for deferred completion callbacks that will now never
1191 // be invoked.
1192 if (started_recv_trailing_metadata_ &&
1193 !seen_recv_trailing_metadata_from_surface_) {
1194 recv_trailing_metadata_internal_batch_.reset(
1195 DEBUG_LOCATION,
1196 "unref internal recv_trailing_metadata_ready batch; attempt abandoned");
1197 }
1198 recv_trailing_metadata_error_ = absl::OkStatus();
1199 recv_initial_metadata_ready_deferred_batch_.reset(
1200 DEBUG_LOCATION,
1201 "unref deferred recv_initial_metadata_ready batch; attempt abandoned");
1202 recv_initial_metadata_error_ = absl::OkStatus();
1203 recv_message_ready_deferred_batch_.reset(
1204 DEBUG_LOCATION,
1205 "unref deferred recv_message_ready batch; attempt abandoned");
1206 recv_message_error_ = absl::OkStatus();
1207 for (auto& on_complete_deferred_batch : on_complete_deferred_batches_) {
1208 on_complete_deferred_batch.batch.reset(
1209 DEBUG_LOCATION, "unref deferred on_complete batch; attempt abandoned");
1210 }
1211 on_complete_deferred_batches_.clear();
1212 }
1213
OnPerAttemptRecvTimer()1214 void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimer() {
1215 GRPC_CLOSURE_INIT(&on_per_attempt_recv_timer_, OnPerAttemptRecvTimerLocked,
1216 this, nullptr);
1217 GRPC_CALL_COMBINER_START(calld_->call_combiner_, &on_per_attempt_recv_timer_,
1218 absl::OkStatus(), "per-attempt timer fired");
1219 }
1220
OnPerAttemptRecvTimerLocked(void * arg,grpc_error_handle error)1221 void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
1222 void* arg, grpc_error_handle error) {
1223 auto* call_attempt = static_cast<CallAttempt*>(arg);
1224 auto* calld = call_attempt->calld_;
1225 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1226 gpr_log(GPR_INFO,
1227 "chand=%p calld=%p attempt=%p: perAttemptRecvTimeout timer fired: "
1228 "error=%s, per_attempt_recv_timer_handle_.has_value()=%d",
1229 calld->chand_, calld, call_attempt, StatusToString(error).c_str(),
1230 call_attempt->per_attempt_recv_timer_handle_.has_value());
1231 }
1232 CallCombinerClosureList closures;
1233 call_attempt->per_attempt_recv_timer_handle_.reset();
1234 // Cancel this attempt.
1235 // TODO(roth): When implementing hedging, we should not cancel the
1236 // current attempt.
1237 call_attempt->MaybeAddBatchForCancelOp(
1238 grpc_error_set_int(
1239 GRPC_ERROR_CREATE("retry perAttemptRecvTimeout exceeded"),
1240 StatusIntProperty::kRpcStatus, GRPC_STATUS_CANCELLED),
1241 &closures);
1242 // Check whether we should retry.
1243 if (call_attempt->ShouldRetry(/*status=*/absl::nullopt,
1244 /*server_pushback_ms=*/absl::nullopt)) {
1245 // Mark current attempt as abandoned.
1246 call_attempt->Abandon();
1247 // We are retrying. Start backoff timer.
1248 calld->StartRetryTimer(/*server_pushback=*/absl::nullopt);
1249 } else {
1250 // Not retrying, so commit the call.
1251 calld->RetryCommit(call_attempt);
1252 // If retry state is no longer needed, switch to fast path for
1253 // subsequent batches.
1254 call_attempt->MaybeSwitchToFastPath();
1255 }
1256 closures.RunClosures(calld->call_combiner_);
1257 call_attempt->Unref(DEBUG_LOCATION, "OnPerAttemptRecvTimer");
1258 GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnPerAttemptRecvTimer");
1259 }
1260
MaybeCancelPerAttemptRecvTimer()1261 void RetryFilter::CallData::CallAttempt::MaybeCancelPerAttemptRecvTimer() {
1262 if (per_attempt_recv_timer_handle_.has_value()) {
1263 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1264 gpr_log(GPR_INFO,
1265 "chand=%p calld=%p attempt=%p: cancelling "
1266 "perAttemptRecvTimeout timer",
1267 calld_->chand_, calld_, this);
1268 }
1269 if (calld_->chand_->event_engine_->Cancel(
1270 *per_attempt_recv_timer_handle_)) {
1271 Unref(DEBUG_LOCATION, "OnPerAttemptRecvTimer");
1272 GRPC_CALL_STACK_UNREF(calld_->owning_call_, "OnPerAttemptRecvTimer");
1273 }
1274 per_attempt_recv_timer_handle_.reset();
1275 }
1276 }
1277
1278 //
1279 // RetryFilter::CallData::CallAttempt::BatchData
1280 //
1281
BatchData(RefCountedPtr<CallAttempt> attempt,int refcount,bool set_on_complete)1282 RetryFilter::CallData::CallAttempt::BatchData::BatchData(
1283 RefCountedPtr<CallAttempt> attempt, int refcount, bool set_on_complete)
1284 : RefCounted(
1285 GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "BatchData" : nullptr,
1286 refcount),
1287 call_attempt_(attempt.release()) {
1288 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1289 gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: creating batch %p",
1290 call_attempt_->calld_->chand_, call_attempt_->calld_, call_attempt_,
1291 this);
1292 }
1293 // We hold a ref to the call stack for every batch sent on a call attempt.
1294 // This is because some batches on the call attempt may not complete
1295 // until after all of the batches are completed at the surface (because
1296 // each batch that is pending at the surface holds a ref). This
1297 // can happen for replayed send ops, and it can happen for
1298 // recv_initial_metadata and recv_message ops on a call attempt that has
1299 // been abandoned.
1300 GRPC_CALL_STACK_REF(call_attempt_->calld_->owning_call_, "Retry BatchData");
1301 batch_.payload = &call_attempt_->batch_payload_;
1302 if (set_on_complete) {
1303 GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this, nullptr);
1304 batch_.on_complete = &on_complete_;
1305 }
1306 }
1307
~BatchData()1308 RetryFilter::CallData::CallAttempt::BatchData::~BatchData() {
1309 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1310 gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: destroying batch %p",
1311 call_attempt_->calld_->chand_, call_attempt_->calld_, call_attempt_,
1312 this);
1313 }
1314 CallAttempt* call_attempt = std::exchange(call_attempt_, nullptr);
1315 grpc_call_stack* owning_call = call_attempt->calld_->owning_call_;
1316 call_attempt->Unref(DEBUG_LOCATION, "~BatchData");
1317 GRPC_CALL_STACK_UNREF(owning_call, "Retry BatchData");
1318 }
1319
1320 void RetryFilter::CallData::CallAttempt::BatchData::
FreeCachedSendOpDataForCompletedBatch()1321 FreeCachedSendOpDataForCompletedBatch() {
1322 auto* calld = call_attempt_->calld_;
1323 // TODO(roth): When we implement hedging, this logic will need to get
1324 // a bit more complex, because there may be other (now abandoned) call
1325 // attempts still using this data. We may need to do some sort of
1326 // ref-counting instead.
1327 if (batch_.send_initial_metadata) {
1328 calld->FreeCachedSendInitialMetadata();
1329 }
1330 if (batch_.send_message) {
1331 calld->FreeCachedSendMessage(call_attempt_->completed_send_message_count_ -
1332 1);
1333 }
1334 if (batch_.send_trailing_metadata) {
1335 calld->FreeCachedSendTrailingMetadata();
1336 }
1337 }
1338
1339 //
1340 // recv_initial_metadata callback handling
1341 //
1342
1343 void RetryFilter::CallData::CallAttempt::BatchData::
MaybeAddClosureForRecvInitialMetadataCallback(grpc_error_handle error,CallCombinerClosureList * closures)1344 MaybeAddClosureForRecvInitialMetadataCallback(
1345 grpc_error_handle error, CallCombinerClosureList* closures) {
1346 // Find pending batch.
1347 PendingBatch* pending = call_attempt_->calld_->PendingBatchFind(
1348 "invoking recv_initial_metadata_ready for",
1349 [](grpc_transport_stream_op_batch* batch) {
1350 return batch->recv_initial_metadata &&
1351 batch->payload->recv_initial_metadata
1352 .recv_initial_metadata_ready != nullptr;
1353 });
1354 if (pending == nullptr) {
1355 return;
1356 }
1357 // Return metadata.
1358 *pending->batch->payload->recv_initial_metadata.recv_initial_metadata =
1359 std::move(call_attempt_->recv_initial_metadata_);
1360 // Propagate trailing_metadata_available.
1361 *pending->batch->payload->recv_initial_metadata.trailing_metadata_available =
1362 call_attempt_->trailing_metadata_available_;
1363 // Update bookkeeping.
1364 // Note: Need to do this before invoking the callback, since invoking
1365 // the callback will result in yielding the call combiner.
1366 grpc_closure* recv_initial_metadata_ready =
1367 pending->batch->payload->recv_initial_metadata
1368 .recv_initial_metadata_ready;
1369 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1370 nullptr;
1371 call_attempt_->calld_->MaybeClearPendingBatch(pending);
1372 // Add callback to closures.
1373 closures->Add(recv_initial_metadata_ready, error,
1374 "recv_initial_metadata_ready for pending batch");
1375 }
1376
RecvInitialMetadataReady(void * arg,grpc_error_handle error)1377 void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
1378 void* arg, grpc_error_handle error) {
1379 RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1380 CallAttempt* call_attempt = batch_data->call_attempt_;
1381 CallData* calld = call_attempt->calld_;
1382 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1383 gpr_log(GPR_INFO,
1384 "chand=%p calld=%p attempt=%p batch_data=%p: "
1385 "got recv_initial_metadata_ready, error=%s",
1386 calld->chand_, calld, call_attempt, batch_data.get(),
1387 StatusToString(error).c_str());
1388 }
1389 call_attempt->completed_recv_initial_metadata_ = true;
1390 // If this attempt has been abandoned, then we're not going to use the
1391 // result of this recv_initial_metadata op, so do nothing.
1392 if (call_attempt->abandoned_) {
1393 GRPC_CALL_COMBINER_STOP(
1394 calld->call_combiner_,
1395 "recv_initial_metadata_ready for abandoned attempt");
1396 return;
1397 }
1398 // Cancel per-attempt recv timer, if any.
1399 call_attempt->MaybeCancelPerAttemptRecvTimer();
1400 // If we're not committed, check the response to see if we need to commit.
1401 if (!calld->retry_committed_) {
1402 // If we got an error or a Trailers-Only response and have not yet gotten
1403 // the recv_trailing_metadata_ready callback, then defer propagating this
1404 // callback back to the surface. We can evaluate whether to retry when
1405 // recv_trailing_metadata comes back.
1406 if (GPR_UNLIKELY(
1407 (call_attempt->trailing_metadata_available_ || !error.ok()) &&
1408 !call_attempt->completed_recv_trailing_metadata_)) {
1409 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1410 gpr_log(GPR_INFO,
1411 "chand=%p calld=%p attempt=%p: deferring "
1412 "recv_initial_metadata_ready (Trailers-Only)",
1413 calld->chand_, calld, call_attempt);
1414 }
1415 call_attempt->recv_initial_metadata_ready_deferred_batch_ =
1416 std::move(batch_data);
1417 call_attempt->recv_initial_metadata_error_ = error;
1418 CallCombinerClosureList closures;
1419 if (!error.ok()) {
1420 call_attempt->MaybeAddBatchForCancelOp(error, &closures);
1421 }
1422 if (!call_attempt->started_recv_trailing_metadata_) {
1423 // recv_trailing_metadata not yet started by application; start it
1424 // ourselves to get status.
1425 call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1426 }
1427 closures.RunClosures(calld->call_combiner_);
1428 return;
1429 }
1430 // Received valid initial metadata, so commit the call.
1431 calld->RetryCommit(call_attempt);
1432 // If retry state is no longer needed, switch to fast path for
1433 // subsequent batches.
1434 call_attempt->MaybeSwitchToFastPath();
1435 }
1436 // Invoke the callback to return the result to the surface.
1437 CallCombinerClosureList closures;
1438 batch_data->MaybeAddClosureForRecvInitialMetadataCallback(error, &closures);
1439 closures.RunClosures(calld->call_combiner_);
1440 }
1441
1442 //
1443 // recv_message callback handling
1444 //
1445
1446 void RetryFilter::CallData::CallAttempt::BatchData::
MaybeAddClosureForRecvMessageCallback(grpc_error_handle error,CallCombinerClosureList * closures)1447 MaybeAddClosureForRecvMessageCallback(grpc_error_handle error,
1448 CallCombinerClosureList* closures) {
1449 // Find pending op.
1450 PendingBatch* pending = call_attempt_->calld_->PendingBatchFind(
1451 "invoking recv_message_ready for",
1452 [](grpc_transport_stream_op_batch* batch) {
1453 return batch->recv_message &&
1454 batch->payload->recv_message.recv_message_ready != nullptr;
1455 });
1456 if (pending == nullptr) {
1457 return;
1458 }
1459 // Return payload.
1460 *pending->batch->payload->recv_message.recv_message =
1461 std::move(call_attempt_->recv_message_);
1462 *pending->batch->payload->recv_message.flags =
1463 call_attempt_->recv_message_flags_;
1464 // Update bookkeeping.
1465 // Note: Need to do this before invoking the callback, since invoking
1466 // the callback will result in yielding the call combiner.
1467 grpc_closure* recv_message_ready =
1468 pending->batch->payload->recv_message.recv_message_ready;
1469 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1470 call_attempt_->calld_->MaybeClearPendingBatch(pending);
1471 // Add callback to closures.
1472 closures->Add(recv_message_ready, error,
1473 "recv_message_ready for pending batch");
1474 }
1475
RecvMessageReady(void * arg,grpc_error_handle error)1476 void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
1477 void* arg, grpc_error_handle error) {
1478 RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1479 CallAttempt* call_attempt = batch_data->call_attempt_;
1480 CallData* calld = call_attempt->calld_;
1481 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1482 gpr_log(GPR_INFO,
1483 "chand=%p calld=%p attempt=%p batch_data=%p: "
1484 "got recv_message_ready, error=%s",
1485 calld->chand_, calld, call_attempt, batch_data.get(),
1486 StatusToString(error).c_str());
1487 }
1488 ++call_attempt->completed_recv_message_count_;
1489 // If this attempt has been abandoned, then we're not going to use the
1490 // result of this recv_message op, so do nothing.
1491 if (call_attempt->abandoned_) {
1492 // The transport will not invoke recv_trailing_metadata_ready until the byte
1493 // stream for any recv_message op is orphaned, so we do that here to ensure
1494 // that any pending recv_trailing_metadata op can complete.
1495 call_attempt->recv_message_.reset();
1496 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1497 "recv_message_ready for abandoned attempt");
1498 return;
1499 }
1500 // Cancel per-attempt recv timer, if any.
1501 call_attempt->MaybeCancelPerAttemptRecvTimer();
1502 // If we're not committed, check the response to see if we need to commit.
1503 if (!calld->retry_committed_) {
1504 // If we got an error or the payload was nullptr and we have not yet gotten
1505 // the recv_trailing_metadata_ready callback, then defer propagating this
1506 // callback back to the surface. We can evaluate whether to retry when
1507 // recv_trailing_metadata comes back.
1508 if (GPR_UNLIKELY(
1509 (!call_attempt->recv_message_.has_value() || !error.ok()) &&
1510 !call_attempt->completed_recv_trailing_metadata_)) {
1511 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1512 gpr_log(GPR_INFO,
1513 "chand=%p calld=%p attempt=%p: deferring recv_message_ready "
1514 "(nullptr message and recv_trailing_metadata pending)",
1515 calld->chand_, calld, call_attempt);
1516 }
1517 call_attempt->recv_message_ready_deferred_batch_ = std::move(batch_data);
1518 call_attempt->recv_message_error_ = error;
1519 CallCombinerClosureList closures;
1520 if (!error.ok()) {
1521 call_attempt->MaybeAddBatchForCancelOp(error, &closures);
1522 }
1523 if (!call_attempt->started_recv_trailing_metadata_) {
1524 // recv_trailing_metadata not yet started by application; start it
1525 // ourselves to get status.
1526 call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1527 }
1528 closures.RunClosures(calld->call_combiner_);
1529 return;
1530 }
1531 // Received a valid message, so commit the call.
1532 calld->RetryCommit(call_attempt);
1533 // If retry state is no longer needed, switch to fast path for
1534 // subsequent batches.
1535 call_attempt->MaybeSwitchToFastPath();
1536 }
1537 // Invoke the callback to return the result to the surface.
1538 CallCombinerClosureList closures;
1539 batch_data->MaybeAddClosureForRecvMessageCallback(error, &closures);
1540 closures.RunClosures(calld->call_combiner_);
1541 }
1542
1543 //
1544 // recv_trailing_metadata handling
1545 //
1546
1547 namespace {
1548
1549 // Sets *status, *server_pushback, and *is_lb_drop based on md_batch
1550 // and error.
GetCallStatus(Timestamp deadline,grpc_metadata_batch * md_batch,grpc_error_handle error,grpc_status_code * status,absl::optional<Duration> * server_pushback,bool * is_lb_drop,absl::optional<GrpcStreamNetworkState::ValueType> * stream_network_state)1551 void GetCallStatus(
1552 Timestamp deadline, grpc_metadata_batch* md_batch, grpc_error_handle error,
1553 grpc_status_code* status, absl::optional<Duration>* server_pushback,
1554 bool* is_lb_drop,
1555 absl::optional<GrpcStreamNetworkState::ValueType>* stream_network_state) {
1556 if (!error.ok()) {
1557 grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
1558 intptr_t value = 0;
1559 if (grpc_error_get_int(error, StatusIntProperty::kLbPolicyDrop, &value) &&
1560 value != 0) {
1561 *is_lb_drop = true;
1562 }
1563 } else {
1564 *status = *md_batch->get(GrpcStatusMetadata());
1565 }
1566 *server_pushback = md_batch->get(GrpcRetryPushbackMsMetadata());
1567 *stream_network_state = md_batch->get(GrpcStreamNetworkState());
1568 }
1569
1570 } // namespace
1571
1572 void RetryFilter::CallData::CallAttempt::BatchData::
MaybeAddClosureForRecvTrailingMetadataReady(grpc_error_handle error,CallCombinerClosureList * closures)1573 MaybeAddClosureForRecvTrailingMetadataReady(
1574 grpc_error_handle error, CallCombinerClosureList* closures) {
1575 auto* calld = call_attempt_->calld_;
1576 // Find pending batch.
1577 PendingBatch* pending = calld->PendingBatchFind(
1578 "invoking recv_trailing_metadata_ready for",
1579 [](grpc_transport_stream_op_batch* batch) {
1580 return batch->recv_trailing_metadata &&
1581 batch->payload->recv_trailing_metadata
1582 .recv_trailing_metadata_ready != nullptr;
1583 });
1584 // If we generated the recv_trailing_metadata op internally via
1585 // AddBatchForInternalRecvTrailingMetadata(), then there will be no
1586 // pending batch.
1587 if (pending == nullptr) {
1588 call_attempt_->recv_trailing_metadata_error_ = error;
1589 return;
1590 }
1591 // Copy transport stats to be delivered up to the surface.
1592 grpc_transport_move_stats(
1593 &call_attempt_->collect_stats_,
1594 pending->batch->payload->recv_trailing_metadata.collect_stats);
1595 // Return metadata.
1596 *pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata =
1597 std::move(call_attempt_->recv_trailing_metadata_);
1598 // Add closure.
1599 closures->Add(pending->batch->payload->recv_trailing_metadata
1600 .recv_trailing_metadata_ready,
1601 error, "recv_trailing_metadata_ready for pending batch");
1602 // Update bookkeeping.
1603 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1604 nullptr;
1605 calld->MaybeClearPendingBatch(pending);
1606 }
1607
1608 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForDeferredCompletionCallbacks(CallCombinerClosureList * closures)1609 AddClosuresForDeferredCompletionCallbacks(
1610 CallCombinerClosureList* closures) {
1611 // Add closure for deferred recv_initial_metadata_ready.
1612 if (GPR_UNLIKELY(call_attempt_->recv_initial_metadata_ready_deferred_batch_ !=
1613 nullptr)) {
1614 MaybeAddClosureForRecvInitialMetadataCallback(
1615 call_attempt_->recv_initial_metadata_error_, closures);
1616 call_attempt_->recv_initial_metadata_ready_deferred_batch_.reset(
1617 DEBUG_LOCATION, "resuming deferred recv_initial_metadata_ready");
1618 call_attempt_->recv_initial_metadata_error_ = absl::OkStatus();
1619 }
1620 // Add closure for deferred recv_message_ready.
1621 if (GPR_UNLIKELY(call_attempt_->recv_message_ready_deferred_batch_ !=
1622 nullptr)) {
1623 MaybeAddClosureForRecvMessageCallback(call_attempt_->recv_message_error_,
1624 closures);
1625 call_attempt_->recv_message_ready_deferred_batch_.reset(
1626 DEBUG_LOCATION, "resuming deferred recv_message_ready");
1627 call_attempt_->recv_message_error_ = absl::OkStatus();
1628 }
1629 // Add closures for deferred on_complete callbacks.
1630 for (auto& on_complete_deferred_batch :
1631 call_attempt_->on_complete_deferred_batches_) {
1632 closures->Add(&on_complete_deferred_batch.batch->on_complete_,
1633 on_complete_deferred_batch.error, "resuming on_complete");
1634 on_complete_deferred_batch.batch.release();
1635 }
1636 call_attempt_->on_complete_deferred_batches_.clear();
1637 }
1638
1639 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresToFailUnstartedPendingBatches(grpc_error_handle error,CallCombinerClosureList * closures)1640 AddClosuresToFailUnstartedPendingBatches(
1641 grpc_error_handle error, CallCombinerClosureList* closures) {
1642 auto* calld = call_attempt_->calld_;
1643 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
1644 PendingBatch* pending = &calld->pending_batches_[i];
1645 if (pending->batch == nullptr) continue;
1646 if (call_attempt_->PendingBatchContainsUnstartedSendOps(pending)) {
1647 closures->Add(pending->batch->on_complete, error,
1648 "failing on_complete for pending batch");
1649 pending->batch->on_complete = nullptr;
1650 calld->MaybeClearPendingBatch(pending);
1651 }
1652 }
1653 }
1654
RunClosuresForCompletedCall(grpc_error_handle error)1655 void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall(
1656 grpc_error_handle error) {
1657 // Construct list of closures to execute.
1658 CallCombinerClosureList closures;
1659 // First, add closure for recv_trailing_metadata_ready.
1660 MaybeAddClosureForRecvTrailingMetadataReady(error, &closures);
1661 // If there are deferred batch completion callbacks, add them to closures.
1662 AddClosuresForDeferredCompletionCallbacks(&closures);
1663 // Add closures to fail any pending batches that have not yet been started.
1664 AddClosuresToFailUnstartedPendingBatches(error, &closures);
1665 // Schedule all of the closures identified above.
1666 // Note: This will release the call combiner.
1667 closures.RunClosures(call_attempt_->calld_->call_combiner_);
1668 }
1669
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)1670 void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
1671 void* arg, grpc_error_handle error) {
1672 RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1673 CallAttempt* call_attempt = batch_data->call_attempt_;
1674 CallData* calld = call_attempt->calld_;
1675 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1676 gpr_log(GPR_INFO,
1677 "chand=%p calld=%p attempt=%p batch_data=%p: "
1678 "got recv_trailing_metadata_ready, error=%s",
1679 calld->chand_, calld, call_attempt, batch_data.get(),
1680 StatusToString(error).c_str());
1681 }
1682 call_attempt->completed_recv_trailing_metadata_ = true;
1683 // If this attempt has been abandoned, then we're not going to use the
1684 // result of this recv_trailing_metadata op, so do nothing.
1685 if (call_attempt->abandoned_) {
1686 GRPC_CALL_COMBINER_STOP(
1687 calld->call_combiner_,
1688 "recv_trailing_metadata_ready for abandoned attempt");
1689 return;
1690 }
1691 // Cancel per-attempt recv timer, if any.
1692 call_attempt->MaybeCancelPerAttemptRecvTimer();
1693 // Get the call's status and check for server pushback metadata.
1694 grpc_status_code status = GRPC_STATUS_OK;
1695 absl::optional<Duration> server_pushback;
1696 bool is_lb_drop = false;
1697 absl::optional<GrpcStreamNetworkState::ValueType> stream_network_state;
1698 grpc_metadata_batch* md_batch =
1699 batch_data->batch_.payload->recv_trailing_metadata.recv_trailing_metadata;
1700 GetCallStatus(calld->deadline_, md_batch, error, &status, &server_pushback,
1701 &is_lb_drop, &stream_network_state);
1702 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1703 gpr_log(GPR_INFO,
1704 "chand=%p calld=%p attempt=%p: call finished, status=%s "
1705 "server_pushback=%s is_lb_drop=%d stream_network_state=%s",
1706 calld->chand_, calld, call_attempt,
1707 grpc_status_code_to_string(status),
1708 server_pushback.has_value() ? server_pushback->ToString().c_str()
1709 : "N/A",
1710 is_lb_drop,
1711 stream_network_state.has_value()
1712 ? absl::StrCat(*stream_network_state).c_str()
1713 : "N/A");
1714 }
1715 // Check if we should retry.
1716 if (!is_lb_drop) { // Never retry on LB drops.
1717 enum { kNoRetry, kTransparentRetry, kConfigurableRetry } retry = kNoRetry;
1718 // Handle transparent retries.
1719 if (stream_network_state.has_value() && !calld->retry_committed_) {
1720 // If not sent on wire, then always retry.
1721 // If sent on wire but not seen by server, retry exactly once.
1722 if (*stream_network_state == GrpcStreamNetworkState::kNotSentOnWire) {
1723 retry = kTransparentRetry;
1724 } else if (*stream_network_state ==
1725 GrpcStreamNetworkState::kNotSeenByServer &&
1726 !calld->sent_transparent_retry_not_seen_by_server_) {
1727 calld->sent_transparent_retry_not_seen_by_server_ = true;
1728 retry = kTransparentRetry;
1729 }
1730 }
1731 // If not transparently retrying, check for configurable retry.
1732 if (retry == kNoRetry &&
1733 call_attempt->ShouldRetry(status, server_pushback)) {
1734 retry = kConfigurableRetry;
1735 }
1736 // If we're retrying, do so.
1737 if (retry != kNoRetry) {
1738 CallCombinerClosureList closures;
1739 // Cancel call attempt.
1740 call_attempt->MaybeAddBatchForCancelOp(
1741 error.ok() ? grpc_error_set_int(
1742 GRPC_ERROR_CREATE("call attempt failed"),
1743 StatusIntProperty::kRpcStatus, GRPC_STATUS_CANCELLED)
1744 : error,
1745 &closures);
1746 // For transparent retries, add a closure to immediately start a new
1747 // call attempt.
1748 // For configurable retries, start retry timer.
1749 if (retry == kTransparentRetry) {
1750 calld->AddClosureToStartTransparentRetry(&closures);
1751 } else {
1752 calld->StartRetryTimer(server_pushback);
1753 }
1754 // Record that this attempt has been abandoned.
1755 call_attempt->Abandon();
1756 // Yields call combiner.
1757 closures.RunClosures(calld->call_combiner_);
1758 return;
1759 }
1760 }
1761 // Not retrying, so commit the call.
1762 calld->RetryCommit(call_attempt);
1763 // If retry state is no longer needed, switch to fast path for
1764 // subsequent batches.
1765 call_attempt->MaybeSwitchToFastPath();
1766 // Run any necessary closures.
1767 batch_data->RunClosuresForCompletedCall(error);
1768 }
1769
1770 //
1771 // on_complete callback handling
1772 //
1773
1774 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForCompletedPendingBatch(grpc_error_handle error,CallCombinerClosureList * closures)1775 AddClosuresForCompletedPendingBatch(grpc_error_handle error,
1776 CallCombinerClosureList* closures) {
1777 auto* calld = call_attempt_->calld_;
1778 PendingBatch* pending = calld->PendingBatchFind(
1779 "completed", [this](grpc_transport_stream_op_batch* batch) {
1780 // Match the pending batch with the same set of send ops as the
1781 // batch we've just completed.
1782 return batch->on_complete != nullptr &&
1783 batch_.send_initial_metadata == batch->send_initial_metadata &&
1784 batch_.send_message == batch->send_message &&
1785 batch_.send_trailing_metadata == batch->send_trailing_metadata;
1786 });
1787 // If batch_data is a replay batch, then there will be no pending
1788 // batch to complete.
1789 if (pending == nullptr) {
1790 return;
1791 }
1792 // Propagate payload.
1793 if (batch_.send_message) {
1794 pending->batch->payload->send_message.stream_write_closed =
1795 batch_.payload->send_message.stream_write_closed;
1796 }
1797 // Add closure.
1798 closures->Add(pending->batch->on_complete, error,
1799 "on_complete for pending batch");
1800 pending->batch->on_complete = nullptr;
1801 calld->MaybeClearPendingBatch(pending);
1802 }
1803
1804 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList * closures)1805 AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) {
1806 auto* calld = call_attempt_->calld_;
1807 bool have_pending_send_ops = call_attempt_->HaveSendOpsToReplay();
1808 // We don't check send_initial_metadata here, because that op will always
1809 // be started as soon as it is received from the surface, so it will
1810 // never need to be started at this point.
1811 if (!have_pending_send_ops) {
1812 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
1813 PendingBatch* pending = &calld->pending_batches_[i];
1814 grpc_transport_stream_op_batch* batch = pending->batch;
1815 if (batch == nullptr || pending->send_ops_cached) continue;
1816 if (batch->send_message || batch->send_trailing_metadata) {
1817 have_pending_send_ops = true;
1818 break;
1819 }
1820 }
1821 }
1822 if (have_pending_send_ops) {
1823 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1824 gpr_log(GPR_INFO,
1825 "chand=%p calld=%p attempt=%p: starting next batch for pending "
1826 "send op(s)",
1827 calld->chand_, calld, call_attempt_);
1828 }
1829 call_attempt_->AddRetriableBatches(closures);
1830 }
1831 }
1832
OnComplete(void * arg,grpc_error_handle error)1833 void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
1834 void* arg, grpc_error_handle error) {
1835 RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1836 CallAttempt* call_attempt = batch_data->call_attempt_;
1837 CallData* calld = call_attempt->calld_;
1838 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1839 gpr_log(GPR_INFO,
1840 "chand=%p calld=%p attempt=%p batch_data=%p: "
1841 "got on_complete, error=%s, batch=%s",
1842 calld->chand_, calld, call_attempt, batch_data.get(),
1843 StatusToString(error).c_str(),
1844 grpc_transport_stream_op_batch_string(&batch_data->batch_, false)
1845 .c_str());
1846 }
1847 // If this attempt has been abandoned, then we're not going to propagate
1848 // the completion of this batch, so do nothing.
1849 if (call_attempt->abandoned_) {
1850 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1851 "on_complete for abandoned attempt");
1852 return;
1853 }
1854 // If we got an error and have not yet gotten the
1855 // recv_trailing_metadata_ready callback, then defer propagating this
1856 // callback back to the surface. We can evaluate whether to retry when
1857 // recv_trailing_metadata comes back.
1858 if (GPR_UNLIKELY(!calld->retry_committed_ && !error.ok() &&
1859 !call_attempt->completed_recv_trailing_metadata_)) {
1860 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1861 gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: deferring on_complete",
1862 calld->chand_, calld, call_attempt);
1863 }
1864 call_attempt->on_complete_deferred_batches_.emplace_back(
1865 std::move(batch_data), error);
1866 CallCombinerClosureList closures;
1867 call_attempt->MaybeAddBatchForCancelOp(error, &closures);
1868 if (!call_attempt->started_recv_trailing_metadata_) {
1869 // recv_trailing_metadata not yet started by application; start it
1870 // ourselves to get status.
1871 call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1872 }
1873 closures.RunClosures(calld->call_combiner_);
1874 return;
1875 }
1876 // Update bookkeeping in call_attempt.
1877 if (batch_data->batch_.send_initial_metadata) {
1878 call_attempt->completed_send_initial_metadata_ = true;
1879 }
1880 if (batch_data->batch_.send_message) {
1881 ++call_attempt->completed_send_message_count_;
1882 }
1883 if (batch_data->batch_.send_trailing_metadata) {
1884 call_attempt->completed_send_trailing_metadata_ = true;
1885 }
1886 // If the call is committed, free cached data for send ops that we've just
1887 // completed.
1888 if (calld->retry_committed_) {
1889 batch_data->FreeCachedSendOpDataForCompletedBatch();
1890 }
1891 // Construct list of closures to execute.
1892 CallCombinerClosureList closures;
1893 // Add closure for the completed pending batch, if any.
1894 batch_data->AddClosuresForCompletedPendingBatch(error, &closures);
1895 // If needed, add a callback to start any replay or pending send ops on
1896 // the LB call.
1897 if (!call_attempt->completed_recv_trailing_metadata_) {
1898 batch_data->AddClosuresForReplayOrPendingSendOps(&closures);
1899 }
1900 // If retry state is no longer needed (i.e., we're committed and there
1901 // are no more send ops to replay), switch to fast path for subsequent
1902 // batches.
1903 call_attempt->MaybeSwitchToFastPath();
1904 // Schedule all of the closures identified above.
1905 // Note: This yields the call combiner.
1906 closures.RunClosures(calld->call_combiner_);
1907 }
1908
OnCompleteForCancelOp(void * arg,grpc_error_handle error)1909 void RetryFilter::CallData::CallAttempt::BatchData::OnCompleteForCancelOp(
1910 void* arg, grpc_error_handle error) {
1911 RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1912 CallAttempt* call_attempt = batch_data->call_attempt_;
1913 CallData* calld = call_attempt->calld_;
1914 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1915 gpr_log(GPR_INFO,
1916 "chand=%p calld=%p attempt=%p batch_data=%p: "
1917 "got on_complete for cancel_stream batch, error=%s, batch=%s",
1918 calld->chand_, calld, call_attempt, batch_data.get(),
1919 StatusToString(error).c_str(),
1920 grpc_transport_stream_op_batch_string(&batch_data->batch_, false)
1921 .c_str());
1922 }
1923 GRPC_CALL_COMBINER_STOP(
1924 calld->call_combiner_,
1925 "on_complete for internally generated cancel_stream op");
1926 }
1927
1928 //
1929 // retriable batch construction
1930 //
1931
1932 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendInitialMetadataOp()1933 AddRetriableSendInitialMetadataOp() {
1934 auto* calld = call_attempt_->calld_;
1935 // We need to make a copy of the metadata batch for each attempt, since
1936 // the filters in the subchannel stack may modify this batch, and we don't
1937 // want those modifications to be passed forward to subsequent attempts.
1938 //
1939 // If we've already completed one or more attempts, add the
1940 // grpc-retry-attempts header.
1941 call_attempt_->send_initial_metadata_ = calld->send_initial_metadata_.Copy();
1942 if (GPR_UNLIKELY(calld->num_attempts_completed_ > 0)) {
1943 call_attempt_->send_initial_metadata_.Set(GrpcPreviousRpcAttemptsMetadata(),
1944 calld->num_attempts_completed_);
1945 } else {
1946 call_attempt_->send_initial_metadata_.Remove(
1947 GrpcPreviousRpcAttemptsMetadata());
1948 }
1949 call_attempt_->started_send_initial_metadata_ = true;
1950 batch_.send_initial_metadata = true;
1951 batch_.payload->send_initial_metadata.send_initial_metadata =
1952 &call_attempt_->send_initial_metadata_;
1953 }
1954
1955 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendMessageOp()1956 AddRetriableSendMessageOp() {
1957 auto* calld = call_attempt_->calld_;
1958 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1959 gpr_log(
1960 GPR_INFO,
1961 "chand=%p calld=%p attempt=%p: starting calld->send_messages[%" PRIuPTR
1962 "]",
1963 calld->chand_, calld, call_attempt_,
1964 call_attempt_->started_send_message_count_);
1965 }
1966 CachedSendMessage cache =
1967 calld->send_messages_[call_attempt_->started_send_message_count_];
1968 ++call_attempt_->started_send_message_count_;
1969 batch_.send_message = true;
1970 batch_.payload->send_message.send_message = cache.slices;
1971 batch_.payload->send_message.flags = cache.flags;
1972 }
1973
1974 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendTrailingMetadataOp()1975 AddRetriableSendTrailingMetadataOp() {
1976 auto* calld = call_attempt_->calld_;
1977 // We need to make a copy of the metadata batch for each attempt, since
1978 // the filters in the subchannel stack may modify this batch, and we don't
1979 // want those modifications to be passed forward to subsequent attempts.
1980 call_attempt_->send_trailing_metadata_ =
1981 calld->send_trailing_metadata_.Copy();
1982 call_attempt_->started_send_trailing_metadata_ = true;
1983 batch_.send_trailing_metadata = true;
1984 batch_.payload->send_trailing_metadata.send_trailing_metadata =
1985 &call_attempt_->send_trailing_metadata_;
1986 }
1987
1988 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvInitialMetadataOp()1989 AddRetriableRecvInitialMetadataOp() {
1990 call_attempt_->started_recv_initial_metadata_ = true;
1991 batch_.recv_initial_metadata = true;
1992 call_attempt_->recv_initial_metadata_.Clear();
1993 batch_.payload->recv_initial_metadata.recv_initial_metadata =
1994 &call_attempt_->recv_initial_metadata_;
1995 batch_.payload->recv_initial_metadata.trailing_metadata_available =
1996 &call_attempt_->trailing_metadata_available_;
1997 GRPC_CLOSURE_INIT(&call_attempt_->recv_initial_metadata_ready_,
1998 RecvInitialMetadataReady, this, grpc_schedule_on_exec_ctx);
1999 batch_.payload->recv_initial_metadata.recv_initial_metadata_ready =
2000 &call_attempt_->recv_initial_metadata_ready_;
2001 }
2002
2003 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvMessageOp()2004 AddRetriableRecvMessageOp() {
2005 ++call_attempt_->started_recv_message_count_;
2006 batch_.recv_message = true;
2007 batch_.payload->recv_message.recv_message = &call_attempt_->recv_message_;
2008 batch_.payload->recv_message.flags = &call_attempt_->recv_message_flags_;
2009 batch_.payload->recv_message.call_failed_before_recv_message = nullptr;
2010 GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_, RecvMessageReady, this,
2011 grpc_schedule_on_exec_ctx);
2012 batch_.payload->recv_message.recv_message_ready =
2013 &call_attempt_->recv_message_ready_;
2014 }
2015
2016 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvTrailingMetadataOp()2017 AddRetriableRecvTrailingMetadataOp() {
2018 call_attempt_->started_recv_trailing_metadata_ = true;
2019 batch_.recv_trailing_metadata = true;
2020 call_attempt_->recv_trailing_metadata_.Clear();
2021 batch_.payload->recv_trailing_metadata.recv_trailing_metadata =
2022 &call_attempt_->recv_trailing_metadata_;
2023 batch_.payload->recv_trailing_metadata.collect_stats =
2024 &call_attempt_->collect_stats_;
2025 GRPC_CLOSURE_INIT(&call_attempt_->recv_trailing_metadata_ready_,
2026 RecvTrailingMetadataReady, this, grpc_schedule_on_exec_ctx);
2027 batch_.payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2028 &call_attempt_->recv_trailing_metadata_ready_;
2029 }
2030
AddCancelStreamOp(grpc_error_handle error)2031 void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp(
2032 grpc_error_handle error) {
2033 batch_.cancel_stream = true;
2034 batch_.payload->cancel_stream.cancel_error = error;
2035 // Override on_complete callback.
2036 GRPC_CLOSURE_INIT(&on_complete_, OnCompleteForCancelOp, this, nullptr);
2037 }
2038
2039 //
2040 // CallData vtable functions
2041 //
2042
Init(grpc_call_element * elem,const grpc_call_element_args * args)2043 grpc_error_handle RetryFilter::CallData::Init(
2044 grpc_call_element* elem, const grpc_call_element_args* args) {
2045 auto* chand = static_cast<RetryFilter*>(elem->channel_data);
2046 new (elem->call_data) CallData(chand, *args);
2047 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2048 gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand,
2049 elem->call_data);
2050 }
2051 return absl::OkStatus();
2052 }
2053
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)2054 void RetryFilter::CallData::Destroy(grpc_call_element* elem,
2055 const grpc_call_final_info* /*final_info*/,
2056 grpc_closure* then_schedule_closure) {
2057 auto* calld = static_cast<CallData*>(elem->call_data);
2058 // Save our ref to the CallStackDestructionBarrier until after our
2059 // dtor is invoked.
2060 RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier =
2061 std::move(calld->call_stack_destruction_barrier_);
2062 calld->~CallData();
2063 // Now set the callback in the CallStackDestructionBarrier object,
2064 // right before we release our ref to it (implicitly upon returning).
2065 // The callback will be invoked when the CallStackDestructionBarrier
2066 // is destroyed.
2067 call_stack_destruction_barrier->set_on_call_stack_destruction(
2068 then_schedule_closure);
2069 }
2070
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2071 void RetryFilter::CallData::StartTransportStreamOpBatch(
2072 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2073 auto* calld = static_cast<CallData*>(elem->call_data);
2074 calld->StartTransportStreamOpBatch(batch);
2075 }
2076
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2077 void RetryFilter::CallData::SetPollent(grpc_call_element* elem,
2078 grpc_polling_entity* pollent) {
2079 auto* calld = static_cast<CallData*>(elem->call_data);
2080 calld->pollent_ = pollent;
2081 }
2082
2083 //
2084 // CallData implementation
2085 //
2086
GetRetryPolicy(const grpc_call_context_element * context)2087 const RetryMethodConfig* RetryFilter::GetRetryPolicy(
2088 const grpc_call_context_element* context) {
2089 if (context == nullptr) return nullptr;
2090 auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
2091 context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2092 if (svc_cfg_call_data == nullptr) return nullptr;
2093 return static_cast<const RetryMethodConfig*>(
2094 svc_cfg_call_data->GetMethodParsedConfig(service_config_parser_index_));
2095 }
2096
CallData(RetryFilter * chand,const grpc_call_element_args & args)2097 RetryFilter::CallData::CallData(RetryFilter* chand,
2098 const grpc_call_element_args& args)
2099 : chand_(chand),
2100 retry_throttle_data_(chand->retry_throttle_data_),
2101 retry_policy_(chand->GetRetryPolicy(args.context)),
2102 retry_backoff_(
2103 BackOff::Options()
2104 .set_initial_backoff(retry_policy_ == nullptr
2105 ? Duration::Zero()
2106 : retry_policy_->initial_backoff())
2107 .set_multiplier(retry_policy_ == nullptr
2108 ? 0
2109 : retry_policy_->backoff_multiplier())
2110 .set_jitter(RETRY_BACKOFF_JITTER)
2111 .set_max_backoff(retry_policy_ == nullptr
2112 ? Duration::Zero()
2113 : retry_policy_->max_backoff())),
2114 path_(CSliceRef(args.path)),
2115 deadline_(args.deadline),
2116 arena_(args.arena),
2117 owning_call_(args.call_stack),
2118 call_combiner_(args.call_combiner),
2119 call_context_(args.context),
2120 call_stack_destruction_barrier_(
2121 arena_->New<CallStackDestructionBarrier>()),
2122 pending_send_initial_metadata_(false),
2123 pending_send_message_(false),
2124 pending_send_trailing_metadata_(false),
2125 retry_committed_(false),
2126 retry_codepath_started_(false),
2127 sent_transparent_retry_not_seen_by_server_(false) {}
2128
~CallData()2129 RetryFilter::CallData::~CallData() {
2130 FreeAllCachedSendOpData();
2131 CSliceUnref(path_);
2132 // Make sure there are no remaining pending batches.
2133 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2134 GPR_ASSERT(pending_batches_[i].batch == nullptr);
2135 }
2136 }
2137
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)2138 void RetryFilter::CallData::StartTransportStreamOpBatch(
2139 grpc_transport_stream_op_batch* batch) {
2140 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) &&
2141 !GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
2142 gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from surface: %s",
2143 chand_, this,
2144 grpc_transport_stream_op_batch_string(batch, false).c_str());
2145 }
2146 // If we have an LB call, delegate to the LB call.
2147 if (committed_call_ != nullptr) {
2148 // Note: This will release the call combiner.
2149 committed_call_->StartTransportStreamOpBatch(batch);
2150 return;
2151 }
2152 // If we were previously cancelled from the surface, fail this
2153 // batch immediately.
2154 if (!cancelled_from_surface_.ok()) {
2155 // Note: This will release the call combiner.
2156 grpc_transport_stream_op_batch_finish_with_failure(
2157 batch, cancelled_from_surface_, call_combiner_);
2158 return;
2159 }
2160 // Handle cancellation.
2161 if (GPR_UNLIKELY(batch->cancel_stream)) {
2162 // Save cancel_error in case subsequent batches are started.
2163 cancelled_from_surface_ = batch->payload->cancel_stream.cancel_error;
2164 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2165 gpr_log(GPR_INFO, "chand=%p calld=%p: cancelled from surface: %s", chand_,
2166 this, StatusToString(cancelled_from_surface_).c_str());
2167 }
2168 // Fail any pending batches.
2169 PendingBatchesFail(cancelled_from_surface_);
2170 // If we have a current call attempt, commit the call, then send
2171 // the cancellation down to that attempt. When the call fails, it
2172 // will not be retried, because we have committed it here.
2173 if (call_attempt_ != nullptr) {
2174 RetryCommit(call_attempt_.get());
2175 // TODO(roth): When implementing hedging, this will get more
2176 // complex, because instead of just passing the batch down to a
2177 // single call attempt, we'll need to cancel multiple call
2178 // attempts and wait for the cancellation on_complete from each call
2179 // attempt before we propagate the on_complete from this batch
2180 // back to the surface.
2181 // Note: This will release the call combiner.
2182 call_attempt_->CancelFromSurface(batch);
2183 return;
2184 }
2185 // Cancel retry timer if needed.
2186 if (retry_timer_handle_.has_value()) {
2187 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2188 gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling retry timer", chand_,
2189 this);
2190 }
2191 if (chand_->event_engine_->Cancel(*retry_timer_handle_)) {
2192 GRPC_CALL_STACK_UNREF(owning_call_, "OnRetryTimer");
2193 }
2194 retry_timer_handle_.reset();
2195 FreeAllCachedSendOpData();
2196 }
2197 // We have no call attempt, so there's nowhere to send the cancellation
2198 // batch. Return it back to the surface immediately.
2199 // Note: This will release the call combiner.
2200 grpc_transport_stream_op_batch_finish_with_failure(
2201 batch, cancelled_from_surface_, call_combiner_);
2202 return;
2203 }
2204 // Add the batch to the pending list.
2205 PendingBatch* pending = PendingBatchesAdd(batch);
2206 // If the timer is pending, yield the call combiner and wait for it to
2207 // run, since we don't want to start another call attempt until it does.
2208 if (retry_timer_handle_.has_value()) {
2209 GRPC_CALL_COMBINER_STOP(call_combiner_,
2210 "added pending batch while retry timer pending");
2211 return;
2212 }
2213 // If we do not yet have a call attempt, create one.
2214 if (call_attempt_ == nullptr) {
2215 // If this is the first batch and retries are already committed
2216 // (e.g., if this batch put the call above the buffer size limit), then
2217 // immediately create an LB call and delegate the batch to it. This
2218 // avoids the overhead of unnecessarily allocating a CallAttempt
2219 // object or caching any of the send op data.
2220 // Note that we would ideally like to do this also on subsequent
2221 // attempts (e.g., if a batch puts the call above the buffer size
2222 // limit since the last attempt was complete), but in practice that's
2223 // not really worthwhile, because we will almost always have cached and
2224 // completed at least the send_initial_metadata op on the previous
2225 // attempt, which means that we'd need special logic to replay the
2226 // batch anyway, which is exactly what the CallAttempt object provides.
2227 // We also skip this optimization if perAttemptRecvTimeout is set in the
2228 // retry policy, because we need the code in CallAttempt to handle
2229 // the associated timer.
2230 if (!retry_codepath_started_ && retry_committed_ &&
2231 (retry_policy_ == nullptr ||
2232 !retry_policy_->per_attempt_recv_timeout().has_value())) {
2233 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2234 gpr_log(GPR_INFO,
2235 "chand=%p calld=%p: retry committed before first attempt; "
2236 "creating LB call",
2237 chand_, this);
2238 }
2239 PendingBatchClear(pending);
2240 auto* service_config_call_data =
2241 static_cast<ClientChannelServiceConfigCallData*>(
2242 call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2243 committed_call_ = CreateLoadBalancedCall(
2244 [service_config_call_data]() { service_config_call_data->Commit(); },
2245 /*is_transparent_retry=*/false);
2246 committed_call_->StartTransportStreamOpBatch(batch);
2247 return;
2248 }
2249 // Otherwise, create a call attempt.
2250 // The attempt will automatically start any necessary replays or
2251 // pending batches.
2252 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2253 gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_,
2254 this);
2255 }
2256 retry_codepath_started_ = true;
2257 CreateCallAttempt(/*is_transparent_retry=*/false);
2258 return;
2259 }
2260 // Send batches to call attempt.
2261 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2262 gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on attempt=%p", chand_,
2263 this, call_attempt_.get());
2264 }
2265 call_attempt_->StartRetriableBatches();
2266 }
2267
2268 OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2269 RetryFilter::CallData::CreateLoadBalancedCall(
2270 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) {
2271 grpc_call_element_args args = {owning_call_, nullptr, call_context_,
2272 path_, /*start_time=*/0, deadline_,
2273 arena_, call_combiner_};
2274 return chand_->client_channel_->CreateLoadBalancedCall(
2275 args, pollent_,
2276 // This callback holds a ref to the CallStackDestructionBarrier
2277 // object until the LB call is destroyed.
2278 call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this),
2279 std::move(on_commit), is_transparent_retry);
2280 }
2281
CreateCallAttempt(bool is_transparent_retry)2282 void RetryFilter::CallData::CreateCallAttempt(bool is_transparent_retry) {
2283 call_attempt_ = MakeRefCounted<CallAttempt>(this, is_transparent_retry);
2284 call_attempt_->StartRetriableBatches();
2285 }
2286
2287 //
2288 // send op data caching
2289 //
2290
MaybeCacheSendOpsForBatch(PendingBatch * pending)2291 void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
2292 if (pending->send_ops_cached) return;
2293 pending->send_ops_cached = true;
2294 grpc_transport_stream_op_batch* batch = pending->batch;
2295 // Save a copy of metadata for send_initial_metadata ops.
2296 if (batch->send_initial_metadata) {
2297 seen_send_initial_metadata_ = true;
2298 grpc_metadata_batch* send_initial_metadata =
2299 batch->payload->send_initial_metadata.send_initial_metadata;
2300 send_initial_metadata_ = send_initial_metadata->Copy();
2301 }
2302 // Set up cache for send_message ops.
2303 if (batch->send_message) {
2304 SliceBuffer* cache = arena_->New<SliceBuffer>(std::move(
2305 *std::exchange(batch->payload->send_message.send_message, nullptr)));
2306 send_messages_.push_back({cache, batch->payload->send_message.flags});
2307 }
2308 // Save metadata batch for send_trailing_metadata ops.
2309 if (batch->send_trailing_metadata) {
2310 seen_send_trailing_metadata_ = true;
2311 grpc_metadata_batch* send_trailing_metadata =
2312 batch->payload->send_trailing_metadata.send_trailing_metadata;
2313 send_trailing_metadata_ = send_trailing_metadata->Copy();
2314 }
2315 }
2316
FreeCachedSendInitialMetadata()2317 void RetryFilter::CallData::FreeCachedSendInitialMetadata() {
2318 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2319 gpr_log(GPR_INFO, "chand=%p calld=%p: destroying send_initial_metadata",
2320 chand_, this);
2321 }
2322 send_initial_metadata_.Clear();
2323 }
2324
FreeCachedSendMessage(size_t idx)2325 void RetryFilter::CallData::FreeCachedSendMessage(size_t idx) {
2326 if (send_messages_[idx].slices != nullptr) {
2327 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2328 gpr_log(GPR_INFO,
2329 "chand=%p calld=%p: destroying send_messages[%" PRIuPTR "]",
2330 chand_, this, idx);
2331 }
2332 Destruct(std::exchange(send_messages_[idx].slices, nullptr));
2333 }
2334 }
2335
FreeCachedSendTrailingMetadata()2336 void RetryFilter::CallData::FreeCachedSendTrailingMetadata() {
2337 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2338 gpr_log(GPR_INFO, "chand=%p calld=%p: destroying send_trailing_metadata",
2339 chand_, this);
2340 }
2341 send_trailing_metadata_.Clear();
2342 }
2343
FreeAllCachedSendOpData()2344 void RetryFilter::CallData::FreeAllCachedSendOpData() {
2345 if (seen_send_initial_metadata_) {
2346 FreeCachedSendInitialMetadata();
2347 }
2348 for (size_t i = 0; i < send_messages_.size(); ++i) {
2349 FreeCachedSendMessage(i);
2350 }
2351 if (seen_send_trailing_metadata_) {
2352 FreeCachedSendTrailingMetadata();
2353 }
2354 }
2355
2356 //
2357 // pending_batches management
2358 //
2359
GetBatchIndex(grpc_transport_stream_op_batch * batch)2360 size_t RetryFilter::CallData::GetBatchIndex(
2361 grpc_transport_stream_op_batch* batch) {
2362 if (batch->send_initial_metadata) return 0;
2363 if (batch->send_message) return 1;
2364 if (batch->send_trailing_metadata) return 2;
2365 if (batch->recv_initial_metadata) return 3;
2366 if (batch->recv_message) return 4;
2367 if (batch->recv_trailing_metadata) return 5;
2368 GPR_UNREACHABLE_CODE(return (size_t)-1);
2369 }
2370
2371 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2372 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchesAdd(
2373 grpc_transport_stream_op_batch* batch) {
2374 const size_t idx = GetBatchIndex(batch);
2375 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2376 gpr_log(GPR_INFO,
2377 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR,
2378 chand_, this, idx);
2379 }
2380 PendingBatch* pending = &pending_batches_[idx];
2381 GPR_ASSERT(pending->batch == nullptr);
2382 pending->batch = batch;
2383 pending->send_ops_cached = false;
2384 // Update state in calld about pending batches.
2385 // Also check if the batch takes us over the retry buffer limit.
2386 // Note: We don't check the size of trailing metadata here, because
2387 // gRPC clients do not send trailing metadata.
2388 if (batch->send_initial_metadata) {
2389 pending_send_initial_metadata_ = true;
2390 bytes_buffered_for_retry_ += batch->payload->send_initial_metadata
2391 .send_initial_metadata->TransportSize();
2392 }
2393 if (batch->send_message) {
2394 pending_send_message_ = true;
2395 bytes_buffered_for_retry_ +=
2396 batch->payload->send_message.send_message->Length();
2397 }
2398 if (batch->send_trailing_metadata) {
2399 pending_send_trailing_metadata_ = true;
2400 }
2401 // TODO(roth): When we implement hedging, if there are currently attempts
2402 // in flight, we will need to pick the one on which the max number of send
2403 // ops have already been sent, and we commit to that attempt.
2404 if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
2405 chand_->per_rpc_retry_buffer_size_)) {
2406 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2407 gpr_log(GPR_INFO,
2408 "chand=%p calld=%p: exceeded retry buffer size, committing",
2409 chand_, this);
2410 }
2411 RetryCommit(call_attempt_.get());
2412 }
2413 return pending;
2414 }
2415
PendingBatchClear(PendingBatch * pending)2416 void RetryFilter::CallData::PendingBatchClear(PendingBatch* pending) {
2417 if (pending->batch->send_initial_metadata) {
2418 pending_send_initial_metadata_ = false;
2419 }
2420 if (pending->batch->send_message) {
2421 pending_send_message_ = false;
2422 }
2423 if (pending->batch->send_trailing_metadata) {
2424 pending_send_trailing_metadata_ = false;
2425 }
2426 pending->batch = nullptr;
2427 }
2428
MaybeClearPendingBatch(PendingBatch * pending)2429 void RetryFilter::CallData::MaybeClearPendingBatch(PendingBatch* pending) {
2430 grpc_transport_stream_op_batch* batch = pending->batch;
2431 // We clear the pending batch if all of its callbacks have been
2432 // scheduled and reset to nullptr.
2433 if (batch->on_complete == nullptr &&
2434 (!batch->recv_initial_metadata ||
2435 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2436 nullptr) &&
2437 (!batch->recv_message ||
2438 batch->payload->recv_message.recv_message_ready == nullptr) &&
2439 (!batch->recv_trailing_metadata ||
2440 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2441 nullptr)) {
2442 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2443 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand_,
2444 this);
2445 }
2446 PendingBatchClear(pending);
2447 }
2448 }
2449
2450 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2451 void RetryFilter::CallData::FailPendingBatchInCallCombiner(
2452 void* arg, grpc_error_handle error) {
2453 grpc_transport_stream_op_batch* batch =
2454 static_cast<grpc_transport_stream_op_batch*>(arg);
2455 CallData* call = static_cast<CallData*>(batch->handler_private.extra_arg);
2456 // Note: This will release the call combiner.
2457 grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2458 call->call_combiner_);
2459 }
2460
2461 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error)2462 void RetryFilter::CallData::PendingBatchesFail(grpc_error_handle error) {
2463 GPR_ASSERT(!error.ok());
2464 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2465 size_t num_batches = 0;
2466 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2467 if (pending_batches_[i].batch != nullptr) ++num_batches;
2468 }
2469 gpr_log(GPR_INFO,
2470 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2471 chand_, this, num_batches, StatusToString(error).c_str());
2472 }
2473 CallCombinerClosureList closures;
2474 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2475 PendingBatch* pending = &pending_batches_[i];
2476 grpc_transport_stream_op_batch* batch = pending->batch;
2477 if (batch != nullptr) {
2478 batch->handler_private.extra_arg = this;
2479 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2480 FailPendingBatchInCallCombiner, batch,
2481 grpc_schedule_on_exec_ctx);
2482 closures.Add(&batch->handler_private.closure, error,
2483 "PendingBatchesFail");
2484 PendingBatchClear(pending);
2485 }
2486 }
2487 closures.RunClosuresWithoutYielding(call_combiner_);
2488 }
2489
2490 template <typename Predicate>
PendingBatchFind(const char * log_message,Predicate predicate)2491 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchFind(
2492 const char* log_message, Predicate predicate) {
2493 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2494 PendingBatch* pending = &pending_batches_[i];
2495 grpc_transport_stream_op_batch* batch = pending->batch;
2496 if (batch != nullptr && predicate(batch)) {
2497 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2498 gpr_log(GPR_INFO,
2499 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR,
2500 chand_, this, log_message, i);
2501 }
2502 return pending;
2503 }
2504 }
2505 return nullptr;
2506 }
2507
2508 //
2509 // retry code
2510 //
2511
RetryCommit(CallAttempt * call_attempt)2512 void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) {
2513 if (retry_committed_) return;
2514 retry_committed_ = true;
2515 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2516 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand_, this);
2517 }
2518 if (call_attempt != nullptr) {
2519 // If the call attempt's LB call has been committed, invoke the
2520 // call's on_commit callback.
2521 // Note: If call_attempt is null, this is happening before the first
2522 // retry attempt is started, in which case we'll just pass the real
2523 // on_commit callback down into the LB call, and it won't be our
2524 // problem anymore.
2525 if (call_attempt->lb_call_committed()) {
2526 auto* service_config_call_data =
2527 static_cast<ClientChannelServiceConfigCallData*>(
2528 call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2529 service_config_call_data->Commit();
2530 }
2531 // Free cached send ops.
2532 call_attempt->FreeCachedSendOpDataAfterCommit();
2533 }
2534 }
2535
StartRetryTimer(absl::optional<Duration> server_pushback)2536 void RetryFilter::CallData::StartRetryTimer(
2537 absl::optional<Duration> server_pushback) {
2538 // Reset call attempt.
2539 call_attempt_.reset(DEBUG_LOCATION, "StartRetryTimer");
2540 // Compute backoff delay.
2541 Duration next_attempt_timeout;
2542 if (server_pushback.has_value()) {
2543 GPR_ASSERT(*server_pushback >= Duration::Zero());
2544 next_attempt_timeout = *server_pushback;
2545 retry_backoff_.Reset();
2546 } else {
2547 next_attempt_timeout = retry_backoff_.NextAttemptTime() - Timestamp::Now();
2548 }
2549 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2550 gpr_log(GPR_INFO,
2551 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand_,
2552 this, next_attempt_timeout.millis());
2553 }
2554 // Schedule retry after computed delay.
2555 GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer");
2556 retry_timer_handle_ =
2557 chand_->event_engine_->RunAfter(next_attempt_timeout, [this] {
2558 ApplicationCallbackExecCtx callback_exec_ctx;
2559 ExecCtx exec_ctx;
2560 OnRetryTimer();
2561 });
2562 }
2563
OnRetryTimer()2564 void RetryFilter::CallData::OnRetryTimer() {
2565 GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimerLocked, this, nullptr);
2566 GRPC_CALL_COMBINER_START(call_combiner_, &retry_closure_, absl::OkStatus(),
2567 "retry timer fired");
2568 }
2569
OnRetryTimerLocked(void * arg,grpc_error_handle)2570 void RetryFilter::CallData::OnRetryTimerLocked(void* arg,
2571 grpc_error_handle /*error*/) {
2572 auto* calld = static_cast<CallData*>(arg);
2573 calld->retry_timer_handle_.reset();
2574 calld->CreateCallAttempt(/*is_transparent_retry=*/false);
2575 GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
2576 }
2577
AddClosureToStartTransparentRetry(CallCombinerClosureList * closures)2578 void RetryFilter::CallData::AddClosureToStartTransparentRetry(
2579 CallCombinerClosureList* closures) {
2580 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2581 gpr_log(GPR_INFO, "chand=%p calld=%p: scheduling transparent retry", chand_,
2582 this);
2583 }
2584 GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer");
2585 GRPC_CLOSURE_INIT(&retry_closure_, StartTransparentRetry, this, nullptr);
2586 closures->Add(&retry_closure_, absl::OkStatus(), "start transparent retry");
2587 }
2588
StartTransparentRetry(void * arg,grpc_error_handle)2589 void RetryFilter::CallData::StartTransparentRetry(void* arg,
2590 grpc_error_handle /*error*/) {
2591 auto* calld = static_cast<CallData*>(arg);
2592 if (calld->cancelled_from_surface_.ok()) {
2593 calld->CreateCallAttempt(/*is_transparent_retry=*/true);
2594 } else {
2595 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2596 "call cancelled before transparent retry");
2597 }
2598 GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
2599 }
2600
2601 } // namespace
2602
2603 const grpc_channel_filter kRetryFilterVtable = {
2604 RetryFilter::CallData::StartTransportStreamOpBatch,
2605 nullptr,
2606 RetryFilter::StartTransportOp,
2607 sizeof(RetryFilter::CallData),
2608 RetryFilter::CallData::Init,
2609 RetryFilter::CallData::SetPollent,
2610 RetryFilter::CallData::Destroy,
2611 sizeof(RetryFilter),
2612 RetryFilter::Init,
2613 grpc_channel_stack_no_post_init,
2614 RetryFilter::Destroy,
2615 RetryFilter::GetChannelInfo,
2616 "retry_filter",
2617 };
2618
2619 } // namespace grpc_core
2620