1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <memory>
18 #include <new>
19
20 #include "absl/status/status.h"
21 #include "absl/types/optional.h"
22 #include "gtest/gtest.h"
23
24 #include <grpc/impl/channel_arg_names.h>
25 #include <grpc/status.h>
26
27 #include "src/core/lib/channel/channel_args.h"
28 #include "src/core/lib/channel/channel_fwd.h"
29 #include "src/core/lib/channel/channel_stack.h"
30 #include "src/core/lib/config/core_configuration.h"
31 #include "src/core/lib/gprpp/debug_location.h"
32 #include "src/core/lib/gprpp/status_helper.h"
33 #include "src/core/lib/gprpp/time.h"
34 #include "src/core/lib/iomgr/closure.h"
35 #include "src/core/lib/iomgr/error.h"
36 #include "src/core/lib/surface/channel_stack_type.h"
37 #include "src/core/lib/transport/transport.h"
38 #include "test/core/end2end/end2end_tests.h"
39
40 namespace grpc_core {
41 namespace {
42
43 // A filter that returns recv_trailing_metadata_ready with an error.
44 class InjectStatusFilter {
45 public:
46 static grpc_channel_filter kFilterVtable;
47
48 private:
49 class CallData {
50 public:
Init(grpc_call_element * elem,const grpc_call_element_args *)51 static grpc_error_handle Init(grpc_call_element* elem,
52 const grpc_call_element_args* /*args*/) {
53 new (elem->call_data) CallData();
54 return absl::OkStatus();
55 }
56
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)57 static void Destroy(grpc_call_element* elem,
58 const grpc_call_final_info* /*final_info*/,
59 grpc_closure* /*ignored*/) {
60 auto* calld = static_cast<CallData*>(elem->call_data);
61 calld->~CallData();
62 }
63
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)64 static void StartTransportStreamOpBatch(
65 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
66 auto* calld = static_cast<CallData*>(elem->call_data);
67 if (batch->recv_trailing_metadata) {
68 calld->original_recv_trailing_metadata_ready_ =
69 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
70 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
71 &calld->recv_trailing_metadata_ready_;
72 }
73 grpc_call_next_op(elem, batch);
74 }
75
76 private:
CallData()77 CallData() {
78 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
79 RecvTrailingMetadataReady, this, nullptr);
80 }
81
RecvTrailingMetadataReady(void * arg,grpc_error_handle)82 static void RecvTrailingMetadataReady(void* arg,
83 grpc_error_handle /*error*/) {
84 auto* calld = static_cast<CallData*>(arg);
85 Closure::Run(DEBUG_LOCATION,
86 calld->original_recv_trailing_metadata_ready_,
87 grpc_error_set_int(GRPC_ERROR_CREATE("injected error"),
88 StatusIntProperty::kRpcStatus,
89 GRPC_STATUS_INVALID_ARGUMENT));
90 }
91
92 grpc_closure recv_trailing_metadata_ready_;
93 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
94 };
95
Init(grpc_channel_element *,grpc_channel_element_args *)96 static grpc_error_handle Init(grpc_channel_element* /*elem*/,
97 grpc_channel_element_args* /*args*/) {
98 return absl::OkStatus();
99 }
100
Destroy(grpc_channel_element *)101 static void Destroy(grpc_channel_element* /*elem*/) {}
102 };
103
104 grpc_channel_filter InjectStatusFilter::kFilterVtable = {
105 CallData::StartTransportStreamOpBatch,
106 nullptr,
107 nullptr,
108 grpc_channel_next_op,
109 sizeof(CallData),
110 CallData::Init,
111 grpc_call_stack_ignore_set_pollset_or_pollset_set,
112 CallData::Destroy,
113 0,
114 Init,
115 grpc_channel_stack_no_post_init,
116 Destroy,
117 grpc_channel_next_get_info,
118 "InjectStatusFilter",
119 };
120
121 // Tests that we honor the error passed to recv_trailing_metadata_ready
122 // when determining the call's status, even if the op completion runs before
123 // the recv_trailing_metadata op is started from the surface.
124 // - 1 retry allowed for ABORTED status
125 // - server returns ABORTED, but filter overwrites to INVALID_ARGUMENT,
126 // so no retry is done
CORE_END2END_TEST(RetryTest,RetryRecvTrailingMetadataError)127 CORE_END2END_TEST(RetryTest, RetryRecvTrailingMetadataError) {
128 CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
129 builder->channel_init()
130 ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
131 &InjectStatusFilter::kFilterVtable)
132 // Skip on proxy (which explicitly disables retries).
133 .IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
134 });
135 InitServer(ChannelArgs());
136 InitClient(ChannelArgs().Set(
137 GRPC_ARG_SERVICE_CONFIG,
138 "{\n"
139 " \"methodConfig\": [ {\n"
140 " \"name\": [\n"
141 " { \"service\": \"service\", \"method\": \"method\" }\n"
142 " ],\n"
143 " \"retryPolicy\": {\n"
144 " \"maxAttempts\": 2,\n"
145 " \"initialBackoff\": \"1s\",\n"
146 " \"maxBackoff\": \"120s\",\n"
147 " \"backoffMultiplier\": 1.6,\n"
148 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
149 " }\n"
150 " } ]\n"
151 "}"));
152 auto c =
153 NewClientCall("/service/method").Timeout(Duration::Seconds(5)).Create();
154 EXPECT_NE(c.GetPeer(), absl::nullopt);
155 IncomingMessage server_message;
156 IncomingMetadata server_initial_metadata;
157 c.NewBatch(1)
158 .SendInitialMetadata({})
159 .SendMessage("foo")
160 .RecvMessage(server_message)
161 .SendCloseFromClient()
162 .RecvInitialMetadata(server_initial_metadata);
163 auto s = RequestCall(101);
164 Expect(101, true);
165 Step();
166 EXPECT_NE(s.GetPeer(), absl::nullopt);
167 EXPECT_NE(c.GetPeer(), absl::nullopt);
168 IncomingCloseOnServer client_close;
169 s.NewBatch(102)
170 .SendInitialMetadata({})
171 .SendStatusFromServer(GRPC_STATUS_ABORTED, "xyz", {})
172 .RecvCloseOnServer(client_close);
173 Expect(102, true);
174 Expect(1, true);
175 Step();
176 IncomingStatusOnClient server_status;
177 c.NewBatch(2).RecvStatusOnClient(server_status);
178 Expect(2, true);
179 Step();
180 EXPECT_EQ(server_status.status(), GRPC_STATUS_INVALID_ARGUMENT);
181 EXPECT_EQ(server_status.message(), "injected error");
182 EXPECT_EQ(s.method(), "/service/method");
183 EXPECT_FALSE(client_close.was_cancelled());
184 }
185
186 } // namespace
187 } // namespace grpc_core
188