1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H
16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <functional>
21 #include <string>
22 #include <utility>
23 
24 #include "absl/status/status.h"
25 #include "absl/status/statusor.h"
26 
27 #include <grpc/event_engine/endpoint_config.h>
28 #include <grpc/event_engine/event_engine.h>
29 #include <grpc/grpc.h>
30 #include <grpc/support/log.h>
31 
32 #include "src/core/lib/gprpp/ref_counted_ptr.h"
33 #include "src/core/lib/iomgr/port.h"
34 #include "src/core/lib/iomgr/socket_mutator.h"
35 #include "src/core/lib/resource_quota/resource_quota.h"
36 
37 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
38 #include <sys/socket.h>
39 #endif
40 
41 #ifdef GRPC_LINUX_ERRQUEUE
42 #ifndef SO_ZEROCOPY
43 #define SO_ZEROCOPY 60
44 #endif
45 #ifndef SO_EE_ORIGIN_ZEROCOPY
46 #define SO_EE_ORIGIN_ZEROCOPY 5
47 #endif
48 #endif  // ifdef GRPC_LINUX_ERRQUEUE
49 
50 namespace grpc_event_engine {
51 namespace experimental {
52 
53 struct PosixTcpOptions {
54   static constexpr int kDefaultReadChunkSize = 8192;
55   static constexpr int kDefaultMinReadChunksize = 256;
56   static constexpr int kDefaultMaxReadChunksize = 4 * 1024 * 1024;
57   static constexpr int kZerocpTxEnabledDefault = 0;
58   static constexpr int kMaxChunkSize = 32 * 1024 * 1024;
59   static constexpr int kDefaultMaxSends = 4;
60   static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024;
61   // Let the system decide the proper buffer size.
62   static constexpr int kReadBufferSizeUnset = -1;
63   int tcp_read_chunk_size = kDefaultReadChunkSize;
64   int tcp_min_read_chunk_size = kDefaultMinReadChunksize;
65   int tcp_max_read_chunk_size = kDefaultMaxReadChunksize;
66   int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold;
67   int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends;
68   int tcp_receive_buffer_size = kReadBufferSizeUnset;
69   bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault;
70   int keep_alive_time_ms = 0;
71   int keep_alive_timeout_ms = 0;
72   bool expand_wildcard_addrs = false;
73   bool allow_reuse_port = false;
74   grpc_core::RefCountedPtr<grpc_core::ResourceQuota> resource_quota;
75   struct grpc_socket_mutator* socket_mutator = nullptr;
76   PosixTcpOptions() = default;
77   // Move ctor
PosixTcpOptionsPosixTcpOptions78   PosixTcpOptions(PosixTcpOptions&& other) noexcept {
79     socket_mutator = std::exchange(other.socket_mutator, nullptr);
80     resource_quota = std::move(other.resource_quota);
81     CopyIntegerOptions(other);
82   }
83   // Move assignment
84   PosixTcpOptions& operator=(PosixTcpOptions&& other) noexcept {
85     if (socket_mutator != nullptr) {
86       grpc_socket_mutator_unref(socket_mutator);
87     }
88     socket_mutator = std::exchange(other.socket_mutator, nullptr);
89     resource_quota = std::move(other.resource_quota);
90     CopyIntegerOptions(other);
91     return *this;
92   }
93   // Copy ctor
PosixTcpOptionsPosixTcpOptions94   PosixTcpOptions(const PosixTcpOptions& other) {
95     if (other.socket_mutator != nullptr) {
96       socket_mutator = grpc_socket_mutator_ref(other.socket_mutator);
97     }
98     resource_quota = other.resource_quota;
99     CopyIntegerOptions(other);
100   }
101   // Copy assignment
102   PosixTcpOptions& operator=(const PosixTcpOptions& other) {
103     if (&other == this) {
104       return *this;
105     }
106     if (socket_mutator != nullptr) {
107       grpc_socket_mutator_unref(socket_mutator);
108       socket_mutator = nullptr;
109     }
110     if (other.socket_mutator != nullptr) {
111       socket_mutator = grpc_socket_mutator_ref(other.socket_mutator);
112     }
113     resource_quota = other.resource_quota;
114     CopyIntegerOptions(other);
115     return *this;
116   }
117   // Destructor.
~PosixTcpOptionsPosixTcpOptions118   ~PosixTcpOptions() {
119     if (socket_mutator != nullptr) {
120       grpc_socket_mutator_unref(socket_mutator);
121     }
122   }
123 
124  private:
CopyIntegerOptionsPosixTcpOptions125   void CopyIntegerOptions(const PosixTcpOptions& other) {
126     tcp_read_chunk_size = other.tcp_read_chunk_size;
127     tcp_min_read_chunk_size = other.tcp_min_read_chunk_size;
128     tcp_max_read_chunk_size = other.tcp_max_read_chunk_size;
129     tcp_tx_zerocopy_send_bytes_threshold =
130         other.tcp_tx_zerocopy_send_bytes_threshold;
131     tcp_tx_zerocopy_max_simultaneous_sends =
132         other.tcp_tx_zerocopy_max_simultaneous_sends;
133     tcp_tx_zero_copy_enabled = other.tcp_tx_zero_copy_enabled;
134     keep_alive_time_ms = other.keep_alive_time_ms;
135     keep_alive_timeout_ms = other.keep_alive_timeout_ms;
136     expand_wildcard_addrs = other.expand_wildcard_addrs;
137     allow_reuse_port = other.allow_reuse_port;
138   }
139 };
140 
141 PosixTcpOptions TcpOptionsFromEndpointConfig(
142     const grpc_event_engine::experimental::EndpointConfig& config);
143 
144 // a wrapper for accept or accept4
145 int Accept4(int sockfd,
146             grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
147             int nonblock, int cloexec);
148 
149 // Unlink the path pointed to by the given address if it refers to UDS path.
150 void UnlinkIfUnixDomainSocket(
151     const EventEngine::ResolvedAddress& resolved_addr);
152 
153 class PosixSocketWrapper {
154  public:
PosixSocketWrapper(int fd)155   explicit PosixSocketWrapper(int fd) : fd_(fd) { GPR_ASSERT(fd_ > 0); }
156 
PosixSocketWrapper()157   PosixSocketWrapper() : fd_(-1){};
158 
159   ~PosixSocketWrapper() = default;
160 
161   // Instruct the kernel to wait for specified number of bytes to be received on
162   // the socket before generating an interrupt for packet receive. If the call
163   // succeeds, it returns the number of bytes (wait threshold) that was actually
164   // set.
165   absl::StatusOr<int> SetSocketRcvLowat(int bytes);
166 
167   // Set socket to use zerocopy
168   absl::Status SetSocketZeroCopy();
169 
170   // Set socket to non blocking mode
171   absl::Status SetSocketNonBlocking(int non_blocking);
172 
173   // Set socket to close on exec
174   absl::Status SetSocketCloexec(int close_on_exec);
175 
176   // Set socket to reuse old addresses
177   absl::Status SetSocketReuseAddr(int reuse);
178 
179   // Disable nagle algorithm
180   absl::Status SetSocketLowLatency(int low_latency);
181 
182   // Set SO_REUSEPORT
183   absl::Status SetSocketReusePort(int reuse);
184 
185   // Override default Tcp user timeout values if necessary.
186   void TrySetSocketTcpUserTimeout(const PosixTcpOptions& options,
187                                   bool is_client);
188 
189   // Tries to set SO_NOSIGPIPE if available on this platform.
190   // If SO_NO_SIGPIPE is not available, returns not OK status.
191   absl::Status SetSocketNoSigpipeIfPossible();
192 
193   // Tries to set IP_PKTINFO if available on this platform. If IP_PKTINFO is not
194   // available, returns not OK status.
195   absl::Status SetSocketIpPktInfoIfPossible();
196 
197   // Tries to set IPV6_RECVPKTINFO if available on this platform. If
198   // IPV6_RECVPKTINFO is not available, returns not OK status.
199   absl::Status SetSocketIpv6RecvPktInfoIfPossible();
200 
201   // Tries to set the socket's send buffer to given size.
202   absl::Status SetSocketSndBuf(int buffer_size_bytes);
203 
204   // Tries to set the socket's receive buffer to given size.
205   absl::Status SetSocketRcvBuf(int buffer_size_bytes);
206 
207   // Tries to set the socket using a grpc_socket_mutator
208   absl::Status SetSocketMutator(grpc_fd_usage usage,
209                                 grpc_socket_mutator* mutator);
210 
211   // Extracts the first socket mutator from config if any and applies on the fd.
212   absl::Status ApplySocketMutatorInOptions(grpc_fd_usage usage,
213                                            const PosixTcpOptions& options);
214 
215   // Return LocalAddress as EventEngine::ResolvedAddress
216   absl::StatusOr<EventEngine::ResolvedAddress> LocalAddress();
217 
218   // Return PeerAddress as EventEngine::ResolvedAddress
219   absl::StatusOr<EventEngine::ResolvedAddress> PeerAddress();
220 
221   // Return LocalAddress as string
222   absl::StatusOr<std::string> LocalAddressString();
223 
224   // Return PeerAddress as string
225   absl::StatusOr<std::string> PeerAddressString();
226 
227   // An enum to keep track of IPv4/IPv6 socket modes.
228 
229   // Currently, this information is only used when a socket is first created,
230   // but in the future we may wish to store it alongside the fd.  This would let
231   // calls like sendto() know which family to use without asking the kernel
232   // first.
233   enum DSMode {
234     // Uninitialized, or a non-IP socket.
235     DSMODE_NONE,
236     // AF_INET only.
237     DSMODE_IPV4,
238     // AF_INET6 only, because IPV6_V6ONLY could not be cleared.
239     DSMODE_IPV6,
240     // AF_INET6, which also supports ::ffff-mapped IPv4 addresses.
241     DSMODE_DUALSTACK
242   };
243 
244   // Returns the underlying file-descriptor.
Fd()245   int Fd() const { return fd_; }
246 
247   // Static methods
248 
249   // Configure default values for tcp user timeout to be used by client
250   // and server side sockets.
251   static void ConfigureDefaultTcpUserTimeout(bool enable, int timeout,
252                                              bool is_client);
253 
254   // Return true if SO_REUSEPORT is supported
255   static bool IsSocketReusePortSupported();
256 
257   // Returns true if this system can create AF_INET6 sockets bound to ::1.
258   // The value is probed once, and cached for the life of the process.
259 
260   // This is more restrictive than checking for socket(AF_INET6) to succeed,
261   // because Linux with "net.ipv6.conf.all.disable_ipv6 = 1" is able to create
262   // and bind IPv6 sockets, but cannot connect to a getsockname() of [::]:port
263   // without a valid loopback interface.  Rather than expose this half-broken
264   // state to library users, we turn off IPv6 sockets.
265   static bool IsIpv6LoopbackAvailable();
266 
267   // Creates a new socket for connecting to (or listening on) an address.
268 
269   // If addr is AF_INET6, this creates an IPv6 socket first.  If that fails,
270   // and addr is within ::ffff:0.0.0.0/96, then it automatically falls back to
271   // an IPv4 socket.
272 
273   // If addr is AF_INET, AF_UNIX, or anything else, then this is similar to
274   // calling socket() directly.
275 
276   // Returns an PosixSocketWrapper on success, otherwise returns a not-OK
277   // absl::Status
278 
279   // The dsmode output indicates which address family was actually created.
280   static absl::StatusOr<PosixSocketWrapper> CreateDualStackSocket(
281       std::function<int(int /*domain*/, int /*type*/, int /*protocol*/)>
282           socket_factory,
283       const experimental::EventEngine::ResolvedAddress& addr, int type,
284       int protocol, DSMode& dsmode);
285 
286   struct PosixSocketCreateResult;
287   // Return a PosixSocketCreateResult which manages a configured, unbound,
288   // unconnected TCP client fd.
289   //  options: may contain custom tcp settings for the fd.
290   //  target_addr: the destination address.
291   //
292   // Returns: Not-OK status on error. Otherwise it returns a
293   // PosixSocketWrapper::PosixSocketCreateResult type which includes a sock
294   // of type PosixSocketWrapper and a mapped_target_addr which is
295   // target_addr mapped to an address appropriate to the type of socket FD
296   // created. For example, if target_addr is IPv4 and dual stack sockets are
297   // available, mapped_target_addr will be an IPv4-mapped IPv6 address.
298   //
299   static absl::StatusOr<PosixSocketCreateResult>
300   CreateAndPrepareTcpClientSocket(
301       const PosixTcpOptions& options,
302       const EventEngine::ResolvedAddress& target_addr);
303 
304  private:
305   int fd_;
306 };
307 
308 struct PosixSocketWrapper::PosixSocketCreateResult {
309   PosixSocketWrapper sock;
310   EventEngine::ResolvedAddress mapped_target_addr;
311 };
312 
313 }  // namespace experimental
314 }  // namespace grpc_event_engine
315 
316 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H
317