1 // Copyright 2022 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H 16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <functional> 21 #include <string> 22 #include <utility> 23 24 #include "absl/status/status.h" 25 #include "absl/status/statusor.h" 26 27 #include <grpc/event_engine/endpoint_config.h> 28 #include <grpc/event_engine/event_engine.h> 29 #include <grpc/grpc.h> 30 #include <grpc/support/log.h> 31 32 #include "src/core/lib/gprpp/ref_counted_ptr.h" 33 #include "src/core/lib/iomgr/port.h" 34 #include "src/core/lib/iomgr/socket_mutator.h" 35 #include "src/core/lib/resource_quota/resource_quota.h" 36 37 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON 38 #include <sys/socket.h> 39 #endif 40 41 #ifdef GRPC_LINUX_ERRQUEUE 42 #ifndef SO_ZEROCOPY 43 #define SO_ZEROCOPY 60 44 #endif 45 #ifndef SO_EE_ORIGIN_ZEROCOPY 46 #define SO_EE_ORIGIN_ZEROCOPY 5 47 #endif 48 #endif // ifdef GRPC_LINUX_ERRQUEUE 49 50 namespace grpc_event_engine { 51 namespace experimental { 52 53 struct PosixTcpOptions { 54 static constexpr int kDefaultReadChunkSize = 8192; 55 static constexpr int kDefaultMinReadChunksize = 256; 56 static constexpr int kDefaultMaxReadChunksize = 4 * 1024 * 1024; 57 static constexpr int kZerocpTxEnabledDefault = 0; 58 static constexpr int kMaxChunkSize = 32 * 1024 * 1024; 59 static constexpr int kDefaultMaxSends = 4; 60 static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; 61 // Let the system decide the proper buffer size. 62 static constexpr int kReadBufferSizeUnset = -1; 63 int tcp_read_chunk_size = kDefaultReadChunkSize; 64 int tcp_min_read_chunk_size = kDefaultMinReadChunksize; 65 int tcp_max_read_chunk_size = kDefaultMaxReadChunksize; 66 int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold; 67 int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends; 68 int tcp_receive_buffer_size = kReadBufferSizeUnset; 69 bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault; 70 int keep_alive_time_ms = 0; 71 int keep_alive_timeout_ms = 0; 72 bool expand_wildcard_addrs = false; 73 bool allow_reuse_port = false; 74 grpc_core::RefCountedPtr<grpc_core::ResourceQuota> resource_quota; 75 struct grpc_socket_mutator* socket_mutator = nullptr; 76 PosixTcpOptions() = default; 77 // Move ctor PosixTcpOptionsPosixTcpOptions78 PosixTcpOptions(PosixTcpOptions&& other) noexcept { 79 socket_mutator = std::exchange(other.socket_mutator, nullptr); 80 resource_quota = std::move(other.resource_quota); 81 CopyIntegerOptions(other); 82 } 83 // Move assignment 84 PosixTcpOptions& operator=(PosixTcpOptions&& other) noexcept { 85 if (socket_mutator != nullptr) { 86 grpc_socket_mutator_unref(socket_mutator); 87 } 88 socket_mutator = std::exchange(other.socket_mutator, nullptr); 89 resource_quota = std::move(other.resource_quota); 90 CopyIntegerOptions(other); 91 return *this; 92 } 93 // Copy ctor PosixTcpOptionsPosixTcpOptions94 PosixTcpOptions(const PosixTcpOptions& other) { 95 if (other.socket_mutator != nullptr) { 96 socket_mutator = grpc_socket_mutator_ref(other.socket_mutator); 97 } 98 resource_quota = other.resource_quota; 99 CopyIntegerOptions(other); 100 } 101 // Copy assignment 102 PosixTcpOptions& operator=(const PosixTcpOptions& other) { 103 if (&other == this) { 104 return *this; 105 } 106 if (socket_mutator != nullptr) { 107 grpc_socket_mutator_unref(socket_mutator); 108 socket_mutator = nullptr; 109 } 110 if (other.socket_mutator != nullptr) { 111 socket_mutator = grpc_socket_mutator_ref(other.socket_mutator); 112 } 113 resource_quota = other.resource_quota; 114 CopyIntegerOptions(other); 115 return *this; 116 } 117 // Destructor. ~PosixTcpOptionsPosixTcpOptions118 ~PosixTcpOptions() { 119 if (socket_mutator != nullptr) { 120 grpc_socket_mutator_unref(socket_mutator); 121 } 122 } 123 124 private: CopyIntegerOptionsPosixTcpOptions125 void CopyIntegerOptions(const PosixTcpOptions& other) { 126 tcp_read_chunk_size = other.tcp_read_chunk_size; 127 tcp_min_read_chunk_size = other.tcp_min_read_chunk_size; 128 tcp_max_read_chunk_size = other.tcp_max_read_chunk_size; 129 tcp_tx_zerocopy_send_bytes_threshold = 130 other.tcp_tx_zerocopy_send_bytes_threshold; 131 tcp_tx_zerocopy_max_simultaneous_sends = 132 other.tcp_tx_zerocopy_max_simultaneous_sends; 133 tcp_tx_zero_copy_enabled = other.tcp_tx_zero_copy_enabled; 134 keep_alive_time_ms = other.keep_alive_time_ms; 135 keep_alive_timeout_ms = other.keep_alive_timeout_ms; 136 expand_wildcard_addrs = other.expand_wildcard_addrs; 137 allow_reuse_port = other.allow_reuse_port; 138 } 139 }; 140 141 PosixTcpOptions TcpOptionsFromEndpointConfig( 142 const grpc_event_engine::experimental::EndpointConfig& config); 143 144 // a wrapper for accept or accept4 145 int Accept4(int sockfd, 146 grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr, 147 int nonblock, int cloexec); 148 149 // Unlink the path pointed to by the given address if it refers to UDS path. 150 void UnlinkIfUnixDomainSocket( 151 const EventEngine::ResolvedAddress& resolved_addr); 152 153 class PosixSocketWrapper { 154 public: PosixSocketWrapper(int fd)155 explicit PosixSocketWrapper(int fd) : fd_(fd) { GPR_ASSERT(fd_ > 0); } 156 PosixSocketWrapper()157 PosixSocketWrapper() : fd_(-1){}; 158 159 ~PosixSocketWrapper() = default; 160 161 // Instruct the kernel to wait for specified number of bytes to be received on 162 // the socket before generating an interrupt for packet receive. If the call 163 // succeeds, it returns the number of bytes (wait threshold) that was actually 164 // set. 165 absl::StatusOr<int> SetSocketRcvLowat(int bytes); 166 167 // Set socket to use zerocopy 168 absl::Status SetSocketZeroCopy(); 169 170 // Set socket to non blocking mode 171 absl::Status SetSocketNonBlocking(int non_blocking); 172 173 // Set socket to close on exec 174 absl::Status SetSocketCloexec(int close_on_exec); 175 176 // Set socket to reuse old addresses 177 absl::Status SetSocketReuseAddr(int reuse); 178 179 // Disable nagle algorithm 180 absl::Status SetSocketLowLatency(int low_latency); 181 182 // Set SO_REUSEPORT 183 absl::Status SetSocketReusePort(int reuse); 184 185 // Override default Tcp user timeout values if necessary. 186 void TrySetSocketTcpUserTimeout(const PosixTcpOptions& options, 187 bool is_client); 188 189 // Tries to set SO_NOSIGPIPE if available on this platform. 190 // If SO_NO_SIGPIPE is not available, returns not OK status. 191 absl::Status SetSocketNoSigpipeIfPossible(); 192 193 // Tries to set IP_PKTINFO if available on this platform. If IP_PKTINFO is not 194 // available, returns not OK status. 195 absl::Status SetSocketIpPktInfoIfPossible(); 196 197 // Tries to set IPV6_RECVPKTINFO if available on this platform. If 198 // IPV6_RECVPKTINFO is not available, returns not OK status. 199 absl::Status SetSocketIpv6RecvPktInfoIfPossible(); 200 201 // Tries to set the socket's send buffer to given size. 202 absl::Status SetSocketSndBuf(int buffer_size_bytes); 203 204 // Tries to set the socket's receive buffer to given size. 205 absl::Status SetSocketRcvBuf(int buffer_size_bytes); 206 207 // Tries to set the socket using a grpc_socket_mutator 208 absl::Status SetSocketMutator(grpc_fd_usage usage, 209 grpc_socket_mutator* mutator); 210 211 // Extracts the first socket mutator from config if any and applies on the fd. 212 absl::Status ApplySocketMutatorInOptions(grpc_fd_usage usage, 213 const PosixTcpOptions& options); 214 215 // Return LocalAddress as EventEngine::ResolvedAddress 216 absl::StatusOr<EventEngine::ResolvedAddress> LocalAddress(); 217 218 // Return PeerAddress as EventEngine::ResolvedAddress 219 absl::StatusOr<EventEngine::ResolvedAddress> PeerAddress(); 220 221 // Return LocalAddress as string 222 absl::StatusOr<std::string> LocalAddressString(); 223 224 // Return PeerAddress as string 225 absl::StatusOr<std::string> PeerAddressString(); 226 227 // An enum to keep track of IPv4/IPv6 socket modes. 228 229 // Currently, this information is only used when a socket is first created, 230 // but in the future we may wish to store it alongside the fd. This would let 231 // calls like sendto() know which family to use without asking the kernel 232 // first. 233 enum DSMode { 234 // Uninitialized, or a non-IP socket. 235 DSMODE_NONE, 236 // AF_INET only. 237 DSMODE_IPV4, 238 // AF_INET6 only, because IPV6_V6ONLY could not be cleared. 239 DSMODE_IPV6, 240 // AF_INET6, which also supports ::ffff-mapped IPv4 addresses. 241 DSMODE_DUALSTACK 242 }; 243 244 // Returns the underlying file-descriptor. Fd()245 int Fd() const { return fd_; } 246 247 // Static methods 248 249 // Configure default values for tcp user timeout to be used by client 250 // and server side sockets. 251 static void ConfigureDefaultTcpUserTimeout(bool enable, int timeout, 252 bool is_client); 253 254 // Return true if SO_REUSEPORT is supported 255 static bool IsSocketReusePortSupported(); 256 257 // Returns true if this system can create AF_INET6 sockets bound to ::1. 258 // The value is probed once, and cached for the life of the process. 259 260 // This is more restrictive than checking for socket(AF_INET6) to succeed, 261 // because Linux with "net.ipv6.conf.all.disable_ipv6 = 1" is able to create 262 // and bind IPv6 sockets, but cannot connect to a getsockname() of [::]:port 263 // without a valid loopback interface. Rather than expose this half-broken 264 // state to library users, we turn off IPv6 sockets. 265 static bool IsIpv6LoopbackAvailable(); 266 267 // Creates a new socket for connecting to (or listening on) an address. 268 269 // If addr is AF_INET6, this creates an IPv6 socket first. If that fails, 270 // and addr is within ::ffff:0.0.0.0/96, then it automatically falls back to 271 // an IPv4 socket. 272 273 // If addr is AF_INET, AF_UNIX, or anything else, then this is similar to 274 // calling socket() directly. 275 276 // Returns an PosixSocketWrapper on success, otherwise returns a not-OK 277 // absl::Status 278 279 // The dsmode output indicates which address family was actually created. 280 static absl::StatusOr<PosixSocketWrapper> CreateDualStackSocket( 281 std::function<int(int /*domain*/, int /*type*/, int /*protocol*/)> 282 socket_factory, 283 const experimental::EventEngine::ResolvedAddress& addr, int type, 284 int protocol, DSMode& dsmode); 285 286 struct PosixSocketCreateResult; 287 // Return a PosixSocketCreateResult which manages a configured, unbound, 288 // unconnected TCP client fd. 289 // options: may contain custom tcp settings for the fd. 290 // target_addr: the destination address. 291 // 292 // Returns: Not-OK status on error. Otherwise it returns a 293 // PosixSocketWrapper::PosixSocketCreateResult type which includes a sock 294 // of type PosixSocketWrapper and a mapped_target_addr which is 295 // target_addr mapped to an address appropriate to the type of socket FD 296 // created. For example, if target_addr is IPv4 and dual stack sockets are 297 // available, mapped_target_addr will be an IPv4-mapped IPv6 address. 298 // 299 static absl::StatusOr<PosixSocketCreateResult> 300 CreateAndPrepareTcpClientSocket( 301 const PosixTcpOptions& options, 302 const EventEngine::ResolvedAddress& target_addr); 303 304 private: 305 int fd_; 306 }; 307 308 struct PosixSocketWrapper::PosixSocketCreateResult { 309 PosixSocketWrapper sock; 310 EventEngine::ResolvedAddress mapped_target_addr; 311 }; 312 313 } // namespace experimental 314 } // namespace grpc_event_engine 315 316 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H 317