1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
16 #include <grpc/support/port_platform.h>
17 
18 #ifdef GPR_WINDOWS
19 
20 #include <memory>
21 
22 #include "absl/status/status.h"
23 #include "absl/status/statusor.h"
24 #include "absl/strings/string_view.h"
25 
26 #include <grpc/event_engine/endpoint_config.h>
27 #include <grpc/event_engine/event_engine.h>
28 #include <grpc/event_engine/memory_allocator.h>
29 #include <grpc/event_engine/slice_buffer.h>
30 
31 #include "src/core/lib/event_engine/handle_containers.h"
32 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
33 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
34 #include "src/core/lib/event_engine/windows/iocp.h"
35 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/gprpp/time.h"
38 #include "src/core/lib/surface/init_internally.h"
39 
40 namespace grpc_event_engine {
41 namespace experimental {
42 
43 // TODO(ctiller): KeepsGrpcInitialized is an interim measure to ensure that
44 // EventEngine is shut down before we shut down iomgr.
45 class WindowsEventEngine : public EventEngine,
46                            public grpc_core::KeepsGrpcInitialized {
47  public:
48   class WindowsDNSResolver : public EventEngine::DNSResolver {
49    public:
50     ~WindowsDNSResolver() override;
51     LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
52                                     absl::string_view name,
53                                     absl::string_view default_port,
54                                     Duration timeout) override;
55     LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
56                                absl::string_view name,
57                                Duration timeout) override;
58     LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
59                                absl::string_view name,
60                                Duration timeout) override;
61     bool CancelLookup(LookupTaskHandle handle) override;
62   };
63 
64   WindowsEventEngine();
65   ~WindowsEventEngine() override;
66 
67   absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
68       Listener::AcceptCallback on_accept,
69       absl::AnyInvocable<void(absl::Status)> on_shutdown,
70       const EndpointConfig& config,
71       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
72       override;
73 
74   ConnectionHandle Connect(OnConnectCallback on_connect,
75                            const ResolvedAddress& addr,
76                            const EndpointConfig& args,
77                            MemoryAllocator memory_allocator,
78                            Duration timeout) override;
79 
80   bool CancelConnect(ConnectionHandle handle) override;
81   bool IsWorkerThread() override;
82   std::unique_ptr<DNSResolver> GetDNSResolver(
83       const DNSResolver::ResolverOptions& options) override;
84   void Run(Closure* closure) override;
85   void Run(absl::AnyInvocable<void()> closure) override;
86   TaskHandle RunAfter(Duration when, Closure* closure) override;
87   TaskHandle RunAfter(Duration when,
88                       absl::AnyInvocable<void()> closure) override;
89   bool Cancel(TaskHandle handle) override;
90 
91   // Retrieve the base ThreadPool.
92   // This is public because most classes that know the concrete
93   // WindowsEventEngine type are effectively friends.
94   // Not intended for external use.
thread_pool()95   ThreadPool* thread_pool() { return thread_pool_.get(); }
poller()96   IOCP* poller() { return &iocp_; }
97 
98  private:
99   // State of an active connection.
100   // Managed by a shared_ptr, owned exclusively by the timeout callback and the
101   // OnConnectCompleted callback herein.
102   struct ConnectionState {
103     // everything is guarded by mu;
104     grpc_core::Mutex mu
105         ABSL_ACQUIRED_BEFORE(WindowsEventEngine::connection_mu_);
106     EventEngine::ConnectionHandle connection_handle ABSL_GUARDED_BY(mu);
107     EventEngine::TaskHandle timer_handle ABSL_GUARDED_BY(mu);
108     EventEngine::OnConnectCallback on_connected_user_callback
109         ABSL_GUARDED_BY(mu);
110     EventEngine::Closure* on_connected ABSL_GUARDED_BY(mu);
111     std::unique_ptr<WinSocket> socket ABSL_GUARDED_BY(mu);
112     EventEngine::ResolvedAddress address ABSL_GUARDED_BY(mu);
113     MemoryAllocator allocator ABSL_GUARDED_BY(mu);
114   };
115 
116   // A poll worker which schedules itself unless kicked
117   class IOCPWorkClosure : public EventEngine::Closure {
118    public:
119     explicit IOCPWorkClosure(ThreadPool* thread_pool, IOCP* iocp);
120     void Run() override;
121     void WaitForShutdown();
122 
123    private:
124     std::atomic<int> workers_{1};
125     grpc_core::Notification done_signal_;
126     ThreadPool* thread_pool_;
127     IOCP* iocp_;
128   };
129 
130   void OnConnectCompleted(std::shared_ptr<ConnectionState> state);
131 
132   // CancelConnect called from within the deadline timer.
133   // In this case, the connection_state->mu is already locked, and timer
134   // cancellation is not possible.
135   bool CancelConnectFromDeadlineTimer(ConnectionState* connection_state)
136       ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu);
137 
138   // Completes the connection cancellation logic after checking handle validity
139   // and optionally cancelling deadline timers.
140   bool CancelConnectInternalStateLocked(ConnectionState* connection_state)
141       ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu);
142 
143   struct TimerClosure;
144   EventEngine::TaskHandle RunAfterInternal(Duration when,
145                                            absl::AnyInvocable<void()> cb);
146   grpc_core::Mutex task_mu_;
147   TaskHandleSet known_handles_ ABSL_GUARDED_BY(task_mu_);
148   grpc_core::Mutex connection_mu_ ABSL_ACQUIRED_AFTER(ConnectionState::mu);
149   grpc_core::CondVar connection_cv_;
150   ConnectionHandleSet known_connection_handles_ ABSL_GUARDED_BY(connection_mu_);
151   std::atomic<intptr_t> aba_token_{0};
152 
153   std::shared_ptr<ThreadPool> thread_pool_;
154   IOCP iocp_;
155   TimerManager timer_manager_;
156   IOCPWorkClosure iocp_worker_;
157 };
158 
159 }  // namespace experimental
160 }  // namespace grpc_event_engine
161 
162 #endif
163 
164 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
165