1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
17 
18 #include <errno.h>
19 #include <inttypes.h>
20 #include <limits.h>
21 
22 #include <algorithm>
23 #include <cctype>
24 #include <cstdint>
25 #include <cstdlib>
26 #include <memory>
27 #include <string>
28 #include <type_traits>
29 
30 #include "absl/functional/any_invocable.h"
31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/types/optional.h"
35 
36 #include <grpc/event_engine/internal/slice_cast.h>
37 #include <grpc/event_engine/slice.h>
38 #include <grpc/event_engine/slice_buffer.h>
39 #include <grpc/status.h>
40 #include <grpc/support/log.h>
41 
42 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
43 #include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
44 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
45 #include "src/core/lib/event_engine/tcp_socket_utils.h"
46 #include "src/core/lib/experiments/experiments.h"
47 #include "src/core/lib/gprpp/debug_location.h"
48 #include "src/core/lib/gprpp/load_file.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/status_helper.h"
51 #include "src/core/lib/gprpp/strerror.h"
52 #include "src/core/lib/gprpp/time.h"
53 #include "src/core/lib/resource_quota/resource_quota.h"
54 #include "src/core/lib/slice/slice.h"
55 
56 #ifdef GRPC_POSIX_SOCKET_TCP
57 #ifdef GRPC_LINUX_ERRQUEUE
58 #include <dirent.h>            // IWYU pragma: keep
59 #include <linux/capability.h>  // IWYU pragma: keep
60 #include <linux/errqueue.h>    // IWYU pragma: keep
61 #include <linux/netlink.h>     // IWYU pragma: keep
62 #include <sys/prctl.h>         // IWYU pragma: keep
63 #include <sys/resource.h>      // IWYU pragma: keep
64 #endif
65 #include <netinet/in.h>  // IWYU pragma: keep
66 
67 #ifndef SOL_TCP
68 #define SOL_TCP IPPROTO_TCP
69 #endif
70 
71 #ifndef TCP_INQ
72 #define TCP_INQ 36
73 #define TCP_CM_INQ TCP_INQ
74 #endif
75 
76 #ifdef GRPC_HAVE_MSG_NOSIGNAL
77 #define SENDMSG_FLAGS MSG_NOSIGNAL
78 #else
79 #define SENDMSG_FLAGS 0
80 #endif
81 
82 // TCP zero copy sendmsg flag.
83 // NB: We define this here as a fallback in case we're using an older set of
84 // library headers that has not defined MSG_ZEROCOPY. Since this constant is
85 // part of the kernel, we are guaranteed it will never change/disagree so
86 // defining it here is safe.
87 #ifndef MSG_ZEROCOPY
88 #define MSG_ZEROCOPY 0x4000000
89 #endif
90 
91 #define MAX_READ_IOVEC 64
92 
93 namespace grpc_event_engine {
94 namespace experimental {
95 
96 namespace {
97 
98 // A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
99 // of bytes sent.
TcpSend(int fd,const struct msghdr * msg,int * saved_errno,int additional_flags=0)100 ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno,
101                 int additional_flags = 0) {
102   ssize_t sent_length;
103   do {
104     sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags);
105   } while (sent_length < 0 && (*saved_errno = errno) == EINTR);
106   return sent_length;
107 }
108 
109 #ifdef GRPC_LINUX_ERRQUEUE
110 
111 #define CAP_IS_SUPPORTED(cap) (prctl(PR_CAPBSET_READ, (cap), 0) > 0)
112 
113 // Remove spaces and newline characters from the end of a string.
rtrim(std::string & s)114 void rtrim(std::string& s) {
115   s.erase(std::find_if(s.rbegin(), s.rend(),
116                        [](unsigned char ch) { return !std::isspace(ch); })
117               .base(),
118           s.end());
119 }
120 
ParseUlimitMemLockFromFile(std::string file_name)121 uint64_t ParseUlimitMemLockFromFile(std::string file_name) {
122   static std::string kHardMemlockPrefix = "* hard memlock";
123   auto result = grpc_core::LoadFile(file_name, false);
124   if (!result.ok()) {
125     return 0;
126   }
127   std::string file_contents(reinterpret_cast<const char*>((*result).begin()),
128                             (*result).length());
129   // Find start position containing prefix.
130   size_t start = file_contents.find(kHardMemlockPrefix);
131   if (start == std::string::npos) {
132     return 0;
133   }
134   // Find position of next newline after prefix.
135   size_t end = file_contents.find(start, '\n');
136   // Extract substring between prefix and next newline.
137   auto memlock_value_string = file_contents.substr(
138       start + kHardMemlockPrefix.length() + 1, end - start);
139   rtrim(memlock_value_string);
140   if (memlock_value_string == "unlimited" ||
141       memlock_value_string == "infinity") {
142     return UINT64_MAX;
143   } else {
144     return std::atoi(memlock_value_string.c_str());
145   }
146 }
147 
148 // Ulimit hard memlock controls per socket limit for maximum locked memory in
149 // RAM. Parses all files under  /etc/security/limits.d/ and
150 // /etc/security/limits.conf file for a line of the following format:
151 // * hard memlock <value>
152 // It extracts the first valid <value> and returns it. A value of UINT64_MAX
153 // represents unlimited or infinity. Hard memlock value should be set to
154 // allow zerocopy sendmsgs to succeed. It controls the maximum amount of
155 // memory that can be locked by a socket in RAM.
GetUlimitHardMemLock()156 uint64_t GetUlimitHardMemLock() {
157   static const uint64_t kUlimitHardMemLock = []() -> uint64_t {
158     if (CAP_IS_SUPPORTED(CAP_SYS_RESOURCE)) {
159       // hard memlock ulimit is ignored for privileged user.
160       return UINT64_MAX;
161     }
162     if (auto dir = opendir("/etc/security/limits.d")) {
163       while (auto f = readdir(dir)) {
164         if (f->d_name[0] == '.') {
165           continue;  // Skip everything that starts with a dot
166         }
167         uint64_t hard_memlock = ParseUlimitMemLockFromFile(
168             absl::StrCat("/etc/security/limits.d/", std::string(f->d_name)));
169         if (hard_memlock != 0) {
170           return hard_memlock;
171         }
172       }
173       closedir(dir);
174     }
175     return ParseUlimitMemLockFromFile("/etc/security/limits.conf");
176   }();
177   return kUlimitHardMemLock;
178 }
179 
180 // RLIMIT_MEMLOCK controls per process limit for maximum locked memory in RAM.
GetRLimitMemLockMax()181 uint64_t GetRLimitMemLockMax() {
182   static const uint64_t kRlimitMemLock = []() -> uint64_t {
183     if (CAP_IS_SUPPORTED(CAP_SYS_RESOURCE)) {
184       // RLIMIT_MEMLOCK is ignored for privileged user.
185       return UINT64_MAX;
186     }
187     struct rlimit limit;
188     if (getrlimit(RLIMIT_MEMLOCK, &limit) != 0) {
189       return 0;
190     }
191     return static_cast<uint64_t>(limit.rlim_max);
192   }();
193   return kRlimitMemLock;
194 }
195 
196 // Whether the cmsg received from error queue is of the IPv4 or IPv6 levels.
CmsgIsIpLevel(const cmsghdr & cmsg)197 bool CmsgIsIpLevel(const cmsghdr& cmsg) {
198   return (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) ||
199          (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR);
200 }
201 
CmsgIsZeroCopy(const cmsghdr & cmsg)202 bool CmsgIsZeroCopy(const cmsghdr& cmsg) {
203   if (!CmsgIsIpLevel(cmsg)) {
204     return false;
205   }
206   auto serr = reinterpret_cast<const sock_extended_err*> CMSG_DATA(&cmsg);
207   return serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY;
208 }
209 #endif  // GRPC_LINUX_ERRQUEUE
210 
PosixOSError(int error_no,const char * call_name)211 absl::Status PosixOSError(int error_no, const char* call_name) {
212   absl::Status s = absl::UnknownError(grpc_core::StrError(error_no));
213   grpc_core::StatusSetInt(&s, grpc_core::StatusIntProperty::kErrorNo, error_no);
214   grpc_core::StatusSetStr(&s, grpc_core::StatusStrProperty::kOsError,
215                           grpc_core::StrError(error_no));
216   grpc_core::StatusSetStr(&s, grpc_core::StatusStrProperty::kSyscall,
217                           call_name);
218   return s;
219 }
220 
221 }  // namespace
222 
223 #if defined(IOV_MAX) && IOV_MAX < 260
224 #define MAX_WRITE_IOVEC IOV_MAX
225 #else
226 #define MAX_WRITE_IOVEC 260
227 #endif
PopulateIovs(size_t * unwind_slice_idx,size_t * unwind_byte_idx,size_t * sending_length,iovec * iov)228 msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
229                                                     size_t* unwind_byte_idx,
230                                                     size_t* sending_length,
231                                                     iovec* iov) {
232   msg_iovlen_type iov_size;
233   *unwind_slice_idx = out_offset_.slice_idx;
234   *unwind_byte_idx = out_offset_.byte_idx;
235   for (iov_size = 0;
236        out_offset_.slice_idx != buf_.Count() && iov_size != MAX_WRITE_IOVEC;
237        iov_size++) {
238     MutableSlice& slice = internal::SliceCast<MutableSlice>(
239         buf_.MutableSliceAt(out_offset_.slice_idx));
240     iov[iov_size].iov_base = slice.begin();
241     iov[iov_size].iov_len = slice.length() - out_offset_.byte_idx;
242     *sending_length += iov[iov_size].iov_len;
243     ++(out_offset_.slice_idx);
244     out_offset_.byte_idx = 0;
245   }
246   GPR_DEBUG_ASSERT(iov_size > 0);
247   return iov_size;
248 }
249 
UpdateOffsetForBytesSent(size_t sending_length,size_t actually_sent)250 void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length,
251                                                      size_t actually_sent) {
252   size_t trailing = sending_length - actually_sent;
253   while (trailing > 0) {
254     size_t slice_length;
255     out_offset_.slice_idx--;
256     slice_length = buf_.RefSlice(out_offset_.slice_idx).length();
257     if (slice_length > trailing) {
258       out_offset_.byte_idx = slice_length - trailing;
259       break;
260     } else {
261       trailing -= slice_length;
262     }
263   }
264 }
265 
AddToEstimate(size_t bytes)266 void PosixEndpointImpl::AddToEstimate(size_t bytes) {
267   bytes_read_this_round_ += static_cast<double>(bytes);
268 }
269 
FinishEstimate()270 void PosixEndpointImpl::FinishEstimate() {
271   // If we read >80% of the target buffer in one read loop, increase the size of
272   // the target buffer to either the amount read, or twice its previous value.
273   if (bytes_read_this_round_ > target_length_ * 0.8) {
274     target_length_ = std::max(2 * target_length_, bytes_read_this_round_);
275   } else {
276     target_length_ = 0.99 * target_length_ + 0.01 * bytes_read_this_round_;
277   }
278   bytes_read_this_round_ = 0;
279 }
280 
TcpAnnotateError(absl::Status src_error)281 absl::Status PosixEndpointImpl::TcpAnnotateError(absl::Status src_error) {
282   auto peer_string = ResolvedAddressToNormalizedString(peer_address_);
283 
284   grpc_core::StatusSetStr(&src_error,
285                           grpc_core::StatusStrProperty::kTargetAddress,
286                           peer_string.ok() ? *peer_string : "");
287   grpc_core::StatusSetInt(&src_error, grpc_core::StatusIntProperty::kFd,
288                           handle_->WrappedFd());
289   grpc_core::StatusSetInt(&src_error, grpc_core::StatusIntProperty::kRpcStatus,
290                           GRPC_STATUS_UNAVAILABLE);
291   return src_error;
292 }
293 
294 // Returns true if data available to read or error other than EAGAIN.
TcpDoRead(absl::Status & status)295 bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
296   struct msghdr msg;
297   struct iovec iov[MAX_READ_IOVEC];
298   ssize_t read_bytes;
299   size_t total_read_bytes = 0;
300   size_t iov_len = std::min<size_t>(MAX_READ_IOVEC, incoming_buffer_->Count());
301 #ifdef GRPC_LINUX_ERRQUEUE
302   constexpr size_t cmsg_alloc_space =
303       CMSG_SPACE(sizeof(scm_timestamping)) + CMSG_SPACE(sizeof(int));
304 #else
305   constexpr size_t cmsg_alloc_space = 24;  // CMSG_SPACE(sizeof(int))
306 #endif  // GRPC_LINUX_ERRQUEUE
307   char cmsgbuf[cmsg_alloc_space];
308   for (size_t i = 0; i < iov_len; i++) {
309     MutableSlice& slice =
310         internal::SliceCast<MutableSlice>(incoming_buffer_->MutableSliceAt(i));
311     iov[i].iov_base = slice.begin();
312     iov[i].iov_len = slice.length();
313   }
314 
315   GPR_ASSERT(incoming_buffer_->Length() != 0);
316   GPR_DEBUG_ASSERT(min_progress_size_ > 0);
317 
318   do {
319     // Assume there is something on the queue. If we receive TCP_INQ from
320     // kernel, we will update this value, otherwise, we have to assume there is
321     // always something to read until we get EAGAIN.
322     inq_ = 1;
323 
324     msg.msg_name = nullptr;
325     msg.msg_namelen = 0;
326     msg.msg_iov = iov;
327     msg.msg_iovlen = static_cast<msg_iovlen_type>(iov_len);
328     if (inq_capable_) {
329       msg.msg_control = cmsgbuf;
330       msg.msg_controllen = sizeof(cmsgbuf);
331     } else {
332       msg.msg_control = nullptr;
333       msg.msg_controllen = 0;
334     }
335     msg.msg_flags = 0;
336 
337     do {
338       read_bytes = recvmsg(fd_, &msg, 0);
339     } while (read_bytes < 0 && errno == EINTR);
340 
341     if (read_bytes < 0 && errno == EAGAIN) {
342       // NB: After calling call_read_cb a parallel call of the read handler may
343       // be running.
344       if (total_read_bytes > 0) {
345         break;
346       }
347       FinishEstimate();
348       inq_ = 0;
349       return false;
350     }
351 
352     // We have read something in previous reads. We need to deliver those bytes
353     // to the upper layer.
354     if (read_bytes <= 0 && total_read_bytes >= 1) {
355       inq_ = 1;
356       break;
357     }
358 
359     if (read_bytes <= 0) {
360       // 0 read size ==> end of stream
361       incoming_buffer_->Clear();
362       if (read_bytes == 0) {
363         status = TcpAnnotateError(absl::InternalError("Socket closed"));
364       } else {
365         status = TcpAnnotateError(absl::InternalError(
366             absl::StrCat("recvmsg:", grpc_core::StrError(errno))));
367       }
368       return true;
369     }
370 
371     AddToEstimate(static_cast<size_t>(read_bytes));
372     GPR_DEBUG_ASSERT((size_t)read_bytes <=
373                      incoming_buffer_->Length() - total_read_bytes);
374 
375 #ifdef GRPC_HAVE_TCP_INQ
376     if (inq_capable_) {
377       GPR_DEBUG_ASSERT(!(msg.msg_flags & MSG_CTRUNC));
378       struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
379       for (; cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
380         if (cmsg->cmsg_level == SOL_TCP && cmsg->cmsg_type == TCP_CM_INQ &&
381             cmsg->cmsg_len == CMSG_LEN(sizeof(int))) {
382           inq_ = *reinterpret_cast<int*>(CMSG_DATA(cmsg));
383           break;
384         }
385       }
386     }
387 #endif  // GRPC_HAVE_TCP_INQ
388 
389     total_read_bytes += read_bytes;
390     if (inq_ == 0 || total_read_bytes == incoming_buffer_->Length()) {
391       break;
392     }
393 
394     // We had a partial read, and still have space to read more data. So, adjust
395     // IOVs and try to read more.
396     size_t remaining = read_bytes;
397     size_t j = 0;
398     for (size_t i = 0; i < iov_len; i++) {
399       if (remaining >= iov[i].iov_len) {
400         remaining -= iov[i].iov_len;
401         continue;
402       }
403       if (remaining > 0) {
404         iov[j].iov_base = static_cast<char*>(iov[i].iov_base) + remaining;
405         iov[j].iov_len = iov[i].iov_len - remaining;
406         remaining = 0;
407       } else {
408         iov[j].iov_base = iov[i].iov_base;
409         iov[j].iov_len = iov[i].iov_len;
410       }
411       ++j;
412     }
413     iov_len = j;
414   } while (true);
415 
416   if (inq_ == 0) {
417     FinishEstimate();
418   }
419 
420   GPR_DEBUG_ASSERT(total_read_bytes > 0);
421   status = absl::OkStatus();
422   if (grpc_core::IsTcpFrameSizeTuningEnabled()) {
423     // Update min progress size based on the total number of bytes read in
424     // this round.
425     min_progress_size_ -= total_read_bytes;
426     if (min_progress_size_ > 0) {
427       // There is still some bytes left to be read before we can signal
428       // the read as complete. Append the bytes read so far into
429       // last_read_buffer which serves as a staging buffer. Return false
430       // to indicate tcp_handle_read needs to be scheduled again.
431       incoming_buffer_->MoveFirstNBytesIntoSliceBuffer(total_read_bytes,
432                                                        last_read_buffer_);
433       return false;
434     } else {
435       // The required number of bytes have been read. Append the bytes
436       // read in this round into last_read_buffer. Then swap last_read_buffer
437       // and incoming_buffer. Now incoming buffer contains all the bytes
438       // read since the start of the last tcp_read operation. last_read_buffer
439       // would contain any spare space left in the incoming buffer. This
440       // space will be used in the next tcp_read operation.
441       min_progress_size_ = 1;
442       incoming_buffer_->MoveFirstNBytesIntoSliceBuffer(total_read_bytes,
443                                                        last_read_buffer_);
444       incoming_buffer_->Swap(last_read_buffer_);
445       return true;
446     }
447   }
448   if (total_read_bytes < incoming_buffer_->Length()) {
449     incoming_buffer_->MoveLastNBytesIntoSliceBuffer(
450         incoming_buffer_->Length() - total_read_bytes, last_read_buffer_);
451   }
452   return true;
453 }
454 
PerformReclamation()455 void PosixEndpointImpl::PerformReclamation() {
456   read_mu_.Lock();
457   if (incoming_buffer_ != nullptr) {
458     incoming_buffer_->Clear();
459   }
460   has_posted_reclaimer_ = false;
461   read_mu_.Unlock();
462 }
463 
MaybePostReclaimer()464 void PosixEndpointImpl::MaybePostReclaimer() {
465   if (!has_posted_reclaimer_) {
466     has_posted_reclaimer_ = true;
467     memory_owner_.PostReclaimer(
468         grpc_core::ReclamationPass::kBenign,
469         [self = Ref(DEBUG_LOCATION, "Posix Reclaimer")](
470             absl::optional<grpc_core::ReclamationSweep> sweep) {
471           if (sweep.has_value()) {
472             self->PerformReclamation();
473           }
474         });
475   }
476 }
477 
UpdateRcvLowat()478 void PosixEndpointImpl::UpdateRcvLowat() {
479   if (!grpc_core::IsTcpRcvLowatEnabled()) return;
480 
481   // TODO(ctiller): Check if supported by OS.
482   // TODO(ctiller): Allow some adjustments instead of hardcoding things.
483 
484   static constexpr int kRcvLowatMax = 16 * 1024 * 1024;
485   static constexpr int kRcvLowatThreshold = 16 * 1024;
486 
487   int remaining = std::min({static_cast<int>(incoming_buffer_->Length()),
488                             kRcvLowatMax, min_progress_size_});
489 
490   // Setting SO_RCVLOWAT for small quantities does not save on CPU.
491   if (remaining < kRcvLowatThreshold) {
492     remaining = 0;
493   }
494 
495   // If zerocopy is off, wake shortly before the full RPC is here. More can
496   // show up partway through recvmsg() since it takes a while to copy data.
497   // So an early wakeup aids latency.
498   if (!tcp_zerocopy_send_ctx_->Enabled() && remaining > 0) {
499     remaining -= kRcvLowatThreshold;
500   }
501 
502   // We still do not know the RPC size. Do not set SO_RCVLOWAT.
503   if (set_rcvlowat_ <= 1 && remaining <= 1) return;
504 
505   // Previous value is still valid. No change needed in SO_RCVLOWAT.
506   if (set_rcvlowat_ == remaining) {
507     return;
508   }
509   auto result = sock_.SetSocketRcvLowat(remaining);
510   if (result.ok()) {
511     set_rcvlowat_ = *result;
512   } else {
513     gpr_log(GPR_ERROR, "%s",
514             absl::StrCat("ERROR in SO_RCVLOWAT: ", result.status().message())
515                 .c_str());
516   }
517 }
518 
MaybeMakeReadSlices()519 void PosixEndpointImpl::MaybeMakeReadSlices() {
520   static const int kBigAlloc = 64 * 1024;
521   static const int kSmallAlloc = 8 * 1024;
522   if (incoming_buffer_->Length() < static_cast<size_t>(min_progress_size_)) {
523     size_t allocate_length = min_progress_size_;
524     const size_t target_length = static_cast<size_t>(target_length_);
525     // If memory pressure is low and we think there will be more than
526     // min_progress_size bytes to read, allocate a bit more.
527     const bool low_memory_pressure =
528         memory_owner_.GetPressureInfo().pressure_control_value < 0.8;
529     if (low_memory_pressure && target_length > allocate_length) {
530       allocate_length = target_length;
531     }
532     int extra_wanted =
533         allocate_length - static_cast<int>(incoming_buffer_->Length());
534     if (extra_wanted >=
535         (low_memory_pressure ? kSmallAlloc * 3 / 2 : kBigAlloc)) {
536       while (extra_wanted > 0) {
537         extra_wanted -= kBigAlloc;
538         incoming_buffer_->AppendIndexed(
539             Slice(memory_owner_.MakeSlice(kBigAlloc)));
540       }
541     } else {
542       while (extra_wanted > 0) {
543         extra_wanted -= kSmallAlloc;
544         incoming_buffer_->AppendIndexed(
545             Slice(memory_owner_.MakeSlice(kSmallAlloc)));
546       }
547     }
548     MaybePostReclaimer();
549   }
550 }
551 
HandleRead(absl::Status status)552 void PosixEndpointImpl::HandleRead(absl::Status status) {
553   read_mu_.Lock();
554   if (status.ok() && memory_owner_.is_valid()) {
555     MaybeMakeReadSlices();
556     if (!TcpDoRead(status)) {
557       UpdateRcvLowat();
558       // We've consumed the edge, request a new one.
559       read_mu_.Unlock();
560       handle_->NotifyOnRead(on_read_);
561       return;
562     }
563   } else {
564     if (!memory_owner_.is_valid()) {
565       status = absl::UnknownError("Shutting down endpoint");
566     }
567     incoming_buffer_->Clear();
568     last_read_buffer_.Clear();
569   }
570   absl::AnyInvocable<void(absl::Status)> cb = std::move(read_cb_);
571   read_cb_ = nullptr;
572   incoming_buffer_ = nullptr;
573   read_mu_.Unlock();
574   cb(status);
575   Unref();
576 }
577 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const EventEngine::Endpoint::ReadArgs * args)578 bool PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
579                              SliceBuffer* buffer,
580                              const EventEngine::Endpoint::ReadArgs* args) {
581   grpc_core::ReleasableMutexLock lock(&read_mu_);
582   GPR_ASSERT(read_cb_ == nullptr);
583   incoming_buffer_ = buffer;
584   incoming_buffer_->Clear();
585   incoming_buffer_->Swap(last_read_buffer_);
586   if (args != nullptr && grpc_core::IsTcpFrameSizeTuningEnabled()) {
587     min_progress_size_ = std::max(static_cast<int>(args->read_hint_bytes), 1);
588   } else {
589     min_progress_size_ = 1;
590   }
591   Ref().release();
592   if (is_first_read_) {
593     read_cb_ = std::move(on_read);
594     UpdateRcvLowat();
595     // Endpoint read called for the very first time. Register read callback
596     // with the polling engine.
597     is_first_read_ = false;
598     lock.Release();
599     handle_->NotifyOnRead(on_read_);
600   } else if (inq_ == 0) {
601     read_cb_ = std::move(on_read);
602     UpdateRcvLowat();
603     lock.Release();
604     // Upper layer asked to read more but we know there is no pending data to
605     // read from previous reads. So, wait for POLLIN.
606     handle_->NotifyOnRead(on_read_);
607   } else {
608     absl::Status status;
609     MaybeMakeReadSlices();
610     if (!TcpDoRead(status)) {
611       UpdateRcvLowat();
612       read_cb_ = std::move(on_read);
613       // We've consumed the edge, request a new one.
614       lock.Release();
615       handle_->NotifyOnRead(on_read_);
616       return false;
617     }
618     if (!status.ok()) {
619       // Read failed immediately. Schedule the on_read callback to run
620       // asynchronously.
621       lock.Release();
622       engine_->Run([on_read = std::move(on_read), status]() mutable {
623         on_read(status);
624       });
625       Unref();
626       return false;
627     }
628     // Read succeeded immediately. Return true and don't run the on_read
629     // callback.
630     incoming_buffer_ = nullptr;
631     Unref();
632     return true;
633   }
634   return false;
635 }
636 
637 #ifdef GRPC_LINUX_ERRQUEUE
TcpGetSendZerocopyRecord(SliceBuffer & buf)638 TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord(
639     SliceBuffer& buf) {
640   TcpZerocopySendRecord* zerocopy_send_record = nullptr;
641   const bool use_zerocopy =
642       tcp_zerocopy_send_ctx_->Enabled() &&
643       tcp_zerocopy_send_ctx_->ThresholdBytes() < buf.Length();
644   if (use_zerocopy) {
645     zerocopy_send_record = tcp_zerocopy_send_ctx_->GetSendRecord();
646     if (zerocopy_send_record == nullptr) {
647       ProcessErrors();
648       zerocopy_send_record = tcp_zerocopy_send_ctx_->GetSendRecord();
649     }
650     if (zerocopy_send_record != nullptr) {
651       zerocopy_send_record->PrepareForSends(buf);
652       GPR_DEBUG_ASSERT(buf.Count() == 0);
653       GPR_DEBUG_ASSERT(buf.Length() == 0);
654       outgoing_byte_idx_ = 0;
655       outgoing_buffer_ = nullptr;
656     }
657   }
658   return zerocopy_send_record;
659 }
660 
661 // For linux platforms, reads the socket's error queue and processes error
662 // messages from the queue.
ProcessErrors()663 bool PosixEndpointImpl::ProcessErrors() {
664   bool processed_err = false;
665   struct iovec iov;
666   iov.iov_base = nullptr;
667   iov.iov_len = 0;
668   struct msghdr msg;
669   msg.msg_name = nullptr;
670   msg.msg_namelen = 0;
671   msg.msg_iov = &iov;
672   msg.msg_iovlen = 0;
673   msg.msg_flags = 0;
674   // Allocate enough space so we don't need to keep increasing this as size of
675   // OPT_STATS increase.
676   constexpr size_t cmsg_alloc_space =
677       CMSG_SPACE(sizeof(scm_timestamping)) +
678       CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) +
679       CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t)));
680   // Allocate aligned space for cmsgs received along with timestamps.
681   union {
682     char rbuf[cmsg_alloc_space];
683     struct cmsghdr align;
684   } aligned_buf;
685   msg.msg_control = aligned_buf.rbuf;
686   int r, saved_errno;
687   while (true) {
688     msg.msg_controllen = sizeof(aligned_buf.rbuf);
689     do {
690       r = recvmsg(fd_, &msg, MSG_ERRQUEUE);
691       saved_errno = errno;
692     } while (r < 0 && saved_errno == EINTR);
693 
694     if (r < 0 && saved_errno == EAGAIN) {
695       return processed_err;  // No more errors to process
696     } else if (r < 0) {
697       return processed_err;
698     }
699     if (GPR_UNLIKELY((msg.msg_flags & MSG_CTRUNC) != 0)) {
700       gpr_log(GPR_ERROR, "Error message was truncated.");
701     }
702 
703     if (msg.msg_controllen == 0) {
704       // There was no control message found. It was probably spurious.
705       return processed_err;
706     }
707     bool seen = false;
708     for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
709          cmsg = CMSG_NXTHDR(&msg, cmsg)) {
710       if (CmsgIsZeroCopy(*cmsg)) {
711         ProcessZerocopy(cmsg);
712         seen = true;
713         processed_err = true;
714       } else if (cmsg->cmsg_level == SOL_SOCKET &&
715                  cmsg->cmsg_type == SCM_TIMESTAMPING) {
716         cmsg = ProcessTimestamp(&msg, cmsg);
717         seen = true;
718         processed_err = true;
719       } else {
720         // Got a control message that is not a timestamp or zerocopy. Don't know
721         // how to handle this.
722         return processed_err;
723       }
724     }
725     if (!seen) {
726       return processed_err;
727     }
728   }
729 }
730 
ZerocopyDisableAndWaitForRemaining()731 void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() {
732   tcp_zerocopy_send_ctx_->Shutdown();
733   while (!tcp_zerocopy_send_ctx_->AllSendRecordsEmpty()) {
734     ProcessErrors();
735   }
736 }
737 
738 // Reads \a cmsg to process zerocopy control messages.
ProcessZerocopy(struct cmsghdr * cmsg)739 void PosixEndpointImpl::ProcessZerocopy(struct cmsghdr* cmsg) {
740   GPR_DEBUG_ASSERT(cmsg);
741   auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(cmsg));
742   GPR_DEBUG_ASSERT(serr->ee_errno == 0);
743   GPR_DEBUG_ASSERT(serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY);
744   const uint32_t lo = serr->ee_info;
745   const uint32_t hi = serr->ee_data;
746   for (uint32_t seq = lo; seq <= hi; ++seq) {
747     // TODO(arjunroy): It's likely that lo and hi refer to zerocopy sequence
748     // numbers that are generated by a single call to grpc_endpoint_write; ie.
749     // we can batch the unref operation. So, check if record is the same for
750     // both; if so, batch the unref/put.
751     TcpZerocopySendRecord* record =
752         tcp_zerocopy_send_ctx_->ReleaseSendRecord(seq);
753     GPR_DEBUG_ASSERT(record);
754     UnrefMaybePutZerocopySendRecord(record);
755   }
756   if (tcp_zerocopy_send_ctx_->UpdateZeroCopyOptMemStateAfterFree()) {
757     handle_->SetWritable();
758   }
759 }
760 
761 // Reads \a cmsg to derive timestamps from the control messages. If a valid
762 // timestamp is found, the traced buffer list is updated with this timestamp.
763 // The caller of this function should be looping on the control messages found
764 // in \a msg. \a cmsg should point to the control message that the caller wants
765 // processed. On return, a pointer to a control message is returned. On the next
766 // iteration, CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg.
ProcessTimestamp(msghdr * msg,struct cmsghdr * cmsg)767 struct cmsghdr* PosixEndpointImpl::ProcessTimestamp(msghdr* msg,
768                                                     struct cmsghdr* cmsg) {
769   auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
770   cmsghdr* opt_stats = nullptr;
771   if (next_cmsg == nullptr) {
772     return cmsg;
773   }
774 
775   // Check if next_cmsg is an OPT_STATS msg.
776   if (next_cmsg->cmsg_level == SOL_SOCKET &&
777       next_cmsg->cmsg_type == SCM_TIMESTAMPING_OPT_STATS) {
778     opt_stats = next_cmsg;
779     next_cmsg = CMSG_NXTHDR(msg, opt_stats);
780     if (next_cmsg == nullptr) {
781       return opt_stats;
782     }
783   }
784 
785   if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
786       !(next_cmsg->cmsg_type == IP_RECVERR ||
787         next_cmsg->cmsg_type == IPV6_RECVERR)) {
788     return cmsg;
789   }
790 
791   auto tss = reinterpret_cast<scm_timestamping*>(CMSG_DATA(cmsg));
792   auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
793   if (serr->ee_errno != ENOMSG ||
794       serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
795     gpr_log(GPR_ERROR, "Unexpected control message");
796     return cmsg;
797   }
798   traced_buffers_.ProcessTimestamp(serr, opt_stats, tss);
799   return next_cmsg;
800 }
801 
HandleError(absl::Status status)802 void PosixEndpointImpl::HandleError(absl::Status status) {
803   if (!status.ok() ||
804       stop_error_notification_.load(std::memory_order_relaxed)) {
805     // We aren't going to register to hear on error anymore, so it is safe to
806     // unref.
807     Unref();
808     return;
809   }
810   // We are still interested in collecting timestamps, so let's try reading
811   // them.
812   if (!ProcessErrors()) {
813     // This might not a timestamps error. Set the read and write closures to be
814     // ready.
815     handle_->SetReadable();
816     handle_->SetWritable();
817   }
818   handle_->NotifyOnError(on_error_);
819 }
820 
WriteWithTimestamps(struct msghdr * msg,size_t sending_length,ssize_t * sent_length,int * saved_errno,int additional_flags)821 bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* msg,
822                                             size_t sending_length,
823                                             ssize_t* sent_length,
824                                             int* saved_errno,
825                                             int additional_flags) {
826   if (!socket_ts_enabled_) {
827     uint32_t opt = kTimestampingSocketOptions;
828     if (setsockopt(fd_, SOL_SOCKET, SO_TIMESTAMPING, static_cast<void*>(&opt),
829                    sizeof(opt)) != 0) {
830       return false;
831     }
832     bytes_counter_ = -1;
833     socket_ts_enabled_ = true;
834   }
835   // Set control message to indicate that you want timestamps.
836   union {
837     char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))];
838     struct cmsghdr align;
839   } u;
840   cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf);
841   cmsg->cmsg_level = SOL_SOCKET;
842   cmsg->cmsg_type = SO_TIMESTAMPING;
843   cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t));
844   *reinterpret_cast<int*>(CMSG_DATA(cmsg)) = kTimestampingRecordingOptions;
845   msg->msg_control = u.cmsg_buf;
846   msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));
847 
848   // If there was an error on sendmsg the logic in tcp_flush will handle it.
849   ssize_t length = TcpSend(fd_, msg, saved_errno, additional_flags);
850   *sent_length = length;
851   // Only save timestamps if all the bytes were taken by sendmsg.
852   if (sending_length == static_cast<size_t>(length)) {
853     traced_buffers_.AddNewEntry(static_cast<uint32_t>(bytes_counter_ + length),
854                                 fd_, outgoing_buffer_arg_);
855     outgoing_buffer_arg_ = nullptr;
856   }
857   return true;
858 }
859 
860 #else   // GRPC_LINUX_ERRQUEUE
TcpGetSendZerocopyRecord(SliceBuffer &)861 TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord(
862     SliceBuffer& /*buf*/) {
863   return nullptr;
864 }
865 
HandleError(absl::Status)866 void PosixEndpointImpl::HandleError(absl::Status /*status*/) {
867   grpc_core::Crash("Error handling not supported on this platform");
868 }
869 
ZerocopyDisableAndWaitForRemaining()870 void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() {}
871 
WriteWithTimestamps(struct msghdr *,size_t,ssize_t *,int *,int)872 bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* /*msg*/,
873                                             size_t /*sending_length*/,
874                                             ssize_t* /*sent_length*/,
875                                             int* /*saved_errno*/,
876                                             int /*additional_flags*/) {
877   grpc_core::Crash("Write with timestamps not supported for this platform");
878 }
879 #endif  // GRPC_LINUX_ERRQUEUE
880 
UnrefMaybePutZerocopySendRecord(TcpZerocopySendRecord * record)881 void PosixEndpointImpl::UnrefMaybePutZerocopySendRecord(
882     TcpZerocopySendRecord* record) {
883   if (record->Unref()) {
884     tcp_zerocopy_send_ctx_->PutSendRecord(record);
885   }
886 }
887 
888 // If outgoing_buffer_arg is filled, shuts down the list early, so that any
889 // release operations needed can be performed on the arg.
TcpShutdownTracedBufferList()890 void PosixEndpointImpl::TcpShutdownTracedBufferList() {
891   if (outgoing_buffer_arg_ != nullptr) {
892     traced_buffers_.Shutdown(outgoing_buffer_arg_,
893                              absl::InternalError("TracedBuffer list shutdown"));
894     outgoing_buffer_arg_ = nullptr;
895   }
896 }
897 
898 // returns true if done, false if pending; if returning true, *error is set
DoFlushZerocopy(TcpZerocopySendRecord * record,absl::Status & status)899 bool PosixEndpointImpl::DoFlushZerocopy(TcpZerocopySendRecord* record,
900                                         absl::Status& status) {
901   msg_iovlen_type iov_size;
902   ssize_t sent_length = 0;
903   size_t sending_length;
904   size_t unwind_slice_idx;
905   size_t unwind_byte_idx;
906   bool tried_sending_message;
907   int saved_errno;
908   msghdr msg;
909   bool constrained;
910   status = absl::OkStatus();
911   // iov consumes a large space. Keep it as the last item on the stack to
912   // improve locality. After all, we expect only the first elements of it
913   // being populated in most cases.
914   iovec iov[MAX_WRITE_IOVEC];
915   while (true) {
916     sending_length = 0;
917     iov_size = record->PopulateIovs(&unwind_slice_idx, &unwind_byte_idx,
918                                     &sending_length, iov);
919     msg.msg_name = nullptr;
920     msg.msg_namelen = 0;
921     msg.msg_iov = iov;
922     msg.msg_iovlen = iov_size;
923     msg.msg_flags = 0;
924     tried_sending_message = false;
925     constrained = false;
926     // Before calling sendmsg (with or without timestamps): we
927     // take a single ref on the zerocopy send record.
928     tcp_zerocopy_send_ctx_->NoteSend(record);
929     saved_errno = 0;
930     if (outgoing_buffer_arg_ != nullptr) {
931       if (!ts_capable_ ||
932           !WriteWithTimestamps(&msg, sending_length, &sent_length, &saved_errno,
933                                MSG_ZEROCOPY)) {
934         // We could not set socket options to collect Fathom timestamps.
935         // Fallback on writing without timestamps.
936         ts_capable_ = false;
937         TcpShutdownTracedBufferList();
938       } else {
939         tried_sending_message = true;
940       }
941     }
942     if (!tried_sending_message) {
943       msg.msg_control = nullptr;
944       msg.msg_controllen = 0;
945       sent_length = TcpSend(fd_, &msg, &saved_errno, MSG_ZEROCOPY);
946     }
947     if (tcp_zerocopy_send_ctx_->UpdateZeroCopyOptMemStateAfterSend(
948             saved_errno == ENOBUFS, constrained) ||
949         constrained) {
950       // If constrained, is true it implies that we received an ENOBUFS error
951       // but there are no un-acked z-copy records. This situation may arise
952       // because the per-process RLIMIT_MEMLOCK limit or the per-socket hard
953       // memlock ulimit on the machine may be very small. These limits control
954       // the max number of bytes a process/socket can respectively pin to RAM.
955       // Tx0cp respects these limits and if a sendmsg tries to send more than
956       // this limit, the kernel may return ENOBUFS error. Print a warning
957       // message here to allow help with debugging. Grpc should not attempt to
958       // raise the limit values.
959       if (!constrained) {
960         handle_->SetWritable();
961       } else {
962 #ifdef GRPC_LINUX_ERRQUEUE
963         GRPC_LOG_EVERY_N_SEC(
964             1, GPR_INFO,
965             "Tx0cp encountered an ENOBUFS error possibly because one or "
966             "both of RLIMIT_MEMLOCK or hard memlock ulimit values are too "
967             "small for the intended user. Current system value of "
968             "RLIMIT_MEMLOCK is %" PRIu64 " and hard memlock ulimit is %" PRIu64
969             ".Consider increasing these values appropriately for the intended "
970             "user.",
971             GetRLimitMemLockMax(), GetUlimitHardMemLock());
972 #endif
973       }
974     }
975     if (sent_length < 0) {
976       // If this particular send failed, drop ref taken earlier in this method.
977       tcp_zerocopy_send_ctx_->UndoSend();
978       if (saved_errno == EAGAIN || saved_errno == ENOBUFS) {
979         record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx);
980         return false;
981       } else {
982         status = TcpAnnotateError(PosixOSError(saved_errno, "sendmsg"));
983         TcpShutdownTracedBufferList();
984         return true;
985       }
986     }
987     bytes_counter_ += sent_length;
988     record->UpdateOffsetForBytesSent(sending_length,
989                                      static_cast<size_t>(sent_length));
990     if (record->AllSlicesSent()) {
991       return true;
992     }
993   }
994 }
995 
TcpFlushZerocopy(TcpZerocopySendRecord * record,absl::Status & status)996 bool PosixEndpointImpl::TcpFlushZerocopy(TcpZerocopySendRecord* record,
997                                          absl::Status& status) {
998   bool done = DoFlushZerocopy(record, status);
999   if (done) {
1000     // Either we encountered an error, or we successfully sent all the bytes.
1001     // In either case, we're done with this record.
1002     UnrefMaybePutZerocopySendRecord(record);
1003   }
1004   return done;
1005 }
1006 
TcpFlush(absl::Status & status)1007 bool PosixEndpointImpl::TcpFlush(absl::Status& status) {
1008   struct msghdr msg;
1009   struct iovec iov[MAX_WRITE_IOVEC];
1010   msg_iovlen_type iov_size;
1011   ssize_t sent_length = 0;
1012   size_t sending_length;
1013   size_t trailing;
1014   size_t unwind_slice_idx;
1015   size_t unwind_byte_idx;
1016   int saved_errno;
1017   status = absl::OkStatus();
1018 
1019   // We always start at zero, because we eagerly unref and trim the slice
1020   // buffer as we write
1021   size_t outgoing_slice_idx = 0;
1022 
1023   while (true) {
1024     sending_length = 0;
1025     unwind_slice_idx = outgoing_slice_idx;
1026     unwind_byte_idx = outgoing_byte_idx_;
1027     for (iov_size = 0; outgoing_slice_idx != outgoing_buffer_->Count() &&
1028                        iov_size != MAX_WRITE_IOVEC;
1029          iov_size++) {
1030       MutableSlice& slice = internal::SliceCast<MutableSlice>(
1031           outgoing_buffer_->MutableSliceAt(outgoing_slice_idx));
1032       iov[iov_size].iov_base = slice.begin() + outgoing_byte_idx_;
1033       iov[iov_size].iov_len = slice.length() - outgoing_byte_idx_;
1034 
1035       sending_length += iov[iov_size].iov_len;
1036       outgoing_slice_idx++;
1037       outgoing_byte_idx_ = 0;
1038     }
1039     GPR_ASSERT(iov_size > 0);
1040 
1041     msg.msg_name = nullptr;
1042     msg.msg_namelen = 0;
1043     msg.msg_iov = iov;
1044     msg.msg_iovlen = iov_size;
1045     msg.msg_flags = 0;
1046     bool tried_sending_message = false;
1047     saved_errno = 0;
1048     if (outgoing_buffer_arg_ != nullptr) {
1049       if (!ts_capable_ || !WriteWithTimestamps(&msg, sending_length,
1050                                                &sent_length, &saved_errno, 0)) {
1051         // We could not set socket options to collect Fathom timestamps.
1052         // Fallback on writing without timestamps.
1053         ts_capable_ = false;
1054         TcpShutdownTracedBufferList();
1055       } else {
1056         tried_sending_message = true;
1057       }
1058     }
1059     if (!tried_sending_message) {
1060       msg.msg_control = nullptr;
1061       msg.msg_controllen = 0;
1062       sent_length = TcpSend(fd_, &msg, &saved_errno);
1063     }
1064 
1065     if (sent_length < 0) {
1066       if (saved_errno == EAGAIN || saved_errno == ENOBUFS) {
1067         outgoing_byte_idx_ = unwind_byte_idx;
1068         // unref all and forget about all slices that have been written to this
1069         // point
1070         for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
1071           outgoing_buffer_->TakeFirst();
1072         }
1073         return false;
1074       } else {
1075         status = TcpAnnotateError(PosixOSError(saved_errno, "sendmsg"));
1076         outgoing_buffer_->Clear();
1077         TcpShutdownTracedBufferList();
1078         return true;
1079       }
1080     }
1081 
1082     GPR_ASSERT(outgoing_byte_idx_ == 0);
1083     bytes_counter_ += sent_length;
1084     trailing = sending_length - static_cast<size_t>(sent_length);
1085     while (trailing > 0) {
1086       size_t slice_length;
1087       outgoing_slice_idx--;
1088       slice_length = outgoing_buffer_->RefSlice(outgoing_slice_idx).length();
1089       if (slice_length > trailing) {
1090         outgoing_byte_idx_ = slice_length - trailing;
1091         break;
1092       } else {
1093         trailing -= slice_length;
1094       }
1095     }
1096     if (outgoing_slice_idx == outgoing_buffer_->Count()) {
1097       outgoing_buffer_->Clear();
1098       return true;
1099     }
1100   }
1101 }
1102 
HandleWrite(absl::Status status)1103 void PosixEndpointImpl::HandleWrite(absl::Status status) {
1104   if (!status.ok()) {
1105     absl::AnyInvocable<void(absl::Status)> cb_ = std::move(write_cb_);
1106     write_cb_ = nullptr;
1107     if (current_zerocopy_send_ != nullptr) {
1108       UnrefMaybePutZerocopySendRecord(current_zerocopy_send_);
1109       current_zerocopy_send_ = nullptr;
1110     }
1111     cb_(status);
1112     Unref();
1113     return;
1114   }
1115   bool flush_result = current_zerocopy_send_ != nullptr
1116                           ? TcpFlushZerocopy(current_zerocopy_send_, status)
1117                           : TcpFlush(status);
1118   if (!flush_result) {
1119     GPR_DEBUG_ASSERT(status.ok());
1120     handle_->NotifyOnWrite(on_write_);
1121   } else {
1122     absl::AnyInvocable<void(absl::Status)> cb_ = std::move(write_cb_);
1123     write_cb_ = nullptr;
1124     current_zerocopy_send_ = nullptr;
1125     cb_(status);
1126     Unref();
1127   }
1128 }
1129 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const EventEngine::Endpoint::WriteArgs * args)1130 bool PosixEndpointImpl::Write(
1131     absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
1132     const EventEngine::Endpoint::WriteArgs* args) {
1133   absl::Status status = absl::OkStatus();
1134   TcpZerocopySendRecord* zerocopy_send_record = nullptr;
1135 
1136   GPR_ASSERT(write_cb_ == nullptr);
1137   GPR_DEBUG_ASSERT(current_zerocopy_send_ == nullptr);
1138   GPR_DEBUG_ASSERT(data != nullptr);
1139 
1140   if (data->Length() == 0) {
1141     TcpShutdownTracedBufferList();
1142     if (handle_->IsHandleShutdown()) {
1143       status = TcpAnnotateError(absl::InternalError("EOF"));
1144       engine_->Run([on_writable = std::move(on_writable), status]() mutable {
1145         on_writable(status);
1146       });
1147       return false;
1148     }
1149     return true;
1150   }
1151 
1152   zerocopy_send_record = TcpGetSendZerocopyRecord(*data);
1153   if (zerocopy_send_record == nullptr) {
1154     // Either not enough bytes, or couldn't allocate a zerocopy context.
1155     outgoing_buffer_ = data;
1156     outgoing_byte_idx_ = 0;
1157   }
1158   if (args != nullptr) {
1159     outgoing_buffer_arg_ = args->google_specific;
1160   }
1161   if (outgoing_buffer_arg_) {
1162     GPR_ASSERT(poller_->CanTrackErrors());
1163   }
1164 
1165   bool flush_result = zerocopy_send_record != nullptr
1166                           ? TcpFlushZerocopy(zerocopy_send_record, status)
1167                           : TcpFlush(status);
1168   if (!flush_result) {
1169     Ref().release();
1170     write_cb_ = std::move(on_writable);
1171     current_zerocopy_send_ = zerocopy_send_record;
1172     handle_->NotifyOnWrite(on_write_);
1173     return false;
1174   }
1175   if (!status.ok()) {
1176     // Write failed immediately. Schedule the on_writable callback to run
1177     // asynchronously.
1178     engine_->Run([on_writable = std::move(on_writable), status]() mutable {
1179       on_writable(status);
1180     });
1181     return false;
1182   }
1183   // Write succeeded immediately. Return true and don't run the on_writable
1184   // callback.
1185   return true;
1186 }
1187 
MaybeShutdown(absl::Status why,absl::AnyInvocable<void (absl::StatusOr<int>)> on_release_fd)1188 void PosixEndpointImpl::MaybeShutdown(
1189     absl::Status why,
1190     absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
1191   if (poller_->CanTrackErrors()) {
1192     ZerocopyDisableAndWaitForRemaining();
1193     stop_error_notification_.store(true, std::memory_order_release);
1194     handle_->SetHasError();
1195   }
1196   on_release_fd_ = std::move(on_release_fd);
1197   grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus,
1198                           GRPC_STATUS_UNAVAILABLE);
1199   handle_->ShutdownHandle(why);
1200   read_mu_.Lock();
1201   memory_owner_.Reset();
1202   read_mu_.Unlock();
1203   Unref();
1204 }
1205 
~PosixEndpointImpl()1206 PosixEndpointImpl ::~PosixEndpointImpl() {
1207   int release_fd = -1;
1208   handle_->OrphanHandle(on_done_,
1209                         on_release_fd_ == nullptr ? nullptr : &release_fd, "");
1210   if (on_release_fd_ != nullptr) {
1211     engine_->Run([on_release_fd = std::move(on_release_fd_),
1212                   release_fd]() mutable { on_release_fd(release_fd); });
1213   }
1214   delete on_read_;
1215   delete on_write_;
1216   delete on_error_;
1217 }
1218 
PosixEndpointImpl(EventHandle * handle,PosixEngineClosure * on_done,std::shared_ptr<EventEngine> engine,MemoryAllocator &&,const PosixTcpOptions & options)1219 PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle,
1220                                      PosixEngineClosure* on_done,
1221                                      std::shared_ptr<EventEngine> engine,
1222                                      MemoryAllocator&& /*allocator*/,
1223                                      const PosixTcpOptions& options)
1224     : sock_(PosixSocketWrapper(handle->WrappedFd())),
1225       on_done_(on_done),
1226       traced_buffers_(),
1227       handle_(handle),
1228       poller_(handle->Poller()),
1229       engine_(engine) {
1230   PosixSocketWrapper sock(handle->WrappedFd());
1231   fd_ = handle_->WrappedFd();
1232   GPR_ASSERT(options.resource_quota != nullptr);
1233   auto peer_addr_string = sock.PeerAddressString();
1234   mem_quota_ = options.resource_quota->memory_quota();
1235   memory_owner_ = mem_quota_->CreateMemoryOwner(
1236       peer_addr_string.ok() ? *peer_addr_string : "");
1237   self_reservation_ = memory_owner_.MakeReservation(sizeof(PosixEndpointImpl));
1238   auto local_address = sock.LocalAddress();
1239   if (local_address.ok()) {
1240     local_address_ = *local_address;
1241   }
1242   auto peer_address = sock.PeerAddress();
1243   if (peer_address.ok()) {
1244     peer_address_ = *peer_address;
1245   }
1246   target_length_ = static_cast<double>(options.tcp_read_chunk_size);
1247   bytes_read_this_round_ = 0;
1248   min_read_chunk_size_ = options.tcp_min_read_chunk_size;
1249   max_read_chunk_size_ = options.tcp_max_read_chunk_size;
1250   bool zerocopy_enabled =
1251       options.tcp_tx_zero_copy_enabled && poller_->CanTrackErrors();
1252 #ifdef GRPC_LINUX_ERRQUEUE
1253   if (zerocopy_enabled) {
1254     if (GetRLimitMemLockMax() == 0) {
1255       zerocopy_enabled = false;
1256       gpr_log(
1257           GPR_ERROR,
1258           "Tx zero-copy will not be used by gRPC since RLIMIT_MEMLOCK value is "
1259           "not set. Consider raising its value with setrlimit().");
1260     } else if (GetUlimitHardMemLock() == 0) {
1261       zerocopy_enabled = false;
1262       gpr_log(GPR_ERROR,
1263               "Tx zero-copy will not be used by gRPC since hard memlock ulimit "
1264               "value is not set. Use ulimit -l <value> to set its value.");
1265     } else {
1266       const int enable = 1;
1267       if (setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)) !=
1268           0) {
1269         zerocopy_enabled = false;
1270         gpr_log(GPR_ERROR, "Failed to set zerocopy options on the socket.");
1271       }
1272     }
1273 
1274     if (zerocopy_enabled) {
1275       gpr_log(GPR_INFO,
1276               "Tx-zero copy enabled for gRPC sends. RLIMIT_MEMLOCK value = "
1277               "%" PRIu64 ",ulimit hard memlock value = %" PRIu64,
1278               GetRLimitMemLockMax(), GetUlimitHardMemLock());
1279     }
1280   }
1281 #endif  // GRPC_LINUX_ERRQUEUE
1282   tcp_zerocopy_send_ctx_ = std::make_unique<TcpZerocopySendCtx>(
1283       zerocopy_enabled, options.tcp_tx_zerocopy_max_simultaneous_sends,
1284       options.tcp_tx_zerocopy_send_bytes_threshold);
1285 #ifdef GRPC_HAVE_TCP_INQ
1286   int one = 1;
1287   if (setsockopt(fd_, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) {
1288     inq_capable_ = true;
1289   } else {
1290     gpr_log(GPR_DEBUG, "cannot set inq fd=%d errno=%d", fd_, errno);
1291     inq_capable_ = false;
1292   }
1293 #else
1294   inq_capable_ = false;
1295 #endif  // GRPC_HAVE_TCP_INQ
1296 
1297   on_read_ = PosixEngineClosure::ToPermanentClosure(
1298       [this](absl::Status status) { HandleRead(std::move(status)); });
1299   on_write_ = PosixEngineClosure::ToPermanentClosure(
1300       [this](absl::Status status) { HandleWrite(std::move(status)); });
1301   on_error_ = PosixEngineClosure::ToPermanentClosure(
1302       [this](absl::Status status) { HandleError(std::move(status)); });
1303 
1304   // Start being notified on errors if poller can track errors.
1305   if (poller_->CanTrackErrors()) {
1306     Ref().release();
1307     handle_->NotifyOnError(on_error_);
1308   }
1309 }
1310 
CreatePosixEndpoint(EventHandle * handle,PosixEngineClosure * on_shutdown,std::shared_ptr<EventEngine> engine,MemoryAllocator && allocator,const PosixTcpOptions & options)1311 std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
1312     EventHandle* handle, PosixEngineClosure* on_shutdown,
1313     std::shared_ptr<EventEngine> engine, MemoryAllocator&& allocator,
1314     const PosixTcpOptions& options) {
1315   GPR_DEBUG_ASSERT(handle != nullptr);
1316   return std::make_unique<PosixEndpoint>(handle, on_shutdown, std::move(engine),
1317                                          std::move(allocator), options);
1318 }
1319 
1320 }  // namespace experimental
1321 }  // namespace grpc_event_engine
1322 
1323 #else  // GRPC_POSIX_SOCKET_TCP
1324 
1325 namespace grpc_event_engine {
1326 namespace experimental {
1327 
CreatePosixEndpoint(EventHandle *,PosixEngineClosure *,std::shared_ptr<EventEngine>,const PosixTcpOptions &)1328 std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
1329     EventHandle* /*handle*/, PosixEngineClosure* /*on_shutdown*/,
1330     std::shared_ptr<EventEngine> /*engine*/,
1331     const PosixTcpOptions& /*options*/) {
1332   grpc_core::Crash("Cannot create PosixEndpoint on this platform");
1333 }
1334 
1335 }  // namespace experimental
1336 }  // namespace grpc_event_engine
1337 
1338 #endif  // GRPC_POSIX_SOCKET_TCP
1339