1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/filters/client_channel/retry_filter.h"
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stddef.h>
24 
25 #include <memory>
26 #include <new>
27 #include <string>
28 #include <utility>
29 
30 #include "absl/container/inlined_vector.h"
31 #include "absl/functional/any_invocable.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/strings/strip.h"
37 #include "absl/types/optional.h"
38 
39 #include <grpc/event_engine/event_engine.h>
40 #include <grpc/grpc.h>
41 #include <grpc/slice.h>
42 #include <grpc/status.h>
43 #include <grpc/support/log.h>
44 
45 #include "src/core/ext/filters/client_channel/client_channel.h"
46 #include "src/core/ext/filters/client_channel/client_channel_internal.h"
47 #include "src/core/ext/filters/client_channel/retry_service_config.h"
48 #include "src/core/ext/filters/client_channel/retry_throttle.h"
49 #include "src/core/lib/backoff/backoff.h"
50 #include "src/core/lib/channel/channel_args.h"
51 #include "src/core/lib/channel/channel_stack.h"
52 #include "src/core/lib/channel/context.h"
53 #include "src/core/lib/channel/status_util.h"
54 #include "src/core/lib/debug/trace.h"
55 #include "src/core/lib/gpr/useful.h"
56 #include "src/core/lib/gprpp/construct_destruct.h"
57 #include "src/core/lib/gprpp/debug_location.h"
58 #include "src/core/lib/gprpp/orphanable.h"
59 #include "src/core/lib/gprpp/ref_counted.h"
60 #include "src/core/lib/gprpp/ref_counted_ptr.h"
61 #include "src/core/lib/gprpp/status_helper.h"
62 #include "src/core/lib/gprpp/time.h"
63 #include "src/core/lib/iomgr/call_combiner.h"
64 #include "src/core/lib/iomgr/closure.h"
65 #include "src/core/lib/iomgr/error.h"
66 #include "src/core/lib/iomgr/exec_ctx.h"
67 #include "src/core/lib/iomgr/polling_entity.h"
68 #include "src/core/lib/resource_quota/arena.h"
69 #include "src/core/lib/service_config/service_config.h"
70 #include "src/core/lib/service_config/service_config_call_data.h"
71 #include "src/core/lib/slice/slice.h"
72 #include "src/core/lib/slice/slice_buffer.h"
73 #include "src/core/lib/transport/error_utils.h"
74 #include "src/core/lib/transport/metadata_batch.h"
75 #include "src/core/lib/transport/transport.h"
76 #include "src/core/lib/uri/uri_parser.h"
77 
78 //
79 // Retry filter
80 //
81 
82 // This filter is intended to be used in the DynamicFilter stack in the
83 // client channel, which is situated between the name resolver and the
84 // LB policy.  Normally, the last filter in the DynamicFilter stack is
85 // the DynamicTerminationFilter (see client_channel.cc), which creates a
86 // LoadBalancedCall and delegates to it.  However, when retries are
87 // enabled, this filter is used instead of the DynamicTerminationFilter.
88 //
89 // In order to support retries, we act as a proxy for stream op batches.
90 // When we get a batch from the surface, we add it to our list of pending
91 // batches, and we then use those batches to construct separate "child"
92 // batches to be started on an LB call.  When the child batches return, we
93 // then decide which pending batches have been completed and schedule their
94 // callbacks accordingly.  If a call attempt fails and we want to retry it,
95 // we create a new LB call and start again, constructing new "child" batches
96 // for the new LB call.
97 //
98 // Note that retries are committed when receiving data from the server
99 // (except for Trailers-Only responses).  However, there may be many
100 // send ops started before receiving any data, so we may have already
101 // completed some number of send ops (and returned the completions up to
102 // the surface) by the time we realize that we need to retry.  To deal
103 // with this, we cache data for send ops, so that we can replay them on a
104 // different LB call even after we have completed the original batches.
105 //
106 // The code is structured as follows:
107 // - In CallData (in the parent channel), we maintain a list of pending
108 //   ops and cached data for send ops.
109 // - There is a CallData::CallAttempt object for each retry attempt.
110 //   This object contains the LB call for that attempt and state to indicate
111 //   which ops from the CallData object have already been sent down to that
112 //   LB call.
113 // - There is a CallData::CallAttempt::BatchData object for each "child"
114 //   batch sent on the LB call.
115 //
116 // When constructing the "child" batches, we compare the state in the
117 // CallAttempt object against the state in the CallData object to see
118 // which batches need to be sent on the LB call for a given attempt.
119 
120 // TODO(roth): In subsequent PRs:
121 // - implement hedging
122 
123 // By default, we buffer 256 KiB per RPC for retries.
124 // TODO(roth): Do we have any data to suggest a better value?
125 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
126 
127 // This value was picked arbitrarily.  It can be changed if there is
128 // any even moderately compelling reason to do so.
129 #define RETRY_BACKOFF_JITTER 0.2
130 
131 namespace grpc_core {
132 
133 namespace {
134 
135 using grpc_event_engine::experimental::EventEngine;
136 using internal::RetryGlobalConfig;
137 using internal::RetryMethodConfig;
138 using internal::RetryServiceConfigParser;
139 using internal::ServerRetryThrottleData;
140 
141 TraceFlag grpc_retry_trace(false, "retry");
142 
143 //
144 // RetryFilter
145 //
146 
147 class RetryFilter {
148  public:
149   class CallData;
150 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)151   static grpc_error_handle Init(grpc_channel_element* elem,
152                                 grpc_channel_element_args* args) {
153     GPR_ASSERT(args->is_last);
154     GPR_ASSERT(elem->filter == &kRetryFilterVtable);
155     grpc_error_handle error;
156     new (elem->channel_data) RetryFilter(args->channel_args, &error);
157     return error;
158   }
159 
Destroy(grpc_channel_element * elem)160   static void Destroy(grpc_channel_element* elem) {
161     auto* chand = static_cast<RetryFilter*>(elem->channel_data);
162     chand->~RetryFilter();
163   }
164 
165   // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)166   static void StartTransportOp(grpc_channel_element* /*elem*/,
167                                grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)168   static void GetChannelInfo(grpc_channel_element* /*elem*/,
169                              const grpc_channel_info* /*info*/) {}
170 
171  private:
GetMaxPerRpcRetryBufferSize(const ChannelArgs & args)172   static size_t GetMaxPerRpcRetryBufferSize(const ChannelArgs& args) {
173     return Clamp(args.GetInt(GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE)
174                      .value_or(DEFAULT_PER_RPC_RETRY_BUFFER_SIZE),
175                  0, INT_MAX);
176   }
177 
RetryFilter(const ChannelArgs & args,grpc_error_handle * error)178   RetryFilter(const ChannelArgs& args, grpc_error_handle* error)
179       : client_channel_(args.GetObject<ClientChannel>()),
180         event_engine_(args.GetObject<EventEngine>()),
181         per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)),
182         service_config_parser_index_(
183             internal::RetryServiceConfigParser::ParserIndex()) {
184     // Get retry throttling parameters from service config.
185     auto* service_config = args.GetObject<ServiceConfig>();
186     if (service_config == nullptr) return;
187     const auto* config = static_cast<const RetryGlobalConfig*>(
188         service_config->GetGlobalParsedConfig(
189             RetryServiceConfigParser::ParserIndex()));
190     if (config == nullptr) return;
191     // Get server name from target URI.
192     auto server_uri = args.GetString(GRPC_ARG_SERVER_URI);
193     if (!server_uri.has_value()) {
194       *error = GRPC_ERROR_CREATE(
195           "server URI channel arg missing or wrong type in client channel "
196           "filter");
197       return;
198     }
199     absl::StatusOr<URI> uri = URI::Parse(*server_uri);
200     if (!uri.ok() || uri->path().empty()) {
201       *error =
202           GRPC_ERROR_CREATE("could not extract server name from target URI");
203       return;
204     }
205     std::string server_name(absl::StripPrefix(uri->path(), "/"));
206     // Get throttling config for server_name.
207     retry_throttle_data_ =
208         internal::ServerRetryThrottleMap::Get()->GetDataForServer(
209             server_name, config->max_milli_tokens(),
210             config->milli_token_ratio());
211   }
212 
213   const RetryMethodConfig* GetRetryPolicy(
214       const grpc_call_context_element* context);
215 
216   ClientChannel* client_channel_;
217   EventEngine* const event_engine_;
218   size_t per_rpc_retry_buffer_size_;
219   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
220   const size_t service_config_parser_index_;
221 };
222 
223 //
224 // RetryFilter::CallData
225 //
226 
227 class RetryFilter::CallData {
228  public:
229   static grpc_error_handle Init(grpc_call_element* elem,
230                                 const grpc_call_element_args* args);
231   static void Destroy(grpc_call_element* elem,
232                       const grpc_call_final_info* /*final_info*/,
233                       grpc_closure* then_schedule_closure);
234   static void StartTransportStreamOpBatch(
235       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
236   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
237 
238  private:
239   class CallStackDestructionBarrier;
240 
241   // Pending batches stored in call data.
242   struct PendingBatch {
243     // The pending batch.  If nullptr, this slot is empty.
244     grpc_transport_stream_op_batch* batch = nullptr;
245     // Indicates whether payload for send ops has been cached in CallData.
246     bool send_ops_cached = false;
247   };
248 
249   // State associated with each call attempt.
250   class CallAttempt : public RefCounted<CallAttempt> {
251    public:
252     CallAttempt(CallData* calld, bool is_transparent_retry);
253     ~CallAttempt() override;
254 
lb_call_committed() const255     bool lb_call_committed() const { return lb_call_committed_; }
256 
257     // Constructs and starts whatever batches are needed on this call
258     // attempt.
259     void StartRetriableBatches();
260 
261     // Frees cached send ops that have already been completed after
262     // committing the call.
263     void FreeCachedSendOpDataAfterCommit();
264 
265     // Cancels the call attempt.
266     void CancelFromSurface(grpc_transport_stream_op_batch* cancel_batch);
267 
268    private:
269     // State used for starting a retryable batch on the call attempt's LB call.
270     // This provides its own grpc_transport_stream_op_batch and other data
271     // structures needed to populate the ops in the batch.
272     // We allocate one struct on the arena for each attempt at starting a
273     // batch on a given LB call.
274     class BatchData
275         : public RefCounted<BatchData, PolymorphicRefCount, UnrefCallDtor> {
276      public:
277       BatchData(RefCountedPtr<CallAttempt> call_attempt, int refcount,
278                 bool set_on_complete);
279       ~BatchData() override;
280 
batch()281       grpc_transport_stream_op_batch* batch() { return &batch_; }
282 
283       // Adds retriable send_initial_metadata op.
284       void AddRetriableSendInitialMetadataOp();
285       // Adds retriable send_message op.
286       void AddRetriableSendMessageOp();
287       // Adds retriable send_trailing_metadata op.
288       void AddRetriableSendTrailingMetadataOp();
289       // Adds retriable recv_initial_metadata op.
290       void AddRetriableRecvInitialMetadataOp();
291       // Adds retriable recv_message op.
292       void AddRetriableRecvMessageOp();
293       // Adds retriable recv_trailing_metadata op.
294       void AddRetriableRecvTrailingMetadataOp();
295       // Adds cancel_stream op.
296       void AddCancelStreamOp(grpc_error_handle error);
297 
298      private:
299       // Frees cached send ops that were completed by the completed batch in
300       // batch_data.  Used when batches are completed after the call is
301       // committed.
302       void FreeCachedSendOpDataForCompletedBatch();
303 
304       // If there is a pending recv_initial_metadata op, adds a closure
305       // to closures for recv_initial_metadata_ready.
306       void MaybeAddClosureForRecvInitialMetadataCallback(
307           grpc_error_handle error, CallCombinerClosureList* closures);
308       // Intercepts recv_initial_metadata_ready callback for retries.
309       // Commits the call and returns the initial metadata up the stack.
310       static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
311 
312       // If there is a pending recv_message op, adds a closure to closures
313       // for recv_message_ready.
314       void MaybeAddClosureForRecvMessageCallback(
315           grpc_error_handle error, CallCombinerClosureList* closures);
316       // Intercepts recv_message_ready callback for retries.
317       // Commits the call and returns the message up the stack.
318       static void RecvMessageReady(void* arg, grpc_error_handle error);
319 
320       // If there is a pending recv_trailing_metadata op, adds a closure to
321       // closures for recv_trailing_metadata_ready.
322       void MaybeAddClosureForRecvTrailingMetadataReady(
323           grpc_error_handle error, CallCombinerClosureList* closures);
324       // Adds any necessary closures for deferred batch completion
325       // callbacks to closures.
326       void AddClosuresForDeferredCompletionCallbacks(
327           CallCombinerClosureList* closures);
328       // For any pending batch containing an op that has not yet been started,
329       // adds the pending batch's completion closures to closures.
330       void AddClosuresToFailUnstartedPendingBatches(
331           grpc_error_handle error, CallCombinerClosureList* closures);
332       // Runs necessary closures upon completion of a call attempt.
333       void RunClosuresForCompletedCall(grpc_error_handle error);
334       // Intercepts recv_trailing_metadata_ready callback for retries.
335       // Commits the call and returns the trailing metadata up the stack.
336       static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
337 
338       // Adds the on_complete closure for the pending batch completed in
339       // batch_data to closures.
340       void AddClosuresForCompletedPendingBatch(
341           grpc_error_handle error, CallCombinerClosureList* closures);
342 
343       // If there are any cached ops to replay or pending ops to start on the
344       // LB call, adds them to closures.
345       void AddClosuresForReplayOrPendingSendOps(
346           CallCombinerClosureList* closures);
347 
348       // Callback used to intercept on_complete from LB calls.
349       static void OnComplete(void* arg, grpc_error_handle error);
350 
351       // Callback used to handle on_complete for internally generated
352       // cancel_stream op.
353       static void OnCompleteForCancelOp(void* arg, grpc_error_handle error);
354 
355       // This DOES hold a ref, but it cannot be a RefCountedPtr<>, because
356       // our dtor unrefs the owning call, which may delete the arena in
357       // which we are allocated, which means that running the dtor of any
358       // data members after that would cause a crash.
359       CallAttempt* call_attempt_;
360       // The batch to use in the LB call.
361       // Its payload field points to CallAttempt::batch_payload_.
362       grpc_transport_stream_op_batch batch_;
363       // For intercepting on_complete.
364       grpc_closure on_complete_;
365     };
366 
367     // Creates a BatchData object on the call's arena with the
368     // specified refcount.  If set_on_complete is true, the batch's
369     // on_complete callback will be set to point to on_complete();
370     // otherwise, the batch's on_complete callback will be null.
CreateBatch(int refcount,bool set_on_complete)371     BatchData* CreateBatch(int refcount, bool set_on_complete) {
372       return calld_->arena_->New<BatchData>(Ref(DEBUG_LOCATION, "CreateBatch"),
373                                             refcount, set_on_complete);
374     }
375 
376     // If there are any cached send ops that need to be replayed on this
377     // call attempt, creates and returns a new batch to replay those ops.
378     // Otherwise, returns nullptr.
379     BatchData* MaybeCreateBatchForReplay();
380 
381     // Adds a closure to closures that will execute batch in the call combiner.
382     void AddClosureForBatch(grpc_transport_stream_op_batch* batch,
383                             const char* reason,
384                             CallCombinerClosureList* closures);
385 
386     // Helper function used to start a recv_trailing_metadata batch.  This
387     // is used in the case where a recv_initial_metadata or recv_message
388     // op fails in a way that we know the call is over but when the application
389     // has not yet started its own recv_trailing_metadata op.
390     void AddBatchForInternalRecvTrailingMetadata(
391         CallCombinerClosureList* closures);
392 
393     // Adds a batch to closures to cancel this call attempt, if
394     // cancellation has not already been sent on the LB call.
395     void MaybeAddBatchForCancelOp(grpc_error_handle error,
396                                   CallCombinerClosureList* closures);
397 
398     // Adds batches for pending batches to closures.
399     void AddBatchesForPendingBatches(CallCombinerClosureList* closures);
400 
401     // Adds whatever batches are needed on this attempt to closures.
402     void AddRetriableBatches(CallCombinerClosureList* closures);
403 
404     // Returns true if any send op in the batch was not yet started on this
405     // attempt.
406     bool PendingBatchContainsUnstartedSendOps(PendingBatch* pending);
407 
408     // Returns true if there are cached send ops to replay.
409     bool HaveSendOpsToReplay();
410 
411     // If our retry state is no longer needed, switch to fast path by moving
412     // our LB call into calld_->committed_call_ and having calld_ drop
413     // its ref to us.
414     void MaybeSwitchToFastPath();
415 
416     // Returns true if the call should be retried.
417     bool ShouldRetry(absl::optional<grpc_status_code> status,
418                      absl::optional<Duration> server_pushback_ms);
419 
420     // Abandons the call attempt.  Unrefs any deferred batches.
421     void Abandon();
422 
423     void OnPerAttemptRecvTimer();
424     static void OnPerAttemptRecvTimerLocked(void* arg, grpc_error_handle error);
425     void MaybeCancelPerAttemptRecvTimer();
426 
427     CallData* calld_;
428     OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> lb_call_;
429     bool lb_call_committed_ = false;
430 
431     grpc_closure on_per_attempt_recv_timer_;
432     absl::optional<EventEngine::TaskHandle> per_attempt_recv_timer_handle_;
433 
434     // BatchData.batch.payload points to this.
435     grpc_transport_stream_op_batch_payload batch_payload_;
436     // For send_initial_metadata.
437     grpc_metadata_batch send_initial_metadata_{calld_->arena_};
438     // For send_trailing_metadata.
439     grpc_metadata_batch send_trailing_metadata_{calld_->arena_};
440     // For intercepting recv_initial_metadata.
441     grpc_metadata_batch recv_initial_metadata_{calld_->arena_};
442     grpc_closure recv_initial_metadata_ready_;
443     bool trailing_metadata_available_ = false;
444     // For intercepting recv_message.
445     grpc_closure recv_message_ready_;
446     absl::optional<SliceBuffer> recv_message_;
447     uint32_t recv_message_flags_;
448     // For intercepting recv_trailing_metadata.
449     grpc_metadata_batch recv_trailing_metadata_{calld_->arena_};
450     grpc_transport_stream_stats collect_stats_;
451     grpc_closure recv_trailing_metadata_ready_;
452     // These fields indicate which ops have been started and completed on
453     // this call attempt.
454     size_t started_send_message_count_ = 0;
455     size_t completed_send_message_count_ = 0;
456     size_t started_recv_message_count_ = 0;
457     size_t completed_recv_message_count_ = 0;
458     bool started_send_initial_metadata_ : 1;
459     bool completed_send_initial_metadata_ : 1;
460     bool started_send_trailing_metadata_ : 1;
461     bool completed_send_trailing_metadata_ : 1;
462     bool started_recv_initial_metadata_ : 1;
463     bool completed_recv_initial_metadata_ : 1;
464     bool started_recv_trailing_metadata_ : 1;
465     bool completed_recv_trailing_metadata_ : 1;
466     bool sent_cancel_stream_ : 1;
467     // State for callback processing.
468     RefCountedPtr<BatchData> recv_initial_metadata_ready_deferred_batch_;
469     grpc_error_handle recv_initial_metadata_error_;
470     RefCountedPtr<BatchData> recv_message_ready_deferred_batch_;
471     grpc_error_handle recv_message_error_;
472     struct OnCompleteDeferredBatch {
OnCompleteDeferredBatchgrpc_core::__anon8285bb3f0111::RetryFilter::CallData::CallAttempt::OnCompleteDeferredBatch473       OnCompleteDeferredBatch(RefCountedPtr<BatchData> batch,
474                               grpc_error_handle error)
475           : batch(std::move(batch)), error(error) {}
476       RefCountedPtr<BatchData> batch;
477       grpc_error_handle error;
478     };
479     // There cannot be more than 3 pending send op batches at a time.
480     absl::InlinedVector<OnCompleteDeferredBatch, 3>
481         on_complete_deferred_batches_;
482     RefCountedPtr<BatchData> recv_trailing_metadata_internal_batch_;
483     grpc_error_handle recv_trailing_metadata_error_;
484     bool seen_recv_trailing_metadata_from_surface_ : 1;
485     // NOTE: Do not move this next to the metadata bitfields above. That would
486     //       save space but will also result in a data race because compiler
487     //       will generate a 2 byte store which overwrites the meta-data
488     //       fields upon setting this field.
489     bool abandoned_ : 1;
490   };
491 
492   CallData(RetryFilter* chand, const grpc_call_element_args& args);
493   ~CallData();
494 
495   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
496 
497   // Returns the index into pending_batches_ to be used for batch.
498   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
499   PendingBatch* PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
500   void PendingBatchClear(PendingBatch* pending);
501   void MaybeClearPendingBatch(PendingBatch* pending);
502   static void FailPendingBatchInCallCombiner(void* arg,
503                                              grpc_error_handle error);
504   // Fails all pending batches.  Does NOT yield call combiner.
505   void PendingBatchesFail(grpc_error_handle error);
506   // Returns a pointer to the first pending batch for which predicate(batch)
507   // returns true, or null if not found.
508   template <typename Predicate>
509   PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
510 
511   // Caches data for send ops so that it can be retried later, if not
512   // already cached.
513   void MaybeCacheSendOpsForBatch(PendingBatch* pending);
514   void FreeCachedSendInitialMetadata();
515   // Frees cached send_message at index idx.
516   void FreeCachedSendMessage(size_t idx);
517   void FreeCachedSendTrailingMetadata();
518   void FreeAllCachedSendOpData();
519 
520   // Commits the call so that no further retry attempts will be performed.
521   void RetryCommit(CallAttempt* call_attempt);
522 
523   // Starts a timer to retry after appropriate back-off.
524   // If server_pushback is nullopt, retry_backoff_ is used.
525   void StartRetryTimer(absl::optional<Duration> server_pushback);
526 
527   void OnRetryTimer();
528   static void OnRetryTimerLocked(void* arg, grpc_error_handle /*error*/);
529 
530   // Adds a closure to closures to start a transparent retry.
531   void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
532   static void StartTransparentRetry(void* arg, grpc_error_handle error);
533 
534   OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
535   CreateLoadBalancedCall(absl::AnyInvocable<void()> on_commit,
536                          bool is_transparent_retry);
537 
538   void CreateCallAttempt(bool is_transparent_retry);
539 
540   RetryFilter* chand_;
541   grpc_polling_entity* pollent_;
542   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
543   const RetryMethodConfig* retry_policy_ = nullptr;
544   BackOff retry_backoff_;
545 
546   grpc_slice path_;  // Request path.
547   Timestamp deadline_;
548   Arena* arena_;
549   grpc_call_stack* owning_call_;
550   CallCombiner* call_combiner_;
551   grpc_call_context_element* call_context_;
552 
553   grpc_error_handle cancelled_from_surface_;
554 
555   RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_;
556 
557   // TODO(roth): As part of implementing hedging, we will need to maintain a
558   // list of all pending attempts, so that we can cancel them all if the call
559   // gets cancelled.
560   RefCountedPtr<CallAttempt> call_attempt_;
561 
562   // LB call used when we've committed to a call attempt and the retry
563   // state for that attempt is no longer needed.  This provides a fast
564   // path for long-running streaming calls that minimizes overhead.
565   OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> committed_call_;
566 
567   // When are are not yet fully committed to a particular call (i.e.,
568   // either we might still retry or we have committed to the call but
569   // there are still some cached ops to be replayed on the call),
570   // batches received from above will be added to this list, and they
571   // will not be removed until we have invoked their completion callbacks.
572   size_t bytes_buffered_for_retry_ = 0;
573   PendingBatch pending_batches_[MAX_PENDING_BATCHES];
574   bool pending_send_initial_metadata_ : 1;
575   bool pending_send_message_ : 1;
576   bool pending_send_trailing_metadata_ : 1;
577 
578   // Retry state.
579   bool retry_committed_ : 1;
580   bool retry_codepath_started_ : 1;
581   bool sent_transparent_retry_not_seen_by_server_ : 1;
582   int num_attempts_completed_ = 0;
583   absl::optional<EventEngine::TaskHandle> retry_timer_handle_;
584   grpc_closure retry_closure_;
585 
586   // Cached data for retrying send ops.
587   // send_initial_metadata
588   bool seen_send_initial_metadata_ = false;
589   grpc_metadata_batch send_initial_metadata_{arena_};
590   // send_message
591   // When we get a send_message op, we replace the original byte stream
592   // with a CachingByteStream that caches the slices to a local buffer for
593   // use in retries.
594   // Note: We inline the cache for the first 3 send_message ops and use
595   // dynamic allocation after that.  This number was essentially picked
596   // at random; it could be changed in the future to tune performance.
597   struct CachedSendMessage {
598     SliceBuffer* slices;
599     uint32_t flags;
600   };
601   absl::InlinedVector<CachedSendMessage, 3> send_messages_;
602   // send_trailing_metadata
603   bool seen_send_trailing_metadata_ = false;
604   grpc_metadata_batch send_trailing_metadata_{arena_};
605 };
606 
607 //
608 // RetryFilter::CallData::CallStackDestructionBarrier
609 //
610 
611 // A class to track the existence of LoadBalancedCall call stacks that
612 // we've created.  We wait until all such call stacks have been
613 // destroyed before we return the on_call_stack_destruction closure up
614 // to the surface.
615 //
616 // The parent RetryFilter::CallData object holds a ref to this object.
617 // When it is destroyed, it will store the on_call_stack_destruction
618 // closure from the surface in this object and then release its ref.
619 // We also take a ref to this object for each LB call we create, and
620 // those refs are not released until the LB call stack is destroyed.
621 // When this object is destroyed, it will invoke the
622 // on_call_stack_destruction closure from the surface.
623 class RetryFilter::CallData::CallStackDestructionBarrier
624     : public RefCounted<CallStackDestructionBarrier, PolymorphicRefCount,
625                         UnrefCallDtor> {
626  public:
CallStackDestructionBarrier()627   CallStackDestructionBarrier() {}
628 
~CallStackDestructionBarrier()629   ~CallStackDestructionBarrier() override {
630     // TODO(yashkt) : This can potentially be a Closure::Run
631     ExecCtx::Run(DEBUG_LOCATION, on_call_stack_destruction_, absl::OkStatus());
632   }
633 
634   // Set the closure from the surface.  This closure will be invoked
635   // when this object is destroyed.
set_on_call_stack_destruction(grpc_closure * on_call_stack_destruction)636   void set_on_call_stack_destruction(grpc_closure* on_call_stack_destruction) {
637     on_call_stack_destruction_ = on_call_stack_destruction;
638   }
639 
640   // Invoked to get an on_call_stack_destruction closure for a new LB call.
MakeLbCallDestructionClosure(CallData * calld)641   grpc_closure* MakeLbCallDestructionClosure(CallData* calld) {
642     Ref().release();  // Ref held by callback.
643     grpc_closure* on_lb_call_destruction_complete =
644         calld->arena_->New<grpc_closure>();
645     GRPC_CLOSURE_INIT(on_lb_call_destruction_complete,
646                       OnLbCallDestructionComplete, this, nullptr);
647     return on_lb_call_destruction_complete;
648   }
649 
650  private:
OnLbCallDestructionComplete(void * arg,grpc_error_handle)651   static void OnLbCallDestructionComplete(void* arg,
652                                           grpc_error_handle /*error*/) {
653     auto* self = static_cast<CallStackDestructionBarrier*>(arg);
654     self->Unref();
655   }
656 
657   grpc_closure* on_call_stack_destruction_ = nullptr;
658 };
659 
660 //
661 // RetryFilter::CallData::CallAttempt
662 //
663 
CallAttempt(CallData * calld,bool is_transparent_retry)664 RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld,
665                                                 bool is_transparent_retry)
666     : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "CallAttempt"
667                                                            : nullptr),
668       calld_(calld),
669       batch_payload_(calld->call_context_),
670       started_send_initial_metadata_(false),
671       completed_send_initial_metadata_(false),
672       started_send_trailing_metadata_(false),
673       completed_send_trailing_metadata_(false),
674       started_recv_initial_metadata_(false),
675       completed_recv_initial_metadata_(false),
676       started_recv_trailing_metadata_(false),
677       completed_recv_trailing_metadata_(false),
678       sent_cancel_stream_(false),
679       seen_recv_trailing_metadata_from_surface_(false),
680       abandoned_(false) {
681   lb_call_ = calld->CreateLoadBalancedCall(
682       [this]() {
683         lb_call_committed_ = true;
684         if (calld_->retry_committed_) {
685           auto* service_config_call_data =
686               static_cast<ClientChannelServiceConfigCallData*>(
687                   calld_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA]
688                       .value);
689           service_config_call_data->Commit();
690         }
691       },
692       is_transparent_retry);
693   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
694     gpr_log(GPR_INFO,
695             "chand=%p calld=%p attempt=%p: created attempt, lb_call=%p",
696             calld->chand_, calld, this, lb_call_.get());
697   }
698   // If per_attempt_recv_timeout is set, start a timer.
699   if (calld->retry_policy_ != nullptr &&
700       calld->retry_policy_->per_attempt_recv_timeout().has_value()) {
701     const Duration per_attempt_recv_timeout =
702         *calld->retry_policy_->per_attempt_recv_timeout();
703     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
704       gpr_log(GPR_INFO,
705               "chand=%p calld=%p attempt=%p: per-attempt timeout in %" PRId64
706               " ms",
707               calld->chand_, calld, this, per_attempt_recv_timeout.millis());
708     }
709     // Schedule retry after computed delay.
710     GRPC_CALL_STACK_REF(calld->owning_call_, "OnPerAttemptRecvTimer");
711     Ref(DEBUG_LOCATION, "OnPerAttemptRecvTimer").release();
712     per_attempt_recv_timer_handle_ = calld_->chand_->event_engine_->RunAfter(
713         per_attempt_recv_timeout, [this] {
714           ApplicationCallbackExecCtx callback_exec_ctx;
715           ExecCtx exec_ctx;
716           OnPerAttemptRecvTimer();
717         });
718   }
719 }
720 
~CallAttempt()721 RetryFilter::CallData::CallAttempt::~CallAttempt() {
722   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
723     gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: destroying call attempt",
724             calld_->chand_, calld_, this);
725   }
726 }
727 
FreeCachedSendOpDataAfterCommit()728 void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() {
729   // TODO(roth): When we implement hedging, this logic will need to get
730   // a bit more complex, because there may be other (now abandoned) call
731   // attempts still using this data.  We may need to do some sort of
732   // ref-counting instead.
733   if (completed_send_initial_metadata_) {
734     calld_->FreeCachedSendInitialMetadata();
735   }
736   for (size_t i = 0; i < completed_send_message_count_; ++i) {
737     calld_->FreeCachedSendMessage(i);
738   }
739   if (completed_send_trailing_metadata_) {
740     calld_->FreeCachedSendTrailingMetadata();
741   }
742 }
743 
PendingBatchContainsUnstartedSendOps(PendingBatch * pending)744 bool RetryFilter::CallData::CallAttempt::PendingBatchContainsUnstartedSendOps(
745     PendingBatch* pending) {
746   if (pending->batch->on_complete == nullptr) return false;
747   if (pending->batch->send_initial_metadata &&
748       !started_send_initial_metadata_) {
749     return true;
750   }
751   if (pending->batch->send_message &&
752       started_send_message_count_ < calld_->send_messages_.size()) {
753     return true;
754   }
755   if (pending->batch->send_trailing_metadata &&
756       !started_send_trailing_metadata_) {
757     return true;
758   }
759   return false;
760 }
761 
HaveSendOpsToReplay()762 bool RetryFilter::CallData::CallAttempt::HaveSendOpsToReplay() {
763   // We don't check send_initial_metadata here, because that op will always
764   // be started as soon as it is received from the surface, so it will
765   // never need to be started at this point.
766   return started_send_message_count_ < calld_->send_messages_.size() ||
767          (calld_->seen_send_trailing_metadata_ &&
768           !started_send_trailing_metadata_);
769 }
770 
MaybeSwitchToFastPath()771 void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() {
772   // If we're not yet committed, we can't switch yet.
773   // TODO(roth): As part of implementing hedging, this logic needs to
774   // check that *this* call attempt is the one that we've committed to.
775   // Might need to replace abandoned_ with an enum indicating whether we're
776   // in flight, abandoned, or the winning call attempt.
777   if (!calld_->retry_committed_) return;
778   // If we've already switched to fast path, there's nothing to do here.
779   if (calld_->committed_call_ != nullptr) return;
780   // If the perAttemptRecvTimeout timer is pending, we can't switch yet.
781   if (per_attempt_recv_timer_handle_.has_value()) return;
782   // If there are still send ops to replay, we can't switch yet.
783   if (HaveSendOpsToReplay()) return;
784   // If we started an internal batch for recv_trailing_metadata but have not
785   // yet seen that op from the surface, we can't switch yet.
786   if (recv_trailing_metadata_internal_batch_ != nullptr) return;
787   // Switch to fast path.
788   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
789     gpr_log(GPR_INFO,
790             "chand=%p calld=%p attempt=%p: retry state no longer needed; "
791             "moving LB call to parent and unreffing the call attempt",
792             calld_->chand_, calld_, this);
793   }
794   calld_->committed_call_ = std::move(lb_call_);
795   calld_->call_attempt_.reset(DEBUG_LOCATION, "MaybeSwitchToFastPath");
796 }
797 
798 // If there are any cached send ops that need to be replayed on the
799 // current call attempt, creates and returns a new batch to replay those ops.
800 // Otherwise, returns nullptr.
801 RetryFilter::CallData::CallAttempt::BatchData*
MaybeCreateBatchForReplay()802 RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() {
803   BatchData* replay_batch_data = nullptr;
804   // send_initial_metadata.
805   if (calld_->seen_send_initial_metadata_ && !started_send_initial_metadata_ &&
806       !calld_->pending_send_initial_metadata_) {
807     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
808       gpr_log(GPR_INFO,
809               "chand=%p calld=%p attempt=%p: replaying previously completed "
810               "send_initial_metadata op",
811               calld_->chand_, calld_, this);
812     }
813     replay_batch_data = CreateBatch(1, true /* set_on_complete */);
814     replay_batch_data->AddRetriableSendInitialMetadataOp();
815   }
816   // send_message.
817   // Note that we can only have one send_message op in flight at a time.
818   if (started_send_message_count_ < calld_->send_messages_.size() &&
819       started_send_message_count_ == completed_send_message_count_ &&
820       !calld_->pending_send_message_) {
821     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
822       gpr_log(GPR_INFO,
823               "chand=%p calld=%p attempt=%p: replaying previously completed "
824               "send_message op",
825               calld_->chand_, calld_, this);
826     }
827     if (replay_batch_data == nullptr) {
828       replay_batch_data = CreateBatch(1, true /* set_on_complete */);
829     }
830     replay_batch_data->AddRetriableSendMessageOp();
831   }
832   // send_trailing_metadata.
833   // Note that we only add this op if we have no more send_message ops
834   // to start, since we can't send down any more send_message ops after
835   // send_trailing_metadata.
836   if (calld_->seen_send_trailing_metadata_ &&
837       started_send_message_count_ == calld_->send_messages_.size() &&
838       !started_send_trailing_metadata_ &&
839       !calld_->pending_send_trailing_metadata_) {
840     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
841       gpr_log(GPR_INFO,
842               "chand=%p calld=%p attempt=%p: replaying previously completed "
843               "send_trailing_metadata op",
844               calld_->chand_, calld_, this);
845     }
846     if (replay_batch_data == nullptr) {
847       replay_batch_data = CreateBatch(1, true /* set_on_complete */);
848     }
849     replay_batch_data->AddRetriableSendTrailingMetadataOp();
850   }
851   return replay_batch_data;
852 }
853 
854 namespace {
855 
StartBatchInCallCombiner(void * arg,grpc_error_handle)856 void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
857   grpc_transport_stream_op_batch* batch =
858       static_cast<grpc_transport_stream_op_batch*>(arg);
859   auto* lb_call = static_cast<ClientChannel::FilterBasedLoadBalancedCall*>(
860       batch->handler_private.extra_arg);
861   // Note: This will release the call combiner.
862   lb_call->StartTransportStreamOpBatch(batch);
863 }
864 
865 }  // namespace
866 
AddClosureForBatch(grpc_transport_stream_op_batch * batch,const char * reason,CallCombinerClosureList * closures)867 void RetryFilter::CallData::CallAttempt::AddClosureForBatch(
868     grpc_transport_stream_op_batch* batch, const char* reason,
869     CallCombinerClosureList* closures) {
870   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
871     gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: adding batch (%s): %s",
872             calld_->chand_, calld_, this, reason,
873             grpc_transport_stream_op_batch_string(batch, false).c_str());
874   }
875   batch->handler_private.extra_arg = lb_call_.get();
876   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
877                     batch, grpc_schedule_on_exec_ctx);
878   closures->Add(&batch->handler_private.closure, absl::OkStatus(), reason);
879 }
880 
881 void RetryFilter::CallData::CallAttempt::
AddBatchForInternalRecvTrailingMetadata(CallCombinerClosureList * closures)882     AddBatchForInternalRecvTrailingMetadata(CallCombinerClosureList* closures) {
883   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
884     gpr_log(GPR_INFO,
885             "chand=%p calld=%p attempt=%p: call failed but "
886             "recv_trailing_metadata not started; starting it internally",
887             calld_->chand_, calld_, this);
888   }
889   // Create batch_data with 2 refs, since this batch will be unreffed twice:
890   // once for the recv_trailing_metadata_ready callback when the batch
891   // completes, and again when we actually get a recv_trailing_metadata
892   // op from the surface.
893   BatchData* batch_data = CreateBatch(2, false /* set_on_complete */);
894   batch_data->AddRetriableRecvTrailingMetadataOp();
895   recv_trailing_metadata_internal_batch_.reset(batch_data);
896   AddClosureForBatch(batch_data->batch(),
897                      "starting internal recv_trailing_metadata", closures);
898 }
899 
MaybeAddBatchForCancelOp(grpc_error_handle error,CallCombinerClosureList * closures)900 void RetryFilter::CallData::CallAttempt::MaybeAddBatchForCancelOp(
901     grpc_error_handle error, CallCombinerClosureList* closures) {
902   if (sent_cancel_stream_) {
903     return;
904   }
905   sent_cancel_stream_ = true;
906   BatchData* cancel_batch_data = CreateBatch(1, /*set_on_complete=*/true);
907   cancel_batch_data->AddCancelStreamOp(error);
908   AddClosureForBatch(cancel_batch_data->batch(),
909                      "start cancellation batch on call attempt", closures);
910 }
911 
AddBatchesForPendingBatches(CallCombinerClosureList * closures)912 void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
913     CallCombinerClosureList* closures) {
914   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld_->pending_batches_); ++i) {
915     PendingBatch* pending = &calld_->pending_batches_[i];
916     grpc_transport_stream_op_batch* batch = pending->batch;
917     if (batch == nullptr) continue;
918     bool has_send_ops = false;
919     // Skip any batch that either (a) has already been started on this
920     // call attempt or (b) we can't start yet because we're still
921     // replaying send ops that need to be completed first.
922     // TODO(roth): Note that if any one op in the batch can't be sent
923     // yet due to ops that we're replaying, we don't start any of the ops
924     // in the batch.  This is probably okay, but it could conceivably
925     // lead to increased latency in some cases -- e.g., we could delay
926     // starting a recv op due to it being in the same batch with a send
927     // op.  If/when we revamp the callback protocol in
928     // transport_stream_op_batch, we may be able to fix this.
929     if (batch->send_initial_metadata) {
930       if (started_send_initial_metadata_) continue;
931       has_send_ops = true;
932     }
933     if (batch->send_message) {
934       // Cases where we can't start this send_message op:
935       // - We are currently replaying a previous cached send_message op.
936       // - We have already replayed all send_message ops, including this
937       //   one.  (This can happen if a send_message op is in the same
938       //   batch as a recv op, the send_message op has already completed
939       //   but the recv op hasn't, and then a subsequent batch with another
940       //   recv op is started from the surface.)
941       if (completed_send_message_count_ < started_send_message_count_ ||
942           completed_send_message_count_ ==
943               (calld_->send_messages_.size() + !pending->send_ops_cached)) {
944         continue;
945       }
946       has_send_ops = true;
947     }
948     // Note that we only start send_trailing_metadata if we have no more
949     // send_message ops to start, since we can't send down any more
950     // send_message ops after send_trailing_metadata.
951     if (batch->send_trailing_metadata) {
952       if (started_send_message_count_ + batch->send_message <
953               calld_->send_messages_.size() ||
954           started_send_trailing_metadata_) {
955         continue;
956       }
957       has_send_ops = true;
958     }
959     int num_callbacks = has_send_ops;  // All send ops share one callback.
960     if (batch->recv_initial_metadata) {
961       if (started_recv_initial_metadata_) continue;
962       ++num_callbacks;
963     }
964     if (batch->recv_message) {
965       // Skip if the op is already in flight, or if it has already completed
966       // but the completion has not yet been sent to the surface.
967       if (completed_recv_message_count_ < started_recv_message_count_ ||
968           recv_message_ready_deferred_batch_ != nullptr) {
969         continue;
970       }
971       ++num_callbacks;
972     }
973     if (batch->recv_trailing_metadata) {
974       if (started_recv_trailing_metadata_) {
975         seen_recv_trailing_metadata_from_surface_ = true;
976         // If we previously completed a recv_trailing_metadata op
977         // initiated by AddBatchForInternalRecvTrailingMetadata(), use the
978         // result of that instead of trying to re-start this op.
979         if (GPR_UNLIKELY(recv_trailing_metadata_internal_batch_ != nullptr)) {
980           // If the batch completed, then trigger the completion callback
981           // directly, so that we return the previously returned results to
982           // the application.  Otherwise, just unref the internally started
983           // batch, since we'll propagate the completion when it completes.
984           if (completed_recv_trailing_metadata_) {
985             closures->Add(
986                 &recv_trailing_metadata_ready_, recv_trailing_metadata_error_,
987                 "re-executing recv_trailing_metadata_ready to propagate "
988                 "internally triggered result");
989             // Ref will be released by callback.
990             recv_trailing_metadata_internal_batch_.release();
991           } else {
992             recv_trailing_metadata_internal_batch_.reset(
993                 DEBUG_LOCATION,
994                 "internally started recv_trailing_metadata batch pending and "
995                 "recv_trailing_metadata started from surface");
996           }
997           recv_trailing_metadata_error_ = absl::OkStatus();
998         }
999         // We don't want the fact that we've already started this op internally
1000         // to prevent us from adding a batch that may contain other ops.
1001         // Instead, we'll just skip adding this op below.
1002         if (num_callbacks == 0) continue;
1003       } else {
1004         ++num_callbacks;
1005       }
1006     }
1007     // If we're already committed and the following conditions are met,
1008     // just send the batch down as-is:
1009     // - The batch contains no cached send ops.  (If it does, we need
1010     //   the logic below to use the cached payloads.)
1011     // - The batch does not contain recv_trailing_metadata when we have
1012     //   already started an internal recv_trailing_metadata batch.  (If
1013     //   we've already started an internal recv_trailing_metadata batch,
1014     //   then we need the logic below to send all ops in the batch
1015     //   *except* the recv_trailing_metadata op.)
1016     if (calld_->retry_committed_ && !pending->send_ops_cached &&
1017         (!batch->recv_trailing_metadata || !started_recv_trailing_metadata_)) {
1018       AddClosureForBatch(
1019           batch,
1020           "start non-replayable pending batch on call attempt after commit",
1021           closures);
1022       calld_->PendingBatchClear(pending);
1023       continue;
1024     }
1025     // Create batch with the right number of callbacks.
1026     BatchData* batch_data =
1027         CreateBatch(num_callbacks, has_send_ops /* set_on_complete */);
1028     // Cache send ops if needed.
1029     calld_->MaybeCacheSendOpsForBatch(pending);
1030     // send_initial_metadata.
1031     if (batch->send_initial_metadata) {
1032       batch_data->AddRetriableSendInitialMetadataOp();
1033     }
1034     // send_message.
1035     if (batch->send_message) {
1036       batch_data->AddRetriableSendMessageOp();
1037     }
1038     // send_trailing_metadata.
1039     if (batch->send_trailing_metadata) {
1040       batch_data->AddRetriableSendTrailingMetadataOp();
1041     }
1042     // recv_initial_metadata.
1043     if (batch->recv_initial_metadata) {
1044       batch_data->AddRetriableRecvInitialMetadataOp();
1045     }
1046     // recv_message.
1047     if (batch->recv_message) {
1048       batch_data->AddRetriableRecvMessageOp();
1049     }
1050     // recv_trailing_metadata.
1051     if (batch->recv_trailing_metadata && !started_recv_trailing_metadata_) {
1052       batch_data->AddRetriableRecvTrailingMetadataOp();
1053     }
1054     AddClosureForBatch(batch_data->batch(),
1055                        "start replayable pending batch on call attempt",
1056                        closures);
1057   }
1058 }
1059 
AddRetriableBatches(CallCombinerClosureList * closures)1060 void RetryFilter::CallData::CallAttempt::AddRetriableBatches(
1061     CallCombinerClosureList* closures) {
1062   // Replay previously-returned send_* ops if needed.
1063   BatchData* replay_batch_data = MaybeCreateBatchForReplay();
1064   if (replay_batch_data != nullptr) {
1065     AddClosureForBatch(replay_batch_data->batch(),
1066                        "start replay batch on call attempt", closures);
1067   }
1068   // Now add pending batches.
1069   AddBatchesForPendingBatches(closures);
1070 }
1071 
StartRetriableBatches()1072 void RetryFilter::CallData::CallAttempt::StartRetriableBatches() {
1073   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1074     gpr_log(GPR_INFO,
1075             "chand=%p calld=%p attempt=%p: constructing retriable batches",
1076             calld_->chand_, calld_, this);
1077   }
1078   // Construct list of closures to execute, one for each pending batch.
1079   CallCombinerClosureList closures;
1080   AddRetriableBatches(&closures);
1081   // Note: This will yield the call combiner.
1082   // Start batches on LB call.
1083   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1084     gpr_log(GPR_INFO,
1085             "chand=%p calld=%p attempt=%p: starting %" PRIuPTR
1086             " retriable batches on lb_call=%p",
1087             calld_->chand_, calld_, this, closures.size(), lb_call_.get());
1088   }
1089   closures.RunClosures(calld_->call_combiner_);
1090 }
1091 
CancelFromSurface(grpc_transport_stream_op_batch * cancel_batch)1092 void RetryFilter::CallData::CallAttempt::CancelFromSurface(
1093     grpc_transport_stream_op_batch* cancel_batch) {
1094   MaybeCancelPerAttemptRecvTimer();
1095   Abandon();
1096   // Propagate cancellation to LB call.
1097   lb_call_->StartTransportStreamOpBatch(cancel_batch);
1098 }
1099 
ShouldRetry(absl::optional<grpc_status_code> status,absl::optional<Duration> server_pushback)1100 bool RetryFilter::CallData::CallAttempt::ShouldRetry(
1101     absl::optional<grpc_status_code> status,
1102     absl::optional<Duration> server_pushback) {
1103   // If no retry policy, don't retry.
1104   if (calld_->retry_policy_ == nullptr) return false;
1105   // Check status.
1106   if (status.has_value()) {
1107     if (GPR_LIKELY(*status == GRPC_STATUS_OK)) {
1108       if (calld_->retry_throttle_data_ != nullptr) {
1109         calld_->retry_throttle_data_->RecordSuccess();
1110       }
1111       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1112         gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: call succeeded",
1113                 calld_->chand_, calld_, this);
1114       }
1115       return false;
1116     }
1117     // Status is not OK.  Check whether the status is retryable.
1118     if (!calld_->retry_policy_->retryable_status_codes().Contains(*status)) {
1119       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1120         gpr_log(GPR_INFO,
1121                 "chand=%p calld=%p attempt=%p: status %s not configured as "
1122                 "retryable",
1123                 calld_->chand_, calld_, this,
1124                 grpc_status_code_to_string(*status));
1125       }
1126       return false;
1127     }
1128   }
1129   // Record the failure and check whether retries are throttled.
1130   // Note that it's important for this check to come after the status
1131   // code check above, since we should only record failures whose statuses
1132   // match the configured retryable status codes, so that we don't count
1133   // things like failures due to malformed requests (INVALID_ARGUMENT).
1134   // Conversely, it's important for this to come before the remaining
1135   // checks, so that we don't fail to record failures due to other factors.
1136   if (calld_->retry_throttle_data_ != nullptr &&
1137       !calld_->retry_throttle_data_->RecordFailure()) {
1138     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1139       gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: retries throttled",
1140               calld_->chand_, calld_, this);
1141     }
1142     return false;
1143   }
1144   // Check whether the call is committed.
1145   if (calld_->retry_committed_) {
1146     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1147       gpr_log(GPR_INFO,
1148               "chand=%p calld=%p attempt=%p: retries already committed",
1149               calld_->chand_, calld_, this);
1150     }
1151     return false;
1152   }
1153   // Check whether we have retries remaining.
1154   ++calld_->num_attempts_completed_;
1155   if (calld_->num_attempts_completed_ >=
1156       calld_->retry_policy_->max_attempts()) {
1157     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1158       gpr_log(
1159           GPR_INFO, "chand=%p calld=%p attempt=%p: exceeded %d retry attempts",
1160           calld_->chand_, calld_, this, calld_->retry_policy_->max_attempts());
1161     }
1162     return false;
1163   }
1164   // Check server push-back.
1165   if (server_pushback.has_value()) {
1166     if (*server_pushback < Duration::Zero()) {
1167       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1168         gpr_log(GPR_INFO,
1169                 "chand=%p calld=%p attempt=%p: not retrying due to server "
1170                 "push-back",
1171                 calld_->chand_, calld_, this);
1172       }
1173       return false;
1174     } else {
1175       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1176         gpr_log(
1177             GPR_INFO,
1178             "chand=%p calld=%p attempt=%p: server push-back: retry in %" PRIu64
1179             " ms",
1180             calld_->chand_, calld_, this, server_pushback->millis());
1181       }
1182     }
1183   }
1184   // We should retry.
1185   return true;
1186 }
1187 
Abandon()1188 void RetryFilter::CallData::CallAttempt::Abandon() {
1189   abandoned_ = true;
1190   // Unref batches for deferred completion callbacks that will now never
1191   // be invoked.
1192   if (started_recv_trailing_metadata_ &&
1193       !seen_recv_trailing_metadata_from_surface_) {
1194     recv_trailing_metadata_internal_batch_.reset(
1195         DEBUG_LOCATION,
1196         "unref internal recv_trailing_metadata_ready batch; attempt abandoned");
1197   }
1198   recv_trailing_metadata_error_ = absl::OkStatus();
1199   recv_initial_metadata_ready_deferred_batch_.reset(
1200       DEBUG_LOCATION,
1201       "unref deferred recv_initial_metadata_ready batch; attempt abandoned");
1202   recv_initial_metadata_error_ = absl::OkStatus();
1203   recv_message_ready_deferred_batch_.reset(
1204       DEBUG_LOCATION,
1205       "unref deferred recv_message_ready batch; attempt abandoned");
1206   recv_message_error_ = absl::OkStatus();
1207   for (auto& on_complete_deferred_batch : on_complete_deferred_batches_) {
1208     on_complete_deferred_batch.batch.reset(
1209         DEBUG_LOCATION, "unref deferred on_complete batch; attempt abandoned");
1210   }
1211   on_complete_deferred_batches_.clear();
1212 }
1213 
OnPerAttemptRecvTimer()1214 void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimer() {
1215   GRPC_CLOSURE_INIT(&on_per_attempt_recv_timer_, OnPerAttemptRecvTimerLocked,
1216                     this, nullptr);
1217   GRPC_CALL_COMBINER_START(calld_->call_combiner_, &on_per_attempt_recv_timer_,
1218                            absl::OkStatus(), "per-attempt timer fired");
1219 }
1220 
OnPerAttemptRecvTimerLocked(void * arg,grpc_error_handle error)1221 void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
1222     void* arg, grpc_error_handle error) {
1223   auto* call_attempt = static_cast<CallAttempt*>(arg);
1224   auto* calld = call_attempt->calld_;
1225   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1226     gpr_log(GPR_INFO,
1227             "chand=%p calld=%p attempt=%p: perAttemptRecvTimeout timer fired: "
1228             "error=%s, per_attempt_recv_timer_handle_.has_value()=%d",
1229             calld->chand_, calld, call_attempt, StatusToString(error).c_str(),
1230             call_attempt->per_attempt_recv_timer_handle_.has_value());
1231   }
1232   CallCombinerClosureList closures;
1233   call_attempt->per_attempt_recv_timer_handle_.reset();
1234   // Cancel this attempt.
1235   // TODO(roth): When implementing hedging, we should not cancel the
1236   // current attempt.
1237   call_attempt->MaybeAddBatchForCancelOp(
1238       grpc_error_set_int(
1239           GRPC_ERROR_CREATE("retry perAttemptRecvTimeout exceeded"),
1240           StatusIntProperty::kRpcStatus, GRPC_STATUS_CANCELLED),
1241       &closures);
1242   // Check whether we should retry.
1243   if (call_attempt->ShouldRetry(/*status=*/absl::nullopt,
1244                                 /*server_pushback_ms=*/absl::nullopt)) {
1245     // Mark current attempt as abandoned.
1246     call_attempt->Abandon();
1247     // We are retrying.  Start backoff timer.
1248     calld->StartRetryTimer(/*server_pushback=*/absl::nullopt);
1249   } else {
1250     // Not retrying, so commit the call.
1251     calld->RetryCommit(call_attempt);
1252     // If retry state is no longer needed, switch to fast path for
1253     // subsequent batches.
1254     call_attempt->MaybeSwitchToFastPath();
1255   }
1256   closures.RunClosures(calld->call_combiner_);
1257   call_attempt->Unref(DEBUG_LOCATION, "OnPerAttemptRecvTimer");
1258   GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnPerAttemptRecvTimer");
1259 }
1260 
MaybeCancelPerAttemptRecvTimer()1261 void RetryFilter::CallData::CallAttempt::MaybeCancelPerAttemptRecvTimer() {
1262   if (per_attempt_recv_timer_handle_.has_value()) {
1263     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1264       gpr_log(GPR_INFO,
1265               "chand=%p calld=%p attempt=%p: cancelling "
1266               "perAttemptRecvTimeout timer",
1267               calld_->chand_, calld_, this);
1268     }
1269     if (calld_->chand_->event_engine_->Cancel(
1270             *per_attempt_recv_timer_handle_)) {
1271       Unref(DEBUG_LOCATION, "OnPerAttemptRecvTimer");
1272       GRPC_CALL_STACK_UNREF(calld_->owning_call_, "OnPerAttemptRecvTimer");
1273     }
1274     per_attempt_recv_timer_handle_.reset();
1275   }
1276 }
1277 
1278 //
1279 // RetryFilter::CallData::CallAttempt::BatchData
1280 //
1281 
BatchData(RefCountedPtr<CallAttempt> attempt,int refcount,bool set_on_complete)1282 RetryFilter::CallData::CallAttempt::BatchData::BatchData(
1283     RefCountedPtr<CallAttempt> attempt, int refcount, bool set_on_complete)
1284     : RefCounted(
1285           GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "BatchData" : nullptr,
1286           refcount),
1287       call_attempt_(attempt.release()) {
1288   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1289     gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: creating batch %p",
1290             call_attempt_->calld_->chand_, call_attempt_->calld_, call_attempt_,
1291             this);
1292   }
1293   // We hold a ref to the call stack for every batch sent on a call attempt.
1294   // This is because some batches on the call attempt may not complete
1295   // until after all of the batches are completed at the surface (because
1296   // each batch that is pending at the surface holds a ref).  This
1297   // can happen for replayed send ops, and it can happen for
1298   // recv_initial_metadata and recv_message ops on a call attempt that has
1299   // been abandoned.
1300   GRPC_CALL_STACK_REF(call_attempt_->calld_->owning_call_, "Retry BatchData");
1301   batch_.payload = &call_attempt_->batch_payload_;
1302   if (set_on_complete) {
1303     GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this, nullptr);
1304     batch_.on_complete = &on_complete_;
1305   }
1306 }
1307 
~BatchData()1308 RetryFilter::CallData::CallAttempt::BatchData::~BatchData() {
1309   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1310     gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: destroying batch %p",
1311             call_attempt_->calld_->chand_, call_attempt_->calld_, call_attempt_,
1312             this);
1313   }
1314   CallAttempt* call_attempt = std::exchange(call_attempt_, nullptr);
1315   grpc_call_stack* owning_call = call_attempt->calld_->owning_call_;
1316   call_attempt->Unref(DEBUG_LOCATION, "~BatchData");
1317   GRPC_CALL_STACK_UNREF(owning_call, "Retry BatchData");
1318 }
1319 
1320 void RetryFilter::CallData::CallAttempt::BatchData::
FreeCachedSendOpDataForCompletedBatch()1321     FreeCachedSendOpDataForCompletedBatch() {
1322   auto* calld = call_attempt_->calld_;
1323   // TODO(roth): When we implement hedging, this logic will need to get
1324   // a bit more complex, because there may be other (now abandoned) call
1325   // attempts still using this data.  We may need to do some sort of
1326   // ref-counting instead.
1327   if (batch_.send_initial_metadata) {
1328     calld->FreeCachedSendInitialMetadata();
1329   }
1330   if (batch_.send_message) {
1331     calld->FreeCachedSendMessage(call_attempt_->completed_send_message_count_ -
1332                                  1);
1333   }
1334   if (batch_.send_trailing_metadata) {
1335     calld->FreeCachedSendTrailingMetadata();
1336   }
1337 }
1338 
1339 //
1340 // recv_initial_metadata callback handling
1341 //
1342 
1343 void RetryFilter::CallData::CallAttempt::BatchData::
MaybeAddClosureForRecvInitialMetadataCallback(grpc_error_handle error,CallCombinerClosureList * closures)1344     MaybeAddClosureForRecvInitialMetadataCallback(
1345         grpc_error_handle error, CallCombinerClosureList* closures) {
1346   // Find pending batch.
1347   PendingBatch* pending = call_attempt_->calld_->PendingBatchFind(
1348       "invoking recv_initial_metadata_ready for",
1349       [](grpc_transport_stream_op_batch* batch) {
1350         return batch->recv_initial_metadata &&
1351                batch->payload->recv_initial_metadata
1352                        .recv_initial_metadata_ready != nullptr;
1353       });
1354   if (pending == nullptr) {
1355     return;
1356   }
1357   // Return metadata.
1358   *pending->batch->payload->recv_initial_metadata.recv_initial_metadata =
1359       std::move(call_attempt_->recv_initial_metadata_);
1360   // Propagate trailing_metadata_available.
1361   *pending->batch->payload->recv_initial_metadata.trailing_metadata_available =
1362       call_attempt_->trailing_metadata_available_;
1363   // Update bookkeeping.
1364   // Note: Need to do this before invoking the callback, since invoking
1365   // the callback will result in yielding the call combiner.
1366   grpc_closure* recv_initial_metadata_ready =
1367       pending->batch->payload->recv_initial_metadata
1368           .recv_initial_metadata_ready;
1369   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1370       nullptr;
1371   call_attempt_->calld_->MaybeClearPendingBatch(pending);
1372   // Add callback to closures.
1373   closures->Add(recv_initial_metadata_ready, error,
1374                 "recv_initial_metadata_ready for pending batch");
1375 }
1376 
RecvInitialMetadataReady(void * arg,grpc_error_handle error)1377 void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
1378     void* arg, grpc_error_handle error) {
1379   RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1380   CallAttempt* call_attempt = batch_data->call_attempt_;
1381   CallData* calld = call_attempt->calld_;
1382   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1383     gpr_log(GPR_INFO,
1384             "chand=%p calld=%p attempt=%p batch_data=%p: "
1385             "got recv_initial_metadata_ready, error=%s",
1386             calld->chand_, calld, call_attempt, batch_data.get(),
1387             StatusToString(error).c_str());
1388   }
1389   call_attempt->completed_recv_initial_metadata_ = true;
1390   // If this attempt has been abandoned, then we're not going to use the
1391   // result of this recv_initial_metadata op, so do nothing.
1392   if (call_attempt->abandoned_) {
1393     GRPC_CALL_COMBINER_STOP(
1394         calld->call_combiner_,
1395         "recv_initial_metadata_ready for abandoned attempt");
1396     return;
1397   }
1398   // Cancel per-attempt recv timer, if any.
1399   call_attempt->MaybeCancelPerAttemptRecvTimer();
1400   // If we're not committed, check the response to see if we need to commit.
1401   if (!calld->retry_committed_) {
1402     // If we got an error or a Trailers-Only response and have not yet gotten
1403     // the recv_trailing_metadata_ready callback, then defer propagating this
1404     // callback back to the surface.  We can evaluate whether to retry when
1405     // recv_trailing_metadata comes back.
1406     if (GPR_UNLIKELY(
1407             (call_attempt->trailing_metadata_available_ || !error.ok()) &&
1408             !call_attempt->completed_recv_trailing_metadata_)) {
1409       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1410         gpr_log(GPR_INFO,
1411                 "chand=%p calld=%p attempt=%p: deferring "
1412                 "recv_initial_metadata_ready (Trailers-Only)",
1413                 calld->chand_, calld, call_attempt);
1414       }
1415       call_attempt->recv_initial_metadata_ready_deferred_batch_ =
1416           std::move(batch_data);
1417       call_attempt->recv_initial_metadata_error_ = error;
1418       CallCombinerClosureList closures;
1419       if (!error.ok()) {
1420         call_attempt->MaybeAddBatchForCancelOp(error, &closures);
1421       }
1422       if (!call_attempt->started_recv_trailing_metadata_) {
1423         // recv_trailing_metadata not yet started by application; start it
1424         // ourselves to get status.
1425         call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1426       }
1427       closures.RunClosures(calld->call_combiner_);
1428       return;
1429     }
1430     // Received valid initial metadata, so commit the call.
1431     calld->RetryCommit(call_attempt);
1432     // If retry state is no longer needed, switch to fast path for
1433     // subsequent batches.
1434     call_attempt->MaybeSwitchToFastPath();
1435   }
1436   // Invoke the callback to return the result to the surface.
1437   CallCombinerClosureList closures;
1438   batch_data->MaybeAddClosureForRecvInitialMetadataCallback(error, &closures);
1439   closures.RunClosures(calld->call_combiner_);
1440 }
1441 
1442 //
1443 // recv_message callback handling
1444 //
1445 
1446 void RetryFilter::CallData::CallAttempt::BatchData::
MaybeAddClosureForRecvMessageCallback(grpc_error_handle error,CallCombinerClosureList * closures)1447     MaybeAddClosureForRecvMessageCallback(grpc_error_handle error,
1448                                           CallCombinerClosureList* closures) {
1449   // Find pending op.
1450   PendingBatch* pending = call_attempt_->calld_->PendingBatchFind(
1451       "invoking recv_message_ready for",
1452       [](grpc_transport_stream_op_batch* batch) {
1453         return batch->recv_message &&
1454                batch->payload->recv_message.recv_message_ready != nullptr;
1455       });
1456   if (pending == nullptr) {
1457     return;
1458   }
1459   // Return payload.
1460   *pending->batch->payload->recv_message.recv_message =
1461       std::move(call_attempt_->recv_message_);
1462   *pending->batch->payload->recv_message.flags =
1463       call_attempt_->recv_message_flags_;
1464   // Update bookkeeping.
1465   // Note: Need to do this before invoking the callback, since invoking
1466   // the callback will result in yielding the call combiner.
1467   grpc_closure* recv_message_ready =
1468       pending->batch->payload->recv_message.recv_message_ready;
1469   pending->batch->payload->recv_message.recv_message_ready = nullptr;
1470   call_attempt_->calld_->MaybeClearPendingBatch(pending);
1471   // Add callback to closures.
1472   closures->Add(recv_message_ready, error,
1473                 "recv_message_ready for pending batch");
1474 }
1475 
RecvMessageReady(void * arg,grpc_error_handle error)1476 void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
1477     void* arg, grpc_error_handle error) {
1478   RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1479   CallAttempt* call_attempt = batch_data->call_attempt_;
1480   CallData* calld = call_attempt->calld_;
1481   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1482     gpr_log(GPR_INFO,
1483             "chand=%p calld=%p attempt=%p batch_data=%p: "
1484             "got recv_message_ready, error=%s",
1485             calld->chand_, calld, call_attempt, batch_data.get(),
1486             StatusToString(error).c_str());
1487   }
1488   ++call_attempt->completed_recv_message_count_;
1489   // If this attempt has been abandoned, then we're not going to use the
1490   // result of this recv_message op, so do nothing.
1491   if (call_attempt->abandoned_) {
1492     // The transport will not invoke recv_trailing_metadata_ready until the byte
1493     // stream for any recv_message op is orphaned, so we do that here to ensure
1494     // that any pending recv_trailing_metadata op can complete.
1495     call_attempt->recv_message_.reset();
1496     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1497                             "recv_message_ready for abandoned attempt");
1498     return;
1499   }
1500   // Cancel per-attempt recv timer, if any.
1501   call_attempt->MaybeCancelPerAttemptRecvTimer();
1502   // If we're not committed, check the response to see if we need to commit.
1503   if (!calld->retry_committed_) {
1504     // If we got an error or the payload was nullptr and we have not yet gotten
1505     // the recv_trailing_metadata_ready callback, then defer propagating this
1506     // callback back to the surface.  We can evaluate whether to retry when
1507     // recv_trailing_metadata comes back.
1508     if (GPR_UNLIKELY(
1509             (!call_attempt->recv_message_.has_value() || !error.ok()) &&
1510             !call_attempt->completed_recv_trailing_metadata_)) {
1511       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1512         gpr_log(GPR_INFO,
1513                 "chand=%p calld=%p attempt=%p: deferring recv_message_ready "
1514                 "(nullptr message and recv_trailing_metadata pending)",
1515                 calld->chand_, calld, call_attempt);
1516       }
1517       call_attempt->recv_message_ready_deferred_batch_ = std::move(batch_data);
1518       call_attempt->recv_message_error_ = error;
1519       CallCombinerClosureList closures;
1520       if (!error.ok()) {
1521         call_attempt->MaybeAddBatchForCancelOp(error, &closures);
1522       }
1523       if (!call_attempt->started_recv_trailing_metadata_) {
1524         // recv_trailing_metadata not yet started by application; start it
1525         // ourselves to get status.
1526         call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1527       }
1528       closures.RunClosures(calld->call_combiner_);
1529       return;
1530     }
1531     // Received a valid message, so commit the call.
1532     calld->RetryCommit(call_attempt);
1533     // If retry state is no longer needed, switch to fast path for
1534     // subsequent batches.
1535     call_attempt->MaybeSwitchToFastPath();
1536   }
1537   // Invoke the callback to return the result to the surface.
1538   CallCombinerClosureList closures;
1539   batch_data->MaybeAddClosureForRecvMessageCallback(error, &closures);
1540   closures.RunClosures(calld->call_combiner_);
1541 }
1542 
1543 //
1544 // recv_trailing_metadata handling
1545 //
1546 
1547 namespace {
1548 
1549 // Sets *status, *server_pushback, and *is_lb_drop based on md_batch
1550 // and error.
GetCallStatus(Timestamp deadline,grpc_metadata_batch * md_batch,grpc_error_handle error,grpc_status_code * status,absl::optional<Duration> * server_pushback,bool * is_lb_drop,absl::optional<GrpcStreamNetworkState::ValueType> * stream_network_state)1551 void GetCallStatus(
1552     Timestamp deadline, grpc_metadata_batch* md_batch, grpc_error_handle error,
1553     grpc_status_code* status, absl::optional<Duration>* server_pushback,
1554     bool* is_lb_drop,
1555     absl::optional<GrpcStreamNetworkState::ValueType>* stream_network_state) {
1556   if (!error.ok()) {
1557     grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
1558     intptr_t value = 0;
1559     if (grpc_error_get_int(error, StatusIntProperty::kLbPolicyDrop, &value) &&
1560         value != 0) {
1561       *is_lb_drop = true;
1562     }
1563   } else {
1564     *status = *md_batch->get(GrpcStatusMetadata());
1565   }
1566   *server_pushback = md_batch->get(GrpcRetryPushbackMsMetadata());
1567   *stream_network_state = md_batch->get(GrpcStreamNetworkState());
1568 }
1569 
1570 }  // namespace
1571 
1572 void RetryFilter::CallData::CallAttempt::BatchData::
MaybeAddClosureForRecvTrailingMetadataReady(grpc_error_handle error,CallCombinerClosureList * closures)1573     MaybeAddClosureForRecvTrailingMetadataReady(
1574         grpc_error_handle error, CallCombinerClosureList* closures) {
1575   auto* calld = call_attempt_->calld_;
1576   // Find pending batch.
1577   PendingBatch* pending = calld->PendingBatchFind(
1578       "invoking recv_trailing_metadata_ready for",
1579       [](grpc_transport_stream_op_batch* batch) {
1580         return batch->recv_trailing_metadata &&
1581                batch->payload->recv_trailing_metadata
1582                        .recv_trailing_metadata_ready != nullptr;
1583       });
1584   // If we generated the recv_trailing_metadata op internally via
1585   // AddBatchForInternalRecvTrailingMetadata(), then there will be no
1586   // pending batch.
1587   if (pending == nullptr) {
1588     call_attempt_->recv_trailing_metadata_error_ = error;
1589     return;
1590   }
1591   // Copy transport stats to be delivered up to the surface.
1592   grpc_transport_move_stats(
1593       &call_attempt_->collect_stats_,
1594       pending->batch->payload->recv_trailing_metadata.collect_stats);
1595   // Return metadata.
1596   *pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata =
1597       std::move(call_attempt_->recv_trailing_metadata_);
1598   // Add closure.
1599   closures->Add(pending->batch->payload->recv_trailing_metadata
1600                     .recv_trailing_metadata_ready,
1601                 error, "recv_trailing_metadata_ready for pending batch");
1602   // Update bookkeeping.
1603   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1604       nullptr;
1605   calld->MaybeClearPendingBatch(pending);
1606 }
1607 
1608 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForDeferredCompletionCallbacks(CallCombinerClosureList * closures)1609     AddClosuresForDeferredCompletionCallbacks(
1610         CallCombinerClosureList* closures) {
1611   // Add closure for deferred recv_initial_metadata_ready.
1612   if (GPR_UNLIKELY(call_attempt_->recv_initial_metadata_ready_deferred_batch_ !=
1613                    nullptr)) {
1614     MaybeAddClosureForRecvInitialMetadataCallback(
1615         call_attempt_->recv_initial_metadata_error_, closures);
1616     call_attempt_->recv_initial_metadata_ready_deferred_batch_.reset(
1617         DEBUG_LOCATION, "resuming deferred recv_initial_metadata_ready");
1618     call_attempt_->recv_initial_metadata_error_ = absl::OkStatus();
1619   }
1620   // Add closure for deferred recv_message_ready.
1621   if (GPR_UNLIKELY(call_attempt_->recv_message_ready_deferred_batch_ !=
1622                    nullptr)) {
1623     MaybeAddClosureForRecvMessageCallback(call_attempt_->recv_message_error_,
1624                                           closures);
1625     call_attempt_->recv_message_ready_deferred_batch_.reset(
1626         DEBUG_LOCATION, "resuming deferred recv_message_ready");
1627     call_attempt_->recv_message_error_ = absl::OkStatus();
1628   }
1629   // Add closures for deferred on_complete callbacks.
1630   for (auto& on_complete_deferred_batch :
1631        call_attempt_->on_complete_deferred_batches_) {
1632     closures->Add(&on_complete_deferred_batch.batch->on_complete_,
1633                   on_complete_deferred_batch.error, "resuming on_complete");
1634     on_complete_deferred_batch.batch.release();
1635   }
1636   call_attempt_->on_complete_deferred_batches_.clear();
1637 }
1638 
1639 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresToFailUnstartedPendingBatches(grpc_error_handle error,CallCombinerClosureList * closures)1640     AddClosuresToFailUnstartedPendingBatches(
1641         grpc_error_handle error, CallCombinerClosureList* closures) {
1642   auto* calld = call_attempt_->calld_;
1643   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
1644     PendingBatch* pending = &calld->pending_batches_[i];
1645     if (pending->batch == nullptr) continue;
1646     if (call_attempt_->PendingBatchContainsUnstartedSendOps(pending)) {
1647       closures->Add(pending->batch->on_complete, error,
1648                     "failing on_complete for pending batch");
1649       pending->batch->on_complete = nullptr;
1650       calld->MaybeClearPendingBatch(pending);
1651     }
1652   }
1653 }
1654 
RunClosuresForCompletedCall(grpc_error_handle error)1655 void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall(
1656     grpc_error_handle error) {
1657   // Construct list of closures to execute.
1658   CallCombinerClosureList closures;
1659   // First, add closure for recv_trailing_metadata_ready.
1660   MaybeAddClosureForRecvTrailingMetadataReady(error, &closures);
1661   // If there are deferred batch completion callbacks, add them to closures.
1662   AddClosuresForDeferredCompletionCallbacks(&closures);
1663   // Add closures to fail any pending batches that have not yet been started.
1664   AddClosuresToFailUnstartedPendingBatches(error, &closures);
1665   // Schedule all of the closures identified above.
1666   // Note: This will release the call combiner.
1667   closures.RunClosures(call_attempt_->calld_->call_combiner_);
1668 }
1669 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)1670 void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
1671     void* arg, grpc_error_handle error) {
1672   RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1673   CallAttempt* call_attempt = batch_data->call_attempt_;
1674   CallData* calld = call_attempt->calld_;
1675   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1676     gpr_log(GPR_INFO,
1677             "chand=%p calld=%p attempt=%p batch_data=%p: "
1678             "got recv_trailing_metadata_ready, error=%s",
1679             calld->chand_, calld, call_attempt, batch_data.get(),
1680             StatusToString(error).c_str());
1681   }
1682   call_attempt->completed_recv_trailing_metadata_ = true;
1683   // If this attempt has been abandoned, then we're not going to use the
1684   // result of this recv_trailing_metadata op, so do nothing.
1685   if (call_attempt->abandoned_) {
1686     GRPC_CALL_COMBINER_STOP(
1687         calld->call_combiner_,
1688         "recv_trailing_metadata_ready for abandoned attempt");
1689     return;
1690   }
1691   // Cancel per-attempt recv timer, if any.
1692   call_attempt->MaybeCancelPerAttemptRecvTimer();
1693   // Get the call's status and check for server pushback metadata.
1694   grpc_status_code status = GRPC_STATUS_OK;
1695   absl::optional<Duration> server_pushback;
1696   bool is_lb_drop = false;
1697   absl::optional<GrpcStreamNetworkState::ValueType> stream_network_state;
1698   grpc_metadata_batch* md_batch =
1699       batch_data->batch_.payload->recv_trailing_metadata.recv_trailing_metadata;
1700   GetCallStatus(calld->deadline_, md_batch, error, &status, &server_pushback,
1701                 &is_lb_drop, &stream_network_state);
1702   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1703     gpr_log(GPR_INFO,
1704             "chand=%p calld=%p attempt=%p: call finished, status=%s "
1705             "server_pushback=%s is_lb_drop=%d stream_network_state=%s",
1706             calld->chand_, calld, call_attempt,
1707             grpc_status_code_to_string(status),
1708             server_pushback.has_value() ? server_pushback->ToString().c_str()
1709                                         : "N/A",
1710             is_lb_drop,
1711             stream_network_state.has_value()
1712                 ? absl::StrCat(*stream_network_state).c_str()
1713                 : "N/A");
1714   }
1715   // Check if we should retry.
1716   if (!is_lb_drop) {  // Never retry on LB drops.
1717     enum { kNoRetry, kTransparentRetry, kConfigurableRetry } retry = kNoRetry;
1718     // Handle transparent retries.
1719     if (stream_network_state.has_value() && !calld->retry_committed_) {
1720       // If not sent on wire, then always retry.
1721       // If sent on wire but not seen by server, retry exactly once.
1722       if (*stream_network_state == GrpcStreamNetworkState::kNotSentOnWire) {
1723         retry = kTransparentRetry;
1724       } else if (*stream_network_state ==
1725                      GrpcStreamNetworkState::kNotSeenByServer &&
1726                  !calld->sent_transparent_retry_not_seen_by_server_) {
1727         calld->sent_transparent_retry_not_seen_by_server_ = true;
1728         retry = kTransparentRetry;
1729       }
1730     }
1731     // If not transparently retrying, check for configurable retry.
1732     if (retry == kNoRetry &&
1733         call_attempt->ShouldRetry(status, server_pushback)) {
1734       retry = kConfigurableRetry;
1735     }
1736     // If we're retrying, do so.
1737     if (retry != kNoRetry) {
1738       CallCombinerClosureList closures;
1739       // Cancel call attempt.
1740       call_attempt->MaybeAddBatchForCancelOp(
1741           error.ok() ? grpc_error_set_int(
1742                            GRPC_ERROR_CREATE("call attempt failed"),
1743                            StatusIntProperty::kRpcStatus, GRPC_STATUS_CANCELLED)
1744                      : error,
1745           &closures);
1746       // For transparent retries, add a closure to immediately start a new
1747       // call attempt.
1748       // For configurable retries, start retry timer.
1749       if (retry == kTransparentRetry) {
1750         calld->AddClosureToStartTransparentRetry(&closures);
1751       } else {
1752         calld->StartRetryTimer(server_pushback);
1753       }
1754       // Record that this attempt has been abandoned.
1755       call_attempt->Abandon();
1756       // Yields call combiner.
1757       closures.RunClosures(calld->call_combiner_);
1758       return;
1759     }
1760   }
1761   // Not retrying, so commit the call.
1762   calld->RetryCommit(call_attempt);
1763   // If retry state is no longer needed, switch to fast path for
1764   // subsequent batches.
1765   call_attempt->MaybeSwitchToFastPath();
1766   // Run any necessary closures.
1767   batch_data->RunClosuresForCompletedCall(error);
1768 }
1769 
1770 //
1771 // on_complete callback handling
1772 //
1773 
1774 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForCompletedPendingBatch(grpc_error_handle error,CallCombinerClosureList * closures)1775     AddClosuresForCompletedPendingBatch(grpc_error_handle error,
1776                                         CallCombinerClosureList* closures) {
1777   auto* calld = call_attempt_->calld_;
1778   PendingBatch* pending = calld->PendingBatchFind(
1779       "completed", [this](grpc_transport_stream_op_batch* batch) {
1780         // Match the pending batch with the same set of send ops as the
1781         // batch we've just completed.
1782         return batch->on_complete != nullptr &&
1783                batch_.send_initial_metadata == batch->send_initial_metadata &&
1784                batch_.send_message == batch->send_message &&
1785                batch_.send_trailing_metadata == batch->send_trailing_metadata;
1786       });
1787   // If batch_data is a replay batch, then there will be no pending
1788   // batch to complete.
1789   if (pending == nullptr) {
1790     return;
1791   }
1792   // Propagate payload.
1793   if (batch_.send_message) {
1794     pending->batch->payload->send_message.stream_write_closed =
1795         batch_.payload->send_message.stream_write_closed;
1796   }
1797   // Add closure.
1798   closures->Add(pending->batch->on_complete, error,
1799                 "on_complete for pending batch");
1800   pending->batch->on_complete = nullptr;
1801   calld->MaybeClearPendingBatch(pending);
1802 }
1803 
1804 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList * closures)1805     AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) {
1806   auto* calld = call_attempt_->calld_;
1807   bool have_pending_send_ops = call_attempt_->HaveSendOpsToReplay();
1808   // We don't check send_initial_metadata here, because that op will always
1809   // be started as soon as it is received from the surface, so it will
1810   // never need to be started at this point.
1811   if (!have_pending_send_ops) {
1812     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
1813       PendingBatch* pending = &calld->pending_batches_[i];
1814       grpc_transport_stream_op_batch* batch = pending->batch;
1815       if (batch == nullptr || pending->send_ops_cached) continue;
1816       if (batch->send_message || batch->send_trailing_metadata) {
1817         have_pending_send_ops = true;
1818         break;
1819       }
1820     }
1821   }
1822   if (have_pending_send_ops) {
1823     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1824       gpr_log(GPR_INFO,
1825               "chand=%p calld=%p attempt=%p: starting next batch for pending "
1826               "send op(s)",
1827               calld->chand_, calld, call_attempt_);
1828     }
1829     call_attempt_->AddRetriableBatches(closures);
1830   }
1831 }
1832 
OnComplete(void * arg,grpc_error_handle error)1833 void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
1834     void* arg, grpc_error_handle error) {
1835   RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1836   CallAttempt* call_attempt = batch_data->call_attempt_;
1837   CallData* calld = call_attempt->calld_;
1838   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1839     gpr_log(GPR_INFO,
1840             "chand=%p calld=%p attempt=%p batch_data=%p: "
1841             "got on_complete, error=%s, batch=%s",
1842             calld->chand_, calld, call_attempt, batch_data.get(),
1843             StatusToString(error).c_str(),
1844             grpc_transport_stream_op_batch_string(&batch_data->batch_, false)
1845                 .c_str());
1846   }
1847   // If this attempt has been abandoned, then we're not going to propagate
1848   // the completion of this batch, so do nothing.
1849   if (call_attempt->abandoned_) {
1850     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1851                             "on_complete for abandoned attempt");
1852     return;
1853   }
1854   // If we got an error and have not yet gotten the
1855   // recv_trailing_metadata_ready callback, then defer propagating this
1856   // callback back to the surface.  We can evaluate whether to retry when
1857   // recv_trailing_metadata comes back.
1858   if (GPR_UNLIKELY(!calld->retry_committed_ && !error.ok() &&
1859                    !call_attempt->completed_recv_trailing_metadata_)) {
1860     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1861       gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: deferring on_complete",
1862               calld->chand_, calld, call_attempt);
1863     }
1864     call_attempt->on_complete_deferred_batches_.emplace_back(
1865         std::move(batch_data), error);
1866     CallCombinerClosureList closures;
1867     call_attempt->MaybeAddBatchForCancelOp(error, &closures);
1868     if (!call_attempt->started_recv_trailing_metadata_) {
1869       // recv_trailing_metadata not yet started by application; start it
1870       // ourselves to get status.
1871       call_attempt->AddBatchForInternalRecvTrailingMetadata(&closures);
1872     }
1873     closures.RunClosures(calld->call_combiner_);
1874     return;
1875   }
1876   // Update bookkeeping in call_attempt.
1877   if (batch_data->batch_.send_initial_metadata) {
1878     call_attempt->completed_send_initial_metadata_ = true;
1879   }
1880   if (batch_data->batch_.send_message) {
1881     ++call_attempt->completed_send_message_count_;
1882   }
1883   if (batch_data->batch_.send_trailing_metadata) {
1884     call_attempt->completed_send_trailing_metadata_ = true;
1885   }
1886   // If the call is committed, free cached data for send ops that we've just
1887   // completed.
1888   if (calld->retry_committed_) {
1889     batch_data->FreeCachedSendOpDataForCompletedBatch();
1890   }
1891   // Construct list of closures to execute.
1892   CallCombinerClosureList closures;
1893   // Add closure for the completed pending batch, if any.
1894   batch_data->AddClosuresForCompletedPendingBatch(error, &closures);
1895   // If needed, add a callback to start any replay or pending send ops on
1896   // the LB call.
1897   if (!call_attempt->completed_recv_trailing_metadata_) {
1898     batch_data->AddClosuresForReplayOrPendingSendOps(&closures);
1899   }
1900   // If retry state is no longer needed (i.e., we're committed and there
1901   // are no more send ops to replay), switch to fast path for subsequent
1902   // batches.
1903   call_attempt->MaybeSwitchToFastPath();
1904   // Schedule all of the closures identified above.
1905   // Note: This yields the call combiner.
1906   closures.RunClosures(calld->call_combiner_);
1907 }
1908 
OnCompleteForCancelOp(void * arg,grpc_error_handle error)1909 void RetryFilter::CallData::CallAttempt::BatchData::OnCompleteForCancelOp(
1910     void* arg, grpc_error_handle error) {
1911   RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
1912   CallAttempt* call_attempt = batch_data->call_attempt_;
1913   CallData* calld = call_attempt->calld_;
1914   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1915     gpr_log(GPR_INFO,
1916             "chand=%p calld=%p attempt=%p batch_data=%p: "
1917             "got on_complete for cancel_stream batch, error=%s, batch=%s",
1918             calld->chand_, calld, call_attempt, batch_data.get(),
1919             StatusToString(error).c_str(),
1920             grpc_transport_stream_op_batch_string(&batch_data->batch_, false)
1921                 .c_str());
1922   }
1923   GRPC_CALL_COMBINER_STOP(
1924       calld->call_combiner_,
1925       "on_complete for internally generated cancel_stream op");
1926 }
1927 
1928 //
1929 // retriable batch construction
1930 //
1931 
1932 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendInitialMetadataOp()1933     AddRetriableSendInitialMetadataOp() {
1934   auto* calld = call_attempt_->calld_;
1935   // We need to make a copy of the metadata batch for each attempt, since
1936   // the filters in the subchannel stack may modify this batch, and we don't
1937   // want those modifications to be passed forward to subsequent attempts.
1938   //
1939   // If we've already completed one or more attempts, add the
1940   // grpc-retry-attempts header.
1941   call_attempt_->send_initial_metadata_ = calld->send_initial_metadata_.Copy();
1942   if (GPR_UNLIKELY(calld->num_attempts_completed_ > 0)) {
1943     call_attempt_->send_initial_metadata_.Set(GrpcPreviousRpcAttemptsMetadata(),
1944                                               calld->num_attempts_completed_);
1945   } else {
1946     call_attempt_->send_initial_metadata_.Remove(
1947         GrpcPreviousRpcAttemptsMetadata());
1948   }
1949   call_attempt_->started_send_initial_metadata_ = true;
1950   batch_.send_initial_metadata = true;
1951   batch_.payload->send_initial_metadata.send_initial_metadata =
1952       &call_attempt_->send_initial_metadata_;
1953 }
1954 
1955 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendMessageOp()1956     AddRetriableSendMessageOp() {
1957   auto* calld = call_attempt_->calld_;
1958   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1959     gpr_log(
1960         GPR_INFO,
1961         "chand=%p calld=%p attempt=%p: starting calld->send_messages[%" PRIuPTR
1962         "]",
1963         calld->chand_, calld, call_attempt_,
1964         call_attempt_->started_send_message_count_);
1965   }
1966   CachedSendMessage cache =
1967       calld->send_messages_[call_attempt_->started_send_message_count_];
1968   ++call_attempt_->started_send_message_count_;
1969   batch_.send_message = true;
1970   batch_.payload->send_message.send_message = cache.slices;
1971   batch_.payload->send_message.flags = cache.flags;
1972 }
1973 
1974 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendTrailingMetadataOp()1975     AddRetriableSendTrailingMetadataOp() {
1976   auto* calld = call_attempt_->calld_;
1977   // We need to make a copy of the metadata batch for each attempt, since
1978   // the filters in the subchannel stack may modify this batch, and we don't
1979   // want those modifications to be passed forward to subsequent attempts.
1980   call_attempt_->send_trailing_metadata_ =
1981       calld->send_trailing_metadata_.Copy();
1982   call_attempt_->started_send_trailing_metadata_ = true;
1983   batch_.send_trailing_metadata = true;
1984   batch_.payload->send_trailing_metadata.send_trailing_metadata =
1985       &call_attempt_->send_trailing_metadata_;
1986 }
1987 
1988 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvInitialMetadataOp()1989     AddRetriableRecvInitialMetadataOp() {
1990   call_attempt_->started_recv_initial_metadata_ = true;
1991   batch_.recv_initial_metadata = true;
1992   call_attempt_->recv_initial_metadata_.Clear();
1993   batch_.payload->recv_initial_metadata.recv_initial_metadata =
1994       &call_attempt_->recv_initial_metadata_;
1995   batch_.payload->recv_initial_metadata.trailing_metadata_available =
1996       &call_attempt_->trailing_metadata_available_;
1997   GRPC_CLOSURE_INIT(&call_attempt_->recv_initial_metadata_ready_,
1998                     RecvInitialMetadataReady, this, grpc_schedule_on_exec_ctx);
1999   batch_.payload->recv_initial_metadata.recv_initial_metadata_ready =
2000       &call_attempt_->recv_initial_metadata_ready_;
2001 }
2002 
2003 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvMessageOp()2004     AddRetriableRecvMessageOp() {
2005   ++call_attempt_->started_recv_message_count_;
2006   batch_.recv_message = true;
2007   batch_.payload->recv_message.recv_message = &call_attempt_->recv_message_;
2008   batch_.payload->recv_message.flags = &call_attempt_->recv_message_flags_;
2009   batch_.payload->recv_message.call_failed_before_recv_message = nullptr;
2010   GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_, RecvMessageReady, this,
2011                     grpc_schedule_on_exec_ctx);
2012   batch_.payload->recv_message.recv_message_ready =
2013       &call_attempt_->recv_message_ready_;
2014 }
2015 
2016 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvTrailingMetadataOp()2017     AddRetriableRecvTrailingMetadataOp() {
2018   call_attempt_->started_recv_trailing_metadata_ = true;
2019   batch_.recv_trailing_metadata = true;
2020   call_attempt_->recv_trailing_metadata_.Clear();
2021   batch_.payload->recv_trailing_metadata.recv_trailing_metadata =
2022       &call_attempt_->recv_trailing_metadata_;
2023   batch_.payload->recv_trailing_metadata.collect_stats =
2024       &call_attempt_->collect_stats_;
2025   GRPC_CLOSURE_INIT(&call_attempt_->recv_trailing_metadata_ready_,
2026                     RecvTrailingMetadataReady, this, grpc_schedule_on_exec_ctx);
2027   batch_.payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2028       &call_attempt_->recv_trailing_metadata_ready_;
2029 }
2030 
AddCancelStreamOp(grpc_error_handle error)2031 void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp(
2032     grpc_error_handle error) {
2033   batch_.cancel_stream = true;
2034   batch_.payload->cancel_stream.cancel_error = error;
2035   // Override on_complete callback.
2036   GRPC_CLOSURE_INIT(&on_complete_, OnCompleteForCancelOp, this, nullptr);
2037 }
2038 
2039 //
2040 // CallData vtable functions
2041 //
2042 
Init(grpc_call_element * elem,const grpc_call_element_args * args)2043 grpc_error_handle RetryFilter::CallData::Init(
2044     grpc_call_element* elem, const grpc_call_element_args* args) {
2045   auto* chand = static_cast<RetryFilter*>(elem->channel_data);
2046   new (elem->call_data) CallData(chand, *args);
2047   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2048     gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand,
2049             elem->call_data);
2050   }
2051   return absl::OkStatus();
2052 }
2053 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)2054 void RetryFilter::CallData::Destroy(grpc_call_element* elem,
2055                                     const grpc_call_final_info* /*final_info*/,
2056                                     grpc_closure* then_schedule_closure) {
2057   auto* calld = static_cast<CallData*>(elem->call_data);
2058   // Save our ref to the CallStackDestructionBarrier until after our
2059   // dtor is invoked.
2060   RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier =
2061       std::move(calld->call_stack_destruction_barrier_);
2062   calld->~CallData();
2063   // Now set the callback in the CallStackDestructionBarrier object,
2064   // right before we release our ref to it (implicitly upon returning).
2065   // The callback will be invoked when the CallStackDestructionBarrier
2066   // is destroyed.
2067   call_stack_destruction_barrier->set_on_call_stack_destruction(
2068       then_schedule_closure);
2069 }
2070 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2071 void RetryFilter::CallData::StartTransportStreamOpBatch(
2072     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2073   auto* calld = static_cast<CallData*>(elem->call_data);
2074   calld->StartTransportStreamOpBatch(batch);
2075 }
2076 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2077 void RetryFilter::CallData::SetPollent(grpc_call_element* elem,
2078                                        grpc_polling_entity* pollent) {
2079   auto* calld = static_cast<CallData*>(elem->call_data);
2080   calld->pollent_ = pollent;
2081 }
2082 
2083 //
2084 // CallData implementation
2085 //
2086 
GetRetryPolicy(const grpc_call_context_element * context)2087 const RetryMethodConfig* RetryFilter::GetRetryPolicy(
2088     const grpc_call_context_element* context) {
2089   if (context == nullptr) return nullptr;
2090   auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
2091       context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2092   if (svc_cfg_call_data == nullptr) return nullptr;
2093   return static_cast<const RetryMethodConfig*>(
2094       svc_cfg_call_data->GetMethodParsedConfig(service_config_parser_index_));
2095 }
2096 
CallData(RetryFilter * chand,const grpc_call_element_args & args)2097 RetryFilter::CallData::CallData(RetryFilter* chand,
2098                                 const grpc_call_element_args& args)
2099     : chand_(chand),
2100       retry_throttle_data_(chand->retry_throttle_data_),
2101       retry_policy_(chand->GetRetryPolicy(args.context)),
2102       retry_backoff_(
2103           BackOff::Options()
2104               .set_initial_backoff(retry_policy_ == nullptr
2105                                        ? Duration::Zero()
2106                                        : retry_policy_->initial_backoff())
2107               .set_multiplier(retry_policy_ == nullptr
2108                                   ? 0
2109                                   : retry_policy_->backoff_multiplier())
2110               .set_jitter(RETRY_BACKOFF_JITTER)
2111               .set_max_backoff(retry_policy_ == nullptr
2112                                    ? Duration::Zero()
2113                                    : retry_policy_->max_backoff())),
2114       path_(CSliceRef(args.path)),
2115       deadline_(args.deadline),
2116       arena_(args.arena),
2117       owning_call_(args.call_stack),
2118       call_combiner_(args.call_combiner),
2119       call_context_(args.context),
2120       call_stack_destruction_barrier_(
2121           arena_->New<CallStackDestructionBarrier>()),
2122       pending_send_initial_metadata_(false),
2123       pending_send_message_(false),
2124       pending_send_trailing_metadata_(false),
2125       retry_committed_(false),
2126       retry_codepath_started_(false),
2127       sent_transparent_retry_not_seen_by_server_(false) {}
2128 
~CallData()2129 RetryFilter::CallData::~CallData() {
2130   FreeAllCachedSendOpData();
2131   CSliceUnref(path_);
2132   // Make sure there are no remaining pending batches.
2133   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2134     GPR_ASSERT(pending_batches_[i].batch == nullptr);
2135   }
2136 }
2137 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)2138 void RetryFilter::CallData::StartTransportStreamOpBatch(
2139     grpc_transport_stream_op_batch* batch) {
2140   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) &&
2141       !GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
2142     gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from surface: %s",
2143             chand_, this,
2144             grpc_transport_stream_op_batch_string(batch, false).c_str());
2145   }
2146   // If we have an LB call, delegate to the LB call.
2147   if (committed_call_ != nullptr) {
2148     // Note: This will release the call combiner.
2149     committed_call_->StartTransportStreamOpBatch(batch);
2150     return;
2151   }
2152   // If we were previously cancelled from the surface, fail this
2153   // batch immediately.
2154   if (!cancelled_from_surface_.ok()) {
2155     // Note: This will release the call combiner.
2156     grpc_transport_stream_op_batch_finish_with_failure(
2157         batch, cancelled_from_surface_, call_combiner_);
2158     return;
2159   }
2160   // Handle cancellation.
2161   if (GPR_UNLIKELY(batch->cancel_stream)) {
2162     // Save cancel_error in case subsequent batches are started.
2163     cancelled_from_surface_ = batch->payload->cancel_stream.cancel_error;
2164     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2165       gpr_log(GPR_INFO, "chand=%p calld=%p: cancelled from surface: %s", chand_,
2166               this, StatusToString(cancelled_from_surface_).c_str());
2167     }
2168     // Fail any pending batches.
2169     PendingBatchesFail(cancelled_from_surface_);
2170     // If we have a current call attempt, commit the call, then send
2171     // the cancellation down to that attempt.  When the call fails, it
2172     // will not be retried, because we have committed it here.
2173     if (call_attempt_ != nullptr) {
2174       RetryCommit(call_attempt_.get());
2175       // TODO(roth): When implementing hedging, this will get more
2176       // complex, because instead of just passing the batch down to a
2177       // single call attempt, we'll need to cancel multiple call
2178       // attempts and wait for the cancellation on_complete from each call
2179       // attempt before we propagate the on_complete from this batch
2180       // back to the surface.
2181       // Note: This will release the call combiner.
2182       call_attempt_->CancelFromSurface(batch);
2183       return;
2184     }
2185     // Cancel retry timer if needed.
2186     if (retry_timer_handle_.has_value()) {
2187       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2188         gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling retry timer", chand_,
2189                 this);
2190       }
2191       if (chand_->event_engine_->Cancel(*retry_timer_handle_)) {
2192         GRPC_CALL_STACK_UNREF(owning_call_, "OnRetryTimer");
2193       }
2194       retry_timer_handle_.reset();
2195       FreeAllCachedSendOpData();
2196     }
2197     // We have no call attempt, so there's nowhere to send the cancellation
2198     // batch.  Return it back to the surface immediately.
2199     // Note: This will release the call combiner.
2200     grpc_transport_stream_op_batch_finish_with_failure(
2201         batch, cancelled_from_surface_, call_combiner_);
2202     return;
2203   }
2204   // Add the batch to the pending list.
2205   PendingBatch* pending = PendingBatchesAdd(batch);
2206   // If the timer is pending, yield the call combiner and wait for it to
2207   // run, since we don't want to start another call attempt until it does.
2208   if (retry_timer_handle_.has_value()) {
2209     GRPC_CALL_COMBINER_STOP(call_combiner_,
2210                             "added pending batch while retry timer pending");
2211     return;
2212   }
2213   // If we do not yet have a call attempt, create one.
2214   if (call_attempt_ == nullptr) {
2215     // If this is the first batch and retries are already committed
2216     // (e.g., if this batch put the call above the buffer size limit), then
2217     // immediately create an LB call and delegate the batch to it.  This
2218     // avoids the overhead of unnecessarily allocating a CallAttempt
2219     // object or caching any of the send op data.
2220     // Note that we would ideally like to do this also on subsequent
2221     // attempts (e.g., if a batch puts the call above the buffer size
2222     // limit since the last attempt was complete), but in practice that's
2223     // not really worthwhile, because we will almost always have cached and
2224     // completed at least the send_initial_metadata op on the previous
2225     // attempt, which means that we'd need special logic to replay the
2226     // batch anyway, which is exactly what the CallAttempt object provides.
2227     // We also skip this optimization if perAttemptRecvTimeout is set in the
2228     // retry policy, because we need the code in CallAttempt to handle
2229     // the associated timer.
2230     if (!retry_codepath_started_ && retry_committed_ &&
2231         (retry_policy_ == nullptr ||
2232          !retry_policy_->per_attempt_recv_timeout().has_value())) {
2233       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2234         gpr_log(GPR_INFO,
2235                 "chand=%p calld=%p: retry committed before first attempt; "
2236                 "creating LB call",
2237                 chand_, this);
2238       }
2239       PendingBatchClear(pending);
2240       auto* service_config_call_data =
2241           static_cast<ClientChannelServiceConfigCallData*>(
2242               call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2243       committed_call_ = CreateLoadBalancedCall(
2244           [service_config_call_data]() { service_config_call_data->Commit(); },
2245           /*is_transparent_retry=*/false);
2246       committed_call_->StartTransportStreamOpBatch(batch);
2247       return;
2248     }
2249     // Otherwise, create a call attempt.
2250     // The attempt will automatically start any necessary replays or
2251     // pending batches.
2252     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2253       gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_,
2254               this);
2255     }
2256     retry_codepath_started_ = true;
2257     CreateCallAttempt(/*is_transparent_retry=*/false);
2258     return;
2259   }
2260   // Send batches to call attempt.
2261   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2262     gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on attempt=%p", chand_,
2263             this, call_attempt_.get());
2264   }
2265   call_attempt_->StartRetriableBatches();
2266 }
2267 
2268 OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2269 RetryFilter::CallData::CreateLoadBalancedCall(
2270     absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) {
2271   grpc_call_element_args args = {owning_call_, nullptr,          call_context_,
2272                                  path_,        /*start_time=*/0, deadline_,
2273                                  arena_,       call_combiner_};
2274   return chand_->client_channel_->CreateLoadBalancedCall(
2275       args, pollent_,
2276       // This callback holds a ref to the CallStackDestructionBarrier
2277       // object until the LB call is destroyed.
2278       call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this),
2279       std::move(on_commit), is_transparent_retry);
2280 }
2281 
CreateCallAttempt(bool is_transparent_retry)2282 void RetryFilter::CallData::CreateCallAttempt(bool is_transparent_retry) {
2283   call_attempt_ = MakeRefCounted<CallAttempt>(this, is_transparent_retry);
2284   call_attempt_->StartRetriableBatches();
2285 }
2286 
2287 //
2288 // send op data caching
2289 //
2290 
MaybeCacheSendOpsForBatch(PendingBatch * pending)2291 void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
2292   if (pending->send_ops_cached) return;
2293   pending->send_ops_cached = true;
2294   grpc_transport_stream_op_batch* batch = pending->batch;
2295   // Save a copy of metadata for send_initial_metadata ops.
2296   if (batch->send_initial_metadata) {
2297     seen_send_initial_metadata_ = true;
2298     grpc_metadata_batch* send_initial_metadata =
2299         batch->payload->send_initial_metadata.send_initial_metadata;
2300     send_initial_metadata_ = send_initial_metadata->Copy();
2301   }
2302   // Set up cache for send_message ops.
2303   if (batch->send_message) {
2304     SliceBuffer* cache = arena_->New<SliceBuffer>(std::move(
2305         *std::exchange(batch->payload->send_message.send_message, nullptr)));
2306     send_messages_.push_back({cache, batch->payload->send_message.flags});
2307   }
2308   // Save metadata batch for send_trailing_metadata ops.
2309   if (batch->send_trailing_metadata) {
2310     seen_send_trailing_metadata_ = true;
2311     grpc_metadata_batch* send_trailing_metadata =
2312         batch->payload->send_trailing_metadata.send_trailing_metadata;
2313     send_trailing_metadata_ = send_trailing_metadata->Copy();
2314   }
2315 }
2316 
FreeCachedSendInitialMetadata()2317 void RetryFilter::CallData::FreeCachedSendInitialMetadata() {
2318   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2319     gpr_log(GPR_INFO, "chand=%p calld=%p: destroying send_initial_metadata",
2320             chand_, this);
2321   }
2322   send_initial_metadata_.Clear();
2323 }
2324 
FreeCachedSendMessage(size_t idx)2325 void RetryFilter::CallData::FreeCachedSendMessage(size_t idx) {
2326   if (send_messages_[idx].slices != nullptr) {
2327     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2328       gpr_log(GPR_INFO,
2329               "chand=%p calld=%p: destroying send_messages[%" PRIuPTR "]",
2330               chand_, this, idx);
2331     }
2332     Destruct(std::exchange(send_messages_[idx].slices, nullptr));
2333   }
2334 }
2335 
FreeCachedSendTrailingMetadata()2336 void RetryFilter::CallData::FreeCachedSendTrailingMetadata() {
2337   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2338     gpr_log(GPR_INFO, "chand=%p calld=%p: destroying send_trailing_metadata",
2339             chand_, this);
2340   }
2341   send_trailing_metadata_.Clear();
2342 }
2343 
FreeAllCachedSendOpData()2344 void RetryFilter::CallData::FreeAllCachedSendOpData() {
2345   if (seen_send_initial_metadata_) {
2346     FreeCachedSendInitialMetadata();
2347   }
2348   for (size_t i = 0; i < send_messages_.size(); ++i) {
2349     FreeCachedSendMessage(i);
2350   }
2351   if (seen_send_trailing_metadata_) {
2352     FreeCachedSendTrailingMetadata();
2353   }
2354 }
2355 
2356 //
2357 // pending_batches management
2358 //
2359 
GetBatchIndex(grpc_transport_stream_op_batch * batch)2360 size_t RetryFilter::CallData::GetBatchIndex(
2361     grpc_transport_stream_op_batch* batch) {
2362   if (batch->send_initial_metadata) return 0;
2363   if (batch->send_message) return 1;
2364   if (batch->send_trailing_metadata) return 2;
2365   if (batch->recv_initial_metadata) return 3;
2366   if (batch->recv_message) return 4;
2367   if (batch->recv_trailing_metadata) return 5;
2368   GPR_UNREACHABLE_CODE(return (size_t)-1);
2369 }
2370 
2371 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2372 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchesAdd(
2373     grpc_transport_stream_op_batch* batch) {
2374   const size_t idx = GetBatchIndex(batch);
2375   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2376     gpr_log(GPR_INFO,
2377             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR,
2378             chand_, this, idx);
2379   }
2380   PendingBatch* pending = &pending_batches_[idx];
2381   GPR_ASSERT(pending->batch == nullptr);
2382   pending->batch = batch;
2383   pending->send_ops_cached = false;
2384   // Update state in calld about pending batches.
2385   // Also check if the batch takes us over the retry buffer limit.
2386   // Note: We don't check the size of trailing metadata here, because
2387   // gRPC clients do not send trailing metadata.
2388   if (batch->send_initial_metadata) {
2389     pending_send_initial_metadata_ = true;
2390     bytes_buffered_for_retry_ += batch->payload->send_initial_metadata
2391                                      .send_initial_metadata->TransportSize();
2392   }
2393   if (batch->send_message) {
2394     pending_send_message_ = true;
2395     bytes_buffered_for_retry_ +=
2396         batch->payload->send_message.send_message->Length();
2397   }
2398   if (batch->send_trailing_metadata) {
2399     pending_send_trailing_metadata_ = true;
2400   }
2401   // TODO(roth): When we implement hedging, if there are currently attempts
2402   // in flight, we will need to pick the one on which the max number of send
2403   // ops have already been sent, and we commit to that attempt.
2404   if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
2405                    chand_->per_rpc_retry_buffer_size_)) {
2406     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2407       gpr_log(GPR_INFO,
2408               "chand=%p calld=%p: exceeded retry buffer size, committing",
2409               chand_, this);
2410     }
2411     RetryCommit(call_attempt_.get());
2412   }
2413   return pending;
2414 }
2415 
PendingBatchClear(PendingBatch * pending)2416 void RetryFilter::CallData::PendingBatchClear(PendingBatch* pending) {
2417   if (pending->batch->send_initial_metadata) {
2418     pending_send_initial_metadata_ = false;
2419   }
2420   if (pending->batch->send_message) {
2421     pending_send_message_ = false;
2422   }
2423   if (pending->batch->send_trailing_metadata) {
2424     pending_send_trailing_metadata_ = false;
2425   }
2426   pending->batch = nullptr;
2427 }
2428 
MaybeClearPendingBatch(PendingBatch * pending)2429 void RetryFilter::CallData::MaybeClearPendingBatch(PendingBatch* pending) {
2430   grpc_transport_stream_op_batch* batch = pending->batch;
2431   // We clear the pending batch if all of its callbacks have been
2432   // scheduled and reset to nullptr.
2433   if (batch->on_complete == nullptr &&
2434       (!batch->recv_initial_metadata ||
2435        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2436            nullptr) &&
2437       (!batch->recv_message ||
2438        batch->payload->recv_message.recv_message_ready == nullptr) &&
2439       (!batch->recv_trailing_metadata ||
2440        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2441            nullptr)) {
2442     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2443       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand_,
2444               this);
2445     }
2446     PendingBatchClear(pending);
2447   }
2448 }
2449 
2450 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2451 void RetryFilter::CallData::FailPendingBatchInCallCombiner(
2452     void* arg, grpc_error_handle error) {
2453   grpc_transport_stream_op_batch* batch =
2454       static_cast<grpc_transport_stream_op_batch*>(arg);
2455   CallData* call = static_cast<CallData*>(batch->handler_private.extra_arg);
2456   // Note: This will release the call combiner.
2457   grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2458                                                      call->call_combiner_);
2459 }
2460 
2461 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error)2462 void RetryFilter::CallData::PendingBatchesFail(grpc_error_handle error) {
2463   GPR_ASSERT(!error.ok());
2464   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2465     size_t num_batches = 0;
2466     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2467       if (pending_batches_[i].batch != nullptr) ++num_batches;
2468     }
2469     gpr_log(GPR_INFO,
2470             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2471             chand_, this, num_batches, StatusToString(error).c_str());
2472   }
2473   CallCombinerClosureList closures;
2474   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2475     PendingBatch* pending = &pending_batches_[i];
2476     grpc_transport_stream_op_batch* batch = pending->batch;
2477     if (batch != nullptr) {
2478       batch->handler_private.extra_arg = this;
2479       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2480                         FailPendingBatchInCallCombiner, batch,
2481                         grpc_schedule_on_exec_ctx);
2482       closures.Add(&batch->handler_private.closure, error,
2483                    "PendingBatchesFail");
2484       PendingBatchClear(pending);
2485     }
2486   }
2487   closures.RunClosuresWithoutYielding(call_combiner_);
2488 }
2489 
2490 template <typename Predicate>
PendingBatchFind(const char * log_message,Predicate predicate)2491 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchFind(
2492     const char* log_message, Predicate predicate) {
2493   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2494     PendingBatch* pending = &pending_batches_[i];
2495     grpc_transport_stream_op_batch* batch = pending->batch;
2496     if (batch != nullptr && predicate(batch)) {
2497       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2498         gpr_log(GPR_INFO,
2499                 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR,
2500                 chand_, this, log_message, i);
2501       }
2502       return pending;
2503     }
2504   }
2505   return nullptr;
2506 }
2507 
2508 //
2509 // retry code
2510 //
2511 
RetryCommit(CallAttempt * call_attempt)2512 void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) {
2513   if (retry_committed_) return;
2514   retry_committed_ = true;
2515   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2516     gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand_, this);
2517   }
2518   if (call_attempt != nullptr) {
2519     // If the call attempt's LB call has been committed, invoke the
2520     // call's on_commit callback.
2521     // Note: If call_attempt is null, this is happening before the first
2522     // retry attempt is started, in which case we'll just pass the real
2523     // on_commit callback down into the LB call, and it won't be our
2524     // problem anymore.
2525     if (call_attempt->lb_call_committed()) {
2526       auto* service_config_call_data =
2527           static_cast<ClientChannelServiceConfigCallData*>(
2528               call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2529       service_config_call_data->Commit();
2530     }
2531     // Free cached send ops.
2532     call_attempt->FreeCachedSendOpDataAfterCommit();
2533   }
2534 }
2535 
StartRetryTimer(absl::optional<Duration> server_pushback)2536 void RetryFilter::CallData::StartRetryTimer(
2537     absl::optional<Duration> server_pushback) {
2538   // Reset call attempt.
2539   call_attempt_.reset(DEBUG_LOCATION, "StartRetryTimer");
2540   // Compute backoff delay.
2541   Duration next_attempt_timeout;
2542   if (server_pushback.has_value()) {
2543     GPR_ASSERT(*server_pushback >= Duration::Zero());
2544     next_attempt_timeout = *server_pushback;
2545     retry_backoff_.Reset();
2546   } else {
2547     next_attempt_timeout = retry_backoff_.NextAttemptTime() - Timestamp::Now();
2548   }
2549   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2550     gpr_log(GPR_INFO,
2551             "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand_,
2552             this, next_attempt_timeout.millis());
2553   }
2554   // Schedule retry after computed delay.
2555   GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer");
2556   retry_timer_handle_ =
2557       chand_->event_engine_->RunAfter(next_attempt_timeout, [this] {
2558         ApplicationCallbackExecCtx callback_exec_ctx;
2559         ExecCtx exec_ctx;
2560         OnRetryTimer();
2561       });
2562 }
2563 
OnRetryTimer()2564 void RetryFilter::CallData::OnRetryTimer() {
2565   GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimerLocked, this, nullptr);
2566   GRPC_CALL_COMBINER_START(call_combiner_, &retry_closure_, absl::OkStatus(),
2567                            "retry timer fired");
2568 }
2569 
OnRetryTimerLocked(void * arg,grpc_error_handle)2570 void RetryFilter::CallData::OnRetryTimerLocked(void* arg,
2571                                                grpc_error_handle /*error*/) {
2572   auto* calld = static_cast<CallData*>(arg);
2573   calld->retry_timer_handle_.reset();
2574   calld->CreateCallAttempt(/*is_transparent_retry=*/false);
2575   GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
2576 }
2577 
AddClosureToStartTransparentRetry(CallCombinerClosureList * closures)2578 void RetryFilter::CallData::AddClosureToStartTransparentRetry(
2579     CallCombinerClosureList* closures) {
2580   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2581     gpr_log(GPR_INFO, "chand=%p calld=%p: scheduling transparent retry", chand_,
2582             this);
2583   }
2584   GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer");
2585   GRPC_CLOSURE_INIT(&retry_closure_, StartTransparentRetry, this, nullptr);
2586   closures->Add(&retry_closure_, absl::OkStatus(), "start transparent retry");
2587 }
2588 
StartTransparentRetry(void * arg,grpc_error_handle)2589 void RetryFilter::CallData::StartTransparentRetry(void* arg,
2590                                                   grpc_error_handle /*error*/) {
2591   auto* calld = static_cast<CallData*>(arg);
2592   if (calld->cancelled_from_surface_.ok()) {
2593     calld->CreateCallAttempt(/*is_transparent_retry=*/true);
2594   } else {
2595     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2596                             "call cancelled before transparent retry");
2597   }
2598   GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
2599 }
2600 
2601 }  // namespace
2602 
2603 const grpc_channel_filter kRetryFilterVtable = {
2604     RetryFilter::CallData::StartTransportStreamOpBatch,
2605     nullptr,
2606     RetryFilter::StartTransportOp,
2607     sizeof(RetryFilter::CallData),
2608     RetryFilter::CallData::Init,
2609     RetryFilter::CallData::SetPollent,
2610     RetryFilter::CallData::Destroy,
2611     sizeof(RetryFilter),
2612     RetryFilter::Init,
2613     grpc_channel_stack_no_post_init,
2614     RetryFilter::Destroy,
2615     RetryFilter::GetChannelInfo,
2616     "retry_filter",
2617 };
2618 
2619 }  // namespace grpc_core
2620