xref: /aosp_15_r20/external/grpc-grpc/test/core/transport/chttp2/settings_timeout_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <inttypes.h>
20 
21 #include <functional>
22 #include <memory>
23 #include <string>
24 #include <thread>
25 #include <vector>
26 
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/strings/str_cat.h"
30 #include "gtest/gtest.h"
31 
32 #include <grpc/grpc.h>
33 #include <grpc/grpc_security.h>
34 #include <grpc/impl/channel_arg_names.h>
35 #include <grpc/slice.h>
36 #include <grpc/slice_buffer.h>
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/atm.h>
39 #include <grpc/support/log.h>
40 #include <grpc/support/sync.h>
41 #include <grpc/support/time.h>
42 
43 #include "src/core/lib/channel/channel_args_preconditioning.h"
44 #include "src/core/lib/config/core_configuration.h"
45 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
46 #include "src/core/lib/gprpp/status_helper.h"
47 #include "src/core/lib/gprpp/time.h"
48 #include "src/core/lib/iomgr/closure.h"
49 #include "src/core/lib/iomgr/endpoint.h"
50 #include "src/core/lib/iomgr/error.h"
51 #include "src/core/lib/iomgr/exec_ctx.h"
52 #include "src/core/lib/iomgr/iomgr_fwd.h"
53 #include "src/core/lib/iomgr/pollset.h"
54 #include "src/core/lib/iomgr/pollset_set.h"
55 #include "src/core/lib/iomgr/resolve_address.h"
56 #include "src/core/lib/iomgr/resolved_address.h"
57 #include "src/core/lib/iomgr/tcp_client.h"
58 #include "src/core/lib/resource_quota/api.h"
59 #include "test/core/util/port.h"
60 #include "test/core/util/test_config.h"
61 
62 namespace grpc_core {
63 namespace test {
64 namespace {
65 
66 // A gRPC server, running in its own thread.
67 class ServerThread {
68  public:
ServerThread(const char * address)69   explicit ServerThread(const char* address) : address_(address) {}
70 
Start()71   void Start() {
72     // Start server with 1-second handshake timeout.
73     grpc_arg a[2];
74     a[0].type = GRPC_ARG_INTEGER;
75     a[0].key = const_cast<char*>(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
76     a[0].value.integer = 1000;
77     a[1].key = const_cast<char*>(GRPC_ARG_RESOURCE_QUOTA);
78     a[1].type = GRPC_ARG_POINTER;
79     a[1].value.pointer.p = grpc_resource_quota_create("test");
80     a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
81     grpc_channel_args args = {2, a};
82     server_ = grpc_server_create(&args, nullptr);
83     grpc_server_credentials* server_creds =
84         grpc_insecure_server_credentials_create();
85     ASSERT_TRUE(grpc_server_add_http2_port(server_, address_, server_creds));
86     grpc_server_credentials_release(server_creds);
87     cq_ = grpc_completion_queue_create_for_next(nullptr);
88     grpc_server_register_completion_queue(server_, cq_, nullptr);
89     grpc_server_start(server_);
90     thread_ =
91         std::make_unique<std::thread>(std::bind(&ServerThread::Serve, this));
92     grpc_resource_quota_unref(
93         static_cast<grpc_resource_quota*>(a[1].value.pointer.p));
94   }
95 
Shutdown()96   void Shutdown() {
97     grpc_completion_queue* shutdown_cq =
98         grpc_completion_queue_create_for_pluck(nullptr);
99     grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr);
100     GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, nullptr,
101                                            grpc_timeout_seconds_to_deadline(1),
102                                            nullptr)
103                    .type == GRPC_OP_COMPLETE);
104     grpc_completion_queue_destroy(shutdown_cq);
105     grpc_server_destroy(server_);
106     grpc_completion_queue_destroy(cq_);
107     thread_->join();
108   }
109 
110  private:
Serve()111   void Serve() {
112     // The completion queue should not return anything other than shutdown.
113     grpc_event ev = grpc_completion_queue_next(
114         cq_, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
115     ASSERT_EQ(GRPC_QUEUE_SHUTDOWN, ev.type);
116   }
117 
118   const char* address_;  // Do not own.
119   grpc_server* server_ = nullptr;
120   grpc_completion_queue* cq_ = nullptr;
121   std::unique_ptr<std::thread> thread_;
122 };
123 
124 // A TCP client that connects to the server, reads data until the server
125 // closes, and then terminates.
126 class Client {
127  public:
Client(const char * server_address)128   explicit Client(const char* server_address)
129       : server_address_(server_address) {}
130 
Connect()131   void Connect() {
132     ExecCtx exec_ctx;
133     absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or =
134         GetDNSResolver()->LookupHostnameBlocking(server_address_, "80");
135     ASSERT_EQ(absl::OkStatus(), addresses_or.status())
136         << addresses_or.status().ToString();
137     ASSERT_GE(addresses_or->size(), 1UL);
138     pollset_ = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
139     grpc_pollset_init(pollset_, &mu_);
140     grpc_pollset_set* pollset_set = grpc_pollset_set_create();
141     grpc_pollset_set_add_pollset(pollset_set, pollset_);
142     EventState state;
143     auto args = CoreConfiguration::Get()
144                     .channel_args_preconditioning()
145                     .PreconditionChannelArgs(nullptr);
146     grpc_tcp_client_connect(
147         state.closure(), &endpoint_, pollset_set,
148         grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
149         addresses_or->data(), Timestamp::Now() + Duration::Seconds(1));
150     ASSERT_TRUE(PollUntilDone(&state, Timestamp::InfFuture()));
151     ASSERT_EQ(absl::OkStatus(), state.error());
152     grpc_pollset_set_destroy(pollset_set);
153     grpc_endpoint_add_to_pollset(endpoint_, pollset_);
154   }
155 
156   // Reads until an error is returned.
157   // Returns true if an error was encountered before the deadline.
ReadUntilError()158   bool ReadUntilError() {
159     ExecCtx exec_ctx;
160     grpc_slice_buffer read_buffer;
161     grpc_slice_buffer_init(&read_buffer);
162     bool retval = true;
163     // Use a deadline of 3 seconds, which is a lot more than we should
164     // need for a 1-second timeout, but this helps avoid flakes.
165     Timestamp deadline = Timestamp::Now() + Duration::Seconds(3);
166     while (true) {
167       EventState state;
168       grpc_endpoint_read(endpoint_, &read_buffer, state.closure(),
169                          /*urgent=*/true, /*min_progress_size=*/1);
170       if (!PollUntilDone(&state, deadline)) {
171         retval = false;
172         break;
173       }
174       if (state.error() != absl::OkStatus()) break;
175       gpr_log(GPR_INFO, "client read %" PRIuPTR " bytes", read_buffer.length);
176       grpc_slice_buffer_reset_and_unref(&read_buffer);
177     }
178     grpc_endpoint_shutdown(endpoint_, GRPC_ERROR_CREATE("shutdown"));
179     grpc_slice_buffer_destroy(&read_buffer);
180     return retval;
181   }
182 
Shutdown()183   void Shutdown() {
184     ExecCtx exec_ctx;
185     grpc_endpoint_destroy(endpoint_);
186     grpc_pollset_shutdown(pollset_,
187                           GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_,
188                                               grpc_schedule_on_exec_ctx));
189   }
190 
191  private:
192   // State used to wait for an I/O event.
193   class EventState {
194    public:
EventState()195     EventState() {
196       GRPC_CLOSURE_INIT(&closure_, &EventState::OnEventDone, this,
197                         grpc_schedule_on_exec_ctx);
198     }
199 
~EventState()200     ~EventState() {}
201 
closure()202     grpc_closure* closure() { return &closure_; }
203 
done() const204     bool done() const { return gpr_atm_acq_load(&done_atm_) != 0; }
205 
206     // Caller does NOT take ownership of the error.
error() const207     grpc_error_handle error() const { return error_; }
208 
209    private:
OnEventDone(void * arg,grpc_error_handle error)210     static void OnEventDone(void* arg, grpc_error_handle error) {
211       gpr_log(GPR_INFO, "OnEventDone(): %s", StatusToString(error).c_str());
212       EventState* state = static_cast<EventState*>(arg);
213       state->error_ = error;
214       gpr_atm_rel_store(&state->done_atm_, 1);
215     }
216 
217     grpc_closure closure_;
218     gpr_atm done_atm_ = 0;
219     grpc_error_handle error_;
220   };
221 
222   // Returns true if done, or false if deadline exceeded.
PollUntilDone(EventState * state,Timestamp deadline)223   bool PollUntilDone(EventState* state, Timestamp deadline) {
224     while (true) {
225       grpc_pollset_worker* worker = nullptr;
226       gpr_mu_lock(mu_);
227       GRPC_LOG_IF_ERROR(
228           "grpc_pollset_work",
229           grpc_pollset_work(pollset_, &worker,
230                             Timestamp::Now() + Duration::Milliseconds(100)));
231       // Flushes any work scheduled before or during polling.
232       ExecCtx::Get()->Flush();
233       gpr_mu_unlock(mu_);
234       if (state != nullptr && state->done()) return true;
235       if (Timestamp::Now() >= deadline) return false;
236     }
237   }
238 
PollsetDestroy(void * arg,grpc_error_handle)239   static void PollsetDestroy(void* arg, grpc_error_handle /*error*/) {
240     grpc_pollset* pollset = static_cast<grpc_pollset*>(arg);
241     grpc_pollset_destroy(pollset);
242     gpr_free(pollset);
243   }
244 
245   const char* server_address_;  // Do not own.
246   grpc_endpoint* endpoint_;
247   gpr_mu* mu_;
248   grpc_pollset* pollset_;
249 };
250 
TEST(SettingsTimeout,Basic)251 TEST(SettingsTimeout, Basic) {
252   // Construct server address string.
253   const int server_port = grpc_pick_unused_port_or_die();
254   std::string server_address_string = absl::StrCat("localhost:", server_port);
255   // Start server.
256   gpr_log(GPR_INFO, "starting server on %s", server_address_string.c_str());
257   ServerThread server_thread(server_address_string.c_str());
258   server_thread.Start();
259   // Create client and connect to server.
260   gpr_log(GPR_INFO, "starting client connect");
261   Client client(server_address_string.c_str());
262   client.Connect();
263   // Client read.  Should fail due to server dropping connection.
264   gpr_log(GPR_INFO, "starting client read");
265   EXPECT_TRUE(client.ReadUntilError());
266   // Shut down client.
267   gpr_log(GPR_INFO, "shutting down client");
268   client.Shutdown();
269   // Shut down server.
270   gpr_log(GPR_INFO, "shutting down server");
271   server_thread.Shutdown();
272   // Clean up.
273 }
274 
275 }  // namespace
276 }  // namespace test
277 }  // namespace grpc_core
278 
main(int argc,char ** argv)279 int main(int argc, char** argv) {
280   ::testing::InitGoogleTest(&argc, argv);
281   grpc::testing::TestEnvironment env(&argc, argv);
282   grpc_init();
283   int result = RUN_ALL_TESTS();
284   grpc_shutdown();
285   return result;
286 }
287