1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GPR_WINDOWS
17 #include <thread>
18
19 #include <gmock/gmock.h>
20 #include <gtest/gtest.h>
21
22 #include "absl/status/status.h"
23 #include "absl/time/time.h"
24 #include "absl/types/variant.h"
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/log_windows.h>
28
29 #include "src/core/lib/event_engine/common_closures.h"
30 #include "src/core/lib/event_engine/poller.h"
31 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
32 #include "src/core/lib/event_engine/windows/iocp.h"
33 #include "src/core/lib/event_engine/windows/win_socket.h"
34 #include "src/core/lib/gprpp/notification.h"
35 #include "src/core/lib/iomgr/error.h"
36 #include "test/core/event_engine/windows/create_sockpair.h"
37
38 namespace {
39 using ::grpc_event_engine::experimental::AnyInvocableClosure;
40 using ::grpc_event_engine::experimental::CreateSockpair;
41 using ::grpc_event_engine::experimental::EventEngine;
42 using ::grpc_event_engine::experimental::IOCP;
43 using ::grpc_event_engine::experimental::Poller;
44 using ::grpc_event_engine::experimental::SelfDeletingClosure;
45 using ::grpc_event_engine::experimental::ThreadPool;
46 using ::grpc_event_engine::experimental::WinSocket;
47
48 // TODO(hork): replace with logging mechanism that plays nicely with:
49 // `ASSERT_OK(...) << GetErrorMessage(error, context);`
LogErrorMessage(int messageid,absl::string_view context)50 void LogErrorMessage(int messageid, absl::string_view context) {
51 char* utf8_message = gpr_format_message(messageid);
52 gpr_log(GPR_ERROR, "Error in %s: %s", context, utf8_message);
53 gpr_free(utf8_message);
54 }
55 } // namespace
56
57 class IOCPTest : public testing::Test {};
58
TEST_F(IOCPTest,ClientReceivesNotificationOfServerSend)59 TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
60 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
61 IOCP iocp(thread_pool.get());
62 SOCKET sockpair[2];
63 CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
64 auto wrapped_client_socket = iocp.Watch(sockpair[0]);
65 auto wrapped_server_socket = iocp.Watch(sockpair[1]);
66 grpc_core::Notification read_called;
67 grpc_core::Notification write_called;
68 DWORD flags = 0;
69 AnyInvocableClosure* on_read;
70 AnyInvocableClosure* on_write;
71 {
72 // When the client gets some data, ensure it matches what we expect.
73 WSABUF read_wsabuf;
74 read_wsabuf.len = 2048;
75 char read_char_buffer[2048];
76 read_wsabuf.buf = read_char_buffer;
77 DWORD bytes_rcvd;
78 on_read = new AnyInvocableClosure([win_socket = wrapped_client_socket.get(),
79 &read_called, &read_wsabuf]() {
80 gpr_log(GPR_DEBUG, "Notified on read");
81 EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
82 EXPECT_STREQ(read_wsabuf.buf, "hello!");
83 read_called.Notify();
84 });
85 wrapped_client_socket->NotifyOnRead(on_read);
86 int status = WSARecv(
87 wrapped_client_socket->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
88 &flags, wrapped_client_socket->read_info()->overlapped(), NULL);
89 // Expecting error 997, WSA_IO_PENDING
90 EXPECT_EQ(status, -1);
91 int last_error = WSAGetLastError();
92 EXPECT_EQ(last_error, WSA_IO_PENDING);
93 if (last_error != WSA_IO_PENDING) {
94 LogErrorMessage(last_error, "WSARecv");
95 }
96 }
97 {
98 on_write = new AnyInvocableClosure([&write_called] {
99 gpr_log(GPR_DEBUG, "Notified on write");
100 write_called.Notify();
101 });
102 wrapped_server_socket->NotifyOnWrite(on_write);
103 // Have the server send a message to the client
104 WSABUF write_wsabuf;
105 char write_char_buffer[2048] = "hello!";
106 write_wsabuf.len = 2048;
107 write_wsabuf.buf = write_char_buffer;
108 DWORD bytes_sent;
109 int status = WSASend(
110 wrapped_server_socket->raw_socket(), &write_wsabuf, 1, &bytes_sent, 0,
111 wrapped_server_socket->write_info()->overlapped(), NULL);
112 EXPECT_EQ(status, 0);
113 if (status != 0) {
114 LogErrorMessage(WSAGetLastError(), "WSASend");
115 }
116 }
117 // Doing work for WSASend
118 bool cb_invoked = false;
119 auto work_result = iocp.Work(std::chrono::seconds(10),
120 [&cb_invoked]() { cb_invoked = true; });
121 ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
122 ASSERT_TRUE(cb_invoked);
123 // Doing work for WSARecv
124 cb_invoked = false;
125 work_result = iocp.Work(std::chrono::seconds(10),
126 [&cb_invoked]() { cb_invoked = true; });
127 ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
128 ASSERT_TRUE(cb_invoked);
129 // wait for the callbacks to run
130 read_called.WaitForNotification();
131 write_called.WaitForNotification();
132
133 delete on_read;
134 delete on_write;
135 wrapped_client_socket->Shutdown();
136 wrapped_server_socket->Shutdown();
137 iocp.Shutdown();
138 thread_pool->Quiesce();
139 }
140
TEST_F(IOCPTest,IocpWorkTimeoutDueToNoNotificationRegistered)141 TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
142 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
143 IOCP iocp(thread_pool.get());
144 SOCKET sockpair[2];
145 CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
146 auto wrapped_client_socket = iocp.Watch(sockpair[0]);
147 grpc_core::Notification read_called;
148 DWORD flags = 0;
149 {
150 // Set the client to receive asynchronously
151 // Prepare a notification callback, but don't register it yet.
152 WSABUF read_wsabuf;
153 wrapped_client_socket->NotifyOnRead(
154 SelfDeletingClosure::Create([win_socket = wrapped_client_socket.get(),
155 &read_called, &read_wsabuf]() {
156 gpr_log(GPR_DEBUG, "Notified on read");
157 EXPECT_GE(win_socket->read_info()->result().bytes_transferred, 10u);
158 EXPECT_STREQ(read_wsabuf.buf, "hello!");
159 read_called.Notify();
160 }));
161 read_wsabuf.len = 2048;
162 char read_char_buffer[2048];
163 read_wsabuf.buf = read_char_buffer;
164 DWORD bytes_rcvd;
165 int status = WSARecv(
166 wrapped_client_socket->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
167 &flags, wrapped_client_socket->read_info()->overlapped(), NULL);
168 // Expecting error 997, WSA_IO_PENDING
169 EXPECT_EQ(status, -1);
170 int last_error = WSAGetLastError();
171 EXPECT_EQ(last_error, WSA_IO_PENDING);
172 if (last_error != WSA_IO_PENDING) {
173 LogErrorMessage(last_error, "WSARecv");
174 }
175 }
176 {
177 // Have the server send a message to the client. No need to track via IOCP
178 WSABUF write_wsabuf;
179 char write_char_buffer[2048] = "hello!";
180 write_wsabuf.len = 2048;
181 write_wsabuf.buf = write_char_buffer;
182 DWORD bytes_sent;
183 OVERLAPPED write_overlapped;
184 memset(&write_overlapped, 0, sizeof(OVERLAPPED));
185 int status = WSASend(sockpair[1], &write_wsabuf, 1, &bytes_sent, 0,
186 &write_overlapped, NULL);
187 EXPECT_EQ(status, 0);
188 if (status != 0) {
189 LogErrorMessage(WSAGetLastError(), "WSASend");
190 }
191 }
192 // IOCP::Work without any notification callbacks should still return Ok.
193 bool cb_invoked = false;
194 auto work_result = iocp.Work(std::chrono::seconds(2),
195 [&cb_invoked]() { cb_invoked = true; });
196 ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
197 ASSERT_TRUE(cb_invoked);
198 // wait for the callbacks to run
199 read_called.WaitForNotification();
200 wrapped_client_socket->Shutdown();
201 iocp.Shutdown();
202 thread_pool->Quiesce();
203 }
204
TEST_F(IOCPTest,KickWorks)205 TEST_F(IOCPTest, KickWorks) {
206 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
207 IOCP iocp(thread_pool.get());
208 grpc_core::Notification kicked;
209 thread_pool->Run([&iocp, &kicked] {
210 bool cb_invoked = false;
211 Poller::WorkResult result = iocp.Work(
212 std::chrono::seconds(30), [&cb_invoked]() { cb_invoked = true; });
213 ASSERT_TRUE(result == Poller::WorkResult::kKicked);
214 ASSERT_FALSE(cb_invoked);
215 kicked.Notify();
216 });
217 thread_pool->Run([&iocp] {
218 // give the worker thread a chance to start
219 absl::SleepFor(absl::Milliseconds(42));
220 iocp.Kick();
221 });
222 // wait for the callbacks to run
223 kicked.WaitForNotification();
224 thread_pool->Quiesce();
225 }
226
TEST_F(IOCPTest,KickThenShutdownCasusesNextWorkerToBeKicked)227 TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
228 // TODO(hork): evaluate if a kick count is going to be useful.
229 // This documents the existing poller's behavior of maintaining a kick count,
230 // but it's unclear if it's going to be needed.
231 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
232 IOCP iocp(thread_pool.get());
233 // kick twice
234 iocp.Kick();
235 iocp.Kick();
236 bool cb_invoked = false;
237 // Assert the next two WorkResults are kicks
238 auto result = iocp.Work(std::chrono::milliseconds(1),
239 [&cb_invoked]() { cb_invoked = true; });
240 ASSERT_TRUE(result == Poller::WorkResult::kKicked);
241 ASSERT_FALSE(cb_invoked);
242 result = iocp.Work(std::chrono::milliseconds(1),
243 [&cb_invoked]() { cb_invoked = true; });
244 ASSERT_TRUE(result == Poller::WorkResult::kKicked);
245 ASSERT_FALSE(cb_invoked);
246 // followed by a DeadlineExceeded
247 result = iocp.Work(std::chrono::milliseconds(1),
248 [&cb_invoked]() { cb_invoked = true; });
249 ASSERT_TRUE(result == Poller::WorkResult::kDeadlineExceeded);
250 ASSERT_FALSE(cb_invoked);
251 thread_pool->Quiesce();
252 }
253
TEST_F(IOCPTest,CrashOnWatchingAClosedSocket)254 TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) {
255 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
256 IOCP iocp(thread_pool.get());
257 SOCKET sockpair[2];
258 CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
259 closesocket(sockpair[0]);
260 ASSERT_DEATH({ auto wrapped_client_socket = iocp.Watch(sockpair[0]); }, "");
261 thread_pool->Quiesce();
262 }
263
TEST_F(IOCPTest,StressTestThousandsOfSockets)264 TEST_F(IOCPTest, StressTestThousandsOfSockets) {
265 // Start 10 threads, each with their own IOCP
266 // On each thread, create 50 socket pairs (100 sockets) and have them exchange
267 // a message before shutting down.
268 int thread_count = 10;
269 int sockets_per_thread = 50;
270 std::atomic<int> read_count{0};
271 std::atomic<int> write_count{0};
272 std::vector<std::thread> threads;
273 threads.reserve(thread_count);
274 for (int thread_n = 0; thread_n < thread_count; thread_n++) {
275 threads.emplace_back([sockets_per_thread, &read_count, &write_count] {
276 auto thread_pool = grpc_event_engine::experimental::MakeThreadPool(8);
277 IOCP iocp(thread_pool.get());
278 // Start a looping worker thread with a moderate timeout
279 std::thread iocp_worker([&iocp] {
280 Poller::WorkResult result;
281 do {
282 result = iocp.Work(std::chrono::seconds(1), []() {});
283 } while (result != Poller::WorkResult::kDeadlineExceeded);
284 });
285 for (int i = 0; i < sockets_per_thread; i++) {
286 SOCKET sockpair[2];
287 CreateSockpair(sockpair, iocp.GetDefaultSocketFlags());
288 auto wrapped_client_socket = iocp.Watch(sockpair[0]);
289 auto wrapped_server_socket = iocp.Watch(sockpair[1]);
290 auto* pclient = wrapped_client_socket.get();
291 pclient->NotifyOnRead(SelfDeletingClosure::Create(
292 [&read_count,
293 win_socket = std::move(wrapped_client_socket)]() mutable {
294 read_count.fetch_add(1);
295 win_socket->Shutdown();
296 }));
297 auto* pserver = wrapped_server_socket.get();
298 pserver->NotifyOnWrite(SelfDeletingClosure::Create(
299 [&write_count,
300 win_socket = std::move(wrapped_server_socket)]() mutable {
301 write_count.fetch_add(1);
302 win_socket->Shutdown();
303 }));
304 {
305 // Set the client to receive
306 WSABUF read_wsabuf;
307 read_wsabuf.len = 20;
308 char read_char_buffer[20];
309 read_wsabuf.buf = read_char_buffer;
310 DWORD bytes_rcvd;
311 DWORD flags = 0;
312 int status =
313 WSARecv(pclient->raw_socket(), &read_wsabuf, 1, &bytes_rcvd,
314 &flags, pclient->read_info()->overlapped(), NULL);
315 // Expecting error 997, WSA_IO_PENDING
316 EXPECT_EQ(status, -1);
317 int last_error = WSAGetLastError();
318 EXPECT_EQ(last_error, WSA_IO_PENDING);
319 if (last_error != WSA_IO_PENDING) {
320 LogErrorMessage(last_error, "WSARecv");
321 }
322 }
323 {
324 // Have the server send a message to the client.
325 WSABUF write_wsabuf;
326 char write_char_buffer[20] = "hello!";
327 write_wsabuf.len = 20;
328 write_wsabuf.buf = write_char_buffer;
329 DWORD bytes_sent;
330 int status =
331 WSASend(pserver->raw_socket(), &write_wsabuf, 1, &bytes_sent, 0,
332 pserver->write_info()->overlapped(), NULL);
333 if (status != 0) {
334 int wsa_error = WSAGetLastError();
335 if (wsa_error != WSA_IO_PENDING) {
336 LogErrorMessage(wsa_error, "WSASend");
337 FAIL() << "Error in WSASend. See logs";
338 }
339 }
340 }
341 }
342 iocp_worker.join();
343 thread_pool->Quiesce();
344 });
345 }
346 for (auto& t : threads) {
347 t.join();
348 }
349 absl::Time deadline = absl::Now() + absl::Seconds(30);
350 while (read_count.load() != thread_count * sockets_per_thread ||
351 write_count.load() != thread_count * sockets_per_thread) {
352 absl::SleepFor(absl::Milliseconds(50));
353 if (deadline < absl::Now()) {
354 FAIL() << "Deadline exceeded with " << read_count.load() << " reads and "
355 << write_count.load() << " writes";
356 }
357 }
358 ASSERT_EQ(read_count.load(), thread_count * sockets_per_thread);
359 ASSERT_EQ(write_count.load(), thread_count * sockets_per_thread);
360 }
361
main(int argc,char ** argv)362 int main(int argc, char** argv) {
363 ::testing::InitGoogleTest(&argc, argv);
364 grpc_init();
365 int status = RUN_ALL_TESTS();
366 grpc_shutdown();
367 return status;
368 }
369 #else // not GPR_WINDOWS
main(int,char **)370 int main(int /* argc */, char** /* argv */) { return 0; }
371 #endif
372