1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 
18 #include <chrono>
19 
20 #include "absl/strings/str_format.h"
21 
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log_windows.h>
24 
25 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
26 #include "src/core/lib/event_engine/time_util.h"
27 #include "src/core/lib/event_engine/trace.h"
28 #include "src/core/lib/event_engine/windows/iocp.h"
29 #include "src/core/lib/event_engine/windows/win_socket.h"
30 #include "src/core/lib/gprpp/crash.h"
31 #include "src/core/lib/iomgr/error.h"
32 
33 namespace grpc_event_engine {
34 namespace experimental {
35 
IOCP(ThreadPool * thread_pool)36 IOCP::IOCP(ThreadPool* thread_pool) noexcept
37     : thread_pool_(thread_pool),
38       iocp_handle_(CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr,
39                                           (ULONG_PTR) nullptr, 0)) {
40   GPR_ASSERT(iocp_handle_);
41   WSASocketFlagsInit();
42 }
43 
44 // Shutdown must be called prior to deletion
~IOCP()45 IOCP::~IOCP() {}
46 
Watch(SOCKET socket)47 std::unique_ptr<WinSocket> IOCP::Watch(SOCKET socket) {
48   auto wrapped_socket = std::make_unique<WinSocket>(socket, thread_pool_);
49   HANDLE ret = CreateIoCompletionPort(
50       reinterpret_cast<HANDLE>(socket), iocp_handle_,
51       reinterpret_cast<uintptr_t>(wrapped_socket.get()), 0);
52   if (!ret) {
53     grpc_core::Crash(
54         GRPC_WSA_ERROR(WSAGetLastError(), "Unable to add socket to iocp")
55             .ToString());
56   }
57   GPR_ASSERT(ret == iocp_handle_);
58   return wrapped_socket;
59 }
60 
Shutdown()61 void IOCP::Shutdown() {
62   GRPC_EVENT_ENGINE_POLLER_TRACE(
63       "IOCP::%p shutting down. Outstanding kicks: %d", this,
64       outstanding_kicks_.load());
65   while (outstanding_kicks_.load() > 0) {
66     Work(std::chrono::hours(42), []() {});
67   }
68   GPR_ASSERT(CloseHandle(iocp_handle_));
69 }
70 
Work(EventEngine::Duration timeout,absl::FunctionRef<void ()> schedule_poll_again)71 Poller::WorkResult IOCP::Work(EventEngine::Duration timeout,
72                               absl::FunctionRef<void()> schedule_poll_again) {
73   DWORD bytes = 0;
74   ULONG_PTR completion_key;
75   LPOVERLAPPED overlapped;
76   GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p doing work", this);
77   BOOL success = GetQueuedCompletionStatus(
78       iocp_handle_, &bytes, &completion_key, &overlapped,
79       static_cast<DWORD>(Milliseconds(timeout)));
80   if (success == 0 && overlapped == nullptr) {
81     GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p deadline exceeded", this);
82     return Poller::WorkResult::kDeadlineExceeded;
83   }
84   GPR_ASSERT(completion_key && overlapped);
85   if (overlapped == &kick_overlap_) {
86     GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p kicked", this);
87     outstanding_kicks_.fetch_sub(1);
88     if (completion_key == (ULONG_PTR)&kick_token_) {
89       return Poller::WorkResult::kKicked;
90     }
91     grpc_core::Crash(
92         absl::StrFormat("Unknown custom completion key: %lu", completion_key));
93   }
94   GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p got event on OVERLAPPED::%p", this,
95                                  overlapped);
96   // Safety note: socket is guaranteed to exist when managed by a
97   // WindowsEndpoint. If an overlapped event came in, then either a read event
98   // handler is registered, which keeps the socket alive, or the WindowsEndpoint
99   // (which keeps the socket alive) has done an asynchronous WSARecv and is
100   // about to register for notification of an overlapped event.
101   auto* socket = reinterpret_cast<WinSocket*>(completion_key);
102   WinSocket::OpState* info = socket->GetOpInfoForOverlapped(overlapped);
103   GPR_ASSERT(info != nullptr);
104   info->GetOverlappedResult();
105   info->SetReady();
106   schedule_poll_again();
107   return Poller::WorkResult::kOk;
108 }
109 
Kick()110 void IOCP::Kick() {
111   outstanding_kicks_.fetch_add(1);
112   GPR_ASSERT(PostQueuedCompletionStatus(
113       iocp_handle_, 0, reinterpret_cast<ULONG_PTR>(&kick_token_),
114       &kick_overlap_));
115 }
116 
GetDefaultSocketFlags()117 DWORD IOCP::GetDefaultSocketFlags() {
118   static DWORD wsa_socket_flags = WSASocketFlagsInit();
119   return wsa_socket_flags;
120 }
121 
WSASocketFlagsInit()122 DWORD IOCP::WSASocketFlagsInit() {
123   DWORD wsa_socket_flags = WSA_FLAG_OVERLAPPED;
124   // WSA_FLAG_NO_HANDLE_INHERIT may be not supported on the older Windows
125   // versions, see
126   // https://msdn.microsoft.com/en-us/library/windows/desktop/ms742212(v=vs.85).aspx
127   // for details.
128   SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
129                           wsa_socket_flags | WSA_FLAG_NO_HANDLE_INHERIT);
130   if (sock != INVALID_SOCKET) {
131     // Windows 7, Windows 2008 R2 with SP1 or later
132     wsa_socket_flags |= WSA_FLAG_NO_HANDLE_INHERIT;
133     closesocket(sock);
134   }
135   return wsa_socket_flags;
136 }
137 
138 }  // namespace experimental
139 }  // namespace grpc_event_engine
140 
141 #endif  // GPR_WINDOWS
142