1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H
15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H
16 #include <grpc/support/port_platform.h>
17 
18 #ifdef GPR_WINDOWS
19 
20 #include <grpc/event_engine/event_engine.h>
21 
22 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
23 #include "src/core/lib/event_engine/windows/win_socket.h"
24 
25 namespace grpc_event_engine {
26 namespace experimental {
27 
28 class WindowsEndpoint : public EventEngine::Endpoint {
29  public:
30   WindowsEndpoint(const EventEngine::ResolvedAddress& peer_address,
31                   std::unique_ptr<WinSocket> socket,
32                   MemoryAllocator&& allocator, const EndpointConfig& config,
33                   ThreadPool* thread_pool, std::shared_ptr<EventEngine> engine);
34   ~WindowsEndpoint() override;
35   bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
36             const ReadArgs* args) override;
37   bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
38              SliceBuffer* data, const WriteArgs* args) override;
39   const EventEngine::ResolvedAddress& GetPeerAddress() const override;
40   const EventEngine::ResolvedAddress& GetLocalAddress() const override;
41 
42  private:
43   struct AsyncIOState;
44 
45   // Permanent closure type for Read callbacks
46   class HandleReadClosure : public EventEngine::Closure {
47    public:
48     void Run() override;
49     void Prime(std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
50                absl::AnyInvocable<void(absl::Status)> cb);
51     // Resets the per-request data
52     void Reset();
53     // Run the callback with whatever data is available, and reset state.
54     //
55     // Returns true if the callback has been called with some data. Returns
56     // false if no data has been read.
57     bool MaybeFinishIfDataHasAlreadyBeenRead();
58     // Execute the callback and reset.
59     void ExecuteCallbackAndReset(absl::Status status);
60     // Swap any leftover slices into the provided buffer
61     void DonateSpareSlices(SliceBuffer* buffer);
62 
63    private:
64     std::shared_ptr<AsyncIOState> io_state_;
65     absl::AnyInvocable<void(absl::Status)> cb_;
66     SliceBuffer* buffer_ = nullptr;
67     SliceBuffer last_read_buffer_;
68   };
69 
70   // Permanent closure type for Write callbacks
71   class HandleWriteClosure : public EventEngine::Closure {
72    public:
73     void Run() override;
74     void Prime(std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
75                absl::AnyInvocable<void(absl::Status)> cb);
76     // Resets the per-request data
77     void Reset();
78 
79    private:
80     std::shared_ptr<AsyncIOState> io_state_;
81     absl::AnyInvocable<void(absl::Status)> cb_;
82     SliceBuffer* buffer_ = nullptr;
83   };
84 
85   // A class to manage the data that must outlive the Endpoint.
86   //
87   // Once an endpoint is done and destroyed, there still may be overlapped
88   // operations pending. To clean up safely, this data must outlive the
89   // Endpoint, and be destroyed asynchronously when all pending overlapped
90   // events are complete.
91   struct AsyncIOState {
92     AsyncIOState(WindowsEndpoint* endpoint, std::unique_ptr<WinSocket> socket,
93                  std::shared_ptr<EventEngine> engine);
94     ~AsyncIOState();
95     WindowsEndpoint* const endpoint;
96     std::unique_ptr<WinSocket> socket;
97     HandleReadClosure handle_read_event;
98     HandleWriteClosure handle_write_event;
99     std::shared_ptr<EventEngine> engine;
100   };
101 
102   // Perform the low-level calls and execute the HandleReadClosure
103   // asynchronously.
104   absl::Status DoTcpRead(SliceBuffer* buffer);
105 
106   EventEngine::ResolvedAddress peer_address_;
107   std::string peer_address_string_;
108   EventEngine::ResolvedAddress local_address_;
109   std::string local_address_string_;
110   MemoryAllocator allocator_;
111   ThreadPool* thread_pool_;
112   std::shared_ptr<AsyncIOState> io_state_;
113 };
114 
115 }  // namespace experimental
116 }  // namespace grpc_event_engine
117 
118 #endif
119 
120 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H
121