1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <gtest/gtest.h>
20
21 #include "src/core/lib/iomgr/port.h"
22 #include "test/core/util/test_config.h"
23
24 // This test won't work except with posix sockets enabled
25 #ifdef GRPC_POSIX_SOCKET_EV
26
27 #include <ctype.h>
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <netinet/in.h>
31 #include <poll.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/socket.h>
36 #include <sys/time.h>
37 #include <unistd.h>
38
39 #include "absl/strings/str_format.h"
40
41 #include <grpc/grpc.h>
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/sync.h>
45 #include <grpc/support/time.h>
46
47 #include "src/core/lib/gprpp/crash.h"
48 #include "src/core/lib/gprpp/strerror.h"
49 #include "src/core/lib/iomgr/ev_posix.h"
50 #include "src/core/lib/iomgr/iomgr.h"
51 #include "src/core/lib/iomgr/socket_utils_posix.h"
52
53 static gpr_mu* g_mu;
54 static grpc_pollset* g_pollset;
55
56 // buffer size used to send and receive data.
57 // 1024 is the minimal value to set TCP send and receive buffer.
58 #define BUF_SIZE 1024
59
60 // Create a test socket with the right properties for testing.
61 // port is the TCP port to listen or connect to.
62 // Return a socket FD and sockaddr_in.
create_test_socket(int port,int * socket_fd,struct sockaddr_in * sin)63 static void create_test_socket(int port, int* socket_fd,
64 struct sockaddr_in* sin) {
65 int fd;
66 int one = 1;
67 int buffer_size_bytes = BUF_SIZE;
68 int flags;
69
70 fd = socket(AF_INET, SOCK_STREAM, 0);
71 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
72 // Reset the size of socket send buffer to the minimal value to facilitate
73 // buffer filling up and triggering notify_on_write
74 ASSERT_EQ(grpc_set_socket_sndbuf(fd, buffer_size_bytes), absl::OkStatus());
75 ASSERT_EQ(grpc_set_socket_rcvbuf(fd, buffer_size_bytes), absl::OkStatus());
76 // Make fd non-blocking
77 flags = fcntl(fd, F_GETFL, 0);
78 ASSERT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0);
79 *socket_fd = fd;
80
81 // Use local address for test
82 sin->sin_family = AF_INET;
83 sin->sin_addr.s_addr = htonl(0x7f000001);
84 ASSERT_GE(port, 0);
85 ASSERT_LT(port, 65536);
86 sin->sin_port = htons(static_cast<uint16_t>(port));
87 }
88
89 // Phony gRPC callback
no_op_cb(void *,int)90 void no_op_cb(void* /*arg*/, int /*success*/) {}
91
92 // =======An upload server to test notify_on_read===========
93 // The server simply reads and counts a stream of bytes.
94
95 // An upload server.
96 typedef struct {
97 grpc_fd* em_fd; // listening fd
98 ssize_t read_bytes_total; // total number of received bytes
99 int done; // set to 1 when a server finishes serving
100 grpc_closure listen_closure;
101 } server;
102
server_init(server * sv)103 static void server_init(server* sv) {
104 sv->read_bytes_total = 0;
105 sv->done = 0;
106 }
107
108 // An upload session.
109 // Created when a new upload request arrives in the server.
110 typedef struct {
111 server* sv; // not owned by a single session
112 grpc_fd* em_fd; // fd to read upload bytes
113 char read_buf[BUF_SIZE]; // buffer to store upload bytes
114 grpc_closure session_read_closure;
115 } session;
116
117 // Called when an upload session can be safely shutdown.
118 // Close session FD and start to shutdown listen FD.
session_shutdown_cb(void * arg,bool)119 static void session_shutdown_cb(void* arg, // session
120 bool /*success*/) {
121 session* se = static_cast<session*>(arg);
122 server* sv = se->sv;
123 grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
124 gpr_free(se);
125 // Start to shutdown listen fd.
126 grpc_fd_shutdown(sv->em_fd, GRPC_ERROR_CREATE("session_shutdown_cb"));
127 }
128
129 // Called when data become readable in a session.
session_read_cb(void * arg,grpc_error_handle error)130 static void session_read_cb(void* arg, // session
131 grpc_error_handle error) {
132 session* se = static_cast<session*>(arg);
133 int fd = grpc_fd_wrapped_fd(se->em_fd);
134
135 ssize_t read_once = 0;
136 ssize_t read_total = 0;
137
138 if (!error.ok()) {
139 session_shutdown_cb(arg, true);
140 return;
141 }
142
143 do {
144 read_once = read(fd, se->read_buf, BUF_SIZE);
145 if (read_once > 0) read_total += read_once;
146 } while (read_once > 0);
147 se->sv->read_bytes_total += read_total;
148
149 // read() returns 0 to indicate the TCP connection was closed by the client.
150 // read(fd, read_buf, 0) also returns 0 which should never be called as such.
151 // It is possible to read nothing due to spurious edge event or data has
152 // been drained, In such a case, read() returns -1 and set errno to EAGAIN.
153 if (read_once == 0) {
154 session_shutdown_cb(arg, true);
155 } else if (read_once == -1) {
156 if (errno == EAGAIN) {
157 // An edge triggered event is cached in the kernel until next poll.
158 // In the current single thread implementation, session_read_cb is called
159 // in the polling thread, such that polling only happens after this
160 // callback, and will catch read edge event if data is available again
161 // before notify_on_read.
162 // TODO(chenw): in multi-threaded version, callback and polling can be
163 // run in different threads. polling may catch a persist read edge event
164 // before notify_on_read is called.
165 grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
166 } else {
167 grpc_core::Crash(absl::StrFormat("Unhandled read error %s",
168 grpc_core::StrError(errno).c_str()));
169 }
170 }
171 }
172
173 // Called when the listen FD can be safely shutdown.
174 // Close listen FD and signal that server can be shutdown.
listen_shutdown_cb(void * arg,int)175 static void listen_shutdown_cb(void* arg /*server*/, int /*success*/) {
176 server* sv = static_cast<server*>(arg);
177
178 grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
179
180 gpr_mu_lock(g_mu);
181 sv->done = 1;
182 ASSERT_TRUE(
183 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
184 gpr_mu_unlock(g_mu);
185 }
186
187 // Called when a new TCP connection request arrives in the listening port.
listen_cb(void * arg,grpc_error_handle error)188 static void listen_cb(void* arg, //=sv_arg
189 grpc_error_handle error) {
190 server* sv = static_cast<server*>(arg);
191 int fd;
192 int flags;
193 session* se;
194 struct sockaddr_storage ss;
195 socklen_t slen = sizeof(ss);
196 grpc_fd* listen_em_fd = sv->em_fd;
197
198 if (!error.ok()) {
199 listen_shutdown_cb(arg, 1);
200 return;
201 }
202
203 fd = accept(grpc_fd_wrapped_fd(listen_em_fd),
204 reinterpret_cast<struct sockaddr*>(&ss), &slen);
205 ASSERT_GE(fd, 0);
206 ASSERT_LT(fd, FD_SETSIZE);
207 flags = fcntl(fd, F_GETFL, 0);
208 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
209 se = static_cast<session*>(gpr_malloc(sizeof(*se)));
210 se->sv = sv;
211 se->em_fd = grpc_fd_create(fd, "listener", false);
212 grpc_pollset_add_fd(g_pollset, se->em_fd);
213 GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
214 grpc_schedule_on_exec_ctx);
215 grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
216
217 grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
218 }
219
220 // Max number of connections pending to be accepted by listen().
221 #define MAX_NUM_FD 1024
222
223 // Start a test server, return the TCP listening port bound to listen_fd.
224 // listen_cb() is registered to be interested in reading from listen_fd.
225 // When connection request arrives, listen_cb() is called to accept the
226 // connection request.
server_start(server * sv)227 static int server_start(server* sv) {
228 int port = 0;
229 int fd;
230 struct sockaddr_in sin;
231 socklen_t addr_len;
232
233 create_test_socket(port, &fd, &sin);
234 addr_len = sizeof(sin);
235 EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0);
236 EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0);
237 port = ntohs(sin.sin_port);
238 EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
239
240 sv->em_fd = grpc_fd_create(fd, "server", false);
241 grpc_pollset_add_fd(g_pollset, sv->em_fd);
242 // Register to be interested in reading from listen_fd.
243 GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
244 grpc_schedule_on_exec_ctx);
245 grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
246
247 return port;
248 }
249
250 // Wait and shutdown a sever.
server_wait_and_shutdown(server * sv)251 static void server_wait_and_shutdown(server* sv) {
252 gpr_mu_lock(g_mu);
253 while (!sv->done) {
254 grpc_core::ExecCtx exec_ctx;
255 grpc_pollset_worker* worker = nullptr;
256 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
257 "pollset_work", grpc_pollset_work(g_pollset, &worker,
258 grpc_core::Timestamp::InfFuture())));
259 gpr_mu_unlock(g_mu);
260
261 gpr_mu_lock(g_mu);
262 }
263 gpr_mu_unlock(g_mu);
264 }
265
266 // ===An upload client to test notify_on_write===
267
268 // Client write buffer size
269 #define CLIENT_WRITE_BUF_SIZE 10
270 // Total number of times that the client fills up the write buffer
271 #define CLIENT_TOTAL_WRITE_CNT 3
272
273 // An upload client.
274 typedef struct {
275 grpc_fd* em_fd;
276 char write_buf[CLIENT_WRITE_BUF_SIZE];
277 ssize_t write_bytes_total;
278 // Number of times that the client fills up the write buffer and calls
279 // notify_on_write to schedule another write.
280 int client_write_cnt;
281
282 int done; // set to 1 when a client finishes sending
283 grpc_closure write_closure;
284 } client;
285
client_init(client * cl)286 static void client_init(client* cl) {
287 memset(cl->write_buf, 0, sizeof(cl->write_buf));
288 cl->write_bytes_total = 0;
289 cl->client_write_cnt = 0;
290 cl->done = 0;
291 }
292
293 // Called when a client upload session is ready to shutdown.
client_session_shutdown_cb(void * arg,int)294 static void client_session_shutdown_cb(void* arg /*client*/, int /*success*/) {
295 client* cl = static_cast<client*>(arg);
296 grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
297 cl->done = 1;
298 ASSERT_TRUE(
299 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
300 }
301
302 // Write as much as possible, then register notify_on_write.
client_session_write(void * arg,grpc_error_handle error)303 static void client_session_write(void* arg, // client
304 grpc_error_handle error) {
305 client* cl = static_cast<client*>(arg);
306 int fd = grpc_fd_wrapped_fd(cl->em_fd);
307 ssize_t write_once = 0;
308
309 if (!error.ok()) {
310 gpr_mu_lock(g_mu);
311 client_session_shutdown_cb(arg, 1);
312 gpr_mu_unlock(g_mu);
313 return;
314 }
315
316 do {
317 write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
318 if (write_once > 0) cl->write_bytes_total += write_once;
319 } while (write_once > 0);
320
321 if (errno == EAGAIN) {
322 gpr_mu_lock(g_mu);
323 if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
324 GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
325 grpc_schedule_on_exec_ctx);
326 grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
327 cl->client_write_cnt++;
328 } else {
329 client_session_shutdown_cb(arg, 1);
330 }
331 gpr_mu_unlock(g_mu);
332 } else {
333 grpc_core::Crash(absl::StrFormat("unknown errno %s",
334 grpc_core::StrError(errno).c_str()));
335 }
336 }
337
338 // Start a client to send a stream of bytes.
client_start(client * cl,int port)339 static void client_start(client* cl, int port) {
340 int fd;
341 struct sockaddr_in sin;
342 create_test_socket(port, &fd, &sin);
343 if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
344 -1) {
345 if (errno == EINPROGRESS) {
346 struct pollfd pfd;
347 pfd.fd = fd;
348 pfd.events = POLLOUT;
349 pfd.revents = 0;
350 if (poll(&pfd, 1, -1) == -1) {
351 gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
352 abort();
353 }
354 } else {
355 grpc_core::Crash(
356 absl::StrFormat("Failed to connect to the server (errno=%d)", errno));
357 }
358 }
359
360 cl->em_fd = grpc_fd_create(fd, "client", false);
361 grpc_pollset_add_fd(g_pollset, cl->em_fd);
362
363 client_session_write(cl, absl::OkStatus());
364 }
365
366 // Wait for the signal to shutdown a client.
client_wait_and_shutdown(client * cl)367 static void client_wait_and_shutdown(client* cl) {
368 gpr_mu_lock(g_mu);
369 while (!cl->done) {
370 grpc_pollset_worker* worker = nullptr;
371 grpc_core::ExecCtx exec_ctx;
372 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
373 "pollset_work", grpc_pollset_work(g_pollset, &worker,
374 grpc_core::Timestamp::InfFuture())));
375 gpr_mu_unlock(g_mu);
376
377 gpr_mu_lock(g_mu);
378 }
379 gpr_mu_unlock(g_mu);
380 }
381
382 // Test grpc_fd. Start an upload server and client, upload a stream of
383 // bytes from the client to the server, and verify that the total number of
384 // sent bytes is equal to the total number of received bytes.
test_grpc_fd(void)385 static void test_grpc_fd(void) {
386 server sv;
387 client cl;
388 int port;
389 grpc_core::ExecCtx exec_ctx;
390
391 server_init(&sv);
392 port = server_start(&sv);
393 client_init(&cl);
394 client_start(&cl, port);
395
396 client_wait_and_shutdown(&cl);
397 server_wait_and_shutdown(&sv);
398 ASSERT_EQ(sv.read_bytes_total, cl.write_bytes_total);
399 gpr_log(GPR_INFO, "Total read bytes %" PRIdPTR, sv.read_bytes_total);
400 }
401
402 typedef struct fd_change_data {
403 grpc_iomgr_cb_func cb_that_ran;
404 } fd_change_data;
405
init_change_data(fd_change_data * fdc)406 void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
407
destroy_change_data(fd_change_data *)408 void destroy_change_data(fd_change_data* /*fdc*/) {}
409
first_read_callback(void * arg,grpc_error_handle)410 static void first_read_callback(void* arg /* fd_change_data */,
411 grpc_error_handle /*error*/) {
412 fd_change_data* fdc = static_cast<fd_change_data*>(arg);
413
414 gpr_mu_lock(g_mu);
415 fdc->cb_that_ran = first_read_callback;
416 ASSERT_TRUE(
417 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
418 gpr_mu_unlock(g_mu);
419 }
420
second_read_callback(void * arg,grpc_error_handle)421 static void second_read_callback(void* arg /* fd_change_data */,
422 grpc_error_handle /*error*/) {
423 fd_change_data* fdc = static_cast<fd_change_data*>(arg);
424
425 gpr_mu_lock(g_mu);
426 fdc->cb_that_ran = second_read_callback;
427 ASSERT_TRUE(
428 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
429 gpr_mu_unlock(g_mu);
430 }
431
432 // Test that changing the callback we use for notify_on_read actually works.
433 // Note that we have two different but almost identical callbacks above -- the
434 // point is to have two different function pointers and two different data
435 // pointers and make sure that changing both really works.
test_grpc_fd_change(void)436 static void test_grpc_fd_change(void) {
437 grpc_fd* em_fd;
438 fd_change_data a, b;
439 int flags;
440 int sv[2];
441 char data;
442 ssize_t result;
443 grpc_closure first_closure;
444 grpc_closure second_closure;
445 grpc_core::ExecCtx exec_ctx;
446
447 GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
448 grpc_schedule_on_exec_ctx);
449 GRPC_CLOSURE_INIT(&second_closure, second_read_callback, &b,
450 grpc_schedule_on_exec_ctx);
451
452 init_change_data(&a);
453 init_change_data(&b);
454
455 ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
456 flags = fcntl(sv[0], F_GETFL, 0);
457 ASSERT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0);
458 flags = fcntl(sv[1], F_GETFL, 0);
459 ASSERT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0);
460
461 em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
462 grpc_pollset_add_fd(g_pollset, em_fd);
463
464 // Register the first callback, then make its FD readable
465 grpc_fd_notify_on_read(em_fd, &first_closure);
466 data = 0;
467 result = write(sv[1], &data, 1);
468 ASSERT_EQ(result, 1);
469
470 // And now wait for it to run.
471 gpr_mu_lock(g_mu);
472 while (a.cb_that_ran == nullptr) {
473 grpc_pollset_worker* worker = nullptr;
474 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
475 "pollset_work", grpc_pollset_work(g_pollset, &worker,
476 grpc_core::Timestamp::InfFuture())));
477 gpr_mu_unlock(g_mu);
478
479 gpr_mu_lock(g_mu);
480 }
481 ASSERT_EQ(a.cb_that_ran, first_read_callback);
482 gpr_mu_unlock(g_mu);
483
484 // And drain the socket so we can generate a new read edge
485 result = read(sv[0], &data, 1);
486 ASSERT_EQ(result, 1);
487
488 // Now register a second callback with distinct change data, and do the same
489 // thing again.
490 grpc_fd_notify_on_read(em_fd, &second_closure);
491 data = 0;
492 result = write(sv[1], &data, 1);
493 ASSERT_EQ(result, 1);
494
495 gpr_mu_lock(g_mu);
496 while (b.cb_that_ran == nullptr) {
497 grpc_pollset_worker* worker = nullptr;
498 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
499 "pollset_work", grpc_pollset_work(g_pollset, &worker,
500 grpc_core::Timestamp::InfFuture())));
501 gpr_mu_unlock(g_mu);
502
503 gpr_mu_lock(g_mu);
504 }
505 // Except now we verify that second_read_callback ran instead
506 ASSERT_EQ(b.cb_that_ran, second_read_callback);
507 gpr_mu_unlock(g_mu);
508
509 grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
510
511 destroy_change_data(&a);
512 destroy_change_data(&b);
513 close(sv[1]);
514 }
515
destroy_pollset(void * p,grpc_error_handle)516 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
517 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
518 }
519
TEST(FdPosixTest,MainTest)520 TEST(FdPosixTest, MainTest) {
521 grpc_closure destroyed;
522 grpc_init();
523 {
524 grpc_core::ExecCtx exec_ctx;
525 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
526 grpc_pollset_init(g_pollset, &g_mu);
527 test_grpc_fd();
528 test_grpc_fd_change();
529 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
530 grpc_schedule_on_exec_ctx);
531 grpc_pollset_shutdown(g_pollset, &destroyed);
532 grpc_core::ExecCtx::Get()->Flush();
533 gpr_free(g_pollset);
534 }
535 grpc_shutdown();
536 }
537
538 #endif // GRPC_POSIX_SOCKET_EV
539
main(int argc,char ** argv)540 int main(int argc, char** argv) {
541 grpc::testing::TestEnvironment env(&argc, argv);
542 ::testing::InitGoogleTest(&argc, argv);
543 return RUN_ALL_TESTS();
544 }
545