1 // Copyright 2023 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H 15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H 16 #include <grpc/support/port_platform.h> 17 18 #ifdef GPR_APPLE 19 20 #include <CoreFoundation/CoreFoundation.h> 21 22 #include "absl/strings/str_format.h" 23 24 #include <grpc/event_engine/event_engine.h> 25 26 #include "src/core/lib/address_utils/sockaddr_utils.h" 27 #include "src/core/lib/event_engine/cf_engine/cf_engine.h" 28 #include "src/core/lib/event_engine/cf_engine/cftype_unique_ref.h" 29 #include "src/core/lib/event_engine/posix_engine/lockfree_event.h" 30 #include "src/core/lib/event_engine/tcp_socket_utils.h" 31 #include "src/core/lib/gprpp/host_port.h" 32 #include "src/core/lib/gprpp/ref_counted.h" 33 #include "src/core/lib/gprpp/ref_counted_ptr.h" 34 35 namespace grpc_event_engine { 36 namespace experimental { 37 38 class CFStreamEndpointImpl 39 : public grpc_core::RefCounted<CFStreamEndpointImpl> { 40 public: 41 CFStreamEndpointImpl(std::shared_ptr<CFEventEngine> engine, 42 MemoryAllocator memory_allocator); 43 ~CFStreamEndpointImpl(); 44 45 void Shutdown(); 46 47 bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, 48 const EventEngine::Endpoint::ReadArgs* args); 49 bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, 50 SliceBuffer* data, const EventEngine::Endpoint::WriteArgs* args); 51 GetPeerAddress()52 const EventEngine::ResolvedAddress& GetPeerAddress() const { 53 return peer_address_; 54 } GetLocalAddress()55 const EventEngine::ResolvedAddress& GetLocalAddress() const { 56 return local_address_; 57 } 58 59 public: 60 void Connect(absl::AnyInvocable<void(absl::Status)> on_connect, 61 EventEngine::ResolvedAddress addr); 62 bool CancelConnect(absl::Status status); 63 64 private: 65 void DoWrite(absl::AnyInvocable<void(absl::Status)> on_writable, 66 SliceBuffer* data); 67 void DoRead(absl::AnyInvocable<void(absl::Status)> on_read, 68 SliceBuffer* buffer); 69 70 private: Retain(void * info)71 static void* Retain(void* info) { 72 auto that = static_cast<CFStreamEndpointImpl*>(info); 73 return that->Ref().release(); 74 } 75 Release(void * info)76 static void Release(void* info) { 77 auto that = static_cast<CFStreamEndpointImpl*>(info); 78 that->Unref(); 79 } 80 81 static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type, 82 void* client_callback_info); 83 static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, 84 void* client_callback_info); 85 86 private: 87 CFTypeUniqueRef<CFReadStreamRef> cf_read_stream_; 88 CFTypeUniqueRef<CFWriteStreamRef> cf_write_stream_; 89 90 std::shared_ptr<CFEventEngine> engine_; 91 92 EventEngine::ResolvedAddress peer_address_; 93 EventEngine::ResolvedAddress local_address_; 94 std::string peer_address_string_; 95 std::string local_address_string_; 96 MemoryAllocator memory_allocator_; 97 98 LockfreeEvent open_event_; 99 LockfreeEvent read_event_; 100 LockfreeEvent write_event_; 101 }; 102 103 class CFStreamEndpoint : public EventEngine::Endpoint { 104 public: CFStreamEndpoint(std::shared_ptr<CFEventEngine> engine,MemoryAllocator memory_allocator)105 CFStreamEndpoint(std::shared_ptr<CFEventEngine> engine, 106 MemoryAllocator memory_allocator) { 107 impl_ = grpc_core::MakeRefCounted<CFStreamEndpointImpl>( 108 std::move(engine), std::move(memory_allocator)); 109 } ~CFStreamEndpoint()110 ~CFStreamEndpoint() override { impl_->Shutdown(); } 111 Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs * args)112 bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, 113 const ReadArgs* args) override { 114 return impl_->Read(std::move(on_read), buffer, args); 115 } Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs * args)116 bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, 117 SliceBuffer* data, const WriteArgs* args) override { 118 return impl_->Write(std::move(on_writable), data, args); 119 } 120 GetPeerAddress()121 const EventEngine::ResolvedAddress& GetPeerAddress() const override { 122 return impl_->GetPeerAddress(); 123 } GetLocalAddress()124 const EventEngine::ResolvedAddress& GetLocalAddress() const override { 125 return impl_->GetLocalAddress(); 126 } 127 128 public: Connect(absl::AnyInvocable<void (absl::Status)> on_connect,EventEngine::ResolvedAddress addr)129 void Connect(absl::AnyInvocable<void(absl::Status)> on_connect, 130 EventEngine::ResolvedAddress addr) { 131 impl_->Connect(std::move(on_connect), std::move(addr)); 132 } CancelConnect(absl::Status status)133 bool CancelConnect(absl::Status status) { 134 return impl_->CancelConnect(std::move(status)); 135 } 136 137 private: 138 grpc_core::RefCountedPtr<CFStreamEndpointImpl> impl_; 139 }; 140 141 } // namespace experimental 142 } // namespace grpc_event_engine 143 144 #endif // GPR_APPLE 145 146 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H 147