xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/event_engine/windows/windows_engine.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 
18 #include <memory>
19 
20 #include "absl/status/status.h"
21 #include "absl/status/statusor.h"
22 #include "absl/strings/string_view.h"
23 
24 #include <grpc/event_engine/endpoint_config.h>
25 #include <grpc/event_engine/event_engine.h>
26 #include <grpc/event_engine/memory_allocator.h>
27 #include <grpc/event_engine/slice_buffer.h>
28 #include <grpc/support/cpu.h>
29 
30 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
31 #include "src/core/lib/event_engine/common_closures.h"
32 #include "src/core/lib/event_engine/handle_containers.h"
33 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
34 #include "src/core/lib/event_engine/tcp_socket_utils.h"
35 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
36 #include "src/core/lib/event_engine/trace.h"
37 #include "src/core/lib/event_engine/utils.h"
38 #include "src/core/lib/event_engine/windows/grpc_polled_fd_windows.h"
39 #include "src/core/lib/event_engine/windows/iocp.h"
40 #include "src/core/lib/event_engine/windows/native_windows_dns_resolver.h"
41 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
42 #include "src/core/lib/event_engine/windows/windows_engine.h"
43 #include "src/core/lib/event_engine/windows/windows_listener.h"
44 #include "src/core/lib/gprpp/crash.h"
45 #include "src/core/lib/gprpp/sync.h"
46 #include "src/core/lib/gprpp/time.h"
47 #include "src/core/lib/iomgr/error.h"
48 
49 namespace grpc_event_engine {
50 namespace experimental {
51 
52 namespace {
CreateCrashingOnConnectCallback()53 EventEngine::OnConnectCallback CreateCrashingOnConnectCallback() {
54   return [](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>>) {
55     grpc_core::Crash("Internal Error: OnConnect callback called when unset");
56   };
57 }
58 }  // namespace
59 // ---- IOCPWorkClosure ----
60 
IOCPWorkClosure(ThreadPool * thread_pool,IOCP * iocp)61 WindowsEventEngine::IOCPWorkClosure::IOCPWorkClosure(ThreadPool* thread_pool,
62                                                      IOCP* iocp)
63     : thread_pool_(thread_pool), iocp_(iocp) {
64   thread_pool_->Run(this);
65 }
66 
Run()67 void WindowsEventEngine::IOCPWorkClosure::Run() {
68   auto result = iocp_->Work(std::chrono::seconds(60), [this] {
69     workers_.fetch_add(1);
70     thread_pool_->Run(this);
71   });
72   if (result == Poller::WorkResult::kDeadlineExceeded) {
73     // iocp received no messages. restart the worker
74     workers_.fetch_add(1);
75     thread_pool_->Run(this);
76   }
77   if (workers_.fetch_sub(1) == 1) done_signal_.Notify();
78 }
79 
WaitForShutdown()80 void WindowsEventEngine::IOCPWorkClosure::WaitForShutdown() {
81   done_signal_.WaitForNotification();
82 }
83 
84 // ---- WindowsEventEngine ----
85 
86 // TODO(hork): The iomgr timer and execution engine can be reused. It should
87 // be separated out from the posix_engine and instantiated as components. It is
88 // effectively copied below.
89 
90 struct WindowsEventEngine::TimerClosure final : public EventEngine::Closure {
91   absl::AnyInvocable<void()> cb;
92   Timer timer;
93   WindowsEventEngine* engine;
94   EventEngine::TaskHandle handle;
95 
Rungrpc_event_engine::experimental::WindowsEventEngine::TimerClosure96   void Run() override {
97     GRPC_EVENT_ENGINE_TRACE(
98         "WindowsEventEngine:%p executing callback:%s", engine,
99         HandleToString<EventEngine::TaskHandle>(handle).c_str());
100     {
101       grpc_core::MutexLock lock(&engine->task_mu_);
102       engine->known_handles_.erase(handle);
103     }
104     cb();
105     delete this;
106   }
107 };
108 
WindowsEventEngine()109 WindowsEventEngine::WindowsEventEngine()
110     : thread_pool_(
111           MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 4u, 16u))),
112       iocp_(thread_pool_.get()),
113       timer_manager_(thread_pool_),
114       iocp_worker_(thread_pool_.get(), &iocp_) {
115   WSADATA wsaData;
116   int status = WSAStartup(MAKEWORD(2, 0), &wsaData);
117   GPR_ASSERT(status == 0);
118 }
119 
~WindowsEventEngine()120 WindowsEventEngine::~WindowsEventEngine() {
121   GRPC_EVENT_ENGINE_TRACE("~WindowsEventEngine::%p", this);
122   {
123     task_mu_.Lock();
124     if (!known_handles_.empty()) {
125       if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
126         for (auto handle : known_handles_) {
127           gpr_log(GPR_ERROR,
128                   "WindowsEventEngine:%p uncleared TaskHandle at shutdown:%s",
129                   this,
130                   HandleToString<EventEngine::TaskHandle>(handle).c_str());
131         }
132       }
133       // Allow a small grace period for timers to be run before shutting down.
134       auto deadline =
135           timer_manager_.Now() + grpc_core::Duration::FromSecondsAsDouble(10);
136       while (!known_handles_.empty() && timer_manager_.Now() < deadline) {
137         if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
138           GRPC_LOG_EVERY_N_SEC(1, GPR_DEBUG, "Waiting for timers. %d remaining",
139                                known_handles_.size());
140         }
141         task_mu_.Unlock();
142         absl::SleepFor(absl::Milliseconds(200));
143         task_mu_.Lock();
144       }
145     }
146     GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
147     task_mu_.Unlock();
148   }
149   iocp_.Kick();
150   iocp_worker_.WaitForShutdown();
151   iocp_.Shutdown();
152   GPR_ASSERT(WSACleanup() == 0);
153   timer_manager_.Shutdown();
154   thread_pool_->Quiesce();
155 }
156 
Cancel(EventEngine::TaskHandle handle)157 bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) {
158   grpc_core::MutexLock lock(&task_mu_);
159   if (!known_handles_.contains(handle)) return false;
160   GRPC_EVENT_ENGINE_TRACE(
161       "WindowsEventEngine::%p cancelling %s", this,
162       HandleToString<EventEngine::TaskHandle>(handle).c_str());
163   auto* cd = reinterpret_cast<TimerClosure*>(handle.keys[0]);
164   bool r = timer_manager_.TimerCancel(&cd->timer);
165   known_handles_.erase(handle);
166   if (r) delete cd;
167   return r;
168 }
169 
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)170 EventEngine::TaskHandle WindowsEventEngine::RunAfter(
171     Duration when, absl::AnyInvocable<void()> closure) {
172   return RunAfterInternal(when, std::move(closure));
173 }
174 
RunAfter(Duration when,EventEngine::Closure * closure)175 EventEngine::TaskHandle WindowsEventEngine::RunAfter(
176     Duration when, EventEngine::Closure* closure) {
177   return RunAfterInternal(when, [closure]() { closure->Run(); });
178 }
179 
Run(absl::AnyInvocable<void ()> closure)180 void WindowsEventEngine::Run(absl::AnyInvocable<void()> closure) {
181   thread_pool_->Run(std::move(closure));
182 }
183 
Run(EventEngine::Closure * closure)184 void WindowsEventEngine::Run(EventEngine::Closure* closure) {
185   thread_pool_->Run(closure);
186 }
187 
RunAfterInternal(Duration when,absl::AnyInvocable<void ()> cb)188 EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal(
189     Duration when, absl::AnyInvocable<void()> cb) {
190   auto when_ts = ToTimestamp(timer_manager_.Now(), when);
191   auto* cd = new TimerClosure;
192   cd->cb = std::move(cb);
193   cd->engine = this;
194   EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
195                                  aba_token_.fetch_add(1)};
196   grpc_core::MutexLock lock(&task_mu_);
197   known_handles_.insert(handle);
198   cd->handle = handle;
199   GRPC_EVENT_ENGINE_TRACE(
200       "WindowsEventEngine:%p scheduling callback:%s", this,
201       HandleToString<EventEngine::TaskHandle>(handle).c_str());
202   timer_manager_.TimerInit(&cd->timer, when_ts, cd);
203   return handle;
204 }
205 
206 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
207 
WindowsDNSResolver(grpc_core::OrphanablePtr<AresResolver> ares_resolver)208 WindowsEventEngine::WindowsDNSResolver::WindowsDNSResolver(
209     grpc_core::OrphanablePtr<AresResolver> ares_resolver)
210     : ares_resolver_(std::move(ares_resolver)) {}
211 
LookupHostname(LookupHostnameCallback on_resolve,absl::string_view name,absl::string_view default_port)212 void WindowsEventEngine::WindowsDNSResolver::LookupHostname(
213     LookupHostnameCallback on_resolve, absl::string_view name,
214     absl::string_view default_port) {
215   ares_resolver_->LookupHostname(std::move(on_resolve), name, default_port);
216 }
217 
LookupSRV(LookupSRVCallback on_resolve,absl::string_view name)218 void WindowsEventEngine::WindowsDNSResolver::LookupSRV(
219     LookupSRVCallback on_resolve, absl::string_view name) {
220   ares_resolver_->LookupSRV(std::move(on_resolve), name);
221 }
222 
LookupTXT(LookupTXTCallback on_resolve,absl::string_view name)223 void WindowsEventEngine::WindowsDNSResolver::LookupTXT(
224     LookupTXTCallback on_resolve, absl::string_view name) {
225   ares_resolver_->LookupTXT(std::move(on_resolve), name);
226 }
227 
228 #endif  // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
229 
230 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(EventEngine::DNSResolver::ResolverOptions const & options)231 WindowsEventEngine::GetDNSResolver(
232     EventEngine::DNSResolver::ResolverOptions const& options) {
233 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
234   auto ares_resolver = AresResolver::CreateAresResolver(
235       options.dns_server,
236       std::make_unique<GrpcPolledFdFactoryWindows>(poller()),
237       shared_from_this());
238   if (!ares_resolver.ok()) {
239     return ares_resolver.status();
240   }
241   return std::make_unique<WindowsEventEngine::WindowsDNSResolver>(
242       std::move(*ares_resolver));
243 #else   // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
244   GRPC_EVENT_ENGINE_DNS_TRACE(
245       "WindowsEventEngine:%p creating NativeWindowsDNSResolver", this);
246   return std::make_unique<NativeWindowsDNSResolver>(shared_from_this());
247 #endif  // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
248 }
249 
IsWorkerThread()250 bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
251 
OnConnectCompleted(std::shared_ptr<ConnectionState> state)252 void WindowsEventEngine::OnConnectCompleted(
253     std::shared_ptr<ConnectionState> state) {
254   absl::StatusOr<std::unique_ptr<WindowsEndpoint>> endpoint;
255   EventEngine::OnConnectCallback cb;
256   {
257     // Connection attempt complete!
258     grpc_core::MutexLock lock(&state->mu);
259     cb = std::move(state->on_connected_user_callback);
260     state->on_connected_user_callback = CreateCrashingOnConnectCallback();
261     state->on_connected = nullptr;
262     {
263       grpc_core::MutexLock handle_lock(&connection_mu_);
264       known_connection_handles_.erase(state->connection_handle);
265     }
266     const auto& overlapped_result = state->socket->write_info()->result();
267     // return early if we cannot cancel the connection timeout timer.
268     if (!Cancel(state->timer_handle)) return;
269     if (!overlapped_result.error_status.ok()) {
270       state->socket->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
271       endpoint = overlapped_result.error_status;
272     } else if (overlapped_result.wsa_error != 0) {
273       state->socket->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
274       endpoint = GRPC_WSA_ERROR(overlapped_result.wsa_error, "ConnectEx");
275     } else {
276       ChannelArgsEndpointConfig cfg;
277       endpoint = std::make_unique<WindowsEndpoint>(
278           state->address, std::move(state->socket), std::move(state->allocator),
279           cfg, thread_pool_.get(), shared_from_this());
280     }
281   }
282   // This code should be running in a thread pool thread already, so the
283   // callback can be run directly.
284   cb(std::move(endpoint));
285 }
286 
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator memory_allocator,Duration timeout)287 EventEngine::ConnectionHandle WindowsEventEngine::Connect(
288     OnConnectCallback on_connect, const ResolvedAddress& addr,
289     const EndpointConfig& /* args */, MemoryAllocator memory_allocator,
290     Duration timeout) {
291   // TODO(hork): utilize the endpoint config
292   absl::Status status;
293   int istatus;
294   auto uri = ResolvedAddressToURI(addr);
295   if (!uri.ok()) {
296     Run([on_connect = std::move(on_connect), status = uri.status()]() mutable {
297       on_connect(status);
298     });
299     return EventEngine::ConnectionHandle::kInvalid;
300   }
301   GRPC_EVENT_ENGINE_TRACE("EventEngine::%p connecting to %s", this,
302                           uri->c_str());
303   // Use dualstack sockets where available.
304   ResolvedAddress address = addr;
305   ResolvedAddress addr6_v4mapped;
306   if (ResolvedAddressToV4Mapped(addr, &addr6_v4mapped)) {
307     address = addr6_v4mapped;
308   }
309   const int addr_family =
310       (address.address()->sa_family == AF_UNIX) ? AF_UNIX : AF_INET6;
311   const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
312   SOCKET sock = WSASocket(addr_family, SOCK_STREAM, protocol, nullptr, 0,
313                           IOCP::GetDefaultSocketFlags());
314   if (sock == INVALID_SOCKET) {
315     Run([on_connect = std::move(on_connect),
316          status = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket")]() mutable {
317       on_connect(status);
318     });
319     return EventEngine::ConnectionHandle::kInvalid;
320   }
321   if (addr_family == AF_UNIX) {
322     status = SetSocketNonBlock(sock);
323   } else {
324     status = PrepareSocket(sock);
325   }
326   if (!status.ok()) {
327     Run([on_connect = std::move(on_connect), status]() mutable {
328       on_connect(status);
329     });
330     return EventEngine::ConnectionHandle::kInvalid;
331   }
332   // Grab the function pointer for ConnectEx for that specific socket It may
333   // change depending on the interface.
334   LPFN_CONNECTEX ConnectEx;
335   GUID guid = WSAID_CONNECTEX;
336   DWORD ioctl_num_bytes;
337   istatus = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
338                      sizeof(guid), &ConnectEx, sizeof(ConnectEx),
339                      &ioctl_num_bytes, nullptr, nullptr);
340   if (istatus != 0) {
341     Run([on_connect = std::move(on_connect),
342          status = GRPC_WSA_ERROR(
343              WSAGetLastError(),
344              "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)")]() mutable {
345       on_connect(status);
346     });
347     return EventEngine::ConnectionHandle::kInvalid;
348   }
349   // bind the local address
350   ResolvedAddress local_address;
351   if (addr_family == AF_UNIX) {
352     // For ConnectEx() to work for AF_UNIX, the sock needs to be bound to
353     // the local address of an unnamed socket.
354     sockaddr addr = {};
355     addr.sa_family = AF_UNIX;
356     local_address = ResolvedAddress(&addr, sizeof(addr));
357   } else {
358     local_address = ResolvedAddressMakeWild6(0);
359   }
360   istatus = bind(sock, local_address.address(), local_address.size());
361   if (istatus != 0) {
362     Run([on_connect = std::move(on_connect),
363          status = GRPC_WSA_ERROR(WSAGetLastError(), "bind")]() mutable {
364       on_connect(status);
365     });
366     return EventEngine::ConnectionHandle::kInvalid;
367   }
368   // Prepare the socket to receive a connection
369   auto connection_state = std::make_shared<ConnectionState>();
370   grpc_core::MutexLock lock(&connection_state->mu);
371   connection_state->socket = iocp_.Watch(sock);
372   GPR_ASSERT(connection_state->socket != nullptr);
373   auto* info = connection_state->socket->write_info();
374   connection_state->address = address;
375   connection_state->allocator = std::move(memory_allocator);
376   connection_state->on_connected_user_callback = std::move(on_connect);
377   connection_state->on_connected =
378       SelfDeletingClosure::Create([this, connection_state]() mutable {
379         OnConnectCompleted(std::move(connection_state));
380       });
381   connection_state->timer_handle =
382       RunAfter(timeout, [this, connection_state]() {
383         grpc_core::ReleasableMutexLock lock(&connection_state->mu);
384         if (CancelConnectFromDeadlineTimer(connection_state.get())) {
385           auto cb = std::move(connection_state->on_connected_user_callback);
386           connection_state->on_connected_user_callback =
387               CreateCrashingOnConnectCallback();
388           lock.Release();
389           cb(absl::DeadlineExceededError("Connection timed out"));
390         }
391         // else: The connection attempt could not be canceled. We can assume
392         // the connection callback will be called.
393       });
394   // Connect
395   connection_state->socket->NotifyOnWrite(connection_state->on_connected);
396   bool success =
397       ConnectEx(connection_state->socket->raw_socket(), address.address(),
398                 address.size(), nullptr, 0, nullptr, info->overlapped());
399   // It wouldn't be unusual to get a success immediately. But we'll still get an
400   // IOCP notification, so let's ignore it.
401   if (!success) {
402     int last_error = WSAGetLastError();
403     if (last_error != ERROR_IO_PENDING) {
404       if (!Cancel(connection_state->timer_handle)) {
405         return EventEngine::ConnectionHandle::kInvalid;
406       }
407       connection_state->socket->Shutdown(DEBUG_LOCATION, "ConnectEx");
408       Run([connection_state = std::move(connection_state),
409            status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable {
410         EventEngine::OnConnectCallback cb;
411         {
412           grpc_core::MutexLock lock(&connection_state->mu);
413           cb = std::move(connection_state->on_connected_user_callback);
414           connection_state->on_connected_user_callback =
415               CreateCrashingOnConnectCallback();
416         }
417         cb(status);
418       });
419       return EventEngine::ConnectionHandle::kInvalid;
420     }
421   }
422   connection_state->connection_handle =
423       ConnectionHandle{reinterpret_cast<intptr_t>(connection_state.get()),
424                        aba_token_.fetch_add(1)};
425   grpc_core::MutexLock connection_handle_lock(&connection_mu_);
426   known_connection_handles_.insert(connection_state->connection_handle);
427   return connection_state->connection_handle;
428 }
429 
CancelConnect(EventEngine::ConnectionHandle handle)430 bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
431   if (handle == EventEngine::ConnectionHandle::kInvalid) {
432     GRPC_EVENT_ENGINE_TRACE("%s",
433                             "Attempted to cancel an invalid connection handle");
434     return false;
435   }
436   // Erase the connection handle, which may be unknown
437   {
438     grpc_core::MutexLock lock(&connection_mu_);
439     if (!known_connection_handles_.contains(handle)) {
440       GRPC_EVENT_ENGINE_TRACE(
441           "Unknown connection handle: %s",
442           HandleToString<EventEngine::ConnectionHandle>(handle).c_str());
443       return false;
444     }
445     known_connection_handles_.erase(handle);
446   }
447   auto* connection_state = reinterpret_cast<ConnectionState*>(handle.keys[0]);
448   grpc_core::MutexLock state_lock(&connection_state->mu);
449   if (!Cancel(connection_state->timer_handle)) return false;
450   return CancelConnectInternalStateLocked(connection_state);
451 }
452 
CancelConnectFromDeadlineTimer(ConnectionState * connection_state)453 bool WindowsEventEngine::CancelConnectFromDeadlineTimer(
454     ConnectionState* connection_state) {
455   // Erase the connection handle, which is guaranteed to exist.
456   {
457     grpc_core::MutexLock lock(&connection_mu_);
458     GPR_ASSERT(known_connection_handles_.erase(
459                    connection_state->connection_handle) == 1);
460   }
461   return CancelConnectInternalStateLocked(connection_state);
462 }
463 
CancelConnectInternalStateLocked(ConnectionState * connection_state)464 bool WindowsEventEngine::CancelConnectInternalStateLocked(
465     ConnectionState* connection_state) {
466   connection_state->socket->Shutdown(DEBUG_LOCATION, "CancelConnect");
467   // Release the connection_state shared_ptr owned by the connected callback.
468   delete connection_state->on_connected;
469   GRPC_EVENT_ENGINE_TRACE("Successfully cancelled connection %s",
470                           HandleToString<EventEngine::ConnectionHandle>(
471                               connection_state->connection_handle)
472                               .c_str());
473   return true;
474 }
475 
476 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig & config,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)477 WindowsEventEngine::CreateListener(
478     Listener::AcceptCallback on_accept,
479     absl::AnyInvocable<void(absl::Status)> on_shutdown,
480     const EndpointConfig& config,
481     std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
482   return std::make_unique<WindowsEventEngineListener>(
483       &iocp_, std::move(on_accept), std::move(on_shutdown),
484       std::move(memory_allocator_factory), shared_from_this(),
485       thread_pool_.get(), config);
486 }
487 }  // namespace experimental
488 }  // namespace grpc_event_engine
489 
490 #endif  // GPR_WINDOWS
491