1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 
18 #include <memory>
19 
20 #include "absl/status/status.h"
21 #include "absl/status/statusor.h"
22 #include "absl/strings/string_view.h"
23 
24 #include <grpc/event_engine/endpoint_config.h>
25 #include <grpc/event_engine/event_engine.h>
26 #include <grpc/event_engine/memory_allocator.h>
27 #include <grpc/event_engine/slice_buffer.h>
28 #include <grpc/support/cpu.h>
29 
30 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
31 #include "src/core/lib/event_engine/common_closures.h"
32 #include "src/core/lib/event_engine/handle_containers.h"
33 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
34 #include "src/core/lib/event_engine/tcp_socket_utils.h"
35 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
36 #include "src/core/lib/event_engine/trace.h"
37 #include "src/core/lib/event_engine/utils.h"
38 #include "src/core/lib/event_engine/windows/iocp.h"
39 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
40 #include "src/core/lib/event_engine/windows/windows_engine.h"
41 #include "src/core/lib/event_engine/windows/windows_listener.h"
42 #include "src/core/lib/gprpp/crash.h"
43 #include "src/core/lib/gprpp/sync.h"
44 #include "src/core/lib/gprpp/time.h"
45 #include "src/core/lib/iomgr/error.h"
46 
47 namespace grpc_event_engine {
48 namespace experimental {
49 
50 // ---- IOCPWorkClosure ----
51 
IOCPWorkClosure(ThreadPool * thread_pool,IOCP * iocp)52 WindowsEventEngine::IOCPWorkClosure::IOCPWorkClosure(ThreadPool* thread_pool,
53                                                      IOCP* iocp)
54     : thread_pool_(thread_pool), iocp_(iocp) {
55   thread_pool_->Run(this);
56 }
57 
Run()58 void WindowsEventEngine::IOCPWorkClosure::Run() {
59   auto result = iocp_->Work(std::chrono::seconds(60), [this] {
60     workers_.fetch_add(1);
61     thread_pool_->Run(this);
62   });
63   if (result == Poller::WorkResult::kDeadlineExceeded) {
64     // iocp received no messages. restart the worker
65     workers_.fetch_add(1);
66     thread_pool_->Run(this);
67   }
68   if (workers_.fetch_sub(1) == 1) done_signal_.Notify();
69 }
70 
WaitForShutdown()71 void WindowsEventEngine::IOCPWorkClosure::WaitForShutdown() {
72   done_signal_.WaitForNotification();
73 }
74 
75 // ---- WindowsEventEngine ----
76 
77 // TODO(hork): The iomgr timer and execution engine can be reused. It should
78 // be separated out from the posix_engine and instantiated as components. It is
79 // effectively copied below.
80 
81 struct WindowsEventEngine::TimerClosure final : public EventEngine::Closure {
82   absl::AnyInvocable<void()> cb;
83   Timer timer;
84   WindowsEventEngine* engine;
85   EventEngine::TaskHandle handle;
86 
Rungrpc_event_engine::experimental::WindowsEventEngine::TimerClosure87   void Run() override {
88     GRPC_EVENT_ENGINE_TRACE(
89         "WindowsEventEngine:%p executing callback:%s", engine,
90         HandleToString<EventEngine::TaskHandle>(handle).c_str());
91     {
92       grpc_core::MutexLock lock(&engine->task_mu_);
93       engine->known_handles_.erase(handle);
94     }
95     cb();
96     delete this;
97   }
98 };
99 
WindowsEventEngine()100 WindowsEventEngine::WindowsEventEngine()
101     : thread_pool_(
102           MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u))),
103       iocp_(thread_pool_.get()),
104       timer_manager_(thread_pool_),
105       iocp_worker_(thread_pool_.get(), &iocp_) {
106   WSADATA wsaData;
107   int status = WSAStartup(MAKEWORD(2, 0), &wsaData);
108   GPR_ASSERT(status == 0);
109 }
110 
~WindowsEventEngine()111 WindowsEventEngine::~WindowsEventEngine() {
112   GRPC_EVENT_ENGINE_TRACE("~WindowsEventEngine::%p", this);
113   {
114     task_mu_.Lock();
115     if (!known_handles_.empty()) {
116       if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
117         for (auto handle : known_handles_) {
118           gpr_log(GPR_ERROR,
119                   "WindowsEventEngine:%p uncleared TaskHandle at shutdown:%s",
120                   this,
121                   HandleToString<EventEngine::TaskHandle>(handle).c_str());
122         }
123       }
124       // Allow a small grace period for timers to be run before shutting down.
125       auto deadline =
126           timer_manager_.Now() + grpc_core::Duration::FromSecondsAsDouble(10);
127       while (!known_handles_.empty() && timer_manager_.Now() < deadline) {
128         if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
129           GRPC_LOG_EVERY_N_SEC(1, GPR_DEBUG, "Waiting for timers. %d remaining",
130                                known_handles_.size());
131         }
132         task_mu_.Unlock();
133         absl::SleepFor(absl::Milliseconds(200));
134         task_mu_.Lock();
135       }
136     }
137     GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
138     task_mu_.Unlock();
139   }
140   iocp_.Kick();
141   iocp_worker_.WaitForShutdown();
142   iocp_.Shutdown();
143   GPR_ASSERT(WSACleanup() == 0);
144   timer_manager_.Shutdown();
145   thread_pool_->Quiesce();
146 }
147 
Cancel(EventEngine::TaskHandle handle)148 bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) {
149   grpc_core::MutexLock lock(&task_mu_);
150   if (!known_handles_.contains(handle)) return false;
151   GRPC_EVENT_ENGINE_TRACE(
152       "WindowsEventEngine::%p cancelling %s", this,
153       HandleToString<EventEngine::TaskHandle>(handle).c_str());
154   auto* cd = reinterpret_cast<TimerClosure*>(handle.keys[0]);
155   bool r = timer_manager_.TimerCancel(&cd->timer);
156   known_handles_.erase(handle);
157   if (r) delete cd;
158   return r;
159 }
160 
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)161 EventEngine::TaskHandle WindowsEventEngine::RunAfter(
162     Duration when, absl::AnyInvocable<void()> closure) {
163   return RunAfterInternal(when, std::move(closure));
164 }
165 
RunAfter(Duration when,EventEngine::Closure * closure)166 EventEngine::TaskHandle WindowsEventEngine::RunAfter(
167     Duration when, EventEngine::Closure* closure) {
168   return RunAfterInternal(when, [closure]() { closure->Run(); });
169 }
170 
Run(absl::AnyInvocable<void ()> closure)171 void WindowsEventEngine::Run(absl::AnyInvocable<void()> closure) {
172   thread_pool_->Run(std::move(closure));
173 }
174 
Run(EventEngine::Closure * closure)175 void WindowsEventEngine::Run(EventEngine::Closure* closure) {
176   thread_pool_->Run(closure);
177 }
178 
RunAfterInternal(Duration when,absl::AnyInvocable<void ()> cb)179 EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal(
180     Duration when, absl::AnyInvocable<void()> cb) {
181   auto when_ts = ToTimestamp(timer_manager_.Now(), when);
182   auto* cd = new TimerClosure;
183   cd->cb = std::move(cb);
184   cd->engine = this;
185   EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
186                                  aba_token_.fetch_add(1)};
187   grpc_core::MutexLock lock(&task_mu_);
188   known_handles_.insert(handle);
189   cd->handle = handle;
190   GRPC_EVENT_ENGINE_TRACE(
191       "WindowsEventEngine:%p scheduling callback:%s", this,
192       HandleToString<EventEngine::TaskHandle>(handle).c_str());
193   timer_manager_.TimerInit(&cd->timer, when_ts, cd);
194   return handle;
195 }
196 
GetDNSResolver(EventEngine::DNSResolver::ResolverOptions const &)197 std::unique_ptr<EventEngine::DNSResolver> WindowsEventEngine::GetDNSResolver(
198     EventEngine::DNSResolver::ResolverOptions const& /*options*/) {
199   grpc_core::Crash("unimplemented");
200 }
201 
IsWorkerThread()202 bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
203 
OnConnectCompleted(std::shared_ptr<ConnectionState> state)204 void WindowsEventEngine::OnConnectCompleted(
205     std::shared_ptr<ConnectionState> state) {
206   absl::StatusOr<std::unique_ptr<WindowsEndpoint>> endpoint;
207   EventEngine::OnConnectCallback cb;
208   {
209     // Connection attempt complete!
210     grpc_core::MutexLock lock(&state->mu);
211     cb = std::move(state->on_connected_user_callback);
212     state->on_connected = nullptr;
213     {
214       grpc_core::MutexLock handle_lock(&connection_mu_);
215       known_connection_handles_.erase(state->connection_handle);
216     }
217     const auto& overlapped_result = state->socket->write_info()->result();
218     // return early if we cannot cancel the connection timeout timer.
219     if (!Cancel(state->timer_handle)) return;
220     if (overlapped_result.wsa_error != 0) {
221       state->socket->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
222       endpoint = GRPC_WSA_ERROR(overlapped_result.wsa_error, "ConnectEx");
223     } else {
224       // This code should be running in a thread pool thread already, so the
225       // callback can be run directly.
226       ChannelArgsEndpointConfig cfg;
227       endpoint = std::make_unique<WindowsEndpoint>(
228           state->address, std::move(state->socket), std::move(state->allocator),
229           cfg, thread_pool_.get(), shared_from_this());
230     }
231   }
232   cb(std::move(endpoint));
233 }
234 
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator memory_allocator,Duration timeout)235 EventEngine::ConnectionHandle WindowsEventEngine::Connect(
236     OnConnectCallback on_connect, const ResolvedAddress& addr,
237     const EndpointConfig& /* args */, MemoryAllocator memory_allocator,
238     Duration timeout) {
239   // TODO(hork): utilize the endpoint config
240   absl::Status status;
241   int istatus;
242   auto uri = ResolvedAddressToURI(addr);
243   if (!uri.ok()) {
244     Run([on_connect = std::move(on_connect), status = uri.status()]() mutable {
245       on_connect(status);
246     });
247     return EventEngine::ConnectionHandle::kInvalid;
248   }
249   GRPC_EVENT_ENGINE_TRACE("EventEngine::%p connecting to %s", this,
250                           uri->c_str());
251   // Use dualstack sockets where available.
252   ResolvedAddress address = addr;
253   ResolvedAddress addr6_v4mapped;
254   if (ResolvedAddressToV4Mapped(addr, &addr6_v4mapped)) {
255     address = addr6_v4mapped;
256   }
257   SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
258                           IOCP::GetDefaultSocketFlags());
259   if (sock == INVALID_SOCKET) {
260     Run([on_connect = std::move(on_connect),
261          status = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket")]() mutable {
262       on_connect(status);
263     });
264     return EventEngine::ConnectionHandle::kInvalid;
265   }
266   status = PrepareSocket(sock);
267   if (!status.ok()) {
268     Run([on_connect = std::move(on_connect), status]() mutable {
269       on_connect(status);
270     });
271     return EventEngine::ConnectionHandle::kInvalid;
272   }
273   // Grab the function pointer for ConnectEx for that specific socket It may
274   // change depending on the interface.
275   LPFN_CONNECTEX ConnectEx;
276   GUID guid = WSAID_CONNECTEX;
277   DWORD ioctl_num_bytes;
278   istatus = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
279                      sizeof(guid), &ConnectEx, sizeof(ConnectEx),
280                      &ioctl_num_bytes, nullptr, nullptr);
281   if (istatus != 0) {
282     Run([on_connect = std::move(on_connect),
283          status = GRPC_WSA_ERROR(
284              WSAGetLastError(),
285              "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)")]() mutable {
286       on_connect(status);
287     });
288     return EventEngine::ConnectionHandle::kInvalid;
289   }
290   // bind the local address
291   auto local_address = ResolvedAddressMakeWild6(0);
292   istatus = bind(sock, local_address.address(), local_address.size());
293   if (istatus != 0) {
294     Run([on_connect = std::move(on_connect),
295          status = GRPC_WSA_ERROR(WSAGetLastError(), "bind")]() mutable {
296       on_connect(status);
297     });
298     return EventEngine::ConnectionHandle::kInvalid;
299   }
300   // Connect
301   auto watched_socket = iocp_.Watch(sock);
302   auto* info = watched_socket->write_info();
303   bool success =
304       ConnectEx(watched_socket->raw_socket(), address.address(), address.size(),
305                 nullptr, 0, nullptr, info->overlapped());
306   // It wouldn't be unusual to get a success immediately. But we'll still get an
307   // IOCP notification, so let's ignore it.
308   if (!success) {
309     int last_error = WSAGetLastError();
310     if (last_error != ERROR_IO_PENDING) {
311       Run([on_connect = std::move(on_connect),
312            status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable {
313         on_connect(status);
314       });
315       watched_socket->Shutdown(DEBUG_LOCATION, "ConnectEx");
316       return EventEngine::ConnectionHandle::kInvalid;
317     }
318   }
319   GPR_ASSERT(watched_socket != nullptr);
320   auto connection_state = std::make_shared<ConnectionState>();
321   grpc_core::MutexLock lock(&connection_state->mu);
322   connection_state->address = address;
323   connection_state->socket = std::move(watched_socket);
324   connection_state->on_connected_user_callback = std::move(on_connect);
325   connection_state->allocator = std::move(memory_allocator);
326   connection_state->on_connected =
327       SelfDeletingClosure::Create([this, connection_state]() mutable {
328         OnConnectCompleted(std::move(connection_state));
329       });
330   {
331     grpc_core::MutexLock conn_lock(&connection_mu_);
332     connection_state->connection_handle =
333         ConnectionHandle{reinterpret_cast<intptr_t>(connection_state.get()),
334                          aba_token_.fetch_add(1)};
335     known_connection_handles_.insert(connection_state->connection_handle);
336   }
337   connection_state->timer_handle =
338       RunAfter(timeout, [this, connection_state]() {
339         grpc_core::MutexLock lock(&connection_state->mu);
340         if (CancelConnectFromDeadlineTimer(connection_state.get())) {
341           connection_state->on_connected_user_callback(
342               absl::DeadlineExceededError("Connection timed out"));
343         }
344         // else: The connection attempt could not be canceled. We can assume the
345         // connection callback will be called.
346       });
347   connection_state->socket->NotifyOnWrite(connection_state->on_connected);
348   return connection_state->connection_handle;
349 }
350 
CancelConnect(EventEngine::ConnectionHandle handle)351 bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
352   if (handle == EventEngine::ConnectionHandle::kInvalid) {
353     GRPC_EVENT_ENGINE_TRACE("%s",
354                             "Attempted to cancel an invalid connection handle");
355     return false;
356   }
357   // Erase the connection handle, which may be unknown
358   {
359     grpc_core::MutexLock lock(&connection_mu_);
360     if (!known_connection_handles_.contains(handle)) {
361       GRPC_EVENT_ENGINE_TRACE(
362           "Unknown connection handle: %s",
363           HandleToString<EventEngine::ConnectionHandle>(handle).c_str());
364       return false;
365     }
366     known_connection_handles_.erase(handle);
367   }
368   auto* connection_state = reinterpret_cast<ConnectionState*>(handle.keys[0]);
369   grpc_core::MutexLock state_lock(&connection_state->mu);
370   if (!Cancel(connection_state->timer_handle)) return false;
371   return CancelConnectInternalStateLocked(connection_state);
372 }
373 
CancelConnectFromDeadlineTimer(ConnectionState * connection_state)374 bool WindowsEventEngine::CancelConnectFromDeadlineTimer(
375     ConnectionState* connection_state) {
376   // Erase the connection handle, which is guaranteed to exist.
377   {
378     grpc_core::MutexLock lock(&connection_mu_);
379     GPR_ASSERT(known_connection_handles_.erase(
380                    connection_state->connection_handle) == 1);
381   }
382   return CancelConnectInternalStateLocked(connection_state);
383 }
384 
CancelConnectInternalStateLocked(ConnectionState * connection_state)385 bool WindowsEventEngine::CancelConnectInternalStateLocked(
386     ConnectionState* connection_state) {
387   connection_state->socket->Shutdown(DEBUG_LOCATION, "CancelConnect");
388   // Release the connection_state shared_ptr. connection_state is now invalid.
389   delete connection_state->on_connected;
390   GRPC_EVENT_ENGINE_TRACE("Successfully cancelled connection %s",
391                           HandleToString<EventEngine::ConnectionHandle>(
392                               connection_state->connection_handle)
393                               .c_str());
394   return true;
395 }
396 
397 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig & config,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)398 WindowsEventEngine::CreateListener(
399     Listener::AcceptCallback on_accept,
400     absl::AnyInvocable<void(absl::Status)> on_shutdown,
401     const EndpointConfig& config,
402     std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
403   return std::make_unique<WindowsEventEngineListener>(
404       &iocp_, std::move(on_accept), std::move(on_shutdown),
405       std::move(memory_allocator_factory), shared_from_this(),
406       thread_pool_.get(), config);
407 }
408 }  // namespace experimental
409 }  // namespace grpc_event_engine
410 
411 #endif  // GPR_WINDOWS
412