xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H
16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <memory>
21 
22 #include <grpc/event_engine/event_engine.h>
23 
24 #include "src/core/lib/gprpp/sync.h"
25 #include "src/core/lib/iomgr/port.h"
26 
27 #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
28 
29 // IWYU pragma: no_include <ares_build.h>
30 
31 #include <sys/ioctl.h>
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <unistd.h>
35 
36 #include <string>
37 #include <unordered_set>
38 #include <utility>
39 
40 #include <ares.h>
41 
42 #include "absl/functional/any_invocable.h"
43 #include "absl/status/status.h"
44 #include "absl/strings/str_cat.h"
45 
46 #include "src/core/lib/event_engine/grpc_polled_fd.h"
47 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
48 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
49 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
50 
51 namespace grpc_event_engine {
52 namespace experimental {
53 
54 class GrpcPolledFdPosix : public GrpcPolledFd {
55  public:
GrpcPolledFdPosix(ares_socket_t as,EventHandle * handle)56   GrpcPolledFdPosix(ares_socket_t as, EventHandle* handle)
57       : name_(absl::StrCat("c-ares fd: ", static_cast<int>(as))),
58         as_(as),
59         handle_(handle) {}
60 
~GrpcPolledFdPosix()61   ~GrpcPolledFdPosix() override {
62     // c-ares library will close the fd. This fd may be picked up immediately by
63     // another thread and should not be closed by the following OrphanHandle.
64     int phony_release_fd;
65     handle_->OrphanHandle(/*on_done=*/nullptr, &phony_release_fd,
66                           "c-ares query finished");
67   }
68 
RegisterForOnReadableLocked(absl::AnyInvocable<void (absl::Status)> read_closure)69   void RegisterForOnReadableLocked(
70       absl::AnyInvocable<void(absl::Status)> read_closure) override {
71     handle_->NotifyOnRead(new PosixEngineClosure(std::move(read_closure),
72                                                  /*is_permanent=*/false));
73   }
74 
RegisterForOnWriteableLocked(absl::AnyInvocable<void (absl::Status)> write_closure)75   void RegisterForOnWriteableLocked(
76       absl::AnyInvocable<void(absl::Status)> write_closure) override {
77     handle_->NotifyOnWrite(new PosixEngineClosure(std::move(write_closure),
78                                                   /*is_permanent=*/false));
79   }
80 
IsFdStillReadableLocked()81   bool IsFdStillReadableLocked() override {
82     size_t bytes_available = 0;
83     return ioctl(handle_->WrappedFd(), FIONREAD, &bytes_available) == 0 &&
84            bytes_available > 0;
85   }
86 
ShutdownLocked(absl::Status error)87   bool ShutdownLocked(absl::Status error) override {
88     handle_->ShutdownHandle(error);
89     return true;
90   }
91 
GetWrappedAresSocketLocked()92   ares_socket_t GetWrappedAresSocketLocked() override { return as_; }
93 
GetName()94   const char* GetName() const override { return name_.c_str(); }
95 
96  private:
97   const std::string name_;
98   const ares_socket_t as_;
99   EventHandle* handle_;
100 };
101 
102 class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
103  public:
GrpcPolledFdFactoryPosix(PosixEventPoller * poller)104   explicit GrpcPolledFdFactoryPosix(PosixEventPoller* poller)
105       : poller_(poller) {}
106 
~GrpcPolledFdFactoryPosix()107   ~GrpcPolledFdFactoryPosix() override {
108     for (auto& fd : owned_fds_) {
109       close(fd);
110     }
111   }
112 
Initialize(grpc_core::Mutex *,EventEngine *)113   void Initialize(grpc_core::Mutex*, EventEngine*) override {}
114 
NewGrpcPolledFdLocked(ares_socket_t as)115   std::unique_ptr<GrpcPolledFd> NewGrpcPolledFdLocked(
116       ares_socket_t as) override {
117     owned_fds_.insert(as);
118     return std::make_unique<GrpcPolledFdPosix>(
119         as,
120         poller_->CreateHandle(as, "c-ares socket", poller_->CanTrackErrors()));
121   }
122 
ConfigureAresChannelLocked(ares_channel channel)123   void ConfigureAresChannelLocked(ares_channel channel) override {
124     ares_set_socket_functions(channel, &kSockFuncs, this);
125     ares_set_socket_configure_callback(
126         channel, &GrpcPolledFdFactoryPosix::ConfigureSocket, nullptr);
127   }
128 
129  private:
130   /// Overridden socket API for c-ares
Socket(int af,int type,int protocol,void *)131   static ares_socket_t Socket(int af, int type, int protocol,
132                               void* /*user_data*/) {
133     return socket(af, type, protocol);
134   }
135 
136   /// Overridden connect API for c-ares
Connect(ares_socket_t as,const struct sockaddr * target,ares_socklen_t target_len,void *)137   static int Connect(ares_socket_t as, const struct sockaddr* target,
138                      ares_socklen_t target_len, void* /*user_data*/) {
139     return connect(as, target, target_len);
140   }
141 
142   /// Overridden writev API for c-ares
WriteV(ares_socket_t as,const struct iovec * iov,int iovec_count,void *)143   static ares_ssize_t WriteV(ares_socket_t as, const struct iovec* iov,
144                              int iovec_count, void* /*user_data*/) {
145     return writev(as, iov, iovec_count);
146   }
147 
148   /// Overridden recvfrom API for c-ares
RecvFrom(ares_socket_t as,void * data,size_t data_len,int flags,struct sockaddr * from,ares_socklen_t * from_len,void *)149   static ares_ssize_t RecvFrom(ares_socket_t as, void* data, size_t data_len,
150                                int flags, struct sockaddr* from,
151                                ares_socklen_t* from_len, void* /*user_data*/) {
152     return recvfrom(as, data, data_len, flags, from, from_len);
153   }
154 
155   /// Overridden close API for c-ares
Close(ares_socket_t as,void * user_data)156   static int Close(ares_socket_t as, void* user_data) {
157     GrpcPolledFdFactoryPosix* self =
158         static_cast<GrpcPolledFdFactoryPosix*>(user_data);
159     if (self->owned_fds_.find(as) == self->owned_fds_.end()) {
160       // c-ares owns this fd, grpc has never seen it
161       return close(as);
162     }
163     return 0;
164   }
165 
166   /// Because we're using socket API overrides, c-ares won't
167   /// perform its typical configuration on the socket. See
168   /// https://github.com/c-ares/c-ares/blob/bad62225b7f6b278b92e8e85a255600b629ef517/src/lib/ares_process.c#L1018.
169   /// So we use the configure socket callback override and copy default
170   /// settings that c-ares would normally apply on posix platforms:
171   ///   - non-blocking
172   ///   - cloexec flag
173   ///   - disable nagle
ConfigureSocket(ares_socket_t fd,int type,void *)174   static int ConfigureSocket(ares_socket_t fd, int type, void* /*user_data*/) {
175     // clang-format off
176 #define RETURN_IF_ERROR(expr) if (!(expr).ok()) { return -1; }
177     // clang-format on
178     PosixSocketWrapper sock(fd);
179     RETURN_IF_ERROR(sock.SetSocketNonBlocking(1));
180     RETURN_IF_ERROR(sock.SetSocketCloexec(1));
181     if (type == SOCK_STREAM) {
182       RETURN_IF_ERROR(sock.SetSocketLowLatency(1));
183     }
184     return 0;
185   }
186 
187   const struct ares_socket_functions kSockFuncs = {
188       &GrpcPolledFdFactoryPosix::Socket /* socket */,
189       &GrpcPolledFdFactoryPosix::Close /* close */,
190       &GrpcPolledFdFactoryPosix::Connect /* connect */,
191       &GrpcPolledFdFactoryPosix::RecvFrom /* recvfrom */,
192       &GrpcPolledFdFactoryPosix::WriteV /* writev */,
193   };
194 
195   PosixEventPoller* poller_;
196   // fds that are used/owned by grpc - we (grpc) will close them rather than
197   // c-ares
198   std::unordered_set<ares_socket_t> owned_fds_;
199 };
200 
201 }  // namespace experimental
202 }  // namespace grpc_event_engine
203 
204 #endif  // GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
205 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_GRPC_POLLED_FD_POSIX_H
206