1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H
15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H
16 #include <grpc/support/port_platform.h>
17 
18 #ifdef GPR_APPLE
19 
20 #include <CoreFoundation/CoreFoundation.h>
21 
22 #include "absl/strings/str_format.h"
23 
24 #include <grpc/event_engine/event_engine.h>
25 
26 #include "src/core/lib/address_utils/sockaddr_utils.h"
27 #include "src/core/lib/event_engine/cf_engine/cf_engine.h"
28 #include "src/core/lib/event_engine/cf_engine/cftype_unique_ref.h"
29 #include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
30 #include "src/core/lib/event_engine/tcp_socket_utils.h"
31 #include "src/core/lib/gprpp/host_port.h"
32 #include "src/core/lib/gprpp/ref_counted.h"
33 #include "src/core/lib/gprpp/ref_counted_ptr.h"
34 
35 namespace grpc_event_engine {
36 namespace experimental {
37 
38 class CFStreamEndpointImpl
39     : public grpc_core::RefCounted<CFStreamEndpointImpl> {
40  public:
41   CFStreamEndpointImpl(std::shared_ptr<CFEventEngine> engine,
42                        MemoryAllocator memory_allocator);
43   ~CFStreamEndpointImpl();
44 
45   void Shutdown();
46 
47   bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
48             const EventEngine::Endpoint::ReadArgs* args);
49   bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
50              SliceBuffer* data, const EventEngine::Endpoint::WriteArgs* args);
51 
GetPeerAddress()52   const EventEngine::ResolvedAddress& GetPeerAddress() const {
53     return peer_address_;
54   }
GetLocalAddress()55   const EventEngine::ResolvedAddress& GetLocalAddress() const {
56     return local_address_;
57   }
58 
59  public:
60   void Connect(absl::AnyInvocable<void(absl::Status)> on_connect,
61                EventEngine::ResolvedAddress addr);
62   bool CancelConnect(absl::Status status);
63 
64  private:
65   void DoWrite(absl::AnyInvocable<void(absl::Status)> on_writable,
66                SliceBuffer* data);
67   void DoRead(absl::AnyInvocable<void(absl::Status)> on_read,
68               SliceBuffer* buffer);
69 
70  private:
Retain(void * info)71   static void* Retain(void* info) {
72     auto that = static_cast<CFStreamEndpointImpl*>(info);
73     return that->Ref().release();
74   }
75 
Release(void * info)76   static void Release(void* info) {
77     auto that = static_cast<CFStreamEndpointImpl*>(info);
78     that->Unref();
79   }
80 
81   static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type,
82                            void* client_callback_info);
83   static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type,
84                             void* client_callback_info);
85 
86  private:
87   CFTypeUniqueRef<CFReadStreamRef> cf_read_stream_;
88   CFTypeUniqueRef<CFWriteStreamRef> cf_write_stream_;
89 
90   std::shared_ptr<CFEventEngine> engine_;
91 
92   EventEngine::ResolvedAddress peer_address_;
93   EventEngine::ResolvedAddress local_address_;
94   std::string peer_address_string_;
95   std::string local_address_string_;
96   MemoryAllocator memory_allocator_;
97 
98   LockfreeEvent open_event_;
99   LockfreeEvent read_event_;
100   LockfreeEvent write_event_;
101 };
102 
103 class CFStreamEndpoint : public EventEngine::Endpoint {
104  public:
CFStreamEndpoint(std::shared_ptr<CFEventEngine> engine,MemoryAllocator memory_allocator)105   CFStreamEndpoint(std::shared_ptr<CFEventEngine> engine,
106                    MemoryAllocator memory_allocator) {
107     impl_ = grpc_core::MakeRefCounted<CFStreamEndpointImpl>(
108         std::move(engine), std::move(memory_allocator));
109   }
~CFStreamEndpoint()110   ~CFStreamEndpoint() override { impl_->Shutdown(); }
111 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs * args)112   bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
113             const ReadArgs* args) override {
114     return impl_->Read(std::move(on_read), buffer, args);
115   }
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs * args)116   bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
117              SliceBuffer* data, const WriteArgs* args) override {
118     return impl_->Write(std::move(on_writable), data, args);
119   }
120 
GetPeerAddress()121   const EventEngine::ResolvedAddress& GetPeerAddress() const override {
122     return impl_->GetPeerAddress();
123   }
GetLocalAddress()124   const EventEngine::ResolvedAddress& GetLocalAddress() const override {
125     return impl_->GetLocalAddress();
126   }
127 
128  public:
Connect(absl::AnyInvocable<void (absl::Status)> on_connect,EventEngine::ResolvedAddress addr)129   void Connect(absl::AnyInvocable<void(absl::Status)> on_connect,
130                EventEngine::ResolvedAddress addr) {
131     impl_->Connect(std::move(on_connect), std::move(addr));
132   }
CancelConnect(absl::Status status)133   bool CancelConnect(absl::Status status) {
134     return impl_->CancelConnect(std::move(status));
135   }
136 
137  private:
138   grpc_core::RefCountedPtr<CFStreamEndpointImpl> impl_;
139 };
140 
141 }  // namespace experimental
142 }  // namespace grpc_event_engine
143 
144 #endif  // GPR_APPLE
145 
146 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H
147