xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/windows/windows_endpoint_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #ifdef GPR_WINDOWS
18 
19 #include <gtest/gtest.h>
20 
21 #include "absl/status/status.h"
22 
23 #include <grpc/event_engine/event_engine.h>
24 #include <grpc/grpc.h>
25 
26 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
27 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
28 #include "src/core/lib/event_engine/windows/iocp.h"
29 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
30 #include "src/core/lib/event_engine/windows/windows_engine.h"
31 #include "src/core/lib/gprpp/notification.h"
32 #include "src/core/lib/resource_quota/memory_quota.h"
33 #include "test/core/event_engine/windows/create_sockpair.h"
34 
35 namespace grpc_event_engine {
36 namespace experimental {
37 
38 using namespace std::chrono_literals;
39 
40 class WindowsEndpointTest : public testing::Test {};
41 
TEST_F(WindowsEndpointTest,BasicCommunication)42 TEST_F(WindowsEndpointTest, BasicCommunication) {
43   // TODO(hork): deduplicate against winsocket and iocp tests
44   // Setup
45   auto thread_pool = MakeThreadPool(8);
46   IOCP iocp(thread_pool.get());
47   grpc_core::MemoryQuota quota("endpoint_test");
48   SOCKET sockpair[2];
49   CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
50   auto wrapped_client_socket = iocp.Watch(sockpair[0]);
51   auto wrapped_server_socket = iocp.Watch(sockpair[1]);
52   sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
53   auto engine = std::make_shared<WindowsEventEngine>();
54   EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
55                                     sizeof(loopback_addr));
56   WindowsEndpoint client(addr, std::move(wrapped_client_socket),
57                          quota.CreateMemoryAllocator("client"),
58                          ChannelArgsEndpointConfig(), thread_pool.get(),
59                          engine);
60   WindowsEndpoint server(addr, std::move(wrapped_server_socket),
61                          quota.CreateMemoryAllocator("server"),
62                          ChannelArgsEndpointConfig(), thread_pool.get(),
63                          engine);
64   // Test
65   std::string message = "0xDEADBEEF";
66   grpc_core::Notification read_done;
67   SliceBuffer read_buffer;
68   EXPECT_FALSE(server.Read(
69       [&read_done, &message, &read_buffer](absl::Status) {
70         ASSERT_EQ(read_buffer.Count(), 1u);
71         auto slice = read_buffer.TakeFirst();
72         EXPECT_EQ(slice.as_string_view(), message);
73         read_done.Notify();
74       },
75       &read_buffer, nullptr));
76   grpc_core::Notification write_done;
77   SliceBuffer write_buffer;
78   write_buffer.Append(Slice::FromCopiedString(message));
79   EXPECT_FALSE(
80       client.Write([&write_done](absl::Status) { write_done.Notify(); },
81                    &write_buffer, nullptr));
82   iocp.Work(5s, []() {});
83   // Cleanup
84   write_done.WaitForNotification();
85   read_done.WaitForNotification();
86   thread_pool->Quiesce();
87 }
88 
TEST_F(WindowsEndpointTest,Conversation)89 TEST_F(WindowsEndpointTest, Conversation) {
90   // Setup
91   auto thread_pool = MakeThreadPool(8);
92   IOCP iocp(thread_pool.get());
93   grpc_core::MemoryQuota quota("endpoint_test");
94   SOCKET sockpair[2];
95   CreateSockpair(sockpair, IOCP::GetDefaultSocketFlags());
96   sockaddr_in loopback_addr = GetSomeIpv4LoopbackAddress();
97   EventEngine::ResolvedAddress addr((sockaddr*)&loopback_addr,
98                                     sizeof(loopback_addr));
99   // Test
100   struct AppState {
101     AppState(const EventEngine::ResolvedAddress& addr,
102              std::unique_ptr<WinSocket> client,
103              std::unique_ptr<WinSocket> server, grpc_core::MemoryQuota& quota,
104              ThreadPool* thread_pool, std::shared_ptr<EventEngine> engine)
105         : client(addr, std::move(client), quota.CreateMemoryAllocator("client"),
106                  ChannelArgsEndpointConfig(), thread_pool, engine),
107           server(addr, std::move(server), quota.CreateMemoryAllocator("server"),
108                  ChannelArgsEndpointConfig(), thread_pool, engine) {}
109     grpc_core::Notification done;
110     WindowsEndpoint client;
111     WindowsEndpoint server;
112     SliceBuffer read_buffer;
113     SliceBuffer write_buffer;
114     const std::vector<std::string> messages{
115         "Java is to Javascript what car is to carpet. -Heilmann",
116         "Make it work, make it right, make it fast. -Beck",
117         "First, solve the problem. Then write the code. -Johnson",
118         "It works on my machine."};
119     // incremented after a corresponding read of a previous write
120     // if exchange%2 == 0, client -> server
121     // if exchange%2 == 1, server -> client
122     // if exchange == messages.length, done
123     std::atomic<size_t> exchange{0};
124 
125     // Initiates a Write and corresponding Read on two endpoints.
126     void WriteAndQueueReader(WindowsEndpoint* writer, WindowsEndpoint* reader) {
127       write_buffer.Clear();
128       write_buffer.Append(Slice::FromCopiedString(messages[exchange]));
129       EXPECT_FALSE(
130           writer->Write([](absl::Status) {}, &write_buffer, /*args=*/nullptr));
131       auto cb = [this](absl::Status status) { ReadCB(status); };
132       read_buffer.Clear();
133       EXPECT_FALSE(reader->Read(cb, &read_buffer, /*args=*/nullptr));
134     }
135 
136     // Asserts that the received string matches, then queues the next Write/Read
137     // pair
138     void ReadCB(absl::Status) {
139       ASSERT_EQ(read_buffer.Count(), 1u);
140       ASSERT_EQ(read_buffer.TakeFirst().as_string_view(), messages[exchange]);
141       if (++exchange == messages.size()) {
142         done.Notify();
143         return;
144       }
145       if (exchange % 2 == 0) {
146         WriteAndQueueReader(/*writer=*/&client, /*reader=*/&server);
147       } else {
148         WriteAndQueueReader(/*writer=*/&server, /*reader=*/&client);
149       }
150     }
151   };
152   auto engine = std::make_shared<WindowsEventEngine>();
153   AppState state(addr, /*client=*/iocp.Watch(sockpair[0]),
154                  /*server=*/iocp.Watch(sockpair[1]), quota, thread_pool.get(),
155                  engine);
156   state.WriteAndQueueReader(/*writer=*/&state.client, /*reader=*/&state.server);
157   while (iocp.Work(100ms, []() {}) == Poller::WorkResult::kOk ||
158          !state.done.HasBeenNotified()) {
159   }
160   // Cleanup
161   state.done.WaitForNotification();
162   thread_pool->Quiesce();
163 }
164 
165 }  // namespace experimental
166 }  // namespace grpc_event_engine
167 
main(int argc,char ** argv)168 int main(int argc, char** argv) {
169   ::testing::InitGoogleTest(&argc, argv);
170   grpc_init();
171   int status = RUN_ALL_TESTS();
172   grpc_shutdown();
173   return status;
174 }
175 
176 #else  // not GPR_WINDOWS
main(int,char **)177 int main(int /* argc */, char** /* argv */) { return 0; }
178 #endif
179