1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <memory>
20 #include <new>
21
22 #include "absl/status/status.h"
23 #include "absl/types/optional.h"
24 #include "gtest/gtest.h"
25
26 #include <grpc/impl/channel_arg_names.h>
27 #include <grpc/status.h>
28
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/channel/channel_fwd.h"
31 #include "src/core/lib/channel/channel_stack.h"
32 #include "src/core/lib/config/core_configuration.h"
33 #include "src/core/lib/gprpp/status_helper.h"
34 #include "src/core/lib/gprpp/time.h"
35 #include "src/core/lib/iomgr/call_combiner.h"
36 #include "src/core/lib/iomgr/closure.h"
37 #include "src/core/lib/iomgr/error.h"
38 #include "src/core/lib/surface/channel_stack_type.h"
39 #include "src/core/lib/transport/transport.h"
40 #include "test/core/end2end/end2end_tests.h"
41
42 namespace grpc_core {
43 namespace {
44
45 // A filter that, for the first call it sees, will fail all batches except
46 // for cancellations, so that the call fails with status ABORTED.
47 // All subsequent calls are allowed through without failures.
48 class FailFirstCallFilter {
49 public:
50 static grpc_channel_filter kFilterVtable;
51
52 private:
53 class CallData {
54 public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)55 static grpc_error_handle Init(grpc_call_element* elem,
56 const grpc_call_element_args* args) {
57 new (elem->call_data) CallData(args);
58 return absl::OkStatus();
59 }
60
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)61 static void Destroy(grpc_call_element* elem,
62 const grpc_call_final_info* /*final_info*/,
63 grpc_closure* /*ignored*/) {
64 auto* calld = static_cast<CallData*>(elem->call_data);
65 calld->~CallData();
66 }
67
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)68 static void StartTransportStreamOpBatch(
69 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
70 auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
71 auto* calld = static_cast<CallData*>(elem->call_data);
72 if (!chand->seen_first_) {
73 chand->seen_first_ = true;
74 calld->fail_ = true;
75 }
76 if (calld->fail_ && !batch->cancel_stream) {
77 grpc_transport_stream_op_batch_finish_with_failure(
78 batch,
79 grpc_error_set_int(
80 GRPC_ERROR_CREATE("FailFirstCallFilter failing batch"),
81 StatusIntProperty::kRpcStatus, GRPC_STATUS_ABORTED),
82 calld->call_combiner_);
83 return;
84 }
85 grpc_call_next_op(elem, batch);
86 }
87
88 private:
CallData(const grpc_call_element_args * args)89 explicit CallData(const grpc_call_element_args* args)
90 : call_combiner_(args->call_combiner) {}
91
92 CallCombiner* call_combiner_;
93 bool fail_ = false;
94 };
95
Init(grpc_channel_element * elem,grpc_channel_element_args *)96 static grpc_error_handle Init(grpc_channel_element* elem,
97 grpc_channel_element_args* /*args*/) {
98 new (elem->channel_data) FailFirstCallFilter();
99 return absl::OkStatus();
100 }
101
Destroy(grpc_channel_element * elem)102 static void Destroy(grpc_channel_element* elem) {
103 auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
104 chand->~FailFirstCallFilter();
105 }
106
107 bool seen_first_ = false;
108 };
109
110 grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
111 CallData::StartTransportStreamOpBatch,
112 nullptr,
113 nullptr,
114 grpc_channel_next_op,
115 sizeof(CallData),
116 CallData::Init,
117 grpc_call_stack_ignore_set_pollset_or_pollset_set,
118 CallData::Destroy,
119 sizeof(FailFirstCallFilter),
120 Init,
121 grpc_channel_stack_no_post_init,
122 Destroy,
123 grpc_channel_next_get_info,
124 "FailFirstCallFilter",
125 };
126
127 // Tests failure on a send op batch:
128 // - 2 retries allowed for ABORTED status
129 // - on the first call attempt, the batch containing the
130 // send_initial_metadata op fails, and then the call returns ABORTED,
131 // all without ever going out on the wire
132 // - second attempt returns ABORTED but does not retry, because only 2
133 // attempts are allowed
CORE_END2END_TEST(RetryTest,RetrySendOpFails)134 CORE_END2END_TEST(RetryTest, RetrySendOpFails) {
135 CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
136 builder->channel_init()
137 ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
138 &FailFirstCallFilter::kFilterVtable)
139 // Skip on proxy (which explicitly disables retries).
140 .IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
141 });
142 InitServer(ChannelArgs());
143 InitClient(ChannelArgs().Set(
144 GRPC_ARG_SERVICE_CONFIG,
145 "{\n"
146 " \"methodConfig\": [ {\n"
147 " \"name\": [\n"
148 " { \"service\": \"service\", \"method\": \"method\" }\n"
149 " ],\n"
150 " \"retryPolicy\": {\n"
151 " \"maxAttempts\": 2,\n"
152 " \"initialBackoff\": \"1s\",\n"
153 " \"maxBackoff\": \"120s\",\n"
154 " \"backoffMultiplier\": 1.6,\n"
155 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
156 " }\n"
157 " } ]\n"
158 "}"));
159 auto c =
160 NewClientCall("/service/method").Timeout(Duration::Seconds(5)).Create();
161 EXPECT_NE(c.GetPeer(), absl::nullopt);
162 // Start a batch containing send ops.
163 c.NewBatch(1)
164 .SendInitialMetadata({})
165 .SendMessage("foo")
166 .SendCloseFromClient();
167 // Start a batch containing recv ops.
168 IncomingMessage server_message;
169 IncomingMetadata server_initial_metadata;
170 IncomingStatusOnClient server_status;
171 c.NewBatch(2)
172 .RecvInitialMetadata(server_initial_metadata)
173 .RecvMessage(server_message)
174 .RecvStatusOnClient(server_status);
175 // Client send ops should now complete.
176 Expect(1, true);
177 Step();
178 // Server should get a call.
179 auto s = RequestCall(101);
180 Expect(101, true);
181 Step();
182 // Server fails with status ABORTED.
183 IncomingCloseOnServer client_close;
184 s.NewBatch(102)
185 .SendInitialMetadata({})
186 .SendStatusFromServer(GRPC_STATUS_ABORTED, "xyz", {})
187 .RecvCloseOnServer(client_close);
188 // In principle, the server batch should complete before the client
189 // recv ops batch, but in the proxy fixtures, there are multiple threads
190 // involved, so the completion order tends to be a little racy.
191 Expect(102, true);
192 Expect(2, true);
193 Step();
194 EXPECT_EQ(server_status.status(), GRPC_STATUS_ABORTED);
195 EXPECT_EQ(server_status.message(), "xyz");
196 EXPECT_EQ(s.method(), "/service/method");
197 EXPECT_FALSE(client_close.was_cancelled());
198 // Make sure the "grpc-previous-rpc-attempts" header was sent in the retry.
199 EXPECT_EQ(s.GetInitialMetadata("grpc-previous-rpc-attempts"), "1");
200 }
201
202 } // namespace
203 } // namespace grpc_core
204