1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H 15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H 16 #include <grpc/support/port_platform.h> 17 18 #ifdef GPR_WINDOWS 19 20 #include <grpc/event_engine/event_engine.h> 21 22 #include "src/core/lib/event_engine/thread_pool/thread_pool.h" 23 #include "src/core/lib/event_engine/windows/win_socket.h" 24 25 namespace grpc_event_engine { 26 namespace experimental { 27 28 class WindowsEndpoint : public EventEngine::Endpoint { 29 public: 30 WindowsEndpoint(const EventEngine::ResolvedAddress& peer_address, 31 std::unique_ptr<WinSocket> socket, 32 MemoryAllocator&& allocator, const EndpointConfig& config, 33 ThreadPool* thread_pool, std::shared_ptr<EventEngine> engine); 34 ~WindowsEndpoint() override; 35 bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, 36 const ReadArgs* args) override; 37 bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, 38 SliceBuffer* data, const WriteArgs* args) override; 39 const EventEngine::ResolvedAddress& GetPeerAddress() const override; 40 const EventEngine::ResolvedAddress& GetLocalAddress() const override; 41 42 private: 43 struct AsyncIOState; 44 45 // Permanent closure type for Read callbacks 46 class HandleReadClosure : public EventEngine::Closure { 47 public: 48 void Run() override; 49 void Prime(std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer, 50 absl::AnyInvocable<void(absl::Status)> cb); 51 // Resets the per-request data 52 void Reset(); 53 // Run the callback with whatever data is available, and reset state. 54 // 55 // Returns true if the callback has been called with some data. Returns 56 // false if no data has been read. 57 bool MaybeFinishIfDataHasAlreadyBeenRead(); 58 // Execute the callback and reset. 59 void ExecuteCallbackAndReset(absl::Status status); 60 // Swap any leftover slices into the provided buffer 61 void DonateSpareSlices(SliceBuffer* buffer); 62 63 private: 64 std::shared_ptr<AsyncIOState> io_state_; 65 absl::AnyInvocable<void(absl::Status)> cb_; 66 SliceBuffer* buffer_ = nullptr; 67 SliceBuffer last_read_buffer_; 68 }; 69 70 // Permanent closure type for Write callbacks 71 class HandleWriteClosure : public EventEngine::Closure { 72 public: 73 void Run() override; 74 void Prime(std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer, 75 absl::AnyInvocable<void(absl::Status)> cb); 76 // Resets the per-request data 77 void Reset(); 78 79 private: 80 std::shared_ptr<AsyncIOState> io_state_; 81 absl::AnyInvocable<void(absl::Status)> cb_; 82 SliceBuffer* buffer_ = nullptr; 83 }; 84 85 // A class to manage the data that must outlive the Endpoint. 86 // 87 // Once an endpoint is done and destroyed, there still may be overlapped 88 // operations pending. To clean up safely, this data must outlive the 89 // Endpoint, and be destroyed asynchronously when all pending overlapped 90 // events are complete. 91 struct AsyncIOState { 92 AsyncIOState(WindowsEndpoint* endpoint, std::unique_ptr<WinSocket> socket, 93 std::shared_ptr<EventEngine> engine); 94 ~AsyncIOState(); 95 WindowsEndpoint* const endpoint; 96 std::unique_ptr<WinSocket> socket; 97 HandleReadClosure handle_read_event; 98 HandleWriteClosure handle_write_event; 99 std::shared_ptr<EventEngine> engine; 100 }; 101 102 // Perform the low-level calls and execute the HandleReadClosure 103 // asynchronously. 104 absl::Status DoTcpRead(SliceBuffer* buffer); 105 106 EventEngine::ResolvedAddress peer_address_; 107 std::string peer_address_string_; 108 EventEngine::ResolvedAddress local_address_; 109 std::string local_address_string_; 110 MemoryAllocator allocator_; 111 ThreadPool* thread_pool_; 112 std::shared_ptr<AsyncIOState> io_state_; 113 }; 114 115 } // namespace experimental 116 } // namespace grpc_event_engine 117 118 #endif 119 120 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H 121