1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GPR_WINDOWS
17
18 #include "absl/cleanup/cleanup.h"
19 #include "absl/functional/any_invocable.h"
20 #include "absl/status/status.h"
21 #include "absl/strings/str_format.h"
22
23 #include <grpc/event_engine/memory_allocator.h>
24 #include <grpc/support/log_windows.h>
25
26 #include "src/core/lib/event_engine/tcp_socket_utils.h"
27 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
28 #include "src/core/lib/event_engine/trace.h"
29 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
30 #include "src/core/lib/gprpp/debug_location.h"
31 #include "src/core/lib/gprpp/status_helper.h"
32 #include "src/core/lib/iomgr/error.h"
33
34 namespace grpc_event_engine {
35 namespace experimental {
36
37 namespace {
38 constexpr size_t kDefaultTargetReadSize = 8192;
39 constexpr int kMaxWSABUFCount = 16;
40
DumpSliceBuffer(SliceBuffer * buffer,absl::string_view context_string)41 void DumpSliceBuffer(SliceBuffer* buffer, absl::string_view context_string) {
42 for (size_t i = 0; i < buffer->Count(); i++) {
43 auto str = buffer->MutableSliceAt(i).as_string_view();
44 gpr_log(GPR_INFO, "%s: %.*s", context_string.data(), str.length(),
45 str.data());
46 }
47 }
48
49 } // namespace
50
WindowsEndpoint(const EventEngine::ResolvedAddress & peer_address,std::unique_ptr<WinSocket> socket,MemoryAllocator && allocator,const EndpointConfig &,ThreadPool * thread_pool,std::shared_ptr<EventEngine> engine)51 WindowsEndpoint::WindowsEndpoint(
52 const EventEngine::ResolvedAddress& peer_address,
53 std::unique_ptr<WinSocket> socket, MemoryAllocator&& allocator,
54 const EndpointConfig& /* config */, ThreadPool* thread_pool,
55 std::shared_ptr<EventEngine> engine)
56 : peer_address_(peer_address),
57 allocator_(std::move(allocator)),
58 thread_pool_(thread_pool),
59 io_state_(std::make_shared<AsyncIOState>(this, std::move(socket),
60 std::move(engine))) {
61 char addr[EventEngine::ResolvedAddress::MAX_SIZE_BYTES];
62 int addr_len = sizeof(addr);
63 if (getsockname(io_state_->socket->raw_socket(),
64 reinterpret_cast<sockaddr*>(addr), &addr_len) < 0) {
65 grpc_core::Crash(absl::StrFormat(
66 "Unrecoverable error: Failed to get local socket name. %s",
67 GRPC_WSA_ERROR(WSAGetLastError(), "getsockname").ToString().c_str()));
68 }
69 local_address_ =
70 EventEngine::ResolvedAddress(reinterpret_cast<sockaddr*>(addr), addr_len);
71 local_address_string_ = *ResolvedAddressToURI(local_address_);
72 peer_address_string_ = *ResolvedAddressToURI(peer_address_);
73 }
74
~WindowsEndpoint()75 WindowsEndpoint::~WindowsEndpoint() {
76 io_state_->socket->Shutdown(DEBUG_LOCATION, "~WindowsEndpoint");
77 GRPC_EVENT_ENGINE_ENDPOINT_TRACE("~WindowsEndpoint::%p", this);
78 }
79
DoTcpRead(SliceBuffer * buffer)80 absl::Status WindowsEndpoint::DoTcpRead(SliceBuffer* buffer) {
81 GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p reading", this);
82 if (io_state_->socket->IsShutdown()) {
83 return absl::UnavailableError("Socket is shutting down.");
84 }
85 // Prepare the WSABUF struct
86 GPR_ASSERT(buffer->Count() <= kMaxWSABUFCount);
87 WSABUF wsa_buffers[kMaxWSABUFCount];
88 for (size_t i = 0; i < buffer->Count(); i++) {
89 auto& slice = buffer->MutableSliceAt(i);
90 wsa_buffers[i].buf = (char*)slice.begin();
91 wsa_buffers[i].len = slice.size();
92 }
93 DWORD bytes_read = 0;
94 DWORD flags = 0;
95 // First try a synchronous, non-blocking read.
96 int status =
97 WSARecv(io_state_->socket->raw_socket(), wsa_buffers,
98 (DWORD)buffer->Count(), &bytes_read, &flags, nullptr, nullptr);
99 int wsa_error = status == 0 ? 0 : WSAGetLastError();
100 if (wsa_error != WSAEWOULDBLOCK) {
101 // Data or some error was returned immediately.
102 io_state_->socket->read_info()->SetResult(
103 {/*wsa_error=*/wsa_error, /*bytes_read=*/bytes_read});
104 thread_pool_->Run(&io_state_->handle_read_event);
105 return absl::OkStatus();
106 }
107 // If the endpoint has already received some data, and the next call would
108 // block, return the data in case that is all the data the reader expects.
109 if (io_state_->handle_read_event.MaybeFinishIfDataHasAlreadyBeenRead()) {
110 return absl::OkStatus();
111 }
112 // Otherwise, let's retry, by queuing a read.
113 status = WSARecv(io_state_->socket->raw_socket(), wsa_buffers,
114 (DWORD)buffer->Count(), &bytes_read, &flags,
115 io_state_->socket->read_info()->overlapped(), nullptr);
116 wsa_error = status == 0 ? 0 : WSAGetLastError();
117 if (wsa_error != 0 && wsa_error != WSA_IO_PENDING) {
118 // Async read returned immediately with an error
119 return GRPC_WSA_ERROR(
120 wsa_error,
121 absl::StrFormat("WindowsEndpont::%p Read failed", this).c_str());
122 }
123 io_state_->socket->NotifyOnRead(&io_state_->handle_read_event);
124 return absl::OkStatus();
125 }
126
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)127 bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
128 SliceBuffer* buffer, const ReadArgs* /* args */) {
129 if (io_state_->socket->IsShutdown()) {
130 thread_pool_->Run([on_read = std::move(on_read)]() mutable {
131 on_read(absl::UnavailableError("Socket is shutting down."));
132 });
133 return false;
134 }
135 buffer->Clear();
136 io_state_->handle_read_event.DonateSpareSlices(buffer);
137 // TODO(hork): sometimes args->read_hint_bytes is 1, which is not useful.
138 // Choose an appropriate size.
139 size_t min_read_size = kDefaultTargetReadSize;
140 if (buffer->Length() < min_read_size && buffer->Count() < kMaxWSABUFCount) {
141 buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size)));
142 }
143 io_state_->handle_read_event.Prime(io_state_, buffer, std::move(on_read));
144 auto status = DoTcpRead(buffer);
145 if (!status.ok()) {
146 // The read could not be completed.
147 io_state_->endpoint->thread_pool_->Run([this, status]() {
148 io_state_->handle_read_event.ExecuteCallbackAndReset(status);
149 });
150 }
151 return false;
152 }
153
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)154 bool WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
155 SliceBuffer* data, const WriteArgs* /* args */) {
156 GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p writing", this);
157 if (io_state_->socket->IsShutdown()) {
158 thread_pool_->Run([on_writable = std::move(on_writable)]() mutable {
159 on_writable(absl::UnavailableError("Socket is shutting down."));
160 });
161 return false;
162 }
163 if (grpc_event_engine_endpoint_data_trace.enabled()) {
164 for (size_t i = 0; i < data->Count(); i++) {
165 auto str = data->RefSlice(i).as_string_view();
166 gpr_log(GPR_INFO, "WindowsEndpoint::%p WRITE (peer=%s): %.*s", this,
167 peer_address_string_.c_str(), str.length(), str.data());
168 }
169 }
170 GPR_ASSERT(data->Count() <= UINT_MAX);
171 absl::InlinedVector<WSABUF, kMaxWSABUFCount> buffers(data->Count());
172 for (size_t i = 0; i < data->Count(); i++) {
173 auto& slice = data->MutableSliceAt(i);
174 GPR_ASSERT(slice.size() <= ULONG_MAX);
175 buffers[i].len = slice.size();
176 buffers[i].buf = (char*)slice.begin();
177 }
178 // First, let's try a synchronous, non-blocking write.
179 DWORD bytes_sent;
180 int status = WSASend(io_state_->socket->raw_socket(), buffers.data(),
181 (DWORD)buffers.size(), &bytes_sent, 0, nullptr, nullptr);
182 size_t async_buffers_offset = 0;
183 if (status == 0) {
184 if (bytes_sent == data->Length()) {
185 // Write completed, exiting early
186 thread_pool_->Run(
187 [cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); });
188 return false;
189 }
190 // The data was not completely delivered, we should send the rest of it by
191 // doing an async write operation.
192 for (size_t i = 0; i < data->Count(); i++) {
193 if (buffers[i].len > bytes_sent) {
194 buffers[i].buf += bytes_sent;
195 buffers[i].len -= bytes_sent;
196 break;
197 }
198 bytes_sent -= buffers[i].len;
199 async_buffers_offset++;
200 }
201 } else {
202 // We would kind of expect to get a WSAEWOULDBLOCK here, especially on a
203 // busy connection that has its send queue filled up. But if we don't,
204 // then we can avoid doing an async write operation at all.
205 int wsa_error = WSAGetLastError();
206 if (wsa_error != WSAEWOULDBLOCK) {
207 thread_pool_->Run([cb = std::move(on_writable), wsa_error]() mutable {
208 cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
209 });
210 return false;
211 }
212 }
213 auto write_info = io_state_->socket->write_info();
214 status =
215 WSASend(io_state_->socket->raw_socket(), &buffers[async_buffers_offset],
216 (DWORD)(data->Count() - async_buffers_offset), nullptr, 0,
217 write_info->overlapped(), nullptr);
218 if (status != 0) {
219 int wsa_error = WSAGetLastError();
220 if (wsa_error != WSA_IO_PENDING) {
221 thread_pool_->Run([cb = std::move(on_writable), wsa_error]() mutable {
222 cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
223 });
224 return false;
225 }
226 }
227 // As all is now setup, we can now ask for the IOCP notification. It may
228 // trigger the callback immediately however, but no matter.
229 io_state_->handle_write_event.Prime(io_state_, data, std::move(on_writable));
230 io_state_->socket->NotifyOnWrite(&io_state_->handle_write_event);
231 return false;
232 }
GetPeerAddress() const233 const EventEngine::ResolvedAddress& WindowsEndpoint::GetPeerAddress() const {
234 return peer_address_;
235 }
GetLocalAddress() const236 const EventEngine::ResolvedAddress& WindowsEndpoint::GetLocalAddress() const {
237 return local_address_;
238 }
239
240 // ---- Handle{Read|Write}Closure ----
241 namespace {
AbortOnEvent(absl::Status)242 void AbortOnEvent(absl::Status) {
243 grpc_core::Crash(
244 "INTERNAL ERROR: Asked to handle read/write event with an invalid "
245 "callback");
246 }
247 } // namespace
248
Reset()249 void WindowsEndpoint::HandleReadClosure::Reset() {
250 io_state_.reset();
251 cb_ = &AbortOnEvent;
252 buffer_ = nullptr;
253 }
254
Reset()255 void WindowsEndpoint::HandleWriteClosure::Reset() {
256 io_state_.reset();
257 cb_ = &AbortOnEvent;
258 buffer_ = nullptr;
259 }
260
Prime(std::shared_ptr<AsyncIOState> io_state,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> cb)261 void WindowsEndpoint::HandleReadClosure::Prime(
262 std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
263 absl::AnyInvocable<void(absl::Status)> cb) {
264 io_state_ = std::move(io_state);
265 cb_ = std::move(cb);
266 buffer_ = buffer;
267 }
268
Prime(std::shared_ptr<AsyncIOState> io_state,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> cb)269 void WindowsEndpoint::HandleWriteClosure::Prime(
270 std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
271 absl::AnyInvocable<void(absl::Status)> cb) {
272 io_state_ = std::move(io_state);
273 cb_ = std::move(cb);
274 buffer_ = buffer;
275 }
276
Run()277 void WindowsEndpoint::HandleReadClosure::Run() {
278 // Deletes the shared_ptr when this closure returns
279 auto io_state = std::move(io_state_);
280 GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Read Event",
281 io_state->endpoint);
282 absl::Status status;
283 const auto result = io_state->socket->read_info()->result();
284 if (result.wsa_error != 0) {
285 status = GRPC_WSA_ERROR(result.wsa_error, "Async Read Error");
286 buffer_->Clear();
287 return ExecuteCallbackAndReset(status);
288 }
289 if (result.bytes_transferred == 0) {
290 // Either the endpoint is shut down or we've seen the end of the stream
291 if (grpc_event_engine_endpoint_data_trace.enabled()) {
292 DumpSliceBuffer(
293 buffer_, absl::StrFormat("WindowsEndpoint::%p READ (peer=%s)",
294 io_state->endpoint,
295 io_state->endpoint->peer_address_string_));
296 }
297 status = absl::UnavailableError("End of TCP stream");
298 grpc_core::StatusSetInt(&status, grpc_core::StatusIntProperty::kRpcStatus,
299 GRPC_STATUS_UNAVAILABLE);
300 buffer_->Swap(last_read_buffer_);
301 return ExecuteCallbackAndReset(status);
302 }
303 GPR_DEBUG_ASSERT(result.bytes_transferred > 0);
304 GPR_DEBUG_ASSERT(result.bytes_transferred <= buffer_->Length());
305 buffer_->MoveFirstNBytesIntoSliceBuffer(result.bytes_transferred,
306 last_read_buffer_);
307 if (buffer_->Length() == 0) {
308 buffer_->Swap(last_read_buffer_);
309 return ExecuteCallbackAndReset(status);
310 }
311 // Doing another read. Let's keep the AsyncIOState alive a bit longer.
312 io_state_ = std::move(io_state);
313 status = io_state_->endpoint->DoTcpRead(buffer_);
314 if (!status.ok()) {
315 io_state_.reset();
316 ExecuteCallbackAndReset(status);
317 }
318 }
319
MaybeFinishIfDataHasAlreadyBeenRead()320 bool WindowsEndpoint::HandleReadClosure::MaybeFinishIfDataHasAlreadyBeenRead() {
321 if (last_read_buffer_.Length() > 0) {
322 buffer_->Swap(last_read_buffer_);
323 io_state_->endpoint->thread_pool_->Run(
324 [this]() { ExecuteCallbackAndReset(absl::OkStatus()); });
325 return true;
326 }
327 return false;
328 }
329
ExecuteCallbackAndReset(absl::Status status)330 void WindowsEndpoint::HandleReadClosure::ExecuteCallbackAndReset(
331 absl::Status status) {
332 auto cb = std::move(cb_);
333 Reset();
334 cb(status);
335 }
336
DonateSpareSlices(SliceBuffer * buffer)337 void WindowsEndpoint::HandleReadClosure::DonateSpareSlices(
338 SliceBuffer* buffer) {
339 // Donee buffer must be empty.
340 GPR_ASSERT(buffer->Length() == 0);
341 // HandleReadClosure must be in the reset state.
342 GPR_ASSERT(buffer_ == nullptr);
343 buffer->Swap(last_read_buffer_);
344 }
345
Run()346 void WindowsEndpoint::HandleWriteClosure::Run() {
347 // Deletes the shared_ptr when this closure returns
348 auto io_state = std::move(io_state_);
349 GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Write Event",
350 io_state->endpoint);
351 auto cb = std::move(cb_);
352 const auto result = io_state->socket->write_info()->result();
353 absl::Status status;
354 if (result.wsa_error != 0) {
355 status = GRPC_WSA_ERROR(result.wsa_error, "WSASend");
356 } else {
357 GPR_ASSERT(result.bytes_transferred == buffer_->Length());
358 }
359 Reset();
360 cb(status);
361 }
362
363 // ---- AsyncIOState ----
364
AsyncIOState(WindowsEndpoint * endpoint,std::unique_ptr<WinSocket> socket,std::shared_ptr<EventEngine> engine)365 WindowsEndpoint::AsyncIOState::AsyncIOState(WindowsEndpoint* endpoint,
366 std::unique_ptr<WinSocket> socket,
367 std::shared_ptr<EventEngine> engine)
368 : endpoint(endpoint),
369 socket(std::move(socket)),
370 engine(std::move(engine)) {}
371
~AsyncIOState()372 WindowsEndpoint::AsyncIOState::~AsyncIOState() {
373 socket->Shutdown(DEBUG_LOCATION, "~AsyncIOState");
374 }
375
376 } // namespace experimental
377 } // namespace grpc_event_engine
378
379 #endif // GPR_WINDOWS
380