xref: /aosp_15_r20/external/grpc-grpc/test/core/iomgr/fd_posix_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <gtest/gtest.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 #include "test/core/util/test_config.h"
23 
24 // This test won't work except with posix sockets enabled
25 #ifdef GRPC_POSIX_SOCKET_EV
26 
27 #include <ctype.h>
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <netinet/in.h>
31 #include <poll.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/socket.h>
36 #include <sys/time.h>
37 #include <unistd.h>
38 
39 #include "absl/strings/str_format.h"
40 
41 #include <grpc/grpc.h>
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/sync.h>
45 #include <grpc/support/time.h>
46 
47 #include "src/core/lib/gprpp/crash.h"
48 #include "src/core/lib/gprpp/strerror.h"
49 #include "src/core/lib/iomgr/ev_posix.h"
50 #include "src/core/lib/iomgr/iomgr.h"
51 #include "src/core/lib/iomgr/socket_utils_posix.h"
52 
53 static gpr_mu* g_mu;
54 static grpc_pollset* g_pollset;
55 
56 // buffer size used to send and receive data.
57 // 1024 is the minimal value to set TCP send and receive buffer.
58 #define BUF_SIZE 1024
59 
60 // Create a test socket with the right properties for testing.
61 // port is the TCP port to listen or connect to.
62 // Return a socket FD and sockaddr_in.
create_test_socket(int port,int * socket_fd,struct sockaddr_in * sin)63 static void create_test_socket(int port, int* socket_fd,
64                                struct sockaddr_in* sin) {
65   int fd;
66   int one = 1;
67   int buffer_size_bytes = BUF_SIZE;
68   int flags;
69 
70   fd = socket(AF_INET, SOCK_STREAM, 0);
71   setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
72   // Reset the size of socket send buffer to the minimal value to facilitate
73   // buffer filling up and triggering notify_on_write
74   ASSERT_EQ(grpc_set_socket_sndbuf(fd, buffer_size_bytes), absl::OkStatus());
75   ASSERT_EQ(grpc_set_socket_rcvbuf(fd, buffer_size_bytes), absl::OkStatus());
76   // Make fd non-blocking
77   flags = fcntl(fd, F_GETFL, 0);
78   ASSERT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0);
79   *socket_fd = fd;
80 
81   // Use local address for test
82   sin->sin_family = AF_INET;
83   sin->sin_addr.s_addr = htonl(0x7f000001);
84   ASSERT_GE(port, 0);
85   ASSERT_LT(port, 65536);
86   sin->sin_port = htons(static_cast<uint16_t>(port));
87 }
88 
89 // Phony gRPC callback
no_op_cb(void *,int)90 void no_op_cb(void* /*arg*/, int /*success*/) {}
91 
92 // =======An upload server to test notify_on_read===========
93 // The server simply reads and counts a stream of bytes.
94 
95 // An upload server.
96 typedef struct {
97   grpc_fd* em_fd;            // listening fd
98   ssize_t read_bytes_total;  // total number of received bytes
99   int done;                  // set to 1 when a server finishes serving
100   grpc_closure listen_closure;
101 } server;
102 
server_init(server * sv)103 static void server_init(server* sv) {
104   sv->read_bytes_total = 0;
105   sv->done = 0;
106 }
107 
108 // An upload session.
109 // Created when a new upload request arrives in the server.
110 typedef struct {
111   server* sv;               // not owned by a single session
112   grpc_fd* em_fd;           // fd to read upload bytes
113   char read_buf[BUF_SIZE];  // buffer to store upload bytes
114   grpc_closure session_read_closure;
115 } session;
116 
117 // Called when an upload session can be safely shutdown.
118 // Close session FD and start to shutdown listen FD.
session_shutdown_cb(void * arg,bool)119 static void session_shutdown_cb(void* arg,  // session
120                                 bool /*success*/) {
121   session* se = static_cast<session*>(arg);
122   server* sv = se->sv;
123   grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
124   gpr_free(se);
125   // Start to shutdown listen fd.
126   grpc_fd_shutdown(sv->em_fd, GRPC_ERROR_CREATE("session_shutdown_cb"));
127 }
128 
129 // Called when data become readable in a session.
session_read_cb(void * arg,grpc_error_handle error)130 static void session_read_cb(void* arg,  // session
131                             grpc_error_handle error) {
132   session* se = static_cast<session*>(arg);
133   int fd = grpc_fd_wrapped_fd(se->em_fd);
134 
135   ssize_t read_once = 0;
136   ssize_t read_total = 0;
137 
138   if (!error.ok()) {
139     session_shutdown_cb(arg, true);
140     return;
141   }
142 
143   do {
144     read_once = read(fd, se->read_buf, BUF_SIZE);
145     if (read_once > 0) read_total += read_once;
146   } while (read_once > 0);
147   se->sv->read_bytes_total += read_total;
148 
149   // read() returns 0 to indicate the TCP connection was closed by the client.
150   // read(fd, read_buf, 0) also returns 0 which should never be called as such.
151   // It is possible to read nothing due to spurious edge event or data has
152   // been drained, In such a case, read() returns -1 and set errno to EAGAIN.
153   if (read_once == 0) {
154     session_shutdown_cb(arg, true);
155   } else if (read_once == -1) {
156     if (errno == EAGAIN) {
157       // An edge triggered event is cached in the kernel until next poll.
158       // In the current single thread implementation, session_read_cb is called
159       // in the polling thread, such that polling only happens after this
160       // callback, and will catch read edge event if data is available again
161       // before notify_on_read.
162       // TODO(chenw): in multi-threaded version, callback and polling can be
163       // run in different threads. polling may catch a persist read edge event
164       // before notify_on_read is called.
165       grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
166     } else {
167       grpc_core::Crash(absl::StrFormat("Unhandled read error %s",
168                                        grpc_core::StrError(errno).c_str()));
169     }
170   }
171 }
172 
173 // Called when the listen FD can be safely shutdown.
174 // Close listen FD and signal that server can be shutdown.
listen_shutdown_cb(void * arg,int)175 static void listen_shutdown_cb(void* arg /*server*/, int /*success*/) {
176   server* sv = static_cast<server*>(arg);
177 
178   grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
179 
180   gpr_mu_lock(g_mu);
181   sv->done = 1;
182   ASSERT_TRUE(
183       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
184   gpr_mu_unlock(g_mu);
185 }
186 
187 // Called when a new TCP connection request arrives in the listening port.
listen_cb(void * arg,grpc_error_handle error)188 static void listen_cb(void* arg,  //=sv_arg
189                       grpc_error_handle error) {
190   server* sv = static_cast<server*>(arg);
191   int fd;
192   int flags;
193   session* se;
194   struct sockaddr_storage ss;
195   socklen_t slen = sizeof(ss);
196   grpc_fd* listen_em_fd = sv->em_fd;
197 
198   if (!error.ok()) {
199     listen_shutdown_cb(arg, 1);
200     return;
201   }
202 
203   fd = accept(grpc_fd_wrapped_fd(listen_em_fd),
204               reinterpret_cast<struct sockaddr*>(&ss), &slen);
205   ASSERT_GE(fd, 0);
206   ASSERT_LT(fd, FD_SETSIZE);
207   flags = fcntl(fd, F_GETFL, 0);
208   fcntl(fd, F_SETFL, flags | O_NONBLOCK);
209   se = static_cast<session*>(gpr_malloc(sizeof(*se)));
210   se->sv = sv;
211   se->em_fd = grpc_fd_create(fd, "listener", false);
212   grpc_pollset_add_fd(g_pollset, se->em_fd);
213   GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
214                     grpc_schedule_on_exec_ctx);
215   grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
216 
217   grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
218 }
219 
220 // Max number of connections pending to be accepted by listen().
221 #define MAX_NUM_FD 1024
222 
223 // Start a test server, return the TCP listening port bound to listen_fd.
224 // listen_cb() is registered to be interested in reading from listen_fd.
225 // When connection request arrives, listen_cb() is called to accept the
226 // connection request.
server_start(server * sv)227 static int server_start(server* sv) {
228   int port = 0;
229   int fd;
230   struct sockaddr_in sin;
231   socklen_t addr_len;
232 
233   create_test_socket(port, &fd, &sin);
234   addr_len = sizeof(sin);
235   EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0);
236   EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0);
237   port = ntohs(sin.sin_port);
238   EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
239 
240   sv->em_fd = grpc_fd_create(fd, "server", false);
241   grpc_pollset_add_fd(g_pollset, sv->em_fd);
242   // Register to be interested in reading from listen_fd.
243   GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
244                     grpc_schedule_on_exec_ctx);
245   grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
246 
247   return port;
248 }
249 
250 // Wait and shutdown a sever.
server_wait_and_shutdown(server * sv)251 static void server_wait_and_shutdown(server* sv) {
252   gpr_mu_lock(g_mu);
253   while (!sv->done) {
254     grpc_core::ExecCtx exec_ctx;
255     grpc_pollset_worker* worker = nullptr;
256     ASSERT_TRUE(GRPC_LOG_IF_ERROR(
257         "pollset_work", grpc_pollset_work(g_pollset, &worker,
258                                           grpc_core::Timestamp::InfFuture())));
259     gpr_mu_unlock(g_mu);
260 
261     gpr_mu_lock(g_mu);
262   }
263   gpr_mu_unlock(g_mu);
264 }
265 
266 // ===An upload client to test notify_on_write===
267 
268 // Client write buffer size
269 #define CLIENT_WRITE_BUF_SIZE 10
270 // Total number of times that the client fills up the write buffer
271 #define CLIENT_TOTAL_WRITE_CNT 3
272 
273 // An upload client.
274 typedef struct {
275   grpc_fd* em_fd;
276   char write_buf[CLIENT_WRITE_BUF_SIZE];
277   ssize_t write_bytes_total;
278   // Number of times that the client fills up the write buffer and calls
279   // notify_on_write to schedule another write.
280   int client_write_cnt;
281 
282   int done;  // set to 1 when a client finishes sending
283   grpc_closure write_closure;
284 } client;
285 
client_init(client * cl)286 static void client_init(client* cl) {
287   memset(cl->write_buf, 0, sizeof(cl->write_buf));
288   cl->write_bytes_total = 0;
289   cl->client_write_cnt = 0;
290   cl->done = 0;
291 }
292 
293 // Called when a client upload session is ready to shutdown.
client_session_shutdown_cb(void * arg,int)294 static void client_session_shutdown_cb(void* arg /*client*/, int /*success*/) {
295   client* cl = static_cast<client*>(arg);
296   grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
297   cl->done = 1;
298   ASSERT_TRUE(
299       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
300 }
301 
302 // Write as much as possible, then register notify_on_write.
client_session_write(void * arg,grpc_error_handle error)303 static void client_session_write(void* arg,  // client
304                                  grpc_error_handle error) {
305   client* cl = static_cast<client*>(arg);
306   int fd = grpc_fd_wrapped_fd(cl->em_fd);
307   ssize_t write_once = 0;
308 
309   if (!error.ok()) {
310     gpr_mu_lock(g_mu);
311     client_session_shutdown_cb(arg, 1);
312     gpr_mu_unlock(g_mu);
313     return;
314   }
315 
316   do {
317     write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
318     if (write_once > 0) cl->write_bytes_total += write_once;
319   } while (write_once > 0);
320 
321   if (errno == EAGAIN) {
322     gpr_mu_lock(g_mu);
323     if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
324       GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
325                         grpc_schedule_on_exec_ctx);
326       grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
327       cl->client_write_cnt++;
328     } else {
329       client_session_shutdown_cb(arg, 1);
330     }
331     gpr_mu_unlock(g_mu);
332   } else {
333     grpc_core::Crash(absl::StrFormat("unknown errno %s",
334                                      grpc_core::StrError(errno).c_str()));
335   }
336 }
337 
338 // Start a client to send a stream of bytes.
client_start(client * cl,int port)339 static void client_start(client* cl, int port) {
340   int fd;
341   struct sockaddr_in sin;
342   create_test_socket(port, &fd, &sin);
343   if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
344       -1) {
345     if (errno == EINPROGRESS) {
346       struct pollfd pfd;
347       pfd.fd = fd;
348       pfd.events = POLLOUT;
349       pfd.revents = 0;
350       if (poll(&pfd, 1, -1) == -1) {
351         gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
352         abort();
353       }
354     } else {
355       grpc_core::Crash(
356           absl::StrFormat("Failed to connect to the server (errno=%d)", errno));
357     }
358   }
359 
360   cl->em_fd = grpc_fd_create(fd, "client", false);
361   grpc_pollset_add_fd(g_pollset, cl->em_fd);
362 
363   client_session_write(cl, absl::OkStatus());
364 }
365 
366 // Wait for the signal to shutdown a client.
client_wait_and_shutdown(client * cl)367 static void client_wait_and_shutdown(client* cl) {
368   gpr_mu_lock(g_mu);
369   while (!cl->done) {
370     grpc_pollset_worker* worker = nullptr;
371     grpc_core::ExecCtx exec_ctx;
372     ASSERT_TRUE(GRPC_LOG_IF_ERROR(
373         "pollset_work", grpc_pollset_work(g_pollset, &worker,
374                                           grpc_core::Timestamp::InfFuture())));
375     gpr_mu_unlock(g_mu);
376 
377     gpr_mu_lock(g_mu);
378   }
379   gpr_mu_unlock(g_mu);
380 }
381 
382 // Test grpc_fd. Start an upload server and client, upload a stream of
383 // bytes from the client to the server, and verify that the total number of
384 // sent bytes is equal to the total number of received bytes.
test_grpc_fd(void)385 static void test_grpc_fd(void) {
386   server sv;
387   client cl;
388   int port;
389   grpc_core::ExecCtx exec_ctx;
390 
391   server_init(&sv);
392   port = server_start(&sv);
393   client_init(&cl);
394   client_start(&cl, port);
395 
396   client_wait_and_shutdown(&cl);
397   server_wait_and_shutdown(&sv);
398   ASSERT_EQ(sv.read_bytes_total, cl.write_bytes_total);
399   gpr_log(GPR_INFO, "Total read bytes %" PRIdPTR, sv.read_bytes_total);
400 }
401 
402 typedef struct fd_change_data {
403   grpc_iomgr_cb_func cb_that_ran;
404 } fd_change_data;
405 
init_change_data(fd_change_data * fdc)406 void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
407 
destroy_change_data(fd_change_data *)408 void destroy_change_data(fd_change_data* /*fdc*/) {}
409 
first_read_callback(void * arg,grpc_error_handle)410 static void first_read_callback(void* arg /* fd_change_data */,
411                                 grpc_error_handle /*error*/) {
412   fd_change_data* fdc = static_cast<fd_change_data*>(arg);
413 
414   gpr_mu_lock(g_mu);
415   fdc->cb_that_ran = first_read_callback;
416   ASSERT_TRUE(
417       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
418   gpr_mu_unlock(g_mu);
419 }
420 
second_read_callback(void * arg,grpc_error_handle)421 static void second_read_callback(void* arg /* fd_change_data */,
422                                  grpc_error_handle /*error*/) {
423   fd_change_data* fdc = static_cast<fd_change_data*>(arg);
424 
425   gpr_mu_lock(g_mu);
426   fdc->cb_that_ran = second_read_callback;
427   ASSERT_TRUE(
428       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
429   gpr_mu_unlock(g_mu);
430 }
431 
432 // Test that changing the callback we use for notify_on_read actually works.
433 // Note that we have two different but almost identical callbacks above -- the
434 // point is to have two different function pointers and two different data
435 // pointers and make sure that changing both really works.
test_grpc_fd_change(void)436 static void test_grpc_fd_change(void) {
437   grpc_fd* em_fd;
438   fd_change_data a, b;
439   int flags;
440   int sv[2];
441   char data;
442   ssize_t result;
443   grpc_closure first_closure;
444   grpc_closure second_closure;
445   grpc_core::ExecCtx exec_ctx;
446 
447   GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
448                     grpc_schedule_on_exec_ctx);
449   GRPC_CLOSURE_INIT(&second_closure, second_read_callback, &b,
450                     grpc_schedule_on_exec_ctx);
451 
452   init_change_data(&a);
453   init_change_data(&b);
454 
455   ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
456   flags = fcntl(sv[0], F_GETFL, 0);
457   ASSERT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0);
458   flags = fcntl(sv[1], F_GETFL, 0);
459   ASSERT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0);
460 
461   em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
462   grpc_pollset_add_fd(g_pollset, em_fd);
463 
464   // Register the first callback, then make its FD readable
465   grpc_fd_notify_on_read(em_fd, &first_closure);
466   data = 0;
467   result = write(sv[1], &data, 1);
468   ASSERT_EQ(result, 1);
469 
470   // And now wait for it to run.
471   gpr_mu_lock(g_mu);
472   while (a.cb_that_ran == nullptr) {
473     grpc_pollset_worker* worker = nullptr;
474     ASSERT_TRUE(GRPC_LOG_IF_ERROR(
475         "pollset_work", grpc_pollset_work(g_pollset, &worker,
476                                           grpc_core::Timestamp::InfFuture())));
477     gpr_mu_unlock(g_mu);
478 
479     gpr_mu_lock(g_mu);
480   }
481   ASSERT_EQ(a.cb_that_ran, first_read_callback);
482   gpr_mu_unlock(g_mu);
483 
484   // And drain the socket so we can generate a new read edge
485   result = read(sv[0], &data, 1);
486   ASSERT_EQ(result, 1);
487 
488   // Now register a second callback with distinct change data, and do the same
489   // thing again.
490   grpc_fd_notify_on_read(em_fd, &second_closure);
491   data = 0;
492   result = write(sv[1], &data, 1);
493   ASSERT_EQ(result, 1);
494 
495   gpr_mu_lock(g_mu);
496   while (b.cb_that_ran == nullptr) {
497     grpc_pollset_worker* worker = nullptr;
498     ASSERT_TRUE(GRPC_LOG_IF_ERROR(
499         "pollset_work", grpc_pollset_work(g_pollset, &worker,
500                                           grpc_core::Timestamp::InfFuture())));
501     gpr_mu_unlock(g_mu);
502 
503     gpr_mu_lock(g_mu);
504   }
505   // Except now we verify that second_read_callback ran instead
506   ASSERT_EQ(b.cb_that_ran, second_read_callback);
507   gpr_mu_unlock(g_mu);
508 
509   grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
510 
511   destroy_change_data(&a);
512   destroy_change_data(&b);
513   close(sv[1]);
514 }
515 
destroy_pollset(void * p,grpc_error_handle)516 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
517   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
518 }
519 
TEST(FdPosixTest,MainTest)520 TEST(FdPosixTest, MainTest) {
521   grpc_closure destroyed;
522   grpc_init();
523   {
524     grpc_core::ExecCtx exec_ctx;
525     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
526     grpc_pollset_init(g_pollset, &g_mu);
527     test_grpc_fd();
528     test_grpc_fd_change();
529     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
530                       grpc_schedule_on_exec_ctx);
531     grpc_pollset_shutdown(g_pollset, &destroyed);
532     grpc_core::ExecCtx::Get()->Flush();
533     gpr_free(g_pollset);
534   }
535   grpc_shutdown();
536 }
537 
538 #endif  // GRPC_POSIX_SOCKET_EV
539 
main(int argc,char ** argv)540 int main(int argc, char** argv) {
541   grpc::testing::TestEnvironment env(&argc, argv);
542   ::testing::InitGoogleTest(&argc, argv);
543   return RUN_ALL_TESTS();
544 }
545