1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <memory>
20 #include <new>
21
22 #include "absl/status/status.h"
23 #include "absl/types/optional.h"
24 #include "gtest/gtest.h"
25
26 #include <grpc/impl/channel_arg_names.h>
27 #include <grpc/status.h>
28
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/channel/channel_fwd.h"
31 #include "src/core/lib/channel/channel_stack.h"
32 #include "src/core/lib/config/core_configuration.h"
33 #include "src/core/lib/gprpp/status_helper.h"
34 #include "src/core/lib/gprpp/time.h"
35 #include "src/core/lib/iomgr/call_combiner.h"
36 #include "src/core/lib/iomgr/closure.h"
37 #include "src/core/lib/iomgr/error.h"
38 #include "src/core/lib/surface/channel_stack_type.h"
39 #include "src/core/lib/transport/transport.h"
40 #include "test/core/end2end/end2end_tests.h"
41
42 namespace grpc_core {
43 namespace {
44
45 // A filter that, for the first call it sees, will fail the batch
46 // containing send_initial_metadata and then fail the call with status
47 // ABORTED. All subsequent calls are allowed through without failures.
48 class FailFirstSendOpFilter {
49 public:
50 static grpc_channel_filter kFilterVtable;
51
52 class CallData {
53 public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)54 static grpc_error_handle Init(grpc_call_element* elem,
55 const grpc_call_element_args* args) {
56 new (elem->call_data) CallData(args);
57 return absl::OkStatus();
58 }
59
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)60 static void Destroy(grpc_call_element* elem,
61 const grpc_call_final_info* /*final_info*/,
62 grpc_closure* /*ignored*/) {
63 auto* calld = static_cast<CallData*>(elem->call_data);
64 calld->~CallData();
65 }
66
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)67 static void StartTransportStreamOpBatch(
68 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
69 auto* chand = static_cast<FailFirstSendOpFilter*>(elem->channel_data);
70 auto* calld = static_cast<CallData*>(elem->call_data);
71 if (!chand->seen_first_) {
72 chand->seen_first_ = true;
73 calld->fail_ = true;
74 }
75 if (calld->fail_ && !batch->cancel_stream) {
76 grpc_transport_stream_op_batch_finish_with_failure(
77 batch,
78 grpc_error_set_int(
79 GRPC_ERROR_CREATE("FailFirstSendOpFilter failing batch"),
80 StatusIntProperty::kRpcStatus, GRPC_STATUS_ABORTED),
81 calld->call_combiner_);
82 return;
83 }
84 grpc_call_next_op(elem, batch);
85 }
86
87 private:
CallData(const grpc_call_element_args * args)88 explicit CallData(const grpc_call_element_args* args)
89 : call_combiner_(args->call_combiner) {}
90
91 CallCombiner* call_combiner_;
92 bool fail_ = false;
93 };
94
Init(grpc_channel_element * elem,grpc_channel_element_args *)95 static grpc_error_handle Init(grpc_channel_element* elem,
96 grpc_channel_element_args* /*args*/) {
97 new (elem->channel_data) FailFirstSendOpFilter();
98 return absl::OkStatus();
99 }
100
Destroy(grpc_channel_element * elem)101 static void Destroy(grpc_channel_element* elem) {
102 auto* chand = static_cast<FailFirstSendOpFilter*>(elem->channel_data);
103 chand->~FailFirstSendOpFilter();
104 }
105
106 bool seen_first_ = false;
107 };
108
109 grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = {
110 CallData::StartTransportStreamOpBatch,
111 nullptr,
112 nullptr,
113 grpc_channel_next_op,
114 sizeof(CallData),
115 CallData::Init,
116 grpc_call_stack_ignore_set_pollset_or_pollset_set,
117 CallData::Destroy,
118 sizeof(FailFirstSendOpFilter),
119 Init,
120 grpc_channel_stack_no_post_init,
121 Destroy,
122 grpc_channel_next_get_info,
123 "FailFirstSendOpFilter",
124 };
125
126 // Tests the fix for a bug found in real-world code where recv_message
127 // was incorrectly replayed on a call attempt that it was already sent
128 // to when the recv_message completion had already been returned but was
129 // deferred at the point where recv_trailing_metadata was started from
130 // the surface. This resulted in ASAN failures caused by not unreffing
131 // a grpc_error.
CORE_END2END_TEST(RetryTest,RetryRecvMessageReplay)132 CORE_END2END_TEST(RetryTest, RetryRecvMessageReplay) {
133 CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
134 builder->channel_init()
135 ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
136 &FailFirstSendOpFilter::kFilterVtable)
137 // Skip on proxy (which explicitly disables retries).
138 .IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
139 });
140 InitServer(ChannelArgs());
141 InitClient(ChannelArgs().Set(
142 GRPC_ARG_SERVICE_CONFIG,
143 "{\n"
144 " \"methodConfig\": [ {\n"
145 " \"name\": [\n"
146 " { \"service\": \"service\", \"method\": \"method\" }\n"
147 " ],\n"
148 " \"retryPolicy\": {\n"
149 " \"maxAttempts\": 2,\n"
150 " \"initialBackoff\": \"1s\",\n"
151 " \"maxBackoff\": \"120s\",\n"
152 " \"backoffMultiplier\": 1.6,\n"
153 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
154 " }\n"
155 " } ]\n"
156 "}"));
157 auto c =
158 NewClientCall("/service/method").Timeout(Duration::Seconds(5)).Create();
159 EXPECT_NE(c.GetPeer(), absl::nullopt);
160 // Start a batch containing send_initial_metadata and recv_initial_metadata.
161 IncomingMetadata server_initial_metadata;
162 c.NewBatch(1).SendInitialMetadata({}).RecvInitialMetadata(
163 server_initial_metadata);
164 // Start a batch containing recv_message.
165 IncomingMessage server_message;
166 c.NewBatch(2).RecvMessage(server_message);
167 // Start a batch containing recv_trailing_metadata.
168 IncomingStatusOnClient server_status;
169 c.NewBatch(3).RecvStatusOnClient(server_status);
170 // Server should get a call.
171 auto s = RequestCall(101);
172 Expect(101, true);
173 Step();
174 // Server fails with status ABORTED.
175 IncomingCloseOnServer client_close;
176 s.NewBatch(102)
177 .SendInitialMetadata({})
178 .SendStatusFromServer(GRPC_STATUS_ABORTED, "xyz", {})
179 .RecvCloseOnServer(client_close);
180 // In principle, the server batch should complete before the client
181 // batches, but in the proxy fixtures, there are multiple threads
182 // involved, so the completion order tends to be a little racy.
183 Expect(102, true);
184 Expect(1, true);
185 Expect(2, true);
186 Expect(3, true);
187 Step();
188 EXPECT_EQ(server_status.status(), GRPC_STATUS_ABORTED);
189 EXPECT_EQ(server_status.message(), "xyz");
190 EXPECT_EQ(s.method(), "/service/method");
191 EXPECT_FALSE(client_close.was_cancelled());
192 }
193
194 } // namespace
195 } // namespace grpc_core
196