1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <gtest/gtest.h>
20
21 #include "src/core/lib/address_utils/parse_address.h"
22 #include "src/core/lib/channel/channel_args.h"
23 #include "src/core/lib/gprpp/time.h"
24 #include "src/core/lib/iomgr/port.h"
25 #include "test/core/util/port.h"
26 #include "test/core/util/test_config.h"
27
28 // This test won't work except with posix sockets enabled
29 #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
30
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <netinet/in.h>
34 #include <poll.h>
35 #include <string.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 #include <grpc/grpc.h>
40 #include <grpc/support/alloc.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/time.h>
43
44 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
45 #include "src/core/lib/gprpp/crash.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/iomgr/pollset_set.h"
48 #include "src/core/lib/iomgr/socket_utils_posix.h"
49 #include "src/core/lib/iomgr/tcp_client.h"
50 #include "src/core/lib/iomgr/timer.h"
51 #include "src/core/lib/resource_quota/api.h"
52
53 static grpc_pollset_set* g_pollset_set;
54 static gpr_mu* g_mu;
55 static grpc_pollset* g_pollset;
56 static int g_connections_complete = 0;
57 static grpc_endpoint* g_connecting = nullptr;
58
test_deadline(void)59 static grpc_core::Timestamp test_deadline(void) {
60 return grpc_core::Timestamp::FromTimespecRoundUp(
61 grpc_timeout_seconds_to_deadline(10));
62 }
63
finish_connection()64 static void finish_connection() {
65 gpr_mu_lock(g_mu);
66 g_connections_complete++;
67 grpc_core::ExecCtx exec_ctx;
68 ASSERT_TRUE(
69 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
70
71 gpr_mu_unlock(g_mu);
72 }
73
must_succeed(void *,grpc_error_handle error)74 static void must_succeed(void* /*arg*/, grpc_error_handle error) {
75 ASSERT_NE(g_connecting, nullptr);
76 ASSERT_TRUE(error.ok());
77 grpc_endpoint_shutdown(g_connecting,
78 GRPC_ERROR_CREATE("must_succeed called"));
79 grpc_endpoint_destroy(g_connecting);
80 g_connecting = nullptr;
81 finish_connection();
82 }
83
must_fail(void *,grpc_error_handle error)84 static void must_fail(void* /*arg*/, grpc_error_handle error) {
85 ASSERT_EQ(g_connecting, nullptr);
86 ASSERT_FALSE(error.ok());
87 finish_connection();
88 }
89
test_succeeds(void)90 void test_succeeds(void) {
91 gpr_log(GPR_ERROR, "---- starting test_succeeds() ----");
92 grpc_resolved_address resolved_addr;
93 struct sockaddr_in* addr =
94 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
95 int svr_fd;
96 int r;
97 int connections_complete_before;
98 grpc_closure done;
99 grpc_core::ExecCtx exec_ctx;
100
101 memset(&resolved_addr, 0, sizeof(resolved_addr));
102 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
103 addr->sin_family = AF_INET;
104
105 // create a phony server
106 svr_fd = socket(AF_INET, SOCK_STREAM, 0);
107 ASSERT_GE(svr_fd, 0);
108 ASSERT_EQ(bind(svr_fd, (struct sockaddr*)addr, (socklen_t)resolved_addr.len),
109 0);
110 ASSERT_EQ(listen(svr_fd, 1), 0);
111
112 gpr_mu_lock(g_mu);
113 connections_complete_before = g_connections_complete;
114 gpr_mu_unlock(g_mu);
115
116 // connect to it
117 ASSERT_EQ(getsockname(svr_fd, (struct sockaddr*)addr,
118 (socklen_t*)&resolved_addr.len),
119 0);
120 GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
121 grpc_core::ChannelArgs args = grpc_core::CoreConfiguration::Get()
122 .channel_args_preconditioning()
123 .PreconditionChannelArgs(nullptr);
124 int64_t connection_handle = grpc_tcp_client_connect(
125 &done, &g_connecting, g_pollset_set,
126 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
127 &resolved_addr, grpc_core::Timestamp::InfFuture());
128 // await the connection
129 do {
130 resolved_addr.len = static_cast<socklen_t>(sizeof(addr));
131 r = accept(svr_fd, reinterpret_cast<struct sockaddr*>(addr),
132 reinterpret_cast<socklen_t*>(&resolved_addr.len));
133 } while (r == -1 && errno == EINTR);
134 ASSERT_GE(r, 0);
135 close(r);
136
137 gpr_mu_lock(g_mu);
138
139 while (g_connections_complete == connections_complete_before) {
140 grpc_pollset_worker* worker = nullptr;
141 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
142 "pollset_work",
143 grpc_pollset_work(g_pollset, &worker,
144 grpc_core::Timestamp::FromTimespecRoundUp(
145 grpc_timeout_seconds_to_deadline(5)))));
146 gpr_mu_unlock(g_mu);
147 grpc_core::ExecCtx::Get()->Flush();
148 gpr_mu_lock(g_mu);
149 }
150
151 gpr_mu_unlock(g_mu);
152
153 // A cancellation attempt should fail because connect already succeeded.
154 ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), false);
155
156 gpr_log(GPR_ERROR, "---- finished test_succeeds() ----");
157 }
158
test_fails(void)159 void test_fails(void) {
160 gpr_log(GPR_ERROR, "---- starting test_fails() ----");
161 grpc_resolved_address resolved_addr;
162 struct sockaddr_in* addr =
163 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
164 int connections_complete_before;
165 grpc_closure done;
166 grpc_core::ExecCtx exec_ctx;
167
168 memset(&resolved_addr, 0, sizeof(resolved_addr));
169 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
170 addr->sin_family = AF_INET;
171
172 gpr_mu_lock(g_mu);
173 connections_complete_before = g_connections_complete;
174 gpr_mu_unlock(g_mu);
175
176 // connect to a broken address
177 GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
178 int64_t connection_handle = grpc_tcp_client_connect(
179 &done, &g_connecting, g_pollset_set,
180 grpc_event_engine::experimental::ChannelArgsEndpointConfig(),
181 &resolved_addr, grpc_core::Timestamp::InfFuture());
182 gpr_mu_lock(g_mu);
183
184 // wait for the connection callback to finish
185 while (g_connections_complete == connections_complete_before) {
186 grpc_pollset_worker* worker = nullptr;
187 grpc_core::Timestamp polling_deadline = test_deadline();
188 switch (grpc_timer_check(&polling_deadline)) {
189 case GRPC_TIMERS_FIRED:
190 break;
191 case GRPC_TIMERS_NOT_CHECKED:
192 polling_deadline = grpc_core::Timestamp::ProcessEpoch();
193 ABSL_FALLTHROUGH_INTENDED;
194 case GRPC_TIMERS_CHECKED_AND_EMPTY:
195 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
196 "pollset_work",
197 grpc_pollset_work(g_pollset, &worker, polling_deadline)));
198 break;
199 }
200 gpr_mu_unlock(g_mu);
201 grpc_core::ExecCtx::Get()->Flush();
202 gpr_mu_lock(g_mu);
203 }
204
205 gpr_mu_unlock(g_mu);
206
207 // A cancellation attempt should fail because connect already failed.
208 ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), false);
209
210 gpr_log(GPR_ERROR, "---- finished test_fails() ----");
211 }
212
test_connect_cancellation_succeeds(void)213 void test_connect_cancellation_succeeds(void) {
214 gpr_log(GPR_ERROR, "---- starting test_connect_cancellation_succeeds() ----");
215 auto target_ipv6_addr_uri = *grpc_core::URI::Parse(absl::StrCat(
216 "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())));
217 auto target_ipv4_addr_uri = *grpc_core::URI::Parse(absl::StrCat(
218 "ipv4:127.0.0.1:", std::to_string(grpc_pick_unused_port_or_die())));
219 grpc_resolved_address resolved_addr;
220 int svr_fd;
221 grpc_closure done;
222 grpc_core::ExecCtx exec_ctx;
223 bool tried_ipv4 = false;
224 ASSERT_TRUE(grpc_parse_uri(target_ipv6_addr_uri, &resolved_addr));
225 auto try_bind = [&](int sock) {
226 return (sock >= 0 &&
227 bind(sock, reinterpret_cast<sockaddr*>(resolved_addr.addr),
228 resolved_addr.len) == 0);
229 };
230 // create a phony server
231 svr_fd = socket(AF_INET6, SOCK_STREAM, 0);
232 // Try ipv6
233 if (!try_bind(svr_fd)) {
234 if (svr_fd >= 0) {
235 close(svr_fd);
236 }
237 // Failed to bind ipv6. Try ipv4
238 ASSERT_TRUE(grpc_parse_uri(target_ipv4_addr_uri, &resolved_addr));
239 svr_fd = socket(AF_INET, SOCK_STREAM, 0);
240 tried_ipv4 = true;
241 if (!try_bind(svr_fd)) {
242 if (svr_fd >= 0) {
243 close(svr_fd);
244 }
245 gpr_log(GPR_ERROR,
246 "Skipping test. Failed to create a phony server bound to ipv6 or "
247 "ipv4 address");
248 return;
249 }
250 }
251
252 ASSERT_EQ(listen(svr_fd, 1), 0);
253
254 std::vector<int> client_sockets;
255 bool create_more_client_connections = true;
256 // Create and connect client sockets until the connection attempt times out.
257 // Even if the backlog specified to listen is 1, the kernel continues to
258 // accept a certain number of SYN packets before dropping them. This loop
259 // attempts to identify the number of new connection attempts that will
260 // be allowed by the kernel before any subsequent connection attempts
261 // become pending indefinitely.
262 while (create_more_client_connections) {
263 const int kOne = 1;
264 int client_socket = socket(tried_ipv4 ? AF_INET : AF_INET6, SOCK_STREAM, 0);
265 ASSERT_GE(client_socket, 0);
266 setsockopt(client_socket, SOL_SOCKET, SO_REUSEADDR, &kOne, sizeof(kOne));
267 // Make fd non-blocking.
268 int flags = fcntl(client_socket, F_GETFL, 0);
269 ASSERT_EQ(fcntl(client_socket, F_SETFL, flags | O_NONBLOCK), 0);
270
271 if (connect(client_socket, reinterpret_cast<sockaddr*>(resolved_addr.addr),
272 resolved_addr.len) == -1) {
273 if (errno == EINPROGRESS) {
274 struct pollfd pfd;
275 pfd.fd = client_socket;
276 pfd.events = POLLOUT;
277 pfd.revents = 0;
278 int ret = poll(&pfd, 1, 1000);
279 if (ret == -1) {
280 FAIL() << "poll() failed during connect; errno=" << errno;
281 } else if (ret == 0) {
282 // current connection attempt timed out. It indicates that the
283 // kernel will cause any subsequent connection attempts to
284 // become pending indefinitely.
285 create_more_client_connections = false;
286 }
287 } else {
288 FAIL() << "Failed to connect to the server. errno=%d" << errno;
289 }
290 }
291 client_sockets.push_back(client_socket);
292 }
293
294 // connect to it. accept() is not called on the bind socket. So the connection
295 // should appear to be stuck giving ample time to try to cancel it.
296 ASSERT_EQ(getsockname(svr_fd, reinterpret_cast<sockaddr*>(resolved_addr.addr),
297 (socklen_t*)&resolved_addr.len),
298 0);
299 GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
300 grpc_core::ChannelArgs args = grpc_core::CoreConfiguration::Get()
301 .channel_args_preconditioning()
302 .PreconditionChannelArgs(nullptr);
303 int64_t connection_handle = grpc_tcp_client_connect(
304 &done, &g_connecting, g_pollset_set,
305 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
306 &resolved_addr, grpc_core::Timestamp::InfFuture());
307 ASSERT_GT(connection_handle, 0);
308 ASSERT_EQ(grpc_tcp_client_cancel_connect(connection_handle), true);
309 for (auto sock : client_sockets) {
310 close(sock);
311 }
312 close(svr_fd);
313 gpr_log(GPR_ERROR, "---- finished test_connect_cancellation_succeeds() ----");
314 }
315
test_fails_bad_addr_no_leak(void)316 void test_fails_bad_addr_no_leak(void) {
317 gpr_log(GPR_ERROR, "---- starting test_fails_bad_addr_no_leak() ----");
318 grpc_resolved_address resolved_addr;
319 struct sockaddr_in* addr =
320 reinterpret_cast<struct sockaddr_in*>(resolved_addr.addr);
321 int connections_complete_before;
322 grpc_closure done;
323 grpc_core::ExecCtx exec_ctx;
324 memset(&resolved_addr, 0, sizeof(resolved_addr));
325 resolved_addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_in));
326 // force `grpc_tcp_client_prepare_fd` to fail. contrived, but effective.
327 addr->sin_family = AF_IPX;
328 gpr_mu_lock(g_mu);
329 connections_complete_before = g_connections_complete;
330 gpr_mu_unlock(g_mu);
331 // connect to an invalid address.
332 GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
333 grpc_tcp_client_connect(
334 &done, &g_connecting, g_pollset_set,
335 grpc_event_engine::experimental::ChannelArgsEndpointConfig(),
336 &resolved_addr, grpc_core::Timestamp::InfFuture());
337 gpr_mu_lock(g_mu);
338 while (g_connections_complete == connections_complete_before) {
339 grpc_pollset_worker* worker = nullptr;
340 grpc_core::Timestamp polling_deadline = test_deadline();
341 switch (grpc_timer_check(&polling_deadline)) {
342 case GRPC_TIMERS_FIRED:
343 break;
344 case GRPC_TIMERS_NOT_CHECKED:
345 polling_deadline = grpc_core::Timestamp::ProcessEpoch();
346 ABSL_FALLTHROUGH_INTENDED;
347 case GRPC_TIMERS_CHECKED_AND_EMPTY:
348 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
349 "pollset_work",
350 grpc_pollset_work(g_pollset, &worker, polling_deadline)));
351 break;
352 }
353 gpr_mu_unlock(g_mu);
354 grpc_core::ExecCtx::Get()->Flush();
355 gpr_mu_lock(g_mu);
356 }
357 gpr_mu_unlock(g_mu);
358 gpr_log(GPR_ERROR, "---- finished test_fails_bad_addr_no_leak() ----");
359 }
360
destroy_pollset(void * p,grpc_error_handle)361 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
362 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
363 }
364
TEST(TcpClientPosixTest,MainTest)365 TEST(TcpClientPosixTest, MainTest) {
366 grpc_closure destroyed;
367 grpc_init();
368
369 {
370 grpc_core::ExecCtx exec_ctx;
371 g_pollset_set = grpc_pollset_set_create();
372 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
373 grpc_pollset_init(g_pollset, &g_mu);
374 grpc_pollset_set_add_pollset(g_pollset_set, g_pollset);
375
376 test_succeeds();
377 test_connect_cancellation_succeeds();
378 test_fails();
379 test_fails_bad_addr_no_leak();
380 grpc_pollset_set_destroy(g_pollset_set);
381 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
382 grpc_schedule_on_exec_ctx);
383 grpc_pollset_shutdown(g_pollset, &destroyed);
384 }
385
386 grpc_shutdown();
387 gpr_free(g_pollset);
388 }
389
390 #endif // GRPC_POSIX_SOCKET_CLIENT
391
main(int argc,char ** argv)392 int main(int argc, char** argv) {
393 grpc::testing::TestEnvironment env(&argc, argv);
394 ::testing::InitGoogleTest(&argc, argv);
395 return RUN_ALL_TESTS();
396 }
397