1 // Copyright 2023 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H 16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <memory> 21 22 #include <grpc/event_engine/event_engine.h> 23 24 #include "src/core/lib/gprpp/sync.h" 25 #include "src/core/lib/iomgr/port.h" 26 27 #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) 28 29 // IWYU pragma: no_include <ares_build.h> 30 31 #include <sys/ioctl.h> 32 #include <sys/socket.h> 33 #include <sys/uio.h> 34 #include <unistd.h> 35 36 #include <string> 37 #include <unordered_set> 38 #include <utility> 39 40 #include <ares.h> 41 42 #include "absl/functional/any_invocable.h" 43 #include "absl/status/status.h" 44 #include "absl/strings/str_cat.h" 45 46 #include "src/core/lib/event_engine/grpc_polled_fd.h" 47 #include "src/core/lib/event_engine/posix_engine/event_poller.h" 48 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" 49 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" 50 51 namespace grpc_event_engine { 52 namespace experimental { 53 54 class GrpcPolledFdPosix : public GrpcPolledFd { 55 public: GrpcPolledFdPosix(ares_socket_t as,EventHandle * handle)56 GrpcPolledFdPosix(ares_socket_t as, EventHandle* handle) 57 : name_(absl::StrCat("c-ares fd: ", static_cast<int>(as))), 58 as_(as), 59 handle_(handle) {} 60 ~GrpcPolledFdPosix()61 ~GrpcPolledFdPosix() override { 62 // c-ares library will close the fd. This fd may be picked up immediately by 63 // another thread and should not be closed by the following OrphanHandle. 64 int phony_release_fd; 65 handle_->OrphanHandle(/*on_done=*/nullptr, &phony_release_fd, 66 "c-ares query finished"); 67 } 68 RegisterForOnReadableLocked(absl::AnyInvocable<void (absl::Status)> read_closure)69 void RegisterForOnReadableLocked( 70 absl::AnyInvocable<void(absl::Status)> read_closure) override { 71 handle_->NotifyOnRead(new PosixEngineClosure(std::move(read_closure), 72 /*is_permanent=*/false)); 73 } 74 RegisterForOnWriteableLocked(absl::AnyInvocable<void (absl::Status)> write_closure)75 void RegisterForOnWriteableLocked( 76 absl::AnyInvocable<void(absl::Status)> write_closure) override { 77 handle_->NotifyOnWrite(new PosixEngineClosure(std::move(write_closure), 78 /*is_permanent=*/false)); 79 } 80 IsFdStillReadableLocked()81 bool IsFdStillReadableLocked() override { 82 size_t bytes_available = 0; 83 return ioctl(handle_->WrappedFd(), FIONREAD, &bytes_available) == 0 && 84 bytes_available > 0; 85 } 86 ShutdownLocked(absl::Status error)87 bool ShutdownLocked(absl::Status error) override { 88 handle_->ShutdownHandle(error); 89 return true; 90 } 91 GetWrappedAresSocketLocked()92 ares_socket_t GetWrappedAresSocketLocked() override { return as_; } 93 GetName()94 const char* GetName() const override { return name_.c_str(); } 95 96 private: 97 const std::string name_; 98 const ares_socket_t as_; 99 EventHandle* handle_; 100 }; 101 102 class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory { 103 public: GrpcPolledFdFactoryPosix(PosixEventPoller * poller)104 explicit GrpcPolledFdFactoryPosix(PosixEventPoller* poller) 105 : poller_(poller) {} 106 ~GrpcPolledFdFactoryPosix()107 ~GrpcPolledFdFactoryPosix() override { 108 for (auto& fd : owned_fds_) { 109 close(fd); 110 } 111 } 112 Initialize(grpc_core::Mutex *,EventEngine *)113 void Initialize(grpc_core::Mutex*, EventEngine*) override {} 114 NewGrpcPolledFdLocked(ares_socket_t as)115 std::unique_ptr<GrpcPolledFd> NewGrpcPolledFdLocked( 116 ares_socket_t as) override { 117 owned_fds_.insert(as); 118 return std::make_unique<GrpcPolledFdPosix>( 119 as, 120 poller_->CreateHandle(as, "c-ares socket", poller_->CanTrackErrors())); 121 } 122 ConfigureAresChannelLocked(ares_channel channel)123 void ConfigureAresChannelLocked(ares_channel channel) override { 124 ares_set_socket_functions(channel, &kSockFuncs, this); 125 ares_set_socket_configure_callback( 126 channel, &GrpcPolledFdFactoryPosix::ConfigureSocket, nullptr); 127 } 128 129 private: 130 /// Overridden socket API for c-ares Socket(int af,int type,int protocol,void *)131 static ares_socket_t Socket(int af, int type, int protocol, 132 void* /*user_data*/) { 133 return socket(af, type, protocol); 134 } 135 136 /// Overridden connect API for c-ares Connect(ares_socket_t as,const struct sockaddr * target,ares_socklen_t target_len,void *)137 static int Connect(ares_socket_t as, const struct sockaddr* target, 138 ares_socklen_t target_len, void* /*user_data*/) { 139 return connect(as, target, target_len); 140 } 141 142 /// Overridden writev API for c-ares WriteV(ares_socket_t as,const struct iovec * iov,int iovec_count,void *)143 static ares_ssize_t WriteV(ares_socket_t as, const struct iovec* iov, 144 int iovec_count, void* /*user_data*/) { 145 return writev(as, iov, iovec_count); 146 } 147 148 /// Overridden recvfrom API for c-ares RecvFrom(ares_socket_t as,void * data,size_t data_len,int flags,struct sockaddr * from,ares_socklen_t * from_len,void *)149 static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len, 150 int flags, struct sockaddr* from, 151 ares_socklen_t* from_len, void* /*user_data*/) { 152 return recvfrom(as, data, data_len, flags, from, from_len); 153 } 154 155 /// Overridden close API for c-ares Close(ares_socket_t as,void * user_data)156 static int Close(ares_socket_t as, void* user_data) { 157 GrpcPolledFdFactoryPosix* self = 158 static_cast<GrpcPolledFdFactoryPosix*>(user_data); 159 if (self->owned_fds_.find(as) == self->owned_fds_.end()) { 160 // c-ares owns this fd, grpc has never seen it 161 return close(as); 162 } 163 return 0; 164 } 165 166 /// Because we're using socket API overrides, c-ares won't 167 /// perform its typical configuration on the socket. See 168 /// https://github.com/c-ares/c-ares/blob/bad62225b7f6b278b92e8e85a255600b629ef517/src/lib/ares_process.c#L1018. 169 /// So we use the configure socket callback override and copy default 170 /// settings that c-ares would normally apply on posix platforms: 171 /// - non-blocking 172 /// - cloexec flag 173 /// - disable nagle ConfigureSocket(ares_socket_t fd,int type,void *)174 static int ConfigureSocket(ares_socket_t fd, int type, void* /*user_data*/) { 175 // clang-format off 176 #define RETURN_IF_ERROR(expr) if (!(expr).ok()) { return -1; } 177 // clang-format on 178 PosixSocketWrapper sock(fd); 179 RETURN_IF_ERROR(sock.SetSocketNonBlocking(1)); 180 RETURN_IF_ERROR(sock.SetSocketCloexec(1)); 181 if (type == SOCK_STREAM) { 182 RETURN_IF_ERROR(sock.SetSocketLowLatency(1)); 183 } 184 return 0; 185 } 186 187 const struct ares_socket_functions kSockFuncs = { 188 &GrpcPolledFdFactoryPosix::Socket /* socket */, 189 &GrpcPolledFdFactoryPosix::Close /* close */, 190 &GrpcPolledFdFactoryPosix::Connect /* connect */, 191 &GrpcPolledFdFactoryPosix::RecvFrom /* recvfrom */, 192 &GrpcPolledFdFactoryPosix::WriteV /* writev */, 193 }; 194 195 PosixEventPoller* poller_; 196 // fds that are used/owned by grpc - we (grpc) will close them rather than 197 // c-ares 198 std::unordered_set<ares_socket_t> owned_fds_; 199 }; 200 201 } // namespace experimental 202 } // namespace grpc_event_engine 203 204 #endif // GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) 205 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H 206