1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #ifdef GPR_WINDOWS
18
19 #include <gtest/gtest.h>
20
21 #include "absl/status/status.h"
22
23 #include <grpc/event_engine/event_engine.h>
24 #include <grpc/grpc.h>
25
26 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
27 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
28 #include "src/core/lib/event_engine/windows/iocp.h"
29 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
30 #include "src/core/lib/event_engine/windows/windows_engine.h"
31 #include "src/core/lib/gprpp/notification.h"
32 #include "src/core/lib/resource_quota/memory_quota.h"
33 #include "test/core/event_engine/windows/create_sockpair.h"
34
35 namespace grpc_event_engine {
36 namespace experimental {
37
38 using namespace std::chrono_literals;
39
40 class WindowsEndpointTest : public testing::Test {};
41
TEST_F(WindowsEndpointTest,BasicCommunication)42 TEST_F(WindowsEndpointTest, BasicCommunication) {
43 // TODO(hork): deduplicate against winsocket and iocp tests
44 // Setup
45 auto thread_pool = MakeThreadPool(8);
46 IOCP iocp(thread_pool.get());
47 grpc_core::MemoryQuota quota("endpoint_test");
48 SOCKET sockpair[2];
49 CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
50 auto wrapped_client_socket = iocp.Watch(sockpair[0]);
51 auto wrapped_server_socket = iocp.Watch(sockpair[1]);
52 sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
53 auto engine = std::make_shared<WindowsEventEngine>();
54 EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
55 sizeof(loopback_addr));
56 WindowsEndpoint client(addr, std::move(wrapped_client_socket),
57 quota.CreateMemoryAllocator("client"),
58 ChannelArgsEndpointConfig(), thread_pool.get(),
59 engine);
60 WindowsEndpoint server(addr, std::move(wrapped_server_socket),
61 quota.CreateMemoryAllocator("server"),
62 ChannelArgsEndpointConfig(), thread_pool.get(),
63 engine);
64 // Test
65 std::string message = "0xDEADBEEF";
66 grpc_core::Notification read_done;
67 SliceBuffer read_buffer;
68 EXPECT_FALSE(server.Read(
69 [&read_done, &message, &read_buffer](absl::Status) {
70 ASSERT_EQ(read_buffer.Count(), 1u);
71 auto slice = read_buffer.TakeFirst();
72 EXPECT_EQ(slice.as_string_view(), message);
73 read_done.Notify();
74 },
75 &read_buffer, nullptr));
76 grpc_core::Notification write_done;
77 SliceBuffer write_buffer;
78 write_buffer.Append(Slice::FromCopiedString(message));
79 EXPECT_FALSE(
80 client.Write([&write_done](absl::Status) { write_done.Notify(); },
81 &write_buffer, nullptr));
82 iocp.Work(5s, []() {});
83 // Cleanup
84 write_done.WaitForNotification();
85 read_done.WaitForNotification();
86 thread_pool->Quiesce();
87 }
88
TEST_F(WindowsEndpointTest,Conversation)89 TEST_F(WindowsEndpointTest, Conversation) {
90 // Setup
91 auto thread_pool = MakeThreadPool(8);
92 IOCP iocp(thread_pool.get());
93 grpc_core::MemoryQuota quota("endpoint_test");
94 SOCKET sockpair[2];
95 CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
96 sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
97 EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
98 sizeof(loopback_addr));
99 // Test
100 struct AppState {
101 AppState(const EventEngine::ResolvedAddress& addr,
102 std::unique_ptr<WinSocket> client,
103 std::unique_ptr<WinSocket> server, grpc_core::MemoryQuota& quota,
104 ThreadPool* thread_pool, std::shared_ptr<EventEngine> engine)
105 : client(addr, std::move(client), quota.CreateMemoryAllocator("client"),
106 ChannelArgsEndpointConfig(), thread_pool, engine),
107 server(addr, std::move(server), quota.CreateMemoryAllocator("server"),
108 ChannelArgsEndpointConfig(), thread_pool, engine) {}
109 grpc_core::Notification done;
110 WindowsEndpoint client;
111 WindowsEndpoint server;
112 SliceBuffer read_buffer;
113 SliceBuffer write_buffer;
114 const std::vector<std::string> messages{
115 "Java is to Javascript what car is to carpet. -Heilmann",
116 "Make it work, make it right, make it fast. -Beck",
117 "First, solve the problem. Then write the code. -Johnson",
118 "It works on my machine."};
119 // incremented after a corresponding read of a previous write
120 // if exchange%2 == 0, client -> server
121 // if exchange%2 == 1, server -> client
122 // if exchange == messages.length, done
123 std::atomic<size_t> exchange{0};
124
125 // Initiates a Write and corresponding Read on two endpoints.
126 void WriteAndQueueReader(WindowsEndpoint* writer, WindowsEndpoint* reader) {
127 write_buffer.Clear();
128 write_buffer.Append(Slice::FromCopiedString(messages[exchange]));
129 EXPECT_FALSE(
130 writer->Write([](absl::Status) {}, &write_buffer, /*args=*/nullptr));
131 auto cb = [this](absl::Status status) { ReadCB(status); };
132 read_buffer.Clear();
133 EXPECT_FALSE(reader->Read(cb, &read_buffer, /*args=*/nullptr));
134 }
135
136 // Asserts that the received string matches, then queues the next Write/Read
137 // pair
138 void ReadCB(absl::Status) {
139 ASSERT_EQ(read_buffer.Count(), 1u);
140 ASSERT_EQ(read_buffer.TakeFirst().as_string_view(), messages[exchange]);
141 if (++exchange == messages.size()) {
142 done.Notify();
143 return;
144 }
145 if (exchange % 2 == 0) {
146 WriteAndQueueReader(/*writer=*/&client, /*reader=*/&server);
147 } else {
148 WriteAndQueueReader(/*writer=*/&server, /*reader=*/&client);
149 }
150 }
151 };
152 auto engine = std::make_shared<WindowsEventEngine>();
153 AppState state(addr, /*client=*/iocp.Watch(sockpair[0]),
154 /*server=*/iocp.Watch(sockpair[1]), quota, thread_pool.get(),
155 engine);
156 state.WriteAndQueueReader(/*writer=*/&state.client, /*reader=*/&state.server);
157 while (iocp.Work(100ms, []() {}) == Poller::WorkResult::kOk ||
158 !state.done.HasBeenNotified()) {
159 }
160 // Cleanup
161 state.done.WaitForNotification();
162 thread_pool->Quiesce();
163 }
164
165 } // namespace experimental
166 } // namespace grpc_event_engine
167
main(int argc,char ** argv)168 int main(int argc, char** argv) {
169 ::testing::InitGoogleTest(&argc, argv);
170 grpc_init();
171 int status = RUN_ALL_TESTS();
172 grpc_shutdown();
173 return status;
174 }
175
176 #else // not GPR_WINDOWS
main(int,char **)177 int main(int /* argc */, char** /* argv */) { return 0; }
178 #endif
179