xref: /aosp_15_r20/external/grpc-grpc/test/core/end2end/tests/retry_recv_message_replay.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <memory>
20 #include <new>
21 
22 #include "absl/status/status.h"
23 #include "absl/types/optional.h"
24 #include "gtest/gtest.h"
25 
26 #include <grpc/impl/channel_arg_names.h>
27 #include <grpc/status.h>
28 
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/channel/channel_fwd.h"
31 #include "src/core/lib/channel/channel_stack.h"
32 #include "src/core/lib/config/core_configuration.h"
33 #include "src/core/lib/gprpp/status_helper.h"
34 #include "src/core/lib/gprpp/time.h"
35 #include "src/core/lib/iomgr/call_combiner.h"
36 #include "src/core/lib/iomgr/closure.h"
37 #include "src/core/lib/iomgr/error.h"
38 #include "src/core/lib/surface/channel_stack_type.h"
39 #include "src/core/lib/transport/transport.h"
40 #include "test/core/end2end/end2end_tests.h"
41 
42 namespace grpc_core {
43 namespace {
44 
45 // A filter that, for the first call it sees, will fail the batch
46 // containing send_initial_metadata and then fail the call with status
47 // ABORTED.  All subsequent calls are allowed through without failures.
48 class FailFirstSendOpFilter {
49  public:
50   static grpc_channel_filter kFilterVtable;
51 
52   class CallData {
53    public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)54     static grpc_error_handle Init(grpc_call_element* elem,
55                                   const grpc_call_element_args* args) {
56       new (elem->call_data) CallData(args);
57       return absl::OkStatus();
58     }
59 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)60     static void Destroy(grpc_call_element* elem,
61                         const grpc_call_final_info* /*final_info*/,
62                         grpc_closure* /*ignored*/) {
63       auto* calld = static_cast<CallData*>(elem->call_data);
64       calld->~CallData();
65     }
66 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)67     static void StartTransportStreamOpBatch(
68         grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
69       auto* chand = static_cast<FailFirstSendOpFilter*>(elem->channel_data);
70       auto* calld = static_cast<CallData*>(elem->call_data);
71       if (!chand->seen_first_) {
72         chand->seen_first_ = true;
73         calld->fail_ = true;
74       }
75       if (calld->fail_ && !batch->cancel_stream) {
76         grpc_transport_stream_op_batch_finish_with_failure(
77             batch,
78             grpc_error_set_int(
79                 GRPC_ERROR_CREATE("FailFirstSendOpFilter failing batch"),
80                 StatusIntProperty::kRpcStatus, GRPC_STATUS_ABORTED),
81             calld->call_combiner_);
82         return;
83       }
84       grpc_call_next_op(elem, batch);
85     }
86 
87    private:
CallData(const grpc_call_element_args * args)88     explicit CallData(const grpc_call_element_args* args)
89         : call_combiner_(args->call_combiner) {}
90 
91     CallCombiner* call_combiner_;
92     bool fail_ = false;
93   };
94 
Init(grpc_channel_element * elem,grpc_channel_element_args *)95   static grpc_error_handle Init(grpc_channel_element* elem,
96                                 grpc_channel_element_args* /*args*/) {
97     new (elem->channel_data) FailFirstSendOpFilter();
98     return absl::OkStatus();
99   }
100 
Destroy(grpc_channel_element * elem)101   static void Destroy(grpc_channel_element* elem) {
102     auto* chand = static_cast<FailFirstSendOpFilter*>(elem->channel_data);
103     chand->~FailFirstSendOpFilter();
104   }
105 
106   bool seen_first_ = false;
107 };
108 
109 grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = {
110     CallData::StartTransportStreamOpBatch,
111     nullptr,
112     nullptr,
113     grpc_channel_next_op,
114     sizeof(CallData),
115     CallData::Init,
116     grpc_call_stack_ignore_set_pollset_or_pollset_set,
117     CallData::Destroy,
118     sizeof(FailFirstSendOpFilter),
119     Init,
120     grpc_channel_stack_no_post_init,
121     Destroy,
122     grpc_channel_next_get_info,
123     "FailFirstSendOpFilter",
124 };
125 
126 // Tests the fix for a bug found in real-world code where recv_message
127 // was incorrectly replayed on a call attempt that it was already sent
128 // to when the recv_message completion had already been returned but was
129 // deferred at the point where recv_trailing_metadata was started from
130 // the surface.  This resulted in ASAN failures caused by not unreffing
131 // a grpc_error.
CORE_END2END_TEST(RetryTest,RetryRecvMessageReplay)132 CORE_END2END_TEST(RetryTest, RetryRecvMessageReplay) {
133   CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
134     builder->channel_init()
135         ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
136                          &FailFirstSendOpFilter::kFilterVtable)
137         // Skip on proxy (which explicitly disables retries).
138         .IfChannelArg(GRPC_ARG_ENABLE_RETRIES, true);
139   });
140   InitServer(ChannelArgs());
141   InitClient(ChannelArgs().Set(
142       GRPC_ARG_SERVICE_CONFIG,
143       "{\n"
144       "  \"methodConfig\": [ {\n"
145       "    \"name\": [\n"
146       "      { \"service\": \"service\", \"method\": \"method\" }\n"
147       "    ],\n"
148       "    \"retryPolicy\": {\n"
149       "      \"maxAttempts\": 2,\n"
150       "      \"initialBackoff\": \"1s\",\n"
151       "      \"maxBackoff\": \"120s\",\n"
152       "      \"backoffMultiplier\": 1.6,\n"
153       "      \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
154       "    }\n"
155       "  } ]\n"
156       "}"));
157   auto c =
158       NewClientCall("/service/method").Timeout(Duration::Seconds(5)).Create();
159   EXPECT_NE(c.GetPeer(), absl::nullopt);
160   // Start a batch containing send_initial_metadata and recv_initial_metadata.
161   IncomingMetadata server_initial_metadata;
162   c.NewBatch(1).SendInitialMetadata({}).RecvInitialMetadata(
163       server_initial_metadata);
164   // Start a batch containing recv_message.
165   IncomingMessage server_message;
166   c.NewBatch(2).RecvMessage(server_message);
167   // Start a batch containing recv_trailing_metadata.
168   IncomingStatusOnClient server_status;
169   c.NewBatch(3).RecvStatusOnClient(server_status);
170   // Server should get a call.
171   auto s = RequestCall(101);
172   Expect(101, true);
173   Step();
174   // Server fails with status ABORTED.
175   IncomingCloseOnServer client_close;
176   s.NewBatch(102)
177       .SendInitialMetadata({})
178       .SendStatusFromServer(GRPC_STATUS_ABORTED, "xyz", {})
179       .RecvCloseOnServer(client_close);
180   // In principle, the server batch should complete before the client
181   // batches, but in the proxy fixtures, there are multiple threads
182   // involved, so the completion order tends to be a little racy.
183   Expect(102, true);
184   Expect(1, true);
185   Expect(2, true);
186   Expect(3, true);
187   Step();
188   EXPECT_EQ(server_status.status(), GRPC_STATUS_ABORTED);
189   EXPECT_EQ(server_status.message(), "xyz");
190   EXPECT_EQ(s.method(), "/service/method");
191   EXPECT_FALSE(client_close.was_cancelled());
192 }
193 
194 }  // namespace
195 }  // namespace grpc_core
196