xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/windows/iocp_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 #include <thread>
18 
19 #include <gmock/gmock.h>
20 #include <gtest/gtest.h>
21 
22 #include "absl/status/status.h"
23 #include "absl/time/time.h"
24 #include "absl/types/variant.h"
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/log_windows.h>
28 
29 #include "src/core/lib/event_engine/common_closures.h"
30 #include "src/core/lib/event_engine/poller.h"
31 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
32 #include "src/core/lib/event_engine/windows/iocp.h"
33 #include "src/core/lib/event_engine/windows/win_socket.h"
34 #include "src/core/lib/gprpp/notification.h"
35 #include "src/core/lib/iomgr/error.h"
36 #include "test/core/event_engine/windows/create_sockpair.h"
37 
38 namespace {
39 using ::grpc_event_engine::experimental::AnyInvocableClosure;
40 using ::grpc_event_engine::experimental::CreateSockpair;
41 using ::grpc_event_engine::experimental::EventEngine;
42 using ::grpc_event_engine::experimental::IOCP;
43 using ::grpc_event_engine::experimental::Poller;
44 using ::grpc_event_engine::experimental::SelfDeletingClosure;
45 using ::grpc_event_engine::experimental::ThreadPool;
46 using ::grpc_event_engine::experimental::WinSocket;
47 
48 // TODO(hork): replace with logging mechanism that plays nicely with:
49 //   `ASSERT_OK(...) << GetErrorMessage(error, context);`
LogErrorMessage(int messageid,absl::string_view context)50 void LogErrorMessage(int messageid, absl::string_view context) {
51   char* utf8_message = gpr_format_message(messageid);
52   gpr_log(GPR_ERROR, "Error in %s: %s", context, utf8_message);
53   gpr_free(utf8_message);
54 }
55 }  // namespace
56 
57 class IOCPTest : public testing::Test {};
58 
TEST_F(IOCPTest,ClientReceivesNotificationOfServerSend)59 TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
60   auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
61   IOCP iocp(thread_pool.get());
62   SOCKET sockpair[2];
63   CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
64   auto wrapped_client_socket = iocp.Watch(sockpair[0]);
65   auto wrapped_server_socket = iocp.Watch(sockpair[1]);
66   grpc_core::Notification read_called;
67   grpc_core::Notification write_called;
68   DWORD flags = 0;
69   AnyInvocableClosure* on_read;
70   AnyInvocableClosure* on_write;
71   {
72     // When the client gets some data, ensure it matches what we expect.
73     WSABUF read_wsabuf;
74     read_wsabuf.len = 2048;
75     char read_char_buffer[2048];
76     read_wsabuf.buf = read_char_buffer;
77     DWORD bytes_rcvd;
78     on_read = new AnyInvocableClosure([win_socket = wrapped_client_socket.get(),
79                                        &read_called, &read_wsabuf]() {
80       gpr_log(GPR_DEBUG, "Notified on read");
81       EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
82       EXPECT_STREQ(read_wsabuf.buf, "hello!");
83       read_called.Notify();
84     });
85     wrapped_client_socket->NotifyOnRead(on_read);
86     int status = WSARecv(
87         wrapped_client_socket->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
88         &flags, wrapped_client_socket->read_info()->overlapped(), NULL);
89     // Expecting error 997, WSA_IO_PENDING
90     EXPECT_EQ(status, -1);
91     int last_error = WSAGetLastError();
92     EXPECT_EQ(last_error, WSA_IO_PENDING);
93     if (last_error != WSA_IO_PENDING) {
94       LogErrorMessage(last_error, "WSARecv");
95     }
96   }
97   {
98     on_write = new AnyInvocableClosure([&write_called] {
99       gpr_log(GPR_DEBUG, "Notified on write");
100       write_called.Notify();
101     });
102     wrapped_server_socket->NotifyOnWrite(on_write);
103     // Have the server send a message to the client
104     WSABUF write_wsabuf;
105     char write_char_buffer[2048] = "hello!";
106     write_wsabuf.len = 2048;
107     write_wsabuf.buf = write_char_buffer;
108     DWORD bytes_sent;
109     int status = WSASend(
110         wrapped_server_socket->raw_socket(), &write_wsabuf, 1, &bytes_sent, 0,
111         wrapped_server_socket->write_info()->overlapped(), NULL);
112     EXPECT_EQ(status, 0);
113     if (status != 0) {
114       LogErrorMessage(WSAGetLastError(), "WSASend");
115     }
116   }
117   // Doing work for WSASend
118   bool cb_invoked = false;
119   auto work_result = iocp.Work(std::chrono::seconds(10),
120                                [&cb_invoked]() { cb_invoked = true; });
121   ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
122   ASSERT_TRUE(cb_invoked);
123   // Doing work for WSARecv
124   cb_invoked = false;
125   work_result = iocp.Work(std::chrono::seconds(10),
126                           [&cb_invoked]() { cb_invoked = true; });
127   ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
128   ASSERT_TRUE(cb_invoked);
129   // wait for the callbacks to run
130   read_called.WaitForNotification();
131   write_called.WaitForNotification();
132 
133   delete on_read;
134   delete on_write;
135   wrapped_client_socket->Shutdown();
136   wrapped_server_socket->Shutdown();
137   iocp.Shutdown();
138   thread_pool->Quiesce();
139 }
140 
TEST_F(IOCPTest,IocpWorkTimeoutDueToNoNotificationRegistered)141 TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
142   auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
143   IOCP iocp(thread_pool.get());
144   SOCKET sockpair[2];
145   CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
146   auto wrapped_client_socket = iocp.Watch(sockpair[0]);
147   grpc_core::Notification read_called;
148   DWORD flags = 0;
149   {
150     // Set the client to receive asynchronously
151     // Prepare a notification callback, but don't register it yet.
152     WSABUF read_wsabuf;
153     wrapped_client_socket->NotifyOnRead(
154         SelfDeletingClosure::Create([win_socket = wrapped_client_socket.get(),
155                                      &read_called, &read_wsabuf]() {
156           gpr_log(GPR_DEBUG, "Notified on read");
157           EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
158           EXPECT_STREQ(read_wsabuf.buf, "hello!");
159           read_called.Notify();
160         }));
161     read_wsabuf.len = 2048;
162     char read_char_buffer[2048];
163     read_wsabuf.buf = read_char_buffer;
164     DWORD bytes_rcvd;
165     int status = WSARecv(
166         wrapped_client_socket->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
167         &flags, wrapped_client_socket->read_info()->overlapped(), NULL);
168     // Expecting error 997, WSA_IO_PENDING
169     EXPECT_EQ(status, -1);
170     int last_error = WSAGetLastError();
171     EXPECT_EQ(last_error, WSA_IO_PENDING);
172     if (last_error != WSA_IO_PENDING) {
173       LogErrorMessage(last_error, "WSARecv");
174     }
175   }
176   {
177     // Have the server send a message to the client. No need to track via IOCP
178     WSABUF write_wsabuf;
179     char write_char_buffer[2048] = "hello!";
180     write_wsabuf.len = 2048;
181     write_wsabuf.buf = write_char_buffer;
182     DWORD bytes_sent;
183     OVERLAPPED write_overlapped;
184     memset(&write_overlapped, 0, sizeof(OVERLAPPED));
185     int status = WSASend(sockpair[1], &write_wsabuf, 1, &bytes_sent, 0,
186                          &write_overlapped, NULL);
187     EXPECT_EQ(status, 0);
188     if (status != 0) {
189       LogErrorMessage(WSAGetLastError(), "WSASend");
190     }
191   }
192   // IOCP::Work without any notification callbacks should still return Ok.
193   bool cb_invoked = false;
194   auto work_result = iocp.Work(std::chrono::seconds(2),
195                                [&cb_invoked]() { cb_invoked = true; });
196   ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
197   ASSERT_TRUE(cb_invoked);
198   // wait for the callbacks to run
199   read_called.WaitForNotification();
200   wrapped_client_socket->Shutdown();
201   iocp.Shutdown();
202   thread_pool->Quiesce();
203 }
204 
TEST_F(IOCPTest,KickWorks)205 TEST_F(IOCPTest, KickWorks) {
206   auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
207   IOCP iocp(thread_pool.get());
208   grpc_core::Notification kicked;
209   thread_pool->Run([&iocp, &kicked] {
210     bool cb_invoked = false;
211     Poller::WorkResult result = iocp.Work(
212         std::chrono::seconds(30), [&cb_invoked]() { cb_invoked = true; });
213     ASSERT_TRUE(result == Poller::WorkResult::kKicked);
214     ASSERT_FALSE(cb_invoked);
215     kicked.Notify();
216   });
217   thread_pool->Run([&iocp] {
218     // give the worker thread a chance to start
219     absl::SleepFor(absl::Milliseconds(42));
220     iocp.Kick();
221   });
222   // wait for the callbacks to run
223   kicked.WaitForNotification();
224   thread_pool->Quiesce();
225 }
226 
TEST_F(IOCPTest,KickThenShutdownCasusesNextWorkerToBeKicked)227 TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
228   // TODO(hork): evaluate if a kick count is going to be useful.
229   // This documents the existing poller's behavior of maintaining a kick count,
230   // but it's unclear if it's going to be needed.
231   auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
232   IOCP iocp(thread_pool.get());
233   // kick twice
234   iocp.Kick();
235   iocp.Kick();
236   bool cb_invoked = false;
237   // Assert the next two WorkResults are kicks
238   auto result = iocp.Work(std::chrono::milliseconds(1),
239                           [&cb_invoked]() { cb_invoked = true; });
240   ASSERT_TRUE(result == Poller::WorkResult::kKicked);
241   ASSERT_FALSE(cb_invoked);
242   result = iocp.Work(std::chrono::milliseconds(1),
243                      [&cb_invoked]() { cb_invoked = true; });
244   ASSERT_TRUE(result == Poller::WorkResult::kKicked);
245   ASSERT_FALSE(cb_invoked);
246   // followed by a DeadlineExceeded
247   result = iocp.Work(std::chrono::milliseconds(1),
248                      [&cb_invoked]() { cb_invoked = true; });
249   ASSERT_TRUE(result == Poller::WorkResult::kDeadlineExceeded);
250   ASSERT_FALSE(cb_invoked);
251   thread_pool->Quiesce();
252 }
253 
TEST_F(IOCPTest,CrashOnWatchingAClosedSocket)254 TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) {
255   auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
256   IOCP iocp(thread_pool.get());
257   SOCKET sockpair[2];
258   CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
259   closesocket(sockpair[0]);
260   ASSERT_DEATH({ auto wrapped_client_socket = iocp.Watch(sockpair[0]); }, "");
261   thread_pool->Quiesce();
262 }
263 
TEST_F(IOCPTest,StressTestThousandsOfSockets)264 TEST_F(IOCPTest, StressTestThousandsOfSockets) {
265   // Start 10 threads, each with their own IOCP
266   // On each thread, create 50 socket pairs (100 sockets) and have them exchange
267   // a message before shutting down.
268   int thread_count = 10;
269   int sockets_per_thread = 50;
270   std::atomic<int> read_count{0};
271   std::atomic<int> write_count{0};
272   std::vector<std::thread> threads;
273   threads.reserve(thread_count);
274   for (int thread_n = 0; thread_n < thread_count; thread_n++) {
275     threads.emplace_back([sockets_per_thread, &read_count, &write_count] {
276       auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
277       IOCP iocp(thread_pool.get());
278       // Start a looping worker thread with a moderate timeout
279       std::thread iocp_worker([&iocp] {
280         Poller::WorkResult result;
281         do {
282           result = iocp.Work(std::chrono::seconds(1), []() {});
283         } while (result != Poller::WorkResult::kDeadlineExceeded);
284       });
285       for (int i = 0; i < sockets_per_thread; i++) {
286         SOCKET sockpair[2];
287         CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
288         auto wrapped_client_socket = iocp.Watch(sockpair[0]);
289         auto wrapped_server_socket = iocp.Watch(sockpair[1]);
290         auto* pclient = wrapped_client_socket.get();
291         pclient->NotifyOnRead(SelfDeletingClosure::Create(
292             [&read_count,
293              win_socket = std::move(wrapped_client_socket)]() mutable {
294               read_count.fetch_add(1);
295               win_socket->Shutdown();
296             }));
297         auto* pserver = wrapped_server_socket.get();
298         pserver->NotifyOnWrite(SelfDeletingClosure::Create(
299             [&write_count,
300              win_socket = std::move(wrapped_server_socket)]() mutable {
301               write_count.fetch_add(1);
302               win_socket->Shutdown();
303             }));
304         {
305           // Set the client to receive
306           WSABUF read_wsabuf;
307           read_wsabuf.len = 20;
308           char read_char_buffer[20];
309           read_wsabuf.buf = read_char_buffer;
310           DWORD bytes_rcvd;
311           DWORD flags = 0;
312           int status =
313               WSARecv(pclient->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
314                       &flags, pclient->read_info()->overlapped(), NULL);
315           // Expecting error 997, WSA_IO_PENDING
316           EXPECT_EQ(status, -1);
317           int last_error = WSAGetLastError();
318           EXPECT_EQ(last_error, WSA_IO_PENDING);
319           if (last_error != WSA_IO_PENDING) {
320             LogErrorMessage(last_error, "WSARecv");
321           }
322         }
323         {
324           // Have the server send a message to the client.
325           WSABUF write_wsabuf;
326           char write_char_buffer[20] = "hello!";
327           write_wsabuf.len = 20;
328           write_wsabuf.buf = write_char_buffer;
329           DWORD bytes_sent;
330           int status =
331               WSASend(pserver->raw_socket(), &write_wsabuf, 1, &bytes_sent, 0,
332                       pserver->write_info()->overlapped(), NULL);
333           if (status != 0) {
334             int wsa_error = WSAGetLastError();
335             if (wsa_error != WSA_IO_PENDING) {
336               LogErrorMessage(wsa_error, "WSASend");
337               FAIL() << "Error in WSASend. See logs";
338             }
339           }
340         }
341       }
342       iocp_worker.join();
343       thread_pool->Quiesce();
344     });
345   }
346   for (auto& t : threads) {
347     t.join();
348   }
349   absl::Time deadline = absl::Now() + absl::Seconds(30);
350   while (read_count.load() != thread_count * sockets_per_thread ||
351          write_count.load() != thread_count * sockets_per_thread) {
352     absl::SleepFor(absl::Milliseconds(50));
353     if (deadline < absl::Now()) {
354       FAIL() << "Deadline exceeded with " << read_count.load() << " reads and "
355              << write_count.load() << " writes";
356     }
357   }
358   ASSERT_EQ(read_count.load(), thread_count * sockets_per_thread);
359   ASSERT_EQ(write_count.load(), thread_count * sockets_per_thread);
360 }
361 
main(int argc,char ** argv)362 int main(int argc, char** argv) {
363   ::testing::InitGoogleTest(&argc, argv);
364   grpc_init();
365   int status = RUN_ALL_TESTS();
366   grpc_shutdown();
367   return status;
368 }
369 #else  // not GPR_WINDOWS
main(int,char **)370 int main(int /* argc */, char** /* argv */) { return 0; }
371 #endif
372