xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/posix/event_poller_posix_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <stdint.h>
16 #include <sys/select.h>
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <chrono>
21 #include <cstring>
22 #include <memory>
23 #include <vector>
24 
25 #include "absl/status/statusor.h"
26 #include "absl/strings/str_cat.h"
27 #include "absl/strings/str_format.h"
28 #include "absl/strings/str_split.h"
29 #include "absl/strings/string_view.h"
30 #include "gtest/gtest.h"
31 
32 #include <grpc/grpc.h>
33 
34 #include "src/core/lib/config/config_vars.h"
35 #include "src/core/lib/event_engine/poller.h"
36 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
37 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
38 #include "src/core/lib/gprpp/ref_counted_ptr.h"
39 #include "src/core/lib/iomgr/port.h"
40 
41 // IWYU pragma: no_include <arpa/inet.h>
42 // IWYU pragma: no_include <ratio>
43 
44 // This test won't work except with posix sockets enabled
45 #ifdef GRPC_POSIX_SOCKET_EV
46 
47 #include <errno.h>
48 #include <fcntl.h>
49 #include <netinet/in.h>
50 #include <poll.h>
51 #include <stdlib.h>
52 #include <sys/socket.h>
53 #include <unistd.h>
54 
55 #include "absl/status/status.h"
56 
57 #include <grpc/support/alloc.h>
58 #include <grpc/support/log.h>
59 #include <grpc/support/sync.h>
60 
61 #include "src/core/lib/event_engine/common_closures.h"
62 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
63 #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
64 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
65 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
66 #include "src/core/lib/gprpp/crash.h"
67 #include "src/core/lib/gprpp/dual_ref_counted.h"
68 #include "src/core/lib/gprpp/notification.h"
69 #include "src/core/lib/gprpp/strerror.h"
70 #include "test/core/event_engine/posix/posix_engine_test_utils.h"
71 #include "test/core/util/port.h"
72 
73 static gpr_mu g_mu;
74 static std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
75     g_event_poller;
76 
77 // buffer size used to send and receive data.
78 // 1024 is the minimal value to set TCP send and receive buffer.
79 #define BUF_SIZE 1024
80 // Max number of connections pending to be accepted by listen().
81 #define MAX_NUM_FD 1024
82 // Client write buffer size
83 #define CLIENT_WRITE_BUF_SIZE 10
84 // Total number of times that the client fills up the write buffer
85 #define CLIENT_TOTAL_WRITE_CNT 3
86 
87 namespace grpc_event_engine {
88 namespace experimental {
89 
90 using namespace std::chrono_literals;
91 
92 namespace {
93 
SetSocketSendBuf(int fd,int buffer_size_bytes)94 absl::Status SetSocketSendBuf(int fd, int buffer_size_bytes) {
95   return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
96                          sizeof(buffer_size_bytes))
97              ? absl::OkStatus()
98              : absl::Status(absl::StatusCode::kInternal,
99                             grpc_core::StrError(errno).c_str());
100 }
101 
102 // Create a test socket with the right properties for testing.
103 // port is the TCP port to listen or connect to.
104 // Return a socket FD and sockaddr_in.
CreateTestSocket(int port,int * socket_fd,struct sockaddr_in6 * sin)105 void CreateTestSocket(int port, int* socket_fd, struct sockaddr_in6* sin) {
106   int fd;
107   int one = 1;
108   int buffer_size_bytes = BUF_SIZE;
109   int flags;
110 
111   fd = socket(AF_INET6, SOCK_STREAM, 0);
112   setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
113   // Reset the size of socket send buffer to the minimal value to facilitate
114   // buffer filling up and triggering notify_on_write
115   EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok());
116   EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok());
117   // Make fd non-blocking.
118   flags = fcntl(fd, F_GETFL, 0);
119   EXPECT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0);
120   *socket_fd = fd;
121 
122   // Use local address for test.
123   memset(sin, 0, sizeof(struct sockaddr_in6));
124   sin->sin6_family = AF_INET6;
125   (reinterpret_cast<char*>(&sin->sin6_addr))[15] = 1;
126   EXPECT_TRUE(port >= 0 && port < 65536);
127   sin->sin6_port = htons(static_cast<uint16_t>(port));
128 }
129 
130 //  =======An upload server to test notify_on_read===========
131 //    The server simply reads and counts a stream of bytes.
132 
133 // An upload server.
134 typedef struct {
135   EventHandle* em_fd;        // listening fd
136   ssize_t read_bytes_total;  // total number of received bytes
137   int done;                  // set to 1 when a server finishes serving
138   PosixEngineClosure* listen_closure;
139 } server;
140 
ServerInit(server * sv)141 void ServerInit(server* sv) {
142   sv->read_bytes_total = 0;
143   sv->done = 0;
144 }
145 
146 // An upload session.
147 // Created when a new upload request arrives in the server.
148 typedef struct {
149   server* sv;               // not owned by a single session
150   EventHandle* em_fd;       // fd to read upload bytes
151   char read_buf[BUF_SIZE];  // buffer to store upload bytes
152   PosixEngineClosure* session_read_closure;
153 } session;
154 
155 // Called when an upload session can be safely shutdown.
156 // Close session FD and start to shutdown listen FD.
SessionShutdownCb(session * se,bool)157 void SessionShutdownCb(session* se, bool /*success*/) {
158   server* sv = se->sv;
159   se->em_fd->OrphanHandle(nullptr, nullptr, "a");
160   gpr_free(se);
161   // Start to shutdown listen fd.
162   sv->em_fd->ShutdownHandle(
163       absl::Status(absl::StatusCode::kUnknown, "SessionShutdownCb"));
164 }
165 
166 // Called when data become readable in a session.
SessionReadCb(session * se,absl::Status status)167 void SessionReadCb(session* se, absl::Status status) {
168   int fd = se->em_fd->WrappedFd();
169 
170   ssize_t read_once = 0;
171   ssize_t read_total = 0;
172 
173   if (!status.ok()) {
174     SessionShutdownCb(se, true);
175     return;
176   }
177 
178   do {
179     read_once = read(fd, se->read_buf, BUF_SIZE);
180     if (read_once > 0) read_total += read_once;
181   } while (read_once > 0);
182   se->sv->read_bytes_total += read_total;
183 
184   // read() returns 0 to indicate the TCP connection was closed by the
185   // client read(fd, read_buf, 0) also returns 0 which should never be called as
186   // such. It is possible to read nothing due to spurious edge event or data has
187   // been drained, In such a case, read() returns -1 and set errno to
188   // EAGAIN.
189   if (read_once == 0) {
190     SessionShutdownCb(se, true);
191   } else if (read_once == -1) {
192     EXPECT_EQ(errno, EAGAIN);
193     // An edge triggered event is cached in the kernel until next poll.
194     // In the current single thread implementation, SessionReadCb is called
195     // in the polling thread, such that polling only happens after this
196     // callback, and will catch read edge event if data is available again
197     // before notify_on_read.
198     se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
199         [se](absl::Status status) { SessionReadCb(se, status); });
200     se->em_fd->NotifyOnRead(se->session_read_closure);
201   }
202 }
203 
204 // Called when the listen FD can be safely shutdown. Close listen FD and
205 // signal that server can be shutdown.
ListenShutdownCb(server * sv)206 void ListenShutdownCb(server* sv) {
207   sv->em_fd->OrphanHandle(nullptr, nullptr, "b");
208   gpr_mu_lock(&g_mu);
209   sv->done = 1;
210   g_event_poller->Kick();
211   gpr_mu_unlock(&g_mu);
212 }
213 
214 // Called when a new TCP connection request arrives in the listening port.
ListenCb(server * sv,absl::Status status)215 void ListenCb(server* sv, absl::Status status) {
216   int fd;
217   int flags;
218   session* se;
219   struct sockaddr_storage ss;
220   socklen_t slen = sizeof(ss);
221   EventHandle* listen_em_fd = sv->em_fd;
222 
223   if (!status.ok()) {
224     ListenShutdownCb(sv);
225     return;
226   }
227 
228   do {
229     fd = accept(listen_em_fd->WrappedFd(),
230                 reinterpret_cast<struct sockaddr*>(&ss), &slen);
231   } while (fd < 0 && errno == EINTR);
232   if (fd < 0 && errno == EAGAIN) {
233     sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
234         [sv](absl::Status status) { ListenCb(sv, status); });
235     listen_em_fd->NotifyOnRead(sv->listen_closure);
236     return;
237   } else if (fd < 0) {
238     gpr_log(GPR_ERROR, "Failed to acceot a connection, returned error: %s",
239             grpc_core::StrError(errno).c_str());
240   }
241   EXPECT_GE(fd, 0);
242   EXPECT_LT(fd, FD_SETSIZE);
243   flags = fcntl(fd, F_GETFL, 0);
244   fcntl(fd, F_SETFL, flags | O_NONBLOCK);
245   se = static_cast<session*>(gpr_malloc(sizeof(*se)));
246   se->sv = sv;
247   se->em_fd = g_event_poller->CreateHandle(fd, "listener", false);
248   se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
249       [se](absl::Status status) { SessionReadCb(se, status); });
250   se->em_fd->NotifyOnRead(se->session_read_closure);
251   sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
252       [sv](absl::Status status) { ListenCb(sv, status); });
253   listen_em_fd->NotifyOnRead(sv->listen_closure);
254 }
255 
256 // Start a test server, return the TCP listening port bound to listen_fd.
257 // ListenCb() is registered to be interested in reading from listen_fd.
258 // When connection request arrives, ListenCb() is called to accept the
259 // connection request.
ServerStart(server * sv)260 int ServerStart(server* sv) {
261   int port = grpc_pick_unused_port_or_die();
262   int fd;
263   struct sockaddr_in6 sin;
264   socklen_t addr_len;
265 
266   CreateTestSocket(port, &fd, &sin);
267   addr_len = sizeof(sin);
268   EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0);
269   EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0);
270   port = ntohs(sin.sin6_port);
271   EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
272 
273   sv->em_fd = g_event_poller->CreateHandle(fd, "server", false);
274   sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
275       [sv](absl::Status status) { ListenCb(sv, status); });
276   sv->em_fd->NotifyOnRead(sv->listen_closure);
277   return port;
278 }
279 
280 // ===An upload client to test notify_on_write===
281 
282 // An upload client.
283 typedef struct {
284   EventHandle* em_fd;
285   char write_buf[CLIENT_WRITE_BUF_SIZE];
286   ssize_t write_bytes_total;
287   // Number of times that the client fills up the write buffer and calls
288   // notify_on_write to schedule another write.
289   int client_write_cnt;
290   int done;
291   PosixEngineClosure* write_closure;
292 } client;
293 
ClientInit(client * cl)294 void ClientInit(client* cl) {
295   memset(cl->write_buf, 0, sizeof(cl->write_buf));
296   cl->write_bytes_total = 0;
297   cl->client_write_cnt = 0;
298   cl->done = 0;
299 }
300 
301 // Called when a client upload session is ready to shutdown.
ClientSessionShutdownCb(client * cl)302 void ClientSessionShutdownCb(client* cl) {
303   cl->em_fd->OrphanHandle(nullptr, nullptr, "c");
304   gpr_mu_lock(&g_mu);
305   cl->done = 1;
306   g_event_poller->Kick();
307   gpr_mu_unlock(&g_mu);
308 }
309 
310 // Write as much as possible, then register notify_on_write.
ClientSessionWrite(client * cl,absl::Status status)311 void ClientSessionWrite(client* cl, absl::Status status) {
312   int fd = cl->em_fd->WrappedFd();
313   ssize_t write_once = 0;
314 
315   if (!status.ok()) {
316     ClientSessionShutdownCb(cl);
317     return;
318   }
319 
320   do {
321     write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
322     if (write_once > 0) cl->write_bytes_total += write_once;
323   } while (write_once > 0);
324 
325   EXPECT_EQ(errno, EAGAIN);
326   gpr_mu_lock(&g_mu);
327   if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
328     cl->write_closure = PosixEngineClosure::TestOnlyToClosure(
329         [cl](absl::Status status) { ClientSessionWrite(cl, status); });
330     cl->client_write_cnt++;
331     gpr_mu_unlock(&g_mu);
332     cl->em_fd->NotifyOnWrite(cl->write_closure);
333   } else {
334     gpr_mu_unlock(&g_mu);
335     ClientSessionShutdownCb(cl);
336   }
337 }
338 
339 // Start a client to send a stream of bytes.
ClientStart(client * cl,int port)340 void ClientStart(client* cl, int port) {
341   int fd;
342   struct sockaddr_in6 sin;
343   CreateTestSocket(port, &fd, &sin);
344   if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
345       -1) {
346     if (errno == EINPROGRESS) {
347       struct pollfd pfd;
348       pfd.fd = fd;
349       pfd.events = POLLOUT;
350       pfd.revents = 0;
351       if (poll(&pfd, 1, -1) == -1) {
352         gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
353         abort();
354       }
355     } else {
356       grpc_core::Crash(
357           absl::StrFormat("Failed to connect to the server (errno=%d)", errno));
358     }
359   }
360 
361   cl->em_fd = g_event_poller->CreateHandle(fd, "client", false);
362   ClientSessionWrite(cl, absl::OkStatus());
363 }
364 
365 // Wait for the signal to shutdown client and server.
WaitAndShutdown(server * sv,client * cl)366 void WaitAndShutdown(server* sv, client* cl) {
367   Poller::WorkResult result;
368   gpr_mu_lock(&g_mu);
369   while (!sv->done || !cl->done) {
370     gpr_mu_unlock(&g_mu);
371     result = g_event_poller->Work(24h, []() {});
372     ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
373     gpr_mu_lock(&g_mu);
374   }
375   gpr_mu_unlock(&g_mu);
376 }
377 
378 class EventPollerTest : public ::testing::Test {
SetUp()379   void SetUp() override {
380     engine_ =
381         std::make_unique<grpc_event_engine::experimental::PosixEventEngine>();
382     EXPECT_NE(engine_, nullptr);
383     scheduler_ =
384         std::make_unique<grpc_event_engine::experimental::TestScheduler>(
385             engine_.get());
386     EXPECT_NE(scheduler_, nullptr);
387     g_event_poller = MakeDefaultPoller(scheduler_.get());
388     engine_ = PosixEventEngine::MakeTestOnlyPosixEventEngine(g_event_poller);
389     EXPECT_NE(engine_, nullptr);
390     scheduler_->ChangeCurrentEventEngine(engine_.get());
391     if (g_event_poller != nullptr) {
392       gpr_log(GPR_INFO, "Using poller: %s", g_event_poller->Name().c_str());
393     }
394   }
395 
TearDown()396   void TearDown() override {
397     if (g_event_poller != nullptr) {
398       g_event_poller->Shutdown();
399     }
400   }
401 
402  public:
Scheduler()403   TestScheduler* Scheduler() { return scheduler_.get(); }
404 
405  private:
406   std::shared_ptr<grpc_event_engine::experimental::PosixEventEngine> engine_;
407   std::unique_ptr<grpc_event_engine::experimental::TestScheduler> scheduler_;
408 };
409 
410 // Test grpc_fd. Start an upload server and client, upload a stream of bytes
411 // from the client to the server, and verify that the total number of sent
412 // bytes is equal to the total number of received bytes.
TEST_F(EventPollerTest,TestEventPollerHandle)413 TEST_F(EventPollerTest, TestEventPollerHandle) {
414   server sv;
415   client cl;
416   int port;
417   if (g_event_poller == nullptr) {
418     return;
419   }
420   ServerInit(&sv);
421   port = ServerStart(&sv);
422   ClientInit(&cl);
423   ClientStart(&cl, port);
424 
425   WaitAndShutdown(&sv, &cl);
426   EXPECT_EQ(sv.read_bytes_total, cl.write_bytes_total);
427 }
428 
429 typedef struct FdChangeData {
430   void (*cb_that_ran)(struct FdChangeData*, absl::Status);
431 } FdChangeData;
432 
InitChangeData(FdChangeData * fdc)433 void InitChangeData(FdChangeData* fdc) { fdc->cb_that_ran = nullptr; }
434 
DestroyChangeData(FdChangeData *)435 void DestroyChangeData(FdChangeData* /*fdc*/) {}
436 
FirstReadCallback(FdChangeData * fdc,absl::Status)437 void FirstReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
438   gpr_mu_lock(&g_mu);
439   fdc->cb_that_ran = FirstReadCallback;
440   g_event_poller->Kick();
441   gpr_mu_unlock(&g_mu);
442 }
443 
SecondReadCallback(FdChangeData * fdc,absl::Status)444 void SecondReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
445   gpr_mu_lock(&g_mu);
446   fdc->cb_that_ran = SecondReadCallback;
447   g_event_poller->Kick();
448   gpr_mu_unlock(&g_mu);
449 }
450 
451 // Test that changing the callback we use for notify_on_read actually works.
452 // Note that we have two different but almost identical callbacks above -- the
453 // point is to have two different function pointers and two different data
454 // pointers and make sure that changing both really works.
TEST_F(EventPollerTest,TestEventPollerHandleChange)455 TEST_F(EventPollerTest, TestEventPollerHandleChange) {
456   EventHandle* em_fd;
457   FdChangeData a, b;
458   int flags;
459   int sv[2];
460   char data;
461   ssize_t result;
462   if (g_event_poller == nullptr) {
463     return;
464   }
465   PosixEngineClosure* first_closure = PosixEngineClosure::TestOnlyToClosure(
466       [a = &a](absl::Status status) { FirstReadCallback(a, status); });
467   PosixEngineClosure* second_closure = PosixEngineClosure::TestOnlyToClosure(
468       [b = &b](absl::Status status) { SecondReadCallback(b, status); });
469   InitChangeData(&a);
470   InitChangeData(&b);
471 
472   EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
473   flags = fcntl(sv[0], F_GETFL, 0);
474   EXPECT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0);
475   flags = fcntl(sv[1], F_GETFL, 0);
476   EXPECT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0);
477 
478   em_fd =
479       g_event_poller->CreateHandle(sv[0], "TestEventPollerHandleChange", false);
480   EXPECT_NE(em_fd, nullptr);
481   // Register the first callback, then make its FD readable
482   em_fd->NotifyOnRead(first_closure);
483   data = 0;
484   result = write(sv[1], &data, 1);
485   EXPECT_EQ(result, 1);
486 
487   // And now wait for it to run.
488   auto poller_work = [](FdChangeData* fdc) {
489     Poller::WorkResult result;
490     gpr_mu_lock(&g_mu);
491     while (fdc->cb_that_ran == nullptr) {
492       gpr_mu_unlock(&g_mu);
493       result = g_event_poller->Work(24h, []() {});
494       ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
495       gpr_mu_lock(&g_mu);
496     }
497   };
498   poller_work(&a);
499   EXPECT_EQ(a.cb_that_ran, FirstReadCallback);
500   gpr_mu_unlock(&g_mu);
501 
502   // And drain the socket so we can generate a new read edge
503   result = read(sv[0], &data, 1);
504   EXPECT_EQ(result, 1);
505 
506   // Now register a second callback with distinct change data, and do the same
507   // thing again.
508   em_fd->NotifyOnRead(second_closure);
509   data = 0;
510   result = write(sv[1], &data, 1);
511   EXPECT_EQ(result, 1);
512 
513   // And now wait for it to run.
514   poller_work(&b);
515   // Except now we verify that SecondReadCallback ran instead.
516   EXPECT_EQ(b.cb_that_ran, SecondReadCallback);
517   gpr_mu_unlock(&g_mu);
518 
519   em_fd->OrphanHandle(nullptr, nullptr, "d");
520   DestroyChangeData(&a);
521   DestroyChangeData(&b);
522   close(sv[1]);
523 }
524 
525 std::atomic<int> kTotalActiveWakeupFdHandles{0};
526 
527 // A helper class representing one file descriptor. Its implemented using
528 // a WakeupFd. It registers itself with the poller and waits to be notified
529 // of read events. Upon receiving a read event, (1) it processes it,
530 // (2) registes to be notified of the next read event and (3) schedules
531 // generation of the next read event. The Fd orphanes itself after processing
532 // a specified number of read events.
533 class WakeupFdHandle : public grpc_core::DualRefCounted<WakeupFdHandle> {
534  public:
WakeupFdHandle(int num_wakeups,Scheduler * scheduler,PosixEventPoller * poller)535   WakeupFdHandle(int num_wakeups, Scheduler* scheduler,
536                  PosixEventPoller* poller)
537       : num_wakeups_(num_wakeups),
538         scheduler_(scheduler),
539         poller_(poller),
540         on_read_(
541             PosixEngineClosure::ToPermanentClosure([this](absl::Status status) {
542               EXPECT_TRUE(status.ok());
543               status = ReadPipe();
544               if (!status.ok()) {
545                 // Rarely epoll1 poller may generate an EPOLLHUP - which is a
546                 // spurious wakeup. Poll based poller may also likely generate a
547                 // lot of spurious wakeups because of the level triggered nature
548                 // of poll In such cases do not bother changing the number of
549                 // wakeups received.
550                 EXPECT_EQ(status, absl::InternalError("Spurious Wakeup"));
551                 handle_->NotifyOnRead(on_read_);
552                 return;
553               }
554               if (--num_wakeups_ == 0) {
555                 // This should invoke the registered NotifyOnRead callbacks with
556                 // the shutdown error. When those callbacks call Unref(), the
557                 // WakeupFdHandle should call OrphanHandle in the Unref() method
558                 // implementation.
559                 handle_->ShutdownHandle(absl::InternalError("Shutting down"));
560                 Unref();
561               } else {
562                 handle_->NotifyOnRead(on_read_);
563                 Ref().release();
564                 // Schedule next wakeup to trigger the registered NotifyOnRead
565                 // callback.
566                 scheduler_->Run(SelfDeletingClosure::Create([this]() {
567                   // Send next wakeup.
568                   EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
569                   Unref();
570                 }));
571               }
572             })) {
573     WeakRef().release();
574     ++kTotalActiveWakeupFdHandles;
575     EXPECT_GT(num_wakeups_, 0);
576     EXPECT_NE(scheduler_, nullptr);
577     EXPECT_NE(poller_, nullptr);
578     wakeup_fd_ = *PipeWakeupFd::CreatePipeWakeupFd();
579     handle_ = poller_->CreateHandle(wakeup_fd_->ReadFd(), "test", false);
580     EXPECT_NE(handle_, nullptr);
581     handle_->NotifyOnRead(on_read_);
582     //  Send a wakeup initially.
583     EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
584   }
585 
~WakeupFdHandle()586   ~WakeupFdHandle() override { delete on_read_; }
587 
Orphaned()588   void Orphaned() override {
589     // Once the handle has orphaned itself, decrement
590     // kTotalActiveWakeupFdHandles. Once all handles have orphaned themselves,
591     // send a Kick to the poller.
592     handle_->OrphanHandle(
593         PosixEngineClosure::TestOnlyToClosure(
594             [poller = poller_, wakeupfd_handle = this](absl::Status status) {
595               EXPECT_TRUE(status.ok());
596               if (--kTotalActiveWakeupFdHandles == 0) {
597                 poller->Kick();
598               }
599               wakeupfd_handle->WeakUnref();
600             }),
601         nullptr, "");
602   }
603 
604  private:
ReadPipe()605   absl::Status ReadPipe() {
606     char buf[128];
607     ssize_t r;
608     int total_bytes_read = 0;
609     for (;;) {
610       r = read(wakeup_fd_->ReadFd(), buf, sizeof(buf));
611       if (r > 0) {
612         total_bytes_read += r;
613         continue;
614       }
615       if (r == 0) return absl::OkStatus();
616       switch (errno) {
617         case EAGAIN:
618           return total_bytes_read > 0 ? absl::OkStatus()
619                                       : absl::InternalError("Spurious Wakeup");
620         case EINTR:
621           continue;
622         default:
623           return absl::Status(
624               absl::StatusCode::kInternal,
625               absl::StrCat("read: ", grpc_core::StrError(errno)));
626       }
627     }
628   }
629   int num_wakeups_;
630   Scheduler* scheduler_;
631   PosixEventPoller* poller_;
632   PosixEngineClosure* on_read_;
633   std::unique_ptr<WakeupFd> wakeup_fd_;
634   EventHandle* handle_;
635 };
636 
637 // A helper class to create Fds and drive the polling for these Fds. It
638 // repeatedly calls the Work(..) method on the poller to get pet pending events,
639 // then schedules another parallel Work(..) instantiation and processes these
640 // pending events. This continues until all Fds have orphaned themselves.
641 class Worker : public grpc_core::DualRefCounted<Worker> {
642  public:
Worker(Scheduler * scheduler,PosixEventPoller * poller,int num_handles,int num_wakeups_per_handle)643   Worker(Scheduler* scheduler, PosixEventPoller* poller, int num_handles,
644          int num_wakeups_per_handle)
645       : scheduler_(scheduler), poller_(poller) {
646     handles_.reserve(num_handles);
647     for (int i = 0; i < num_handles; i++) {
648       handles_.push_back(
649           new WakeupFdHandle(num_wakeups_per_handle, scheduler_, poller_));
650     }
651     WeakRef().release();
652   }
Orphaned()653   void Orphaned() override { signal.Notify(); }
Start()654   void Start() {
655     // Start executing Work(..).
656     scheduler_->Run([this]() { Work(); });
657   }
658 
Wait()659   void Wait() {
660     signal.WaitForNotification();
661     WeakUnref();
662   }
663 
664  private:
Work()665   void Work() {
666     auto result = g_event_poller->Work(24h, [this]() {
667       // Schedule next work instantiation immediately and take a Ref for
668       // the next instantiation.
669       Ref().release();
670       scheduler_->Run([this]() { Work(); });
671     });
672     ASSERT_TRUE(result == Poller::WorkResult::kOk ||
673                 result == Poller::WorkResult::kKicked);
674     // Corresponds to the Ref taken for the current instantiation. If the
675     // result was Poller::WorkResult::kKicked, then the next work instantiation
676     // would not have been scheduled and the poll_again callback should have
677     // been deleted.
678     Unref();
679   }
680   Scheduler* scheduler_;
681   PosixEventPoller* poller_;
682   grpc_core::Notification signal;
683   std::vector<WakeupFdHandle*> handles_;
684 };
685 
686 // This test creates kNumHandles file descriptors and kNumWakeupsPerHandle
687 // separate read events to the created Fds. The Fds use the NotifyOnRead API to
688 // wait for a read event, upon receiving a read event they process it
689 // immediately and schedule the wait for the next read event. A new read event
690 // is also generated for each fd in parallel after the previous one is
691 // processed.
TEST_F(EventPollerTest,TestMultipleHandles)692 TEST_F(EventPollerTest, TestMultipleHandles) {
693   static constexpr int kNumHandles = 100;
694   static constexpr int kNumWakeupsPerHandle = 100;
695   if (g_event_poller == nullptr) {
696     return;
697   }
698   Worker* worker = new Worker(Scheduler(), g_event_poller.get(), kNumHandles,
699                               kNumWakeupsPerHandle);
700   worker->Start();
701   worker->Wait();
702 }
703 
704 }  // namespace
705 }  // namespace experimental
706 }  // namespace grpc_event_engine
707 
main(int argc,char ** argv)708 int main(int argc, char** argv) {
709   ::testing::InitGoogleTest(&argc, argv);
710   gpr_mu_init(&g_mu);
711   auto poll_strategy = grpc_core::ConfigVars::Get().PollStrategy();
712   auto strings = absl::StrSplit(poll_strategy, ',');
713   if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
714     // Skip the test entirely if poll strategy is none.
715     return 0;
716   }
717   // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
718   // until we clear out the iomgr shutdown code.
719   grpc_init();
720   int r = RUN_ALL_TESTS();
721   grpc_shutdown();
722   return r;
723 }
724 
725 #else  // GRPC_POSIX_SOCKET_EV
726 
main(int argc,char ** argv)727 int main(int argc, char** argv) { return 1; }
728 
729 #endif  // GRPC_POSIX_SOCKET_EV
730