xref: /aosp_15_r20/external/grpc-grpc/test/core/end2end/tests/retry_send_op_fails.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <memory>
20 #include <new>
21 
22 #include "absl/status/status.h"
23 #include "absl/types/optional.h"
24 #include "gtest/gtest.h"
25 
26 #include <grpc/impl/channel_arg_names.h>
27 #include <grpc/status.h>
28 
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/channel/channel_fwd.h"
31 #include "src/core/lib/channel/channel_stack.h"
32 #include "src/core/lib/config/core_configuration.h"
33 #include "src/core/lib/gprpp/status_helper.h"
34 #include "src/core/lib/gprpp/time.h"
35 #include "src/core/lib/iomgr/call_combiner.h"
36 #include "src/core/lib/iomgr/closure.h"
37 #include "src/core/lib/iomgr/error.h"
38 #include "src/core/lib/surface/channel_stack_type.h"
39 #include "src/core/lib/transport/transport.h"
40 #include "test/core/end2end/end2end_tests.h"
41 
42 namespace grpc_core {
43 namespace {
44 
45 // A filter that, for the first call it sees, will fail all batches except
46 // for cancellations, so that the call fails with status ABORTED.
47 // All subsequent calls are allowed through without failures.
48 class FailFirstCallFilter {
49  public:
50   static grpc_channel_filter kFilterVtable;
51 
52  private:
53   class CallData {
54    public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)55     static grpc_error_handle Init(grpc_call_element* elem,
56                                   const grpc_call_element_args* args) {
57       new (elem->call_data) CallData(args);
58       return absl::OkStatus();
59     }
60 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)61     static void Destroy(grpc_call_element* elem,
62                         const grpc_call_final_info* /*final_info*/,
63                         grpc_closure* /*ignored*/) {
64       auto* calld = static_cast<CallData*>(elem->call_data);
65       calld->~CallData();
66     }
67 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)68     static void StartTransportStreamOpBatch(
69         grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
70       auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
71       auto* calld = static_cast<CallData*>(elem->call_data);
72       if (!chand->seen_first_) {
73         chand->seen_first_ = true;
74         calld->fail_ = true;
75       }
76       if (calld->fail_ && !batch->cancel_stream) {
77         grpc_transport_stream_op_batch_finish_with_failure(
78             batch,
79             grpc_error_set_int(
80                 GRPC_ERROR_CREATE("FailFirstCallFilter failing batch"),
81                 StatusIntProperty::kRpcStatus, GRPC_STATUS_ABORTED),
82             calld->call_combiner_);
83         return;
84       }
85       grpc_call_next_op(elem, batch);
86     }
87 
88    private:
CallData(const grpc_call_element_args * args)89     explicit CallData(const grpc_call_element_args* args)
90         : call_combiner_(args->call_combiner) {}
91 
92     CallCombiner* call_combiner_;
93     bool fail_ = false;
94   };
95 
Init(grpc_channel_element * elem,grpc_channel_element_args *)96   static grpc_error_handle Init(grpc_channel_element* elem,
97                                 grpc_channel_element_args* /*args*/) {
98     new (elem->channel_data) FailFirstCallFilter();
99     return absl::OkStatus();
100   }
101 
Destroy(grpc_channel_element * elem)102   static void Destroy(grpc_channel_element* elem) {
103     auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
104     chand->~FailFirstCallFilter();
105   }
106 
107   bool seen_first_ = false;
108 };
109 
110 grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
111     CallData::StartTransportStreamOpBatch,
112     nullptr,
113     nullptr,
114     grpc_channel_next_op,
115     sizeof(CallData),
116     CallData::Init,
117     grpc_call_stack_ignore_set_pollset_or_pollset_set,
118     CallData::Destroy,
119     sizeof(FailFirstCallFilter),
120     Init,
121     grpc_channel_stack_no_post_init,
122     Destroy,
123     grpc_channel_next_get_info,
124     "FailFirstCallFilter",
125 };
126 
127 // Tests failure on a send op batch:
128 // - 2 retries allowed for ABORTED status
129 // - on the first call attempt, the batch containing the
130 //   send_initial_metadata op fails, and then the call returns ABORTED,
131 //   all without ever going out on the wire
132 // - second attempt returns ABORTED but does not retry, because only 2
133 //   attempts are allowed
CORE_END2END_TEST(RetryTest,RetrySendOpFails)134 CORE_END2END_TEST(RetryTest, RetrySendOpFails) {
135   CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
136     builder->channel_init()
137         ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
138                          &FailFirstCallFilter::kFilterVtable)
139         // Skip on proxy (which explicitly disables retries).
140         .IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
141   });
142   InitServer(ChannelArgs());
143   InitClient(ChannelArgs().Set(
144       GRPC_ARG_SERVICE_CONFIG,
145       "{\n"
146       "  \"methodConfig\": [ {\n"
147       "    \"name\": [\n"
148       "      { \"service\": \"service\", \"method\": \"method\" }\n"
149       "    ],\n"
150       "    \"retryPolicy\": {\n"
151       "      \"maxAttempts\": 2,\n"
152       "      \"initialBackoff\": \"1s\",\n"
153       "      \"maxBackoff\": \"120s\",\n"
154       "      \"backoffMultiplier\": 1.6,\n"
155       "      \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
156       "    }\n"
157       "  } ]\n"
158       "}"));
159   auto c =
160       NewClientCall("/service/method").Timeout(Duration::Seconds(5)).Create();
161   EXPECT_NE(c.GetPeer(), absl::nullopt);
162   // Start a batch containing send ops.
163   c.NewBatch(1)
164       .SendInitialMetadata({})
165       .SendMessage("foo")
166       .SendCloseFromClient();
167   // Start a batch containing recv ops.
168   IncomingMessage server_message;
169   IncomingMetadata server_initial_metadata;
170   IncomingStatusOnClient server_status;
171   c.NewBatch(2)
172       .RecvInitialMetadata(server_initial_metadata)
173       .RecvMessage(server_message)
174       .RecvStatusOnClient(server_status);
175   // Client send ops should now complete.
176   Expect(1, true);
177   Step();
178   // Server should get a call.
179   auto s = RequestCall(101);
180   Expect(101, true);
181   Step();
182   // Server fails with status ABORTED.
183   IncomingCloseOnServer client_close;
184   s.NewBatch(102)
185       .SendInitialMetadata({})
186       .SendStatusFromServer(GRPC_STATUS_ABORTED, "xyz", {})
187       .RecvCloseOnServer(client_close);
188   // In principle, the server batch should complete before the client
189   // recv ops batch, but in the proxy fixtures, there are multiple threads
190   // involved, so the completion order tends to be a little racy.
191   Expect(102, true);
192   Expect(2, true);
193   Step();
194   EXPECT_EQ(server_status.status(), GRPC_STATUS_ABORTED);
195   EXPECT_EQ(server_status.message(), "xyz");
196   EXPECT_EQ(s.method(), "/service/method");
197   EXPECT_FALSE(client_close.was_cancelled());
198   // Make sure the "grpc-previous-rpc-attempts" header was sent in the retry.
199   EXPECT_EQ(s.GetInitialMetadata("grpc-previous-rpc-attempts"), "1");
200 }
201 
202 }  // namespace
203 }  // namespace grpc_core
204