xref: /aosp_15_r20/external/grpc-grpc/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <memory>
18 #include <new>
19 
20 #include "absl/status/status.h"
21 #include "absl/types/optional.h"
22 #include "gtest/gtest.h"
23 
24 #include <grpc/impl/channel_arg_names.h>
25 #include <grpc/status.h>
26 
27 #include "src/core/lib/channel/channel_args.h"
28 #include "src/core/lib/channel/channel_fwd.h"
29 #include "src/core/lib/channel/channel_stack.h"
30 #include "src/core/lib/config/core_configuration.h"
31 #include "src/core/lib/gprpp/debug_location.h"
32 #include "src/core/lib/gprpp/status_helper.h"
33 #include "src/core/lib/gprpp/time.h"
34 #include "src/core/lib/iomgr/closure.h"
35 #include "src/core/lib/iomgr/error.h"
36 #include "src/core/lib/surface/channel_stack_type.h"
37 #include "src/core/lib/transport/transport.h"
38 #include "test/core/end2end/end2end_tests.h"
39 
40 namespace grpc_core {
41 namespace {
42 
43 // A filter that returns recv_trailing_metadata_ready with an error.
44 class InjectStatusFilter {
45  public:
46   static grpc_channel_filter kFilterVtable;
47 
48  private:
49   class CallData {
50    public:
Init(grpc_call_element * elem,const grpc_call_element_args *)51     static grpc_error_handle Init(grpc_call_element* elem,
52                                   const grpc_call_element_args* /*args*/) {
53       new (elem->call_data) CallData();
54       return absl::OkStatus();
55     }
56 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)57     static void Destroy(grpc_call_element* elem,
58                         const grpc_call_final_info* /*final_info*/,
59                         grpc_closure* /*ignored*/) {
60       auto* calld = static_cast<CallData*>(elem->call_data);
61       calld->~CallData();
62     }
63 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)64     static void StartTransportStreamOpBatch(
65         grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
66       auto* calld = static_cast<CallData*>(elem->call_data);
67       if (batch->recv_trailing_metadata) {
68         calld->original_recv_trailing_metadata_ready_ =
69             batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
70         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
71             &calld->recv_trailing_metadata_ready_;
72       }
73       grpc_call_next_op(elem, batch);
74     }
75 
76    private:
CallData()77     CallData() {
78       GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
79                         RecvTrailingMetadataReady, this, nullptr);
80     }
81 
RecvTrailingMetadataReady(void * arg,grpc_error_handle)82     static void RecvTrailingMetadataReady(void* arg,
83                                           grpc_error_handle /*error*/) {
84       auto* calld = static_cast<CallData*>(arg);
85       Closure::Run(DEBUG_LOCATION,
86                    calld->original_recv_trailing_metadata_ready_,
87                    grpc_error_set_int(GRPC_ERROR_CREATE("injected error"),
88                                       StatusIntProperty::kRpcStatus,
89                                       GRPC_STATUS_INVALID_ARGUMENT));
90     }
91 
92     grpc_closure recv_trailing_metadata_ready_;
93     grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
94   };
95 
Init(grpc_channel_element *,grpc_channel_element_args *)96   static grpc_error_handle Init(grpc_channel_element* /*elem*/,
97                                 grpc_channel_element_args* /*args*/) {
98     return absl::OkStatus();
99   }
100 
Destroy(grpc_channel_element *)101   static void Destroy(grpc_channel_element* /*elem*/) {}
102 };
103 
104 grpc_channel_filter InjectStatusFilter::kFilterVtable = {
105     CallData::StartTransportStreamOpBatch,
106     nullptr,
107     nullptr,
108     grpc_channel_next_op,
109     sizeof(CallData),
110     CallData::Init,
111     grpc_call_stack_ignore_set_pollset_or_pollset_set,
112     CallData::Destroy,
113     0,
114     Init,
115     grpc_channel_stack_no_post_init,
116     Destroy,
117     grpc_channel_next_get_info,
118     "InjectStatusFilter",
119 };
120 
121 // Tests that we honor the error passed to recv_trailing_metadata_ready
122 // when determining the call's status, even if the op completion runs before
123 // the recv_trailing_metadata op is started from the surface.
124 // - 1 retry allowed for ABORTED status
125 // - server returns ABORTED, but filter overwrites to INVALID_ARGUMENT,
126 //   so no retry is done
CORE_END2END_TEST(RetryTest,RetryRecvTrailingMetadataError)127 CORE_END2END_TEST(RetryTest, RetryRecvTrailingMetadataError) {
128   CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
129     builder->channel_init()
130         ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
131                          &InjectStatusFilter::kFilterVtable)
132         // Skip on proxy (which explicitly disables retries).
133         .IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
134   });
135   InitServer(ChannelArgs());
136   InitClient(ChannelArgs().Set(
137       GRPC_ARG_SERVICE_CONFIG,
138       "{\n"
139       "  \"methodConfig\": [ {\n"
140       "    \"name\": [\n"
141       "      { \"service\": \"service\", \"method\": \"method\" }\n"
142       "    ],\n"
143       "    \"retryPolicy\": {\n"
144       "      \"maxAttempts\": 2,\n"
145       "      \"initialBackoff\": \"1s\",\n"
146       "      \"maxBackoff\": \"120s\",\n"
147       "      \"backoffMultiplier\": 1.6,\n"
148       "      \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
149       "    }\n"
150       "  } ]\n"
151       "}"));
152   auto c =
153       NewClientCall("/service/method").Timeout(Duration::Seconds(5)).Create();
154   EXPECT_NE(c.GetPeer(), absl::nullopt);
155   IncomingMessage server_message;
156   IncomingMetadata server_initial_metadata;
157   c.NewBatch(1)
158       .SendInitialMetadata({})
159       .SendMessage("foo")
160       .RecvMessage(server_message)
161       .SendCloseFromClient()
162       .RecvInitialMetadata(server_initial_metadata);
163   auto s = RequestCall(101);
164   Expect(101, true);
165   Step();
166   EXPECT_NE(s.GetPeer(), absl::nullopt);
167   EXPECT_NE(c.GetPeer(), absl::nullopt);
168   IncomingCloseOnServer client_close;
169   s.NewBatch(102)
170       .SendInitialMetadata({})
171       .SendStatusFromServer(GRPC_STATUS_ABORTED, "xyz", {})
172       .RecvCloseOnServer(client_close);
173   Expect(102, true);
174   Expect(1, true);
175   Step();
176   IncomingStatusOnClient server_status;
177   c.NewBatch(2).RecvStatusOnClient(server_status);
178   Expect(2, true);
179   Step();
180   EXPECT_EQ(server_status.status(), GRPC_STATUS_INVALID_ARGUMENT);
181   EXPECT_EQ(server_status.message(), "injected error");
182   EXPECT_EQ(s.method(), "/service/method");
183   EXPECT_FALSE(client_close.was_cancelled());
184 }
185 
186 }  // namespace
187 }  // namespace grpc_core
188