1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 
18 #include "absl/cleanup/cleanup.h"
19 #include "absl/functional/any_invocable.h"
20 #include "absl/status/status.h"
21 #include "absl/strings/str_format.h"
22 
23 #include <grpc/event_engine/memory_allocator.h>
24 #include <grpc/support/log_windows.h>
25 
26 #include "src/core/lib/event_engine/tcp_socket_utils.h"
27 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
28 #include "src/core/lib/event_engine/trace.h"
29 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
30 #include "src/core/lib/gprpp/debug_location.h"
31 #include "src/core/lib/gprpp/status_helper.h"
32 #include "src/core/lib/iomgr/error.h"
33 
34 namespace grpc_event_engine {
35 namespace experimental {
36 
37 namespace {
38 constexpr size_t kDefaultTargetReadSize = 8192;
39 constexpr int kMaxWSABUFCount = 16;
40 
DumpSliceBuffer(SliceBuffer * buffer,absl::string_view context_string)41 void DumpSliceBuffer(SliceBuffer* buffer, absl::string_view context_string) {
42   for (size_t i = 0; i < buffer->Count(); i++) {
43     auto str = buffer->MutableSliceAt(i).as_string_view();
44     gpr_log(GPR_INFO, "%s: %.*s", context_string.data(), str.length(),
45             str.data());
46   }
47 }
48 
49 }  // namespace
50 
WindowsEndpoint(const EventEngine::ResolvedAddress & peer_address,std::unique_ptr<WinSocket> socket,MemoryAllocator && allocator,const EndpointConfig &,ThreadPool * thread_pool,std::shared_ptr<EventEngine> engine)51 WindowsEndpoint::WindowsEndpoint(
52     const EventEngine::ResolvedAddress& peer_address,
53     std::unique_ptr<WinSocket> socket, MemoryAllocator&& allocator,
54     const EndpointConfig& /* config */, ThreadPool* thread_pool,
55     std::shared_ptr<EventEngine> engine)
56     : peer_address_(peer_address),
57       allocator_(std::move(allocator)),
58       thread_pool_(thread_pool),
59       io_state_(std::make_shared<AsyncIOState>(this, std::move(socket),
60                                                std::move(engine))) {
61   char addr[EventEngine::ResolvedAddress::MAX_SIZE_BYTES];
62   int addr_len = sizeof(addr);
63   if (getsockname(io_state_->socket->raw_socket(),
64                   reinterpret_cast<sockaddr*>(addr), &addr_len) < 0) {
65     grpc_core::Crash(absl::StrFormat(
66         "Unrecoverable error: Failed to get local socket name. %s",
67         GRPC_WSA_ERROR(WSAGetLastError(), "getsockname").ToString().c_str()));
68   }
69   local_address_ =
70       EventEngine::ResolvedAddress(reinterpret_cast<sockaddr*>(addr), addr_len);
71   local_address_string_ = *ResolvedAddressToURI(local_address_);
72   peer_address_string_ = *ResolvedAddressToURI(peer_address_);
73 }
74 
~WindowsEndpoint()75 WindowsEndpoint::~WindowsEndpoint() {
76   io_state_->socket->Shutdown(DEBUG_LOCATION, "~WindowsEndpoint");
77   GRPC_EVENT_ENGINE_ENDPOINT_TRACE("~WindowsEndpoint::%p", this);
78 }
79 
DoTcpRead(SliceBuffer * buffer)80 absl::Status WindowsEndpoint::DoTcpRead(SliceBuffer* buffer) {
81   GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p reading", this);
82   if (io_state_->socket->IsShutdown()) {
83     return absl::UnavailableError("Socket is shutting down.");
84   }
85   // Prepare the WSABUF struct
86   GPR_ASSERT(buffer->Count() <= kMaxWSABUFCount);
87   WSABUF wsa_buffers[kMaxWSABUFCount];
88   for (size_t i = 0; i < buffer->Count(); i++) {
89     auto& slice = buffer->MutableSliceAt(i);
90     wsa_buffers[i].buf = (char*)slice.begin();
91     wsa_buffers[i].len = slice.size();
92   }
93   DWORD bytes_read = 0;
94   DWORD flags = 0;
95   // First try a synchronous, non-blocking read.
96   int status =
97       WSARecv(io_state_->socket->raw_socket(), wsa_buffers,
98               (DWORD)buffer->Count(), &bytes_read, &flags, nullptr, nullptr);
99   int wsa_error = status == 0 ? 0 : WSAGetLastError();
100   if (wsa_error != WSAEWOULDBLOCK) {
101     // Data or some error was returned immediately.
102     io_state_->socket->read_info()->SetResult(
103         {/*wsa_error=*/wsa_error, /*bytes_read=*/bytes_read});
104     thread_pool_->Run(&io_state_->handle_read_event);
105     return absl::OkStatus();
106   }
107   // If the endpoint has already received some data, and the next call would
108   // block, return the data in case that is all the data the reader expects.
109   if (io_state_->handle_read_event.MaybeFinishIfDataHasAlreadyBeenRead()) {
110     return absl::OkStatus();
111   }
112   // Otherwise, let's retry, by queuing a read.
113   status = WSARecv(io_state_->socket->raw_socket(), wsa_buffers,
114                    (DWORD)buffer->Count(), &bytes_read, &flags,
115                    io_state_->socket->read_info()->overlapped(), nullptr);
116   wsa_error = status == 0 ? 0 : WSAGetLastError();
117   if (wsa_error != 0 && wsa_error != WSA_IO_PENDING) {
118     // Async read returned immediately with an error
119     return GRPC_WSA_ERROR(
120         wsa_error,
121         absl::StrFormat("WindowsEndpont::%p Read failed", this).c_str());
122   }
123   io_state_->socket->NotifyOnRead(&io_state_->handle_read_event);
124   return absl::OkStatus();
125 }
126 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)127 bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
128                            SliceBuffer* buffer, const ReadArgs* /* args */) {
129   if (io_state_->socket->IsShutdown()) {
130     thread_pool_->Run([on_read = std::move(on_read)]() mutable {
131       on_read(absl::UnavailableError("Socket is shutting down."));
132     });
133     return false;
134   }
135   buffer->Clear();
136   io_state_->handle_read_event.DonateSpareSlices(buffer);
137   // TODO(hork): sometimes args->read_hint_bytes is 1, which is not useful.
138   // Choose an appropriate size.
139   size_t min_read_size = kDefaultTargetReadSize;
140   if (buffer->Length() < min_read_size && buffer->Count() < kMaxWSABUFCount) {
141     buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size)));
142   }
143   io_state_->handle_read_event.Prime(io_state_, buffer, std::move(on_read));
144   auto status = DoTcpRead(buffer);
145   if (!status.ok()) {
146     // The read could not be completed.
147     io_state_->endpoint->thread_pool_->Run([this, status]() {
148       io_state_->handle_read_event.ExecuteCallbackAndReset(status);
149     });
150   }
151   return false;
152 }
153 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)154 bool WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
155                             SliceBuffer* data, const WriteArgs* /* args */) {
156   GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p writing", this);
157   if (io_state_->socket->IsShutdown()) {
158     thread_pool_->Run([on_writable = std::move(on_writable)]() mutable {
159       on_writable(absl::UnavailableError("Socket is shutting down."));
160     });
161     return false;
162   }
163   if (grpc_event_engine_endpoint_data_trace.enabled()) {
164     for (size_t i = 0; i < data->Count(); i++) {
165       auto str = data->RefSlice(i).as_string_view();
166       gpr_log(GPR_INFO, "WindowsEndpoint::%p WRITE (peer=%s): %.*s", this,
167               peer_address_string_.c_str(), str.length(), str.data());
168     }
169   }
170   GPR_ASSERT(data->Count() <= UINT_MAX);
171   absl::InlinedVector<WSABUF, kMaxWSABUFCount> buffers(data->Count());
172   for (size_t i = 0; i < data->Count(); i++) {
173     auto& slice = data->MutableSliceAt(i);
174     GPR_ASSERT(slice.size() <= ULONG_MAX);
175     buffers[i].len = slice.size();
176     buffers[i].buf = (char*)slice.begin();
177   }
178   // First, let's try a synchronous, non-blocking write.
179   DWORD bytes_sent;
180   int status = WSASend(io_state_->socket->raw_socket(), buffers.data(),
181                        (DWORD)buffers.size(), &bytes_sent, 0, nullptr, nullptr);
182   size_t async_buffers_offset = 0;
183   if (status == 0) {
184     if (bytes_sent == data->Length()) {
185       // Write completed, exiting early
186       thread_pool_->Run(
187           [cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); });
188       return false;
189     }
190     // The data was not completely delivered, we should send the rest of it by
191     // doing an async write operation.
192     for (size_t i = 0; i < data->Count(); i++) {
193       if (buffers[i].len > bytes_sent) {
194         buffers[i].buf += bytes_sent;
195         buffers[i].len -= bytes_sent;
196         break;
197       }
198       bytes_sent -= buffers[i].len;
199       async_buffers_offset++;
200     }
201   } else {
202     // We would kind of expect to get a WSAEWOULDBLOCK here, especially on a
203     // busy connection that has its send queue filled up. But if we don't,
204     // then we can avoid doing an async write operation at all.
205     int wsa_error = WSAGetLastError();
206     if (wsa_error != WSAEWOULDBLOCK) {
207       thread_pool_->Run([cb = std::move(on_writable), wsa_error]() mutable {
208         cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
209       });
210       return false;
211     }
212   }
213   auto write_info = io_state_->socket->write_info();
214   status =
215       WSASend(io_state_->socket->raw_socket(), &buffers[async_buffers_offset],
216               (DWORD)(data->Count() - async_buffers_offset), nullptr, 0,
217               write_info->overlapped(), nullptr);
218   if (status != 0) {
219     int wsa_error = WSAGetLastError();
220     if (wsa_error != WSA_IO_PENDING) {
221       thread_pool_->Run([cb = std::move(on_writable), wsa_error]() mutable {
222         cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
223       });
224       return false;
225     }
226   }
227   // As all is now setup, we can now ask for the IOCP notification. It may
228   // trigger the callback immediately however, but no matter.
229   io_state_->handle_write_event.Prime(io_state_, data, std::move(on_writable));
230   io_state_->socket->NotifyOnWrite(&io_state_->handle_write_event);
231   return false;
232 }
GetPeerAddress() const233 const EventEngine::ResolvedAddress& WindowsEndpoint::GetPeerAddress() const {
234   return peer_address_;
235 }
GetLocalAddress() const236 const EventEngine::ResolvedAddress& WindowsEndpoint::GetLocalAddress() const {
237   return local_address_;
238 }
239 
240 // ---- Handle{Read|Write}Closure ----
241 namespace {
AbortOnEvent(absl::Status)242 void AbortOnEvent(absl::Status) {
243   grpc_core::Crash(
244       "INTERNAL ERROR: Asked to handle read/write event with an invalid "
245       "callback");
246 }
247 }  // namespace
248 
Reset()249 void WindowsEndpoint::HandleReadClosure::Reset() {
250   io_state_.reset();
251   cb_ = &AbortOnEvent;
252   buffer_ = nullptr;
253 }
254 
Reset()255 void WindowsEndpoint::HandleWriteClosure::Reset() {
256   io_state_.reset();
257   cb_ = &AbortOnEvent;
258   buffer_ = nullptr;
259 }
260 
Prime(std::shared_ptr<AsyncIOState> io_state,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> cb)261 void WindowsEndpoint::HandleReadClosure::Prime(
262     std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
263     absl::AnyInvocable<void(absl::Status)> cb) {
264   io_state_ = std::move(io_state);
265   cb_ = std::move(cb);
266   buffer_ = buffer;
267 }
268 
Prime(std::shared_ptr<AsyncIOState> io_state,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> cb)269 void WindowsEndpoint::HandleWriteClosure::Prime(
270     std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
271     absl::AnyInvocable<void(absl::Status)> cb) {
272   io_state_ = std::move(io_state);
273   cb_ = std::move(cb);
274   buffer_ = buffer;
275 }
276 
Run()277 void WindowsEndpoint::HandleReadClosure::Run() {
278   // Deletes the shared_ptr when this closure returns
279   auto io_state = std::move(io_state_);
280   GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Read Event",
281                                    io_state->endpoint);
282   absl::Status status;
283   const auto result = io_state->socket->read_info()->result();
284   if (result.wsa_error != 0) {
285     status = GRPC_WSA_ERROR(result.wsa_error, "Async Read Error");
286     buffer_->Clear();
287     return ExecuteCallbackAndReset(status);
288   }
289   if (result.bytes_transferred == 0) {
290     // Either the endpoint is shut down or we've seen the end of the stream
291     if (grpc_event_engine_endpoint_data_trace.enabled()) {
292       DumpSliceBuffer(
293           buffer_, absl::StrFormat("WindowsEndpoint::%p READ (peer=%s)",
294                                    io_state->endpoint,
295                                    io_state->endpoint->peer_address_string_));
296     }
297     status = absl::UnavailableError("End of TCP stream");
298     grpc_core::StatusSetInt(&status, grpc_core::StatusIntProperty::kRpcStatus,
299                             GRPC_STATUS_UNAVAILABLE);
300     buffer_->Swap(last_read_buffer_);
301     return ExecuteCallbackAndReset(status);
302   }
303   GPR_DEBUG_ASSERT(result.bytes_transferred > 0);
304   GPR_DEBUG_ASSERT(result.bytes_transferred <= buffer_->Length());
305   buffer_->MoveFirstNBytesIntoSliceBuffer(result.bytes_transferred,
306                                           last_read_buffer_);
307   if (buffer_->Length() == 0) {
308     buffer_->Swap(last_read_buffer_);
309     return ExecuteCallbackAndReset(status);
310   }
311   // Doing another read. Let's keep the AsyncIOState alive a bit longer.
312   io_state_ = std::move(io_state);
313   status = io_state_->endpoint->DoTcpRead(buffer_);
314   if (!status.ok()) {
315     io_state_.reset();
316     ExecuteCallbackAndReset(status);
317   }
318 }
319 
MaybeFinishIfDataHasAlreadyBeenRead()320 bool WindowsEndpoint::HandleReadClosure::MaybeFinishIfDataHasAlreadyBeenRead() {
321   if (last_read_buffer_.Length() > 0) {
322     buffer_->Swap(last_read_buffer_);
323     io_state_->endpoint->thread_pool_->Run(
324         [this]() { ExecuteCallbackAndReset(absl::OkStatus()); });
325     return true;
326   }
327   return false;
328 }
329 
ExecuteCallbackAndReset(absl::Status status)330 void WindowsEndpoint::HandleReadClosure::ExecuteCallbackAndReset(
331     absl::Status status) {
332   auto cb = std::move(cb_);
333   Reset();
334   cb(status);
335 }
336 
DonateSpareSlices(SliceBuffer * buffer)337 void WindowsEndpoint::HandleReadClosure::DonateSpareSlices(
338     SliceBuffer* buffer) {
339   // Donee buffer must be empty.
340   GPR_ASSERT(buffer->Length() == 0);
341   // HandleReadClosure must be in the reset state.
342   GPR_ASSERT(buffer_ == nullptr);
343   buffer->Swap(last_read_buffer_);
344 }
345 
Run()346 void WindowsEndpoint::HandleWriteClosure::Run() {
347   // Deletes the shared_ptr when this closure returns
348   auto io_state = std::move(io_state_);
349   GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Write Event",
350                                    io_state->endpoint);
351   auto cb = std::move(cb_);
352   const auto result = io_state->socket->write_info()->result();
353   absl::Status status;
354   if (result.wsa_error != 0) {
355     status = GRPC_WSA_ERROR(result.wsa_error, "WSASend");
356   } else {
357     GPR_ASSERT(result.bytes_transferred == buffer_->Length());
358   }
359   Reset();
360   cb(status);
361 }
362 
363 // ---- AsyncIOState ----
364 
AsyncIOState(WindowsEndpoint * endpoint,std::unique_ptr<WinSocket> socket,std::shared_ptr<EventEngine> engine)365 WindowsEndpoint::AsyncIOState::AsyncIOState(WindowsEndpoint* endpoint,
366                                             std::unique_ptr<WinSocket> socket,
367                                             std::shared_ptr<EventEngine> engine)
368     : endpoint(endpoint),
369       socket(std::move(socket)),
370       engine(std::move(engine)) {}
371 
~AsyncIOState()372 WindowsEndpoint::AsyncIOState::~AsyncIOState() {
373   socket->Shutdown(DEBUG_LOCATION, "~AsyncIOState");
374 }
375 
376 }  // namespace experimental
377 }  // namespace grpc_event_engine
378 
379 #endif  // GPR_WINDOWS
380