1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GPR_WINDOWS
17
18 #include <chrono>
19
20 #include "absl/strings/str_format.h"
21
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log_windows.h>
24
25 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
26 #include "src/core/lib/event_engine/time_util.h"
27 #include "src/core/lib/event_engine/trace.h"
28 #include "src/core/lib/event_engine/windows/iocp.h"
29 #include "src/core/lib/event_engine/windows/win_socket.h"
30 #include "src/core/lib/gprpp/crash.h"
31 #include "src/core/lib/iomgr/error.h"
32
33 namespace grpc_event_engine {
34 namespace experimental {
35
IOCP(ThreadPool * thread_pool)36 IOCP::IOCP(ThreadPool* thread_pool) noexcept
37 : thread_pool_(thread_pool),
38 iocp_handle_(CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr,
39 (ULONG_PTR) nullptr, 0)) {
40 GPR_ASSERT(iocp_handle_);
41 WSASocketFlagsInit();
42 }
43
44 // Shutdown must be called prior to deletion
~IOCP()45 IOCP::~IOCP() {}
46
Watch(SOCKET socket)47 std::unique_ptr<WinSocket> IOCP::Watch(SOCKET socket) {
48 auto wrapped_socket = std::make_unique<WinSocket>(socket, thread_pool_);
49 HANDLE ret = CreateIoCompletionPort(
50 reinterpret_cast<HANDLE>(socket), iocp_handle_,
51 reinterpret_cast<uintptr_t>(wrapped_socket.get()), 0);
52 if (!ret) {
53 grpc_core::Crash(
54 GRPC_WSA_ERROR(WSAGetLastError(), "Unable to add socket to iocp")
55 .ToString());
56 }
57 GPR_ASSERT(ret == iocp_handle_);
58 return wrapped_socket;
59 }
60
Shutdown()61 void IOCP::Shutdown() {
62 GRPC_EVENT_ENGINE_POLLER_TRACE(
63 "IOCP::%p shutting down. Outstanding kicks: %d", this,
64 outstanding_kicks_.load());
65 while (outstanding_kicks_.load() > 0) {
66 Work(std::chrono::hours(42), []() {});
67 }
68 GPR_ASSERT(CloseHandle(iocp_handle_));
69 }
70
Work(EventEngine::Duration timeout,absl::FunctionRef<void ()> schedule_poll_again)71 Poller::WorkResult IOCP::Work(EventEngine::Duration timeout,
72 absl::FunctionRef<void()> schedule_poll_again) {
73 DWORD bytes = 0;
74 ULONG_PTR completion_key;
75 LPOVERLAPPED overlapped;
76 GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p doing work", this);
77 BOOL success = GetQueuedCompletionStatus(
78 iocp_handle_, &bytes, &completion_key, &overlapped,
79 static_cast<DWORD>(Milliseconds(timeout)));
80 if (success == 0 && overlapped == nullptr) {
81 GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p deadline exceeded", this);
82 return Poller::WorkResult::kDeadlineExceeded;
83 }
84 GPR_ASSERT(completion_key && overlapped);
85 if (overlapped == &kick_overlap_) {
86 GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p kicked", this);
87 outstanding_kicks_.fetch_sub(1);
88 if (completion_key == (ULONG_PTR)&kick_token_) {
89 return Poller::WorkResult::kKicked;
90 }
91 grpc_core::Crash(
92 absl::StrFormat("Unknown custom completion key: %lu", completion_key));
93 }
94 GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p got event on OVERLAPPED::%p", this,
95 overlapped);
96 // Safety note: socket is guaranteed to exist when managed by a
97 // WindowsEndpoint. If an overlapped event came in, then either a read event
98 // handler is registered, which keeps the socket alive, or the WindowsEndpoint
99 // (which keeps the socket alive) has done an asynchronous WSARecv and is
100 // about to register for notification of an overlapped event.
101 auto* socket = reinterpret_cast<WinSocket*>(completion_key);
102 WinSocket::OpState* info = socket->GetOpInfoForOverlapped(overlapped);
103 GPR_ASSERT(info != nullptr);
104 info->GetOverlappedResult();
105 info->SetReady();
106 schedule_poll_again();
107 return Poller::WorkResult::kOk;
108 }
109
Kick()110 void IOCP::Kick() {
111 outstanding_kicks_.fetch_add(1);
112 GPR_ASSERT(PostQueuedCompletionStatus(
113 iocp_handle_, 0, reinterpret_cast<ULONG_PTR>(&kick_token_),
114 &kick_overlap_));
115 }
116
GetDefaultSocketFlags()117 DWORD IOCP::GetDefaultSocketFlags() {
118 static DWORD wsa_socket_flags = WSASocketFlagsInit();
119 return wsa_socket_flags;
120 }
121
WSASocketFlagsInit()122 DWORD IOCP::WSASocketFlagsInit() {
123 DWORD wsa_socket_flags = WSA_FLAG_OVERLAPPED;
124 // WSA_FLAG_NO_HANDLE_INHERIT may be not supported on the older Windows
125 // versions, see
126 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms742212(v=vs.85).aspx
127 // for details.
128 SOCKET sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, nullptr, 0,
129 wsa_socket_flags | WSA_FLAG_NO_HANDLE_INHERIT);
130 if (sock != INVALID_SOCKET) {
131 // Windows 7, Windows 2008 R2 with SP1 or later
132 wsa_socket_flags |= WSA_FLAG_NO_HANDLE_INHERIT;
133 closesocket(sock);
134 }
135 return wsa_socket_flags;
136 }
137
138 } // namespace experimental
139 } // namespace grpc_event_engine
140
141 #endif // GPR_WINDOWS
142