1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <stdint.h>
16 #include <sys/select.h>
17
18 #include <algorithm>
19 #include <atomic>
20 #include <chrono>
21 #include <cstring>
22 #include <memory>
23 #include <vector>
24
25 #include "absl/status/statusor.h"
26 #include "absl/strings/str_cat.h"
27 #include "absl/strings/str_format.h"
28 #include "absl/strings/str_split.h"
29 #include "absl/strings/string_view.h"
30 #include "gtest/gtest.h"
31
32 #include <grpc/grpc.h>
33
34 #include "src/core/lib/config/config_vars.h"
35 #include "src/core/lib/event_engine/poller.h"
36 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
37 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
38 #include "src/core/lib/gprpp/ref_counted_ptr.h"
39 #include "src/core/lib/iomgr/port.h"
40
41 // IWYU pragma: no_include <arpa/inet.h>
42 // IWYU pragma: no_include <ratio>
43
44 // This test won't work except with posix sockets enabled
45 #ifdef GRPC_POSIX_SOCKET_EV
46
47 #include <errno.h>
48 #include <fcntl.h>
49 #include <netinet/in.h>
50 #include <poll.h>
51 #include <stdlib.h>
52 #include <sys/socket.h>
53 #include <unistd.h>
54
55 #include "absl/status/status.h"
56
57 #include <grpc/support/alloc.h>
58 #include <grpc/support/log.h>
59 #include <grpc/support/sync.h>
60
61 #include "src/core/lib/event_engine/common_closures.h"
62 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
63 #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
64 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
65 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
66 #include "src/core/lib/gprpp/crash.h"
67 #include "src/core/lib/gprpp/dual_ref_counted.h"
68 #include "src/core/lib/gprpp/notification.h"
69 #include "src/core/lib/gprpp/strerror.h"
70 #include "test/core/event_engine/posix/posix_engine_test_utils.h"
71 #include "test/core/util/port.h"
72
73 static gpr_mu g_mu;
74 static std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
75 g_event_poller;
76
77 // buffer size used to send and receive data.
78 // 1024 is the minimal value to set TCP send and receive buffer.
79 #define BUF_SIZE 1024
80 // Max number of connections pending to be accepted by listen().
81 #define MAX_NUM_FD 1024
82 // Client write buffer size
83 #define CLIENT_WRITE_BUF_SIZE 10
84 // Total number of times that the client fills up the write buffer
85 #define CLIENT_TOTAL_WRITE_CNT 3
86
87 namespace grpc_event_engine {
88 namespace experimental {
89
90 using namespace std::chrono_literals;
91
92 namespace {
93
SetSocketSendBuf(int fd,int buffer_size_bytes)94 absl::Status SetSocketSendBuf(int fd, int buffer_size_bytes) {
95 return 0 == setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
96 sizeof(buffer_size_bytes))
97 ? absl::OkStatus()
98 : absl::Status(absl::StatusCode::kInternal,
99 grpc_core::StrError(errno).c_str());
100 }
101
102 // Create a test socket with the right properties for testing.
103 // port is the TCP port to listen or connect to.
104 // Return a socket FD and sockaddr_in.
CreateTestSocket(int port,int * socket_fd,struct sockaddr_in6 * sin)105 void CreateTestSocket(int port, int* socket_fd, struct sockaddr_in6* sin) {
106 int fd;
107 int one = 1;
108 int buffer_size_bytes = BUF_SIZE;
109 int flags;
110
111 fd = socket(AF_INET6, SOCK_STREAM, 0);
112 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
113 // Reset the size of socket send buffer to the minimal value to facilitate
114 // buffer filling up and triggering notify_on_write
115 EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok());
116 EXPECT_TRUE(SetSocketSendBuf(fd, buffer_size_bytes).ok());
117 // Make fd non-blocking.
118 flags = fcntl(fd, F_GETFL, 0);
119 EXPECT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0);
120 *socket_fd = fd;
121
122 // Use local address for test.
123 memset(sin, 0, sizeof(struct sockaddr_in6));
124 sin->sin6_family = AF_INET6;
125 (reinterpret_cast<char*>(&sin->sin6_addr))[15] = 1;
126 EXPECT_TRUE(port >= 0 && port < 65536);
127 sin->sin6_port = htons(static_cast<uint16_t>(port));
128 }
129
130 // =======An upload server to test notify_on_read===========
131 // The server simply reads and counts a stream of bytes.
132
133 // An upload server.
134 typedef struct {
135 EventHandle* em_fd; // listening fd
136 ssize_t read_bytes_total; // total number of received bytes
137 int done; // set to 1 when a server finishes serving
138 PosixEngineClosure* listen_closure;
139 } server;
140
ServerInit(server * sv)141 void ServerInit(server* sv) {
142 sv->read_bytes_total = 0;
143 sv->done = 0;
144 }
145
146 // An upload session.
147 // Created when a new upload request arrives in the server.
148 typedef struct {
149 server* sv; // not owned by a single session
150 EventHandle* em_fd; // fd to read upload bytes
151 char read_buf[BUF_SIZE]; // buffer to store upload bytes
152 PosixEngineClosure* session_read_closure;
153 } session;
154
155 // Called when an upload session can be safely shutdown.
156 // Close session FD and start to shutdown listen FD.
SessionShutdownCb(session * se,bool)157 void SessionShutdownCb(session* se, bool /*success*/) {
158 server* sv = se->sv;
159 se->em_fd->OrphanHandle(nullptr, nullptr, "a");
160 gpr_free(se);
161 // Start to shutdown listen fd.
162 sv->em_fd->ShutdownHandle(
163 absl::Status(absl::StatusCode::kUnknown, "SessionShutdownCb"));
164 }
165
166 // Called when data become readable in a session.
SessionReadCb(session * se,absl::Status status)167 void SessionReadCb(session* se, absl::Status status) {
168 int fd = se->em_fd->WrappedFd();
169
170 ssize_t read_once = 0;
171 ssize_t read_total = 0;
172
173 if (!status.ok()) {
174 SessionShutdownCb(se, true);
175 return;
176 }
177
178 do {
179 read_once = read(fd, se->read_buf, BUF_SIZE);
180 if (read_once > 0) read_total += read_once;
181 } while (read_once > 0);
182 se->sv->read_bytes_total += read_total;
183
184 // read() returns 0 to indicate the TCP connection was closed by the
185 // client read(fd, read_buf, 0) also returns 0 which should never be called as
186 // such. It is possible to read nothing due to spurious edge event or data has
187 // been drained, In such a case, read() returns -1 and set errno to
188 // EAGAIN.
189 if (read_once == 0) {
190 SessionShutdownCb(se, true);
191 } else if (read_once == -1) {
192 EXPECT_EQ(errno, EAGAIN);
193 // An edge triggered event is cached in the kernel until next poll.
194 // In the current single thread implementation, SessionReadCb is called
195 // in the polling thread, such that polling only happens after this
196 // callback, and will catch read edge event if data is available again
197 // before notify_on_read.
198 se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
199 [se](absl::Status status) { SessionReadCb(se, status); });
200 se->em_fd->NotifyOnRead(se->session_read_closure);
201 }
202 }
203
204 // Called when the listen FD can be safely shutdown. Close listen FD and
205 // signal that server can be shutdown.
ListenShutdownCb(server * sv)206 void ListenShutdownCb(server* sv) {
207 sv->em_fd->OrphanHandle(nullptr, nullptr, "b");
208 gpr_mu_lock(&g_mu);
209 sv->done = 1;
210 g_event_poller->Kick();
211 gpr_mu_unlock(&g_mu);
212 }
213
214 // Called when a new TCP connection request arrives in the listening port.
ListenCb(server * sv,absl::Status status)215 void ListenCb(server* sv, absl::Status status) {
216 int fd;
217 int flags;
218 session* se;
219 struct sockaddr_storage ss;
220 socklen_t slen = sizeof(ss);
221 EventHandle* listen_em_fd = sv->em_fd;
222
223 if (!status.ok()) {
224 ListenShutdownCb(sv);
225 return;
226 }
227
228 do {
229 fd = accept(listen_em_fd->WrappedFd(),
230 reinterpret_cast<struct sockaddr*>(&ss), &slen);
231 } while (fd < 0 && errno == EINTR);
232 if (fd < 0 && errno == EAGAIN) {
233 sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
234 [sv](absl::Status status) { ListenCb(sv, status); });
235 listen_em_fd->NotifyOnRead(sv->listen_closure);
236 return;
237 } else if (fd < 0) {
238 gpr_log(GPR_ERROR, "Failed to acceot a connection, returned error: %s",
239 grpc_core::StrError(errno).c_str());
240 }
241 EXPECT_GE(fd, 0);
242 EXPECT_LT(fd, FD_SETSIZE);
243 flags = fcntl(fd, F_GETFL, 0);
244 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
245 se = static_cast<session*>(gpr_malloc(sizeof(*se)));
246 se->sv = sv;
247 se->em_fd = g_event_poller->CreateHandle(fd, "listener", false);
248 se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
249 [se](absl::Status status) { SessionReadCb(se, status); });
250 se->em_fd->NotifyOnRead(se->session_read_closure);
251 sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
252 [sv](absl::Status status) { ListenCb(sv, status); });
253 listen_em_fd->NotifyOnRead(sv->listen_closure);
254 }
255
256 // Start a test server, return the TCP listening port bound to listen_fd.
257 // ListenCb() is registered to be interested in reading from listen_fd.
258 // When connection request arrives, ListenCb() is called to accept the
259 // connection request.
ServerStart(server * sv)260 int ServerStart(server* sv) {
261 int port = grpc_pick_unused_port_or_die();
262 int fd;
263 struct sockaddr_in6 sin;
264 socklen_t addr_len;
265
266 CreateTestSocket(port, &fd, &sin);
267 addr_len = sizeof(sin);
268 EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0);
269 EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0);
270 port = ntohs(sin.sin6_port);
271 EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
272
273 sv->em_fd = g_event_poller->CreateHandle(fd, "server", false);
274 sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
275 [sv](absl::Status status) { ListenCb(sv, status); });
276 sv->em_fd->NotifyOnRead(sv->listen_closure);
277 return port;
278 }
279
280 // ===An upload client to test notify_on_write===
281
282 // An upload client.
283 typedef struct {
284 EventHandle* em_fd;
285 char write_buf[CLIENT_WRITE_BUF_SIZE];
286 ssize_t write_bytes_total;
287 // Number of times that the client fills up the write buffer and calls
288 // notify_on_write to schedule another write.
289 int client_write_cnt;
290 int done;
291 PosixEngineClosure* write_closure;
292 } client;
293
ClientInit(client * cl)294 void ClientInit(client* cl) {
295 memset(cl->write_buf, 0, sizeof(cl->write_buf));
296 cl->write_bytes_total = 0;
297 cl->client_write_cnt = 0;
298 cl->done = 0;
299 }
300
301 // Called when a client upload session is ready to shutdown.
ClientSessionShutdownCb(client * cl)302 void ClientSessionShutdownCb(client* cl) {
303 cl->em_fd->OrphanHandle(nullptr, nullptr, "c");
304 gpr_mu_lock(&g_mu);
305 cl->done = 1;
306 g_event_poller->Kick();
307 gpr_mu_unlock(&g_mu);
308 }
309
310 // Write as much as possible, then register notify_on_write.
ClientSessionWrite(client * cl,absl::Status status)311 void ClientSessionWrite(client* cl, absl::Status status) {
312 int fd = cl->em_fd->WrappedFd();
313 ssize_t write_once = 0;
314
315 if (!status.ok()) {
316 ClientSessionShutdownCb(cl);
317 return;
318 }
319
320 do {
321 write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
322 if (write_once > 0) cl->write_bytes_total += write_once;
323 } while (write_once > 0);
324
325 EXPECT_EQ(errno, EAGAIN);
326 gpr_mu_lock(&g_mu);
327 if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
328 cl->write_closure = PosixEngineClosure::TestOnlyToClosure(
329 [cl](absl::Status status) { ClientSessionWrite(cl, status); });
330 cl->client_write_cnt++;
331 gpr_mu_unlock(&g_mu);
332 cl->em_fd->NotifyOnWrite(cl->write_closure);
333 } else {
334 gpr_mu_unlock(&g_mu);
335 ClientSessionShutdownCb(cl);
336 }
337 }
338
339 // Start a client to send a stream of bytes.
ClientStart(client * cl,int port)340 void ClientStart(client* cl, int port) {
341 int fd;
342 struct sockaddr_in6 sin;
343 CreateTestSocket(port, &fd, &sin);
344 if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
345 -1) {
346 if (errno == EINPROGRESS) {
347 struct pollfd pfd;
348 pfd.fd = fd;
349 pfd.events = POLLOUT;
350 pfd.revents = 0;
351 if (poll(&pfd, 1, -1) == -1) {
352 gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
353 abort();
354 }
355 } else {
356 grpc_core::Crash(
357 absl::StrFormat("Failed to connect to the server (errno=%d)", errno));
358 }
359 }
360
361 cl->em_fd = g_event_poller->CreateHandle(fd, "client", false);
362 ClientSessionWrite(cl, absl::OkStatus());
363 }
364
365 // Wait for the signal to shutdown client and server.
WaitAndShutdown(server * sv,client * cl)366 void WaitAndShutdown(server* sv, client* cl) {
367 Poller::WorkResult result;
368 gpr_mu_lock(&g_mu);
369 while (!sv->done || !cl->done) {
370 gpr_mu_unlock(&g_mu);
371 result = g_event_poller->Work(24h, []() {});
372 ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
373 gpr_mu_lock(&g_mu);
374 }
375 gpr_mu_unlock(&g_mu);
376 }
377
378 class EventPollerTest : public ::testing::Test {
SetUp()379 void SetUp() override {
380 engine_ =
381 std::make_unique<grpc_event_engine::experimental::PosixEventEngine>();
382 EXPECT_NE(engine_, nullptr);
383 scheduler_ =
384 std::make_unique<grpc_event_engine::experimental::TestScheduler>(
385 engine_.get());
386 EXPECT_NE(scheduler_, nullptr);
387 g_event_poller = MakeDefaultPoller(scheduler_.get());
388 engine_ = PosixEventEngine::MakeTestOnlyPosixEventEngine(g_event_poller);
389 EXPECT_NE(engine_, nullptr);
390 scheduler_->ChangeCurrentEventEngine(engine_.get());
391 if (g_event_poller != nullptr) {
392 gpr_log(GPR_INFO, "Using poller: %s", g_event_poller->Name().c_str());
393 }
394 }
395
TearDown()396 void TearDown() override {
397 if (g_event_poller != nullptr) {
398 g_event_poller->Shutdown();
399 }
400 }
401
402 public:
Scheduler()403 TestScheduler* Scheduler() { return scheduler_.get(); }
404
405 private:
406 std::shared_ptr<grpc_event_engine::experimental::PosixEventEngine> engine_;
407 std::unique_ptr<grpc_event_engine::experimental::TestScheduler> scheduler_;
408 };
409
410 // Test grpc_fd. Start an upload server and client, upload a stream of bytes
411 // from the client to the server, and verify that the total number of sent
412 // bytes is equal to the total number of received bytes.
TEST_F(EventPollerTest,TestEventPollerHandle)413 TEST_F(EventPollerTest, TestEventPollerHandle) {
414 server sv;
415 client cl;
416 int port;
417 if (g_event_poller == nullptr) {
418 return;
419 }
420 ServerInit(&sv);
421 port = ServerStart(&sv);
422 ClientInit(&cl);
423 ClientStart(&cl, port);
424
425 WaitAndShutdown(&sv, &cl);
426 EXPECT_EQ(sv.read_bytes_total, cl.write_bytes_total);
427 }
428
429 typedef struct FdChangeData {
430 void (*cb_that_ran)(struct FdChangeData*, absl::Status);
431 } FdChangeData;
432
InitChangeData(FdChangeData * fdc)433 void InitChangeData(FdChangeData* fdc) { fdc->cb_that_ran = nullptr; }
434
DestroyChangeData(FdChangeData *)435 void DestroyChangeData(FdChangeData* /*fdc*/) {}
436
FirstReadCallback(FdChangeData * fdc,absl::Status)437 void FirstReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
438 gpr_mu_lock(&g_mu);
439 fdc->cb_that_ran = FirstReadCallback;
440 g_event_poller->Kick();
441 gpr_mu_unlock(&g_mu);
442 }
443
SecondReadCallback(FdChangeData * fdc,absl::Status)444 void SecondReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
445 gpr_mu_lock(&g_mu);
446 fdc->cb_that_ran = SecondReadCallback;
447 g_event_poller->Kick();
448 gpr_mu_unlock(&g_mu);
449 }
450
451 // Test that changing the callback we use for notify_on_read actually works.
452 // Note that we have two different but almost identical callbacks above -- the
453 // point is to have two different function pointers and two different data
454 // pointers and make sure that changing both really works.
TEST_F(EventPollerTest,TestEventPollerHandleChange)455 TEST_F(EventPollerTest, TestEventPollerHandleChange) {
456 EventHandle* em_fd;
457 FdChangeData a, b;
458 int flags;
459 int sv[2];
460 char data;
461 ssize_t result;
462 if (g_event_poller == nullptr) {
463 return;
464 }
465 PosixEngineClosure* first_closure = PosixEngineClosure::TestOnlyToClosure(
466 [a = &a](absl::Status status) { FirstReadCallback(a, status); });
467 PosixEngineClosure* second_closure = PosixEngineClosure::TestOnlyToClosure(
468 [b = &b](absl::Status status) { SecondReadCallback(b, status); });
469 InitChangeData(&a);
470 InitChangeData(&b);
471
472 EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
473 flags = fcntl(sv[0], F_GETFL, 0);
474 EXPECT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0);
475 flags = fcntl(sv[1], F_GETFL, 0);
476 EXPECT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0);
477
478 em_fd =
479 g_event_poller->CreateHandle(sv[0], "TestEventPollerHandleChange", false);
480 EXPECT_NE(em_fd, nullptr);
481 // Register the first callback, then make its FD readable
482 em_fd->NotifyOnRead(first_closure);
483 data = 0;
484 result = write(sv[1], &data, 1);
485 EXPECT_EQ(result, 1);
486
487 // And now wait for it to run.
488 auto poller_work = [](FdChangeData* fdc) {
489 Poller::WorkResult result;
490 gpr_mu_lock(&g_mu);
491 while (fdc->cb_that_ran == nullptr) {
492 gpr_mu_unlock(&g_mu);
493 result = g_event_poller->Work(24h, []() {});
494 ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
495 gpr_mu_lock(&g_mu);
496 }
497 };
498 poller_work(&a);
499 EXPECT_EQ(a.cb_that_ran, FirstReadCallback);
500 gpr_mu_unlock(&g_mu);
501
502 // And drain the socket so we can generate a new read edge
503 result = read(sv[0], &data, 1);
504 EXPECT_EQ(result, 1);
505
506 // Now register a second callback with distinct change data, and do the same
507 // thing again.
508 em_fd->NotifyOnRead(second_closure);
509 data = 0;
510 result = write(sv[1], &data, 1);
511 EXPECT_EQ(result, 1);
512
513 // And now wait for it to run.
514 poller_work(&b);
515 // Except now we verify that SecondReadCallback ran instead.
516 EXPECT_EQ(b.cb_that_ran, SecondReadCallback);
517 gpr_mu_unlock(&g_mu);
518
519 em_fd->OrphanHandle(nullptr, nullptr, "d");
520 DestroyChangeData(&a);
521 DestroyChangeData(&b);
522 close(sv[1]);
523 }
524
525 std::atomic<int> kTotalActiveWakeupFdHandles{0};
526
527 // A helper class representing one file descriptor. Its implemented using
528 // a WakeupFd. It registers itself with the poller and waits to be notified
529 // of read events. Upon receiving a read event, (1) it processes it,
530 // (2) registes to be notified of the next read event and (3) schedules
531 // generation of the next read event. The Fd orphanes itself after processing
532 // a specified number of read events.
533 class WakeupFdHandle : public grpc_core::DualRefCounted<WakeupFdHandle> {
534 public:
WakeupFdHandle(int num_wakeups,Scheduler * scheduler,PosixEventPoller * poller)535 WakeupFdHandle(int num_wakeups, Scheduler* scheduler,
536 PosixEventPoller* poller)
537 : num_wakeups_(num_wakeups),
538 scheduler_(scheduler),
539 poller_(poller),
540 on_read_(
541 PosixEngineClosure::ToPermanentClosure([this](absl::Status status) {
542 EXPECT_TRUE(status.ok());
543 status = ReadPipe();
544 if (!status.ok()) {
545 // Rarely epoll1 poller may generate an EPOLLHUP - which is a
546 // spurious wakeup. Poll based poller may also likely generate a
547 // lot of spurious wakeups because of the level triggered nature
548 // of poll In such cases do not bother changing the number of
549 // wakeups received.
550 EXPECT_EQ(status, absl::InternalError("Spurious Wakeup"));
551 handle_->NotifyOnRead(on_read_);
552 return;
553 }
554 if (--num_wakeups_ == 0) {
555 // This should invoke the registered NotifyOnRead callbacks with
556 // the shutdown error. When those callbacks call Unref(), the
557 // WakeupFdHandle should call OrphanHandle in the Unref() method
558 // implementation.
559 handle_->ShutdownHandle(absl::InternalError("Shutting down"));
560 Unref();
561 } else {
562 handle_->NotifyOnRead(on_read_);
563 Ref().release();
564 // Schedule next wakeup to trigger the registered NotifyOnRead
565 // callback.
566 scheduler_->Run(SelfDeletingClosure::Create([this]() {
567 // Send next wakeup.
568 EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
569 Unref();
570 }));
571 }
572 })) {
573 WeakRef().release();
574 ++kTotalActiveWakeupFdHandles;
575 EXPECT_GT(num_wakeups_, 0);
576 EXPECT_NE(scheduler_, nullptr);
577 EXPECT_NE(poller_, nullptr);
578 wakeup_fd_ = *PipeWakeupFd::CreatePipeWakeupFd();
579 handle_ = poller_->CreateHandle(wakeup_fd_->ReadFd(), "test", false);
580 EXPECT_NE(handle_, nullptr);
581 handle_->NotifyOnRead(on_read_);
582 // Send a wakeup initially.
583 EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
584 }
585
~WakeupFdHandle()586 ~WakeupFdHandle() override { delete on_read_; }
587
Orphaned()588 void Orphaned() override {
589 // Once the handle has orphaned itself, decrement
590 // kTotalActiveWakeupFdHandles. Once all handles have orphaned themselves,
591 // send a Kick to the poller.
592 handle_->OrphanHandle(
593 PosixEngineClosure::TestOnlyToClosure(
594 [poller = poller_, wakeupfd_handle = this](absl::Status status) {
595 EXPECT_TRUE(status.ok());
596 if (--kTotalActiveWakeupFdHandles == 0) {
597 poller->Kick();
598 }
599 wakeupfd_handle->WeakUnref();
600 }),
601 nullptr, "");
602 }
603
604 private:
ReadPipe()605 absl::Status ReadPipe() {
606 char buf[128];
607 ssize_t r;
608 int total_bytes_read = 0;
609 for (;;) {
610 r = read(wakeup_fd_->ReadFd(), buf, sizeof(buf));
611 if (r > 0) {
612 total_bytes_read += r;
613 continue;
614 }
615 if (r == 0) return absl::OkStatus();
616 switch (errno) {
617 case EAGAIN:
618 return total_bytes_read > 0 ? absl::OkStatus()
619 : absl::InternalError("Spurious Wakeup");
620 case EINTR:
621 continue;
622 default:
623 return absl::Status(
624 absl::StatusCode::kInternal,
625 absl::StrCat("read: ", grpc_core::StrError(errno)));
626 }
627 }
628 }
629 int num_wakeups_;
630 Scheduler* scheduler_;
631 PosixEventPoller* poller_;
632 PosixEngineClosure* on_read_;
633 std::unique_ptr<WakeupFd> wakeup_fd_;
634 EventHandle* handle_;
635 };
636
637 // A helper class to create Fds and drive the polling for these Fds. It
638 // repeatedly calls the Work(..) method on the poller to get pet pending events,
639 // then schedules another parallel Work(..) instantiation and processes these
640 // pending events. This continues until all Fds have orphaned themselves.
641 class Worker : public grpc_core::DualRefCounted<Worker> {
642 public:
Worker(Scheduler * scheduler,PosixEventPoller * poller,int num_handles,int num_wakeups_per_handle)643 Worker(Scheduler* scheduler, PosixEventPoller* poller, int num_handles,
644 int num_wakeups_per_handle)
645 : scheduler_(scheduler), poller_(poller) {
646 handles_.reserve(num_handles);
647 for (int i = 0; i < num_handles; i++) {
648 handles_.push_back(
649 new WakeupFdHandle(num_wakeups_per_handle, scheduler_, poller_));
650 }
651 WeakRef().release();
652 }
Orphaned()653 void Orphaned() override { signal.Notify(); }
Start()654 void Start() {
655 // Start executing Work(..).
656 scheduler_->Run([this]() { Work(); });
657 }
658
Wait()659 void Wait() {
660 signal.WaitForNotification();
661 WeakUnref();
662 }
663
664 private:
Work()665 void Work() {
666 auto result = g_event_poller->Work(24h, [this]() {
667 // Schedule next work instantiation immediately and take a Ref for
668 // the next instantiation.
669 Ref().release();
670 scheduler_->Run([this]() { Work(); });
671 });
672 ASSERT_TRUE(result == Poller::WorkResult::kOk ||
673 result == Poller::WorkResult::kKicked);
674 // Corresponds to the Ref taken for the current instantiation. If the
675 // result was Poller::WorkResult::kKicked, then the next work instantiation
676 // would not have been scheduled and the poll_again callback should have
677 // been deleted.
678 Unref();
679 }
680 Scheduler* scheduler_;
681 PosixEventPoller* poller_;
682 grpc_core::Notification signal;
683 std::vector<WakeupFdHandle*> handles_;
684 };
685
686 // This test creates kNumHandles file descriptors and kNumWakeupsPerHandle
687 // separate read events to the created Fds. The Fds use the NotifyOnRead API to
688 // wait for a read event, upon receiving a read event they process it
689 // immediately and schedule the wait for the next read event. A new read event
690 // is also generated for each fd in parallel after the previous one is
691 // processed.
TEST_F(EventPollerTest,TestMultipleHandles)692 TEST_F(EventPollerTest, TestMultipleHandles) {
693 static constexpr int kNumHandles = 100;
694 static constexpr int kNumWakeupsPerHandle = 100;
695 if (g_event_poller == nullptr) {
696 return;
697 }
698 Worker* worker = new Worker(Scheduler(), g_event_poller.get(), kNumHandles,
699 kNumWakeupsPerHandle);
700 worker->Start();
701 worker->Wait();
702 }
703
704 } // namespace
705 } // namespace experimental
706 } // namespace grpc_event_engine
707
main(int argc,char ** argv)708 int main(int argc, char** argv) {
709 ::testing::InitGoogleTest(&argc, argv);
710 gpr_mu_init(&g_mu);
711 auto poll_strategy = grpc_core::ConfigVars::Get().PollStrategy();
712 auto strings = absl::StrSplit(poll_strategy, ',');
713 if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
714 // Skip the test entirely if poll strategy is none.
715 return 0;
716 }
717 // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
718 // until we clear out the iomgr shutdown code.
719 grpc_init();
720 int r = RUN_ALL_TESTS();
721 grpc_shutdown();
722 return r;
723 }
724
725 #else // GRPC_POSIX_SOCKET_EV
726
main(int argc,char ** argv)727 int main(int argc, char** argv) { return 1; }
728
729 #endif // GRPC_POSIX_SOCKET_EV
730