1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
17
18 #include <errno.h>
19 #include <inttypes.h>
20 #include <limits.h>
21
22 #include <algorithm>
23 #include <cctype>
24 #include <cstdint>
25 #include <cstdlib>
26 #include <memory>
27 #include <string>
28 #include <type_traits>
29
30 #include "absl/functional/any_invocable.h"
31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/types/optional.h"
35
36 #include <grpc/event_engine/internal/slice_cast.h>
37 #include <grpc/event_engine/slice.h>
38 #include <grpc/event_engine/slice_buffer.h>
39 #include <grpc/status.h>
40 #include <grpc/support/log.h>
41
42 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
43 #include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
44 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
45 #include "src/core/lib/event_engine/tcp_socket_utils.h"
46 #include "src/core/lib/experiments/experiments.h"
47 #include "src/core/lib/gprpp/debug_location.h"
48 #include "src/core/lib/gprpp/load_file.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/status_helper.h"
51 #include "src/core/lib/gprpp/strerror.h"
52 #include "src/core/lib/gprpp/time.h"
53 #include "src/core/lib/resource_quota/resource_quota.h"
54 #include "src/core/lib/slice/slice.h"
55
56 #ifdef GRPC_POSIX_SOCKET_TCP
57 #ifdef GRPC_LINUX_ERRQUEUE
58 #include <dirent.h> // IWYU pragma: keep
59 #include <linux/capability.h> // IWYU pragma: keep
60 #include <linux/errqueue.h> // IWYU pragma: keep
61 #include <linux/netlink.h> // IWYU pragma: keep
62 #include <sys/prctl.h> // IWYU pragma: keep
63 #include <sys/resource.h> // IWYU pragma: keep
64 #endif
65 #include <netinet/in.h> // IWYU pragma: keep
66
67 #ifndef SOL_TCP
68 #define SOL_TCP IPPROTO_TCP
69 #endif
70
71 #ifndef TCP_INQ
72 #define TCP_INQ 36
73 #define TCP_CM_INQ TCP_INQ
74 #endif
75
76 #ifdef GRPC_HAVE_MSG_NOSIGNAL
77 #define SENDMSG_FLAGS MSG_NOSIGNAL
78 #else
79 #define SENDMSG_FLAGS 0
80 #endif
81
82 // TCP zero copy sendmsg flag.
83 // NB: We define this here as a fallback in case we're using an older set of
84 // library headers that has not defined MSG_ZEROCOPY. Since this constant is
85 // part of the kernel, we are guaranteed it will never change/disagree so
86 // defining it here is safe.
87 #ifndef MSG_ZEROCOPY
88 #define MSG_ZEROCOPY 0x4000000
89 #endif
90
91 #define MAX_READ_IOVEC 64
92
93 namespace grpc_event_engine {
94 namespace experimental {
95
96 namespace {
97
98 // A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
99 // of bytes sent.
TcpSend(int fd,const struct msghdr * msg,int * saved_errno,int additional_flags=0)100 ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno,
101 int additional_flags = 0) {
102 ssize_t sent_length;
103 do {
104 sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags);
105 } while (sent_length < 0 && (*saved_errno = errno) == EINTR);
106 return sent_length;
107 }
108
109 #ifdef GRPC_LINUX_ERRQUEUE
110
111 #define CAP_IS_SUPPORTED(cap) (prctl(PR_CAPBSET_READ, (cap), 0) > 0)
112
113 // Remove spaces and newline characters from the end of a string.
rtrim(std::string & s)114 void rtrim(std::string& s) {
115 s.erase(std::find_if(s.rbegin(), s.rend(),
116 [](unsigned char ch) { return !std::isspace(ch); })
117 .base(),
118 s.end());
119 }
120
ParseUlimitMemLockFromFile(std::string file_name)121 uint64_t ParseUlimitMemLockFromFile(std::string file_name) {
122 static std::string kHardMemlockPrefix = "* hard memlock";
123 auto result = grpc_core::LoadFile(file_name, false);
124 if (!result.ok()) {
125 return 0;
126 }
127 std::string file_contents(reinterpret_cast<const char*>((*result).begin()),
128 (*result).length());
129 // Find start position containing prefix.
130 size_t start = file_contents.find(kHardMemlockPrefix);
131 if (start == std::string::npos) {
132 return 0;
133 }
134 // Find position of next newline after prefix.
135 size_t end = file_contents.find(start, '\n');
136 // Extract substring between prefix and next newline.
137 auto memlock_value_string = file_contents.substr(
138 start + kHardMemlockPrefix.length() + 1, end - start);
139 rtrim(memlock_value_string);
140 if (memlock_value_string == "unlimited" ||
141 memlock_value_string == "infinity") {
142 return UINT64_MAX;
143 } else {
144 return std::atoi(memlock_value_string.c_str());
145 }
146 }
147
148 // Ulimit hard memlock controls per socket limit for maximum locked memory in
149 // RAM. Parses all files under /etc/security/limits.d/ and
150 // /etc/security/limits.conf file for a line of the following format:
151 // * hard memlock <value>
152 // It extracts the first valid <value> and returns it. A value of UINT64_MAX
153 // represents unlimited or infinity. Hard memlock value should be set to
154 // allow zerocopy sendmsgs to succeed. It controls the maximum amount of
155 // memory that can be locked by a socket in RAM.
GetUlimitHardMemLock()156 uint64_t GetUlimitHardMemLock() {
157 static const uint64_t kUlimitHardMemLock = []() -> uint64_t {
158 if (CAP_IS_SUPPORTED(CAP_SYS_RESOURCE)) {
159 // hard memlock ulimit is ignored for privileged user.
160 return UINT64_MAX;
161 }
162 if (auto dir = opendir("/etc/security/limits.d")) {
163 while (auto f = readdir(dir)) {
164 if (f->d_name[0] == '.') {
165 continue; // Skip everything that starts with a dot
166 }
167 uint64_t hard_memlock = ParseUlimitMemLockFromFile(
168 absl::StrCat("/etc/security/limits.d/", std::string(f->d_name)));
169 if (hard_memlock != 0) {
170 return hard_memlock;
171 }
172 }
173 closedir(dir);
174 }
175 return ParseUlimitMemLockFromFile("/etc/security/limits.conf");
176 }();
177 return kUlimitHardMemLock;
178 }
179
180 // RLIMIT_MEMLOCK controls per process limit for maximum locked memory in RAM.
GetRLimitMemLockMax()181 uint64_t GetRLimitMemLockMax() {
182 static const uint64_t kRlimitMemLock = []() -> uint64_t {
183 if (CAP_IS_SUPPORTED(CAP_SYS_RESOURCE)) {
184 // RLIMIT_MEMLOCK is ignored for privileged user.
185 return UINT64_MAX;
186 }
187 struct rlimit limit;
188 if (getrlimit(RLIMIT_MEMLOCK, &limit) != 0) {
189 return 0;
190 }
191 return static_cast<uint64_t>(limit.rlim_max);
192 }();
193 return kRlimitMemLock;
194 }
195
196 // Whether the cmsg received from error queue is of the IPv4 or IPv6 levels.
CmsgIsIpLevel(const cmsghdr & cmsg)197 bool CmsgIsIpLevel(const cmsghdr& cmsg) {
198 return (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) ||
199 (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR);
200 }
201
CmsgIsZeroCopy(const cmsghdr & cmsg)202 bool CmsgIsZeroCopy(const cmsghdr& cmsg) {
203 if (!CmsgIsIpLevel(cmsg)) {
204 return false;
205 }
206 auto serr = reinterpret_cast<const sock_extended_err*> CMSG_DATA(&cmsg);
207 return serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY;
208 }
209 #endif // GRPC_LINUX_ERRQUEUE
210
PosixOSError(int error_no,const char * call_name)211 absl::Status PosixOSError(int error_no, const char* call_name) {
212 absl::Status s = absl::UnknownError(grpc_core::StrError(error_no));
213 grpc_core::StatusSetInt(&s, grpc_core::StatusIntProperty::kErrorNo, error_no);
214 grpc_core::StatusSetStr(&s, grpc_core::StatusStrProperty::kOsError,
215 grpc_core::StrError(error_no));
216 grpc_core::StatusSetStr(&s, grpc_core::StatusStrProperty::kSyscall,
217 call_name);
218 return s;
219 }
220
221 } // namespace
222
223 #if defined(IOV_MAX) && IOV_MAX < 260
224 #define MAX_WRITE_IOVEC IOV_MAX
225 #else
226 #define MAX_WRITE_IOVEC 260
227 #endif
PopulateIovs(size_t * unwind_slice_idx,size_t * unwind_byte_idx,size_t * sending_length,iovec * iov)228 msg_iovlen_type TcpZerocopySendRecord::PopulateIovs(size_t* unwind_slice_idx,
229 size_t* unwind_byte_idx,
230 size_t* sending_length,
231 iovec* iov) {
232 msg_iovlen_type iov_size;
233 *unwind_slice_idx = out_offset_.slice_idx;
234 *unwind_byte_idx = out_offset_.byte_idx;
235 for (iov_size = 0;
236 out_offset_.slice_idx != buf_.Count() && iov_size != MAX_WRITE_IOVEC;
237 iov_size++) {
238 MutableSlice& slice = internal::SliceCast<MutableSlice>(
239 buf_.MutableSliceAt(out_offset_.slice_idx));
240 iov[iov_size].iov_base = slice.begin();
241 iov[iov_size].iov_len = slice.length() - out_offset_.byte_idx;
242 *sending_length += iov[iov_size].iov_len;
243 ++(out_offset_.slice_idx);
244 out_offset_.byte_idx = 0;
245 }
246 GPR_DEBUG_ASSERT(iov_size > 0);
247 return iov_size;
248 }
249
UpdateOffsetForBytesSent(size_t sending_length,size_t actually_sent)250 void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length,
251 size_t actually_sent) {
252 size_t trailing = sending_length - actually_sent;
253 while (trailing > 0) {
254 size_t slice_length;
255 out_offset_.slice_idx--;
256 slice_length = buf_.RefSlice(out_offset_.slice_idx).length();
257 if (slice_length > trailing) {
258 out_offset_.byte_idx = slice_length - trailing;
259 break;
260 } else {
261 trailing -= slice_length;
262 }
263 }
264 }
265
AddToEstimate(size_t bytes)266 void PosixEndpointImpl::AddToEstimate(size_t bytes) {
267 bytes_read_this_round_ += static_cast<double>(bytes);
268 }
269
FinishEstimate()270 void PosixEndpointImpl::FinishEstimate() {
271 // If we read >80% of the target buffer in one read loop, increase the size of
272 // the target buffer to either the amount read, or twice its previous value.
273 if (bytes_read_this_round_ > target_length_ * 0.8) {
274 target_length_ = std::max(2 * target_length_, bytes_read_this_round_);
275 } else {
276 target_length_ = 0.99 * target_length_ + 0.01 * bytes_read_this_round_;
277 }
278 bytes_read_this_round_ = 0;
279 }
280
TcpAnnotateError(absl::Status src_error)281 absl::Status PosixEndpointImpl::TcpAnnotateError(absl::Status src_error) {
282 auto peer_string = ResolvedAddressToNormalizedString(peer_address_);
283
284 grpc_core::StatusSetStr(&src_error,
285 grpc_core::StatusStrProperty::kTargetAddress,
286 peer_string.ok() ? *peer_string : "");
287 grpc_core::StatusSetInt(&src_error, grpc_core::StatusIntProperty::kFd,
288 handle_->WrappedFd());
289 grpc_core::StatusSetInt(&src_error, grpc_core::StatusIntProperty::kRpcStatus,
290 GRPC_STATUS_UNAVAILABLE);
291 return src_error;
292 }
293
294 // Returns true if data available to read or error other than EAGAIN.
TcpDoRead(absl::Status & status)295 bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
296 struct msghdr msg;
297 struct iovec iov[MAX_READ_IOVEC];
298 ssize_t read_bytes;
299 size_t total_read_bytes = 0;
300 size_t iov_len = std::min<size_t>(MAX_READ_IOVEC, incoming_buffer_->Count());
301 #ifdef GRPC_LINUX_ERRQUEUE
302 constexpr size_t cmsg_alloc_space =
303 CMSG_SPACE(sizeof(scm_timestamping)) + CMSG_SPACE(sizeof(int));
304 #else
305 constexpr size_t cmsg_alloc_space = 24; // CMSG_SPACE(sizeof(int))
306 #endif // GRPC_LINUX_ERRQUEUE
307 char cmsgbuf[cmsg_alloc_space];
308 for (size_t i = 0; i < iov_len; i++) {
309 MutableSlice& slice =
310 internal::SliceCast<MutableSlice>(incoming_buffer_->MutableSliceAt(i));
311 iov[i].iov_base = slice.begin();
312 iov[i].iov_len = slice.length();
313 }
314
315 GPR_ASSERT(incoming_buffer_->Length() != 0);
316 GPR_DEBUG_ASSERT(min_progress_size_ > 0);
317
318 do {
319 // Assume there is something on the queue. If we receive TCP_INQ from
320 // kernel, we will update this value, otherwise, we have to assume there is
321 // always something to read until we get EAGAIN.
322 inq_ = 1;
323
324 msg.msg_name = nullptr;
325 msg.msg_namelen = 0;
326 msg.msg_iov = iov;
327 msg.msg_iovlen = static_cast<msg_iovlen_type>(iov_len);
328 if (inq_capable_) {
329 msg.msg_control = cmsgbuf;
330 msg.msg_controllen = sizeof(cmsgbuf);
331 } else {
332 msg.msg_control = nullptr;
333 msg.msg_controllen = 0;
334 }
335 msg.msg_flags = 0;
336
337 do {
338 read_bytes = recvmsg(fd_, &msg, 0);
339 } while (read_bytes < 0 && errno == EINTR);
340
341 if (read_bytes < 0 && errno == EAGAIN) {
342 // NB: After calling call_read_cb a parallel call of the read handler may
343 // be running.
344 if (total_read_bytes > 0) {
345 break;
346 }
347 FinishEstimate();
348 inq_ = 0;
349 return false;
350 }
351
352 // We have read something in previous reads. We need to deliver those bytes
353 // to the upper layer.
354 if (read_bytes <= 0 && total_read_bytes >= 1) {
355 inq_ = 1;
356 break;
357 }
358
359 if (read_bytes <= 0) {
360 // 0 read size ==> end of stream
361 incoming_buffer_->Clear();
362 if (read_bytes == 0) {
363 status = TcpAnnotateError(absl::InternalError("Socket closed"));
364 } else {
365 status = TcpAnnotateError(absl::InternalError(
366 absl::StrCat("recvmsg:", grpc_core::StrError(errno))));
367 }
368 return true;
369 }
370
371 AddToEstimate(static_cast<size_t>(read_bytes));
372 GPR_DEBUG_ASSERT((size_t)read_bytes <=
373 incoming_buffer_->Length() - total_read_bytes);
374
375 #ifdef GRPC_HAVE_TCP_INQ
376 if (inq_capable_) {
377 GPR_DEBUG_ASSERT(!(msg.msg_flags & MSG_CTRUNC));
378 struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
379 for (; cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
380 if (cmsg->cmsg_level == SOL_TCP && cmsg->cmsg_type == TCP_CM_INQ &&
381 cmsg->cmsg_len == CMSG_LEN(sizeof(int))) {
382 inq_ = *reinterpret_cast<int*>(CMSG_DATA(cmsg));
383 break;
384 }
385 }
386 }
387 #endif // GRPC_HAVE_TCP_INQ
388
389 total_read_bytes += read_bytes;
390 if (inq_ == 0 || total_read_bytes == incoming_buffer_->Length()) {
391 break;
392 }
393
394 // We had a partial read, and still have space to read more data. So, adjust
395 // IOVs and try to read more.
396 size_t remaining = read_bytes;
397 size_t j = 0;
398 for (size_t i = 0; i < iov_len; i++) {
399 if (remaining >= iov[i].iov_len) {
400 remaining -= iov[i].iov_len;
401 continue;
402 }
403 if (remaining > 0) {
404 iov[j].iov_base = static_cast<char*>(iov[i].iov_base) + remaining;
405 iov[j].iov_len = iov[i].iov_len - remaining;
406 remaining = 0;
407 } else {
408 iov[j].iov_base = iov[i].iov_base;
409 iov[j].iov_len = iov[i].iov_len;
410 }
411 ++j;
412 }
413 iov_len = j;
414 } while (true);
415
416 if (inq_ == 0) {
417 FinishEstimate();
418 }
419
420 GPR_DEBUG_ASSERT(total_read_bytes > 0);
421 status = absl::OkStatus();
422 if (grpc_core::IsTcpFrameSizeTuningEnabled()) {
423 // Update min progress size based on the total number of bytes read in
424 // this round.
425 min_progress_size_ -= total_read_bytes;
426 if (min_progress_size_ > 0) {
427 // There is still some bytes left to be read before we can signal
428 // the read as complete. Append the bytes read so far into
429 // last_read_buffer which serves as a staging buffer. Return false
430 // to indicate tcp_handle_read needs to be scheduled again.
431 incoming_buffer_->MoveFirstNBytesIntoSliceBuffer(total_read_bytes,
432 last_read_buffer_);
433 return false;
434 } else {
435 // The required number of bytes have been read. Append the bytes
436 // read in this round into last_read_buffer. Then swap last_read_buffer
437 // and incoming_buffer. Now incoming buffer contains all the bytes
438 // read since the start of the last tcp_read operation. last_read_buffer
439 // would contain any spare space left in the incoming buffer. This
440 // space will be used in the next tcp_read operation.
441 min_progress_size_ = 1;
442 incoming_buffer_->MoveFirstNBytesIntoSliceBuffer(total_read_bytes,
443 last_read_buffer_);
444 incoming_buffer_->Swap(last_read_buffer_);
445 return true;
446 }
447 }
448 if (total_read_bytes < incoming_buffer_->Length()) {
449 incoming_buffer_->MoveLastNBytesIntoSliceBuffer(
450 incoming_buffer_->Length() - total_read_bytes, last_read_buffer_);
451 }
452 return true;
453 }
454
PerformReclamation()455 void PosixEndpointImpl::PerformReclamation() {
456 read_mu_.Lock();
457 if (incoming_buffer_ != nullptr) {
458 incoming_buffer_->Clear();
459 }
460 has_posted_reclaimer_ = false;
461 read_mu_.Unlock();
462 }
463
MaybePostReclaimer()464 void PosixEndpointImpl::MaybePostReclaimer() {
465 if (!has_posted_reclaimer_) {
466 has_posted_reclaimer_ = true;
467 memory_owner_.PostReclaimer(
468 grpc_core::ReclamationPass::kBenign,
469 [self = Ref(DEBUG_LOCATION, "Posix Reclaimer")](
470 absl::optional<grpc_core::ReclamationSweep> sweep) {
471 if (sweep.has_value()) {
472 self->PerformReclamation();
473 }
474 });
475 }
476 }
477
UpdateRcvLowat()478 void PosixEndpointImpl::UpdateRcvLowat() {
479 if (!grpc_core::IsTcpRcvLowatEnabled()) return;
480
481 // TODO(ctiller): Check if supported by OS.
482 // TODO(ctiller): Allow some adjustments instead of hardcoding things.
483
484 static constexpr int kRcvLowatMax = 16 * 1024 * 1024;
485 static constexpr int kRcvLowatThreshold = 16 * 1024;
486
487 int remaining = std::min({static_cast<int>(incoming_buffer_->Length()),
488 kRcvLowatMax, min_progress_size_});
489
490 // Setting SO_RCVLOWAT for small quantities does not save on CPU.
491 if (remaining < kRcvLowatThreshold) {
492 remaining = 0;
493 }
494
495 // If zerocopy is off, wake shortly before the full RPC is here. More can
496 // show up partway through recvmsg() since it takes a while to copy data.
497 // So an early wakeup aids latency.
498 if (!tcp_zerocopy_send_ctx_->Enabled() && remaining > 0) {
499 remaining -= kRcvLowatThreshold;
500 }
501
502 // We still do not know the RPC size. Do not set SO_RCVLOWAT.
503 if (set_rcvlowat_ <= 1 && remaining <= 1) return;
504
505 // Previous value is still valid. No change needed in SO_RCVLOWAT.
506 if (set_rcvlowat_ == remaining) {
507 return;
508 }
509 auto result = sock_.SetSocketRcvLowat(remaining);
510 if (result.ok()) {
511 set_rcvlowat_ = *result;
512 } else {
513 gpr_log(GPR_ERROR, "%s",
514 absl::StrCat("ERROR in SO_RCVLOWAT: ", result.status().message())
515 .c_str());
516 }
517 }
518
MaybeMakeReadSlices()519 void PosixEndpointImpl::MaybeMakeReadSlices() {
520 static const int kBigAlloc = 64 * 1024;
521 static const int kSmallAlloc = 8 * 1024;
522 if (incoming_buffer_->Length() < static_cast<size_t>(min_progress_size_)) {
523 size_t allocate_length = min_progress_size_;
524 const size_t target_length = static_cast<size_t>(target_length_);
525 // If memory pressure is low and we think there will be more than
526 // min_progress_size bytes to read, allocate a bit more.
527 const bool low_memory_pressure =
528 memory_owner_.GetPressureInfo().pressure_control_value < 0.8;
529 if (low_memory_pressure && target_length > allocate_length) {
530 allocate_length = target_length;
531 }
532 int extra_wanted =
533 allocate_length - static_cast<int>(incoming_buffer_->Length());
534 if (extra_wanted >=
535 (low_memory_pressure ? kSmallAlloc * 3 / 2 : kBigAlloc)) {
536 while (extra_wanted > 0) {
537 extra_wanted -= kBigAlloc;
538 incoming_buffer_->AppendIndexed(
539 Slice(memory_owner_.MakeSlice(kBigAlloc)));
540 }
541 } else {
542 while (extra_wanted > 0) {
543 extra_wanted -= kSmallAlloc;
544 incoming_buffer_->AppendIndexed(
545 Slice(memory_owner_.MakeSlice(kSmallAlloc)));
546 }
547 }
548 MaybePostReclaimer();
549 }
550 }
551
HandleRead(absl::Status status)552 void PosixEndpointImpl::HandleRead(absl::Status status) {
553 read_mu_.Lock();
554 if (status.ok() && memory_owner_.is_valid()) {
555 MaybeMakeReadSlices();
556 if (!TcpDoRead(status)) {
557 UpdateRcvLowat();
558 // We've consumed the edge, request a new one.
559 read_mu_.Unlock();
560 handle_->NotifyOnRead(on_read_);
561 return;
562 }
563 } else {
564 if (!memory_owner_.is_valid()) {
565 status = absl::UnknownError("Shutting down endpoint");
566 }
567 incoming_buffer_->Clear();
568 last_read_buffer_.Clear();
569 }
570 absl::AnyInvocable<void(absl::Status)> cb = std::move(read_cb_);
571 read_cb_ = nullptr;
572 incoming_buffer_ = nullptr;
573 read_mu_.Unlock();
574 cb(status);
575 Unref();
576 }
577
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const EventEngine::Endpoint::ReadArgs * args)578 bool PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
579 SliceBuffer* buffer,
580 const EventEngine::Endpoint::ReadArgs* args) {
581 grpc_core::ReleasableMutexLock lock(&read_mu_);
582 GPR_ASSERT(read_cb_ == nullptr);
583 incoming_buffer_ = buffer;
584 incoming_buffer_->Clear();
585 incoming_buffer_->Swap(last_read_buffer_);
586 if (args != nullptr && grpc_core::IsTcpFrameSizeTuningEnabled()) {
587 min_progress_size_ = std::max(static_cast<int>(args->read_hint_bytes), 1);
588 } else {
589 min_progress_size_ = 1;
590 }
591 Ref().release();
592 if (is_first_read_) {
593 read_cb_ = std::move(on_read);
594 UpdateRcvLowat();
595 // Endpoint read called for the very first time. Register read callback
596 // with the polling engine.
597 is_first_read_ = false;
598 lock.Release();
599 handle_->NotifyOnRead(on_read_);
600 } else if (inq_ == 0) {
601 read_cb_ = std::move(on_read);
602 UpdateRcvLowat();
603 lock.Release();
604 // Upper layer asked to read more but we know there is no pending data to
605 // read from previous reads. So, wait for POLLIN.
606 handle_->NotifyOnRead(on_read_);
607 } else {
608 absl::Status status;
609 MaybeMakeReadSlices();
610 if (!TcpDoRead(status)) {
611 UpdateRcvLowat();
612 read_cb_ = std::move(on_read);
613 // We've consumed the edge, request a new one.
614 lock.Release();
615 handle_->NotifyOnRead(on_read_);
616 return false;
617 }
618 if (!status.ok()) {
619 // Read failed immediately. Schedule the on_read callback to run
620 // asynchronously.
621 lock.Release();
622 engine_->Run([on_read = std::move(on_read), status]() mutable {
623 on_read(status);
624 });
625 Unref();
626 return false;
627 }
628 // Read succeeded immediately. Return true and don't run the on_read
629 // callback.
630 incoming_buffer_ = nullptr;
631 Unref();
632 return true;
633 }
634 return false;
635 }
636
637 #ifdef GRPC_LINUX_ERRQUEUE
TcpGetSendZerocopyRecord(SliceBuffer & buf)638 TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord(
639 SliceBuffer& buf) {
640 TcpZerocopySendRecord* zerocopy_send_record = nullptr;
641 const bool use_zerocopy =
642 tcp_zerocopy_send_ctx_->Enabled() &&
643 tcp_zerocopy_send_ctx_->ThresholdBytes() < buf.Length();
644 if (use_zerocopy) {
645 zerocopy_send_record = tcp_zerocopy_send_ctx_->GetSendRecord();
646 if (zerocopy_send_record == nullptr) {
647 ProcessErrors();
648 zerocopy_send_record = tcp_zerocopy_send_ctx_->GetSendRecord();
649 }
650 if (zerocopy_send_record != nullptr) {
651 zerocopy_send_record->PrepareForSends(buf);
652 GPR_DEBUG_ASSERT(buf.Count() == 0);
653 GPR_DEBUG_ASSERT(buf.Length() == 0);
654 outgoing_byte_idx_ = 0;
655 outgoing_buffer_ = nullptr;
656 }
657 }
658 return zerocopy_send_record;
659 }
660
661 // For linux platforms, reads the socket's error queue and processes error
662 // messages from the queue.
ProcessErrors()663 bool PosixEndpointImpl::ProcessErrors() {
664 bool processed_err = false;
665 struct iovec iov;
666 iov.iov_base = nullptr;
667 iov.iov_len = 0;
668 struct msghdr msg;
669 msg.msg_name = nullptr;
670 msg.msg_namelen = 0;
671 msg.msg_iov = &iov;
672 msg.msg_iovlen = 0;
673 msg.msg_flags = 0;
674 // Allocate enough space so we don't need to keep increasing this as size of
675 // OPT_STATS increase.
676 constexpr size_t cmsg_alloc_space =
677 CMSG_SPACE(sizeof(scm_timestamping)) +
678 CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) +
679 CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t)));
680 // Allocate aligned space for cmsgs received along with timestamps.
681 union {
682 char rbuf[cmsg_alloc_space];
683 struct cmsghdr align;
684 } aligned_buf;
685 msg.msg_control = aligned_buf.rbuf;
686 int r, saved_errno;
687 while (true) {
688 msg.msg_controllen = sizeof(aligned_buf.rbuf);
689 do {
690 r = recvmsg(fd_, &msg, MSG_ERRQUEUE);
691 saved_errno = errno;
692 } while (r < 0 && saved_errno == EINTR);
693
694 if (r < 0 && saved_errno == EAGAIN) {
695 return processed_err; // No more errors to process
696 } else if (r < 0) {
697 return processed_err;
698 }
699 if (GPR_UNLIKELY((msg.msg_flags & MSG_CTRUNC) != 0)) {
700 gpr_log(GPR_ERROR, "Error message was truncated.");
701 }
702
703 if (msg.msg_controllen == 0) {
704 // There was no control message found. It was probably spurious.
705 return processed_err;
706 }
707 bool seen = false;
708 for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
709 cmsg = CMSG_NXTHDR(&msg, cmsg)) {
710 if (CmsgIsZeroCopy(*cmsg)) {
711 ProcessZerocopy(cmsg);
712 seen = true;
713 processed_err = true;
714 } else if (cmsg->cmsg_level == SOL_SOCKET &&
715 cmsg->cmsg_type == SCM_TIMESTAMPING) {
716 cmsg = ProcessTimestamp(&msg, cmsg);
717 seen = true;
718 processed_err = true;
719 } else {
720 // Got a control message that is not a timestamp or zerocopy. Don't know
721 // how to handle this.
722 return processed_err;
723 }
724 }
725 if (!seen) {
726 return processed_err;
727 }
728 }
729 }
730
ZerocopyDisableAndWaitForRemaining()731 void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() {
732 tcp_zerocopy_send_ctx_->Shutdown();
733 while (!tcp_zerocopy_send_ctx_->AllSendRecordsEmpty()) {
734 ProcessErrors();
735 }
736 }
737
738 // Reads \a cmsg to process zerocopy control messages.
ProcessZerocopy(struct cmsghdr * cmsg)739 void PosixEndpointImpl::ProcessZerocopy(struct cmsghdr* cmsg) {
740 GPR_DEBUG_ASSERT(cmsg);
741 auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(cmsg));
742 GPR_DEBUG_ASSERT(serr->ee_errno == 0);
743 GPR_DEBUG_ASSERT(serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY);
744 const uint32_t lo = serr->ee_info;
745 const uint32_t hi = serr->ee_data;
746 for (uint32_t seq = lo; seq <= hi; ++seq) {
747 // TODO(arjunroy): It's likely that lo and hi refer to zerocopy sequence
748 // numbers that are generated by a single call to grpc_endpoint_write; ie.
749 // we can batch the unref operation. So, check if record is the same for
750 // both; if so, batch the unref/put.
751 TcpZerocopySendRecord* record =
752 tcp_zerocopy_send_ctx_->ReleaseSendRecord(seq);
753 GPR_DEBUG_ASSERT(record);
754 UnrefMaybePutZerocopySendRecord(record);
755 }
756 if (tcp_zerocopy_send_ctx_->UpdateZeroCopyOptMemStateAfterFree()) {
757 handle_->SetWritable();
758 }
759 }
760
761 // Reads \a cmsg to derive timestamps from the control messages. If a valid
762 // timestamp is found, the traced buffer list is updated with this timestamp.
763 // The caller of this function should be looping on the control messages found
764 // in \a msg. \a cmsg should point to the control message that the caller wants
765 // processed. On return, a pointer to a control message is returned. On the next
766 // iteration, CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg.
ProcessTimestamp(msghdr * msg,struct cmsghdr * cmsg)767 struct cmsghdr* PosixEndpointImpl::ProcessTimestamp(msghdr* msg,
768 struct cmsghdr* cmsg) {
769 auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
770 cmsghdr* opt_stats = nullptr;
771 if (next_cmsg == nullptr) {
772 return cmsg;
773 }
774
775 // Check if next_cmsg is an OPT_STATS msg.
776 if (next_cmsg->cmsg_level == SOL_SOCKET &&
777 next_cmsg->cmsg_type == SCM_TIMESTAMPING_OPT_STATS) {
778 opt_stats = next_cmsg;
779 next_cmsg = CMSG_NXTHDR(msg, opt_stats);
780 if (next_cmsg == nullptr) {
781 return opt_stats;
782 }
783 }
784
785 if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
786 !(next_cmsg->cmsg_type == IP_RECVERR ||
787 next_cmsg->cmsg_type == IPV6_RECVERR)) {
788 return cmsg;
789 }
790
791 auto tss = reinterpret_cast<scm_timestamping*>(CMSG_DATA(cmsg));
792 auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
793 if (serr->ee_errno != ENOMSG ||
794 serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
795 gpr_log(GPR_ERROR, "Unexpected control message");
796 return cmsg;
797 }
798 traced_buffers_.ProcessTimestamp(serr, opt_stats, tss);
799 return next_cmsg;
800 }
801
HandleError(absl::Status status)802 void PosixEndpointImpl::HandleError(absl::Status status) {
803 if (!status.ok() ||
804 stop_error_notification_.load(std::memory_order_relaxed)) {
805 // We aren't going to register to hear on error anymore, so it is safe to
806 // unref.
807 Unref();
808 return;
809 }
810 // We are still interested in collecting timestamps, so let's try reading
811 // them.
812 if (!ProcessErrors()) {
813 // This might not a timestamps error. Set the read and write closures to be
814 // ready.
815 handle_->SetReadable();
816 handle_->SetWritable();
817 }
818 handle_->NotifyOnError(on_error_);
819 }
820
WriteWithTimestamps(struct msghdr * msg,size_t sending_length,ssize_t * sent_length,int * saved_errno,int additional_flags)821 bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* msg,
822 size_t sending_length,
823 ssize_t* sent_length,
824 int* saved_errno,
825 int additional_flags) {
826 if (!socket_ts_enabled_) {
827 uint32_t opt = kTimestampingSocketOptions;
828 if (setsockopt(fd_, SOL_SOCKET, SO_TIMESTAMPING, static_cast<void*>(&opt),
829 sizeof(opt)) != 0) {
830 return false;
831 }
832 bytes_counter_ = -1;
833 socket_ts_enabled_ = true;
834 }
835 // Set control message to indicate that you want timestamps.
836 union {
837 char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))];
838 struct cmsghdr align;
839 } u;
840 cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf);
841 cmsg->cmsg_level = SOL_SOCKET;
842 cmsg->cmsg_type = SO_TIMESTAMPING;
843 cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t));
844 *reinterpret_cast<int*>(CMSG_DATA(cmsg)) = kTimestampingRecordingOptions;
845 msg->msg_control = u.cmsg_buf;
846 msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));
847
848 // If there was an error on sendmsg the logic in tcp_flush will handle it.
849 ssize_t length = TcpSend(fd_, msg, saved_errno, additional_flags);
850 *sent_length = length;
851 // Only save timestamps if all the bytes were taken by sendmsg.
852 if (sending_length == static_cast<size_t>(length)) {
853 traced_buffers_.AddNewEntry(static_cast<uint32_t>(bytes_counter_ + length),
854 fd_, outgoing_buffer_arg_);
855 outgoing_buffer_arg_ = nullptr;
856 }
857 return true;
858 }
859
860 #else // GRPC_LINUX_ERRQUEUE
TcpGetSendZerocopyRecord(SliceBuffer &)861 TcpZerocopySendRecord* PosixEndpointImpl::TcpGetSendZerocopyRecord(
862 SliceBuffer& /*buf*/) {
863 return nullptr;
864 }
865
HandleError(absl::Status)866 void PosixEndpointImpl::HandleError(absl::Status /*status*/) {
867 grpc_core::Crash("Error handling not supported on this platform");
868 }
869
ZerocopyDisableAndWaitForRemaining()870 void PosixEndpointImpl::ZerocopyDisableAndWaitForRemaining() {}
871
WriteWithTimestamps(struct msghdr *,size_t,ssize_t *,int *,int)872 bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* /*msg*/,
873 size_t /*sending_length*/,
874 ssize_t* /*sent_length*/,
875 int* /*saved_errno*/,
876 int /*additional_flags*/) {
877 grpc_core::Crash("Write with timestamps not supported for this platform");
878 }
879 #endif // GRPC_LINUX_ERRQUEUE
880
UnrefMaybePutZerocopySendRecord(TcpZerocopySendRecord * record)881 void PosixEndpointImpl::UnrefMaybePutZerocopySendRecord(
882 TcpZerocopySendRecord* record) {
883 if (record->Unref()) {
884 tcp_zerocopy_send_ctx_->PutSendRecord(record);
885 }
886 }
887
888 // If outgoing_buffer_arg is filled, shuts down the list early, so that any
889 // release operations needed can be performed on the arg.
TcpShutdownTracedBufferList()890 void PosixEndpointImpl::TcpShutdownTracedBufferList() {
891 if (outgoing_buffer_arg_ != nullptr) {
892 traced_buffers_.Shutdown(outgoing_buffer_arg_,
893 absl::InternalError("TracedBuffer list shutdown"));
894 outgoing_buffer_arg_ = nullptr;
895 }
896 }
897
898 // returns true if done, false if pending; if returning true, *error is set
DoFlushZerocopy(TcpZerocopySendRecord * record,absl::Status & status)899 bool PosixEndpointImpl::DoFlushZerocopy(TcpZerocopySendRecord* record,
900 absl::Status& status) {
901 msg_iovlen_type iov_size;
902 ssize_t sent_length = 0;
903 size_t sending_length;
904 size_t unwind_slice_idx;
905 size_t unwind_byte_idx;
906 bool tried_sending_message;
907 int saved_errno;
908 msghdr msg;
909 bool constrained;
910 status = absl::OkStatus();
911 // iov consumes a large space. Keep it as the last item on the stack to
912 // improve locality. After all, we expect only the first elements of it
913 // being populated in most cases.
914 iovec iov[MAX_WRITE_IOVEC];
915 while (true) {
916 sending_length = 0;
917 iov_size = record->PopulateIovs(&unwind_slice_idx, &unwind_byte_idx,
918 &sending_length, iov);
919 msg.msg_name = nullptr;
920 msg.msg_namelen = 0;
921 msg.msg_iov = iov;
922 msg.msg_iovlen = iov_size;
923 msg.msg_flags = 0;
924 tried_sending_message = false;
925 constrained = false;
926 // Before calling sendmsg (with or without timestamps): we
927 // take a single ref on the zerocopy send record.
928 tcp_zerocopy_send_ctx_->NoteSend(record);
929 saved_errno = 0;
930 if (outgoing_buffer_arg_ != nullptr) {
931 if (!ts_capable_ ||
932 !WriteWithTimestamps(&msg, sending_length, &sent_length, &saved_errno,
933 MSG_ZEROCOPY)) {
934 // We could not set socket options to collect Fathom timestamps.
935 // Fallback on writing without timestamps.
936 ts_capable_ = false;
937 TcpShutdownTracedBufferList();
938 } else {
939 tried_sending_message = true;
940 }
941 }
942 if (!tried_sending_message) {
943 msg.msg_control = nullptr;
944 msg.msg_controllen = 0;
945 sent_length = TcpSend(fd_, &msg, &saved_errno, MSG_ZEROCOPY);
946 }
947 if (tcp_zerocopy_send_ctx_->UpdateZeroCopyOptMemStateAfterSend(
948 saved_errno == ENOBUFS, constrained) ||
949 constrained) {
950 // If constrained, is true it implies that we received an ENOBUFS error
951 // but there are no un-acked z-copy records. This situation may arise
952 // because the per-process RLIMIT_MEMLOCK limit or the per-socket hard
953 // memlock ulimit on the machine may be very small. These limits control
954 // the max number of bytes a process/socket can respectively pin to RAM.
955 // Tx0cp respects these limits and if a sendmsg tries to send more than
956 // this limit, the kernel may return ENOBUFS error. Print a warning
957 // message here to allow help with debugging. Grpc should not attempt to
958 // raise the limit values.
959 if (!constrained) {
960 handle_->SetWritable();
961 } else {
962 #ifdef GRPC_LINUX_ERRQUEUE
963 GRPC_LOG_EVERY_N_SEC(
964 1, GPR_INFO,
965 "Tx0cp encountered an ENOBUFS error possibly because one or "
966 "both of RLIMIT_MEMLOCK or hard memlock ulimit values are too "
967 "small for the intended user. Current system value of "
968 "RLIMIT_MEMLOCK is %" PRIu64 " and hard memlock ulimit is %" PRIu64
969 ".Consider increasing these values appropriately for the intended "
970 "user.",
971 GetRLimitMemLockMax(), GetUlimitHardMemLock());
972 #endif
973 }
974 }
975 if (sent_length < 0) {
976 // If this particular send failed, drop ref taken earlier in this method.
977 tcp_zerocopy_send_ctx_->UndoSend();
978 if (saved_errno == EAGAIN || saved_errno == ENOBUFS) {
979 record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx);
980 return false;
981 } else {
982 status = TcpAnnotateError(PosixOSError(saved_errno, "sendmsg"));
983 TcpShutdownTracedBufferList();
984 return true;
985 }
986 }
987 bytes_counter_ += sent_length;
988 record->UpdateOffsetForBytesSent(sending_length,
989 static_cast<size_t>(sent_length));
990 if (record->AllSlicesSent()) {
991 return true;
992 }
993 }
994 }
995
TcpFlushZerocopy(TcpZerocopySendRecord * record,absl::Status & status)996 bool PosixEndpointImpl::TcpFlushZerocopy(TcpZerocopySendRecord* record,
997 absl::Status& status) {
998 bool done = DoFlushZerocopy(record, status);
999 if (done) {
1000 // Either we encountered an error, or we successfully sent all the bytes.
1001 // In either case, we're done with this record.
1002 UnrefMaybePutZerocopySendRecord(record);
1003 }
1004 return done;
1005 }
1006
TcpFlush(absl::Status & status)1007 bool PosixEndpointImpl::TcpFlush(absl::Status& status) {
1008 struct msghdr msg;
1009 struct iovec iov[MAX_WRITE_IOVEC];
1010 msg_iovlen_type iov_size;
1011 ssize_t sent_length = 0;
1012 size_t sending_length;
1013 size_t trailing;
1014 size_t unwind_slice_idx;
1015 size_t unwind_byte_idx;
1016 int saved_errno;
1017 status = absl::OkStatus();
1018
1019 // We always start at zero, because we eagerly unref and trim the slice
1020 // buffer as we write
1021 size_t outgoing_slice_idx = 0;
1022
1023 while (true) {
1024 sending_length = 0;
1025 unwind_slice_idx = outgoing_slice_idx;
1026 unwind_byte_idx = outgoing_byte_idx_;
1027 for (iov_size = 0; outgoing_slice_idx != outgoing_buffer_->Count() &&
1028 iov_size != MAX_WRITE_IOVEC;
1029 iov_size++) {
1030 MutableSlice& slice = internal::SliceCast<MutableSlice>(
1031 outgoing_buffer_->MutableSliceAt(outgoing_slice_idx));
1032 iov[iov_size].iov_base = slice.begin() + outgoing_byte_idx_;
1033 iov[iov_size].iov_len = slice.length() - outgoing_byte_idx_;
1034
1035 sending_length += iov[iov_size].iov_len;
1036 outgoing_slice_idx++;
1037 outgoing_byte_idx_ = 0;
1038 }
1039 GPR_ASSERT(iov_size > 0);
1040
1041 msg.msg_name = nullptr;
1042 msg.msg_namelen = 0;
1043 msg.msg_iov = iov;
1044 msg.msg_iovlen = iov_size;
1045 msg.msg_flags = 0;
1046 bool tried_sending_message = false;
1047 saved_errno = 0;
1048 if (outgoing_buffer_arg_ != nullptr) {
1049 if (!ts_capable_ || !WriteWithTimestamps(&msg, sending_length,
1050 &sent_length, &saved_errno, 0)) {
1051 // We could not set socket options to collect Fathom timestamps.
1052 // Fallback on writing without timestamps.
1053 ts_capable_ = false;
1054 TcpShutdownTracedBufferList();
1055 } else {
1056 tried_sending_message = true;
1057 }
1058 }
1059 if (!tried_sending_message) {
1060 msg.msg_control = nullptr;
1061 msg.msg_controllen = 0;
1062 sent_length = TcpSend(fd_, &msg, &saved_errno);
1063 }
1064
1065 if (sent_length < 0) {
1066 if (saved_errno == EAGAIN || saved_errno == ENOBUFS) {
1067 outgoing_byte_idx_ = unwind_byte_idx;
1068 // unref all and forget about all slices that have been written to this
1069 // point
1070 for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
1071 outgoing_buffer_->TakeFirst();
1072 }
1073 return false;
1074 } else {
1075 status = TcpAnnotateError(PosixOSError(saved_errno, "sendmsg"));
1076 outgoing_buffer_->Clear();
1077 TcpShutdownTracedBufferList();
1078 return true;
1079 }
1080 }
1081
1082 GPR_ASSERT(outgoing_byte_idx_ == 0);
1083 bytes_counter_ += sent_length;
1084 trailing = sending_length - static_cast<size_t>(sent_length);
1085 while (trailing > 0) {
1086 size_t slice_length;
1087 outgoing_slice_idx--;
1088 slice_length = outgoing_buffer_->RefSlice(outgoing_slice_idx).length();
1089 if (slice_length > trailing) {
1090 outgoing_byte_idx_ = slice_length - trailing;
1091 break;
1092 } else {
1093 trailing -= slice_length;
1094 }
1095 }
1096 if (outgoing_slice_idx == outgoing_buffer_->Count()) {
1097 outgoing_buffer_->Clear();
1098 return true;
1099 }
1100 }
1101 }
1102
HandleWrite(absl::Status status)1103 void PosixEndpointImpl::HandleWrite(absl::Status status) {
1104 if (!status.ok()) {
1105 absl::AnyInvocable<void(absl::Status)> cb_ = std::move(write_cb_);
1106 write_cb_ = nullptr;
1107 if (current_zerocopy_send_ != nullptr) {
1108 UnrefMaybePutZerocopySendRecord(current_zerocopy_send_);
1109 current_zerocopy_send_ = nullptr;
1110 }
1111 cb_(status);
1112 Unref();
1113 return;
1114 }
1115 bool flush_result = current_zerocopy_send_ != nullptr
1116 ? TcpFlushZerocopy(current_zerocopy_send_, status)
1117 : TcpFlush(status);
1118 if (!flush_result) {
1119 GPR_DEBUG_ASSERT(status.ok());
1120 handle_->NotifyOnWrite(on_write_);
1121 } else {
1122 absl::AnyInvocable<void(absl::Status)> cb_ = std::move(write_cb_);
1123 write_cb_ = nullptr;
1124 current_zerocopy_send_ = nullptr;
1125 cb_(status);
1126 Unref();
1127 }
1128 }
1129
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const EventEngine::Endpoint::WriteArgs * args)1130 bool PosixEndpointImpl::Write(
1131 absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
1132 const EventEngine::Endpoint::WriteArgs* args) {
1133 absl::Status status = absl::OkStatus();
1134 TcpZerocopySendRecord* zerocopy_send_record = nullptr;
1135
1136 GPR_ASSERT(write_cb_ == nullptr);
1137 GPR_DEBUG_ASSERT(current_zerocopy_send_ == nullptr);
1138 GPR_DEBUG_ASSERT(data != nullptr);
1139
1140 if (data->Length() == 0) {
1141 TcpShutdownTracedBufferList();
1142 if (handle_->IsHandleShutdown()) {
1143 status = TcpAnnotateError(absl::InternalError("EOF"));
1144 engine_->Run([on_writable = std::move(on_writable), status]() mutable {
1145 on_writable(status);
1146 });
1147 return false;
1148 }
1149 return true;
1150 }
1151
1152 zerocopy_send_record = TcpGetSendZerocopyRecord(*data);
1153 if (zerocopy_send_record == nullptr) {
1154 // Either not enough bytes, or couldn't allocate a zerocopy context.
1155 outgoing_buffer_ = data;
1156 outgoing_byte_idx_ = 0;
1157 }
1158 if (args != nullptr) {
1159 outgoing_buffer_arg_ = args->google_specific;
1160 }
1161 if (outgoing_buffer_arg_) {
1162 GPR_ASSERT(poller_->CanTrackErrors());
1163 }
1164
1165 bool flush_result = zerocopy_send_record != nullptr
1166 ? TcpFlushZerocopy(zerocopy_send_record, status)
1167 : TcpFlush(status);
1168 if (!flush_result) {
1169 Ref().release();
1170 write_cb_ = std::move(on_writable);
1171 current_zerocopy_send_ = zerocopy_send_record;
1172 handle_->NotifyOnWrite(on_write_);
1173 return false;
1174 }
1175 if (!status.ok()) {
1176 // Write failed immediately. Schedule the on_writable callback to run
1177 // asynchronously.
1178 engine_->Run([on_writable = std::move(on_writable), status]() mutable {
1179 on_writable(status);
1180 });
1181 return false;
1182 }
1183 // Write succeeded immediately. Return true and don't run the on_writable
1184 // callback.
1185 return true;
1186 }
1187
MaybeShutdown(absl::Status why,absl::AnyInvocable<void (absl::StatusOr<int>)> on_release_fd)1188 void PosixEndpointImpl::MaybeShutdown(
1189 absl::Status why,
1190 absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
1191 if (poller_->CanTrackErrors()) {
1192 ZerocopyDisableAndWaitForRemaining();
1193 stop_error_notification_.store(true, std::memory_order_release);
1194 handle_->SetHasError();
1195 }
1196 on_release_fd_ = std::move(on_release_fd);
1197 grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus,
1198 GRPC_STATUS_UNAVAILABLE);
1199 handle_->ShutdownHandle(why);
1200 read_mu_.Lock();
1201 memory_owner_.Reset();
1202 read_mu_.Unlock();
1203 Unref();
1204 }
1205
~PosixEndpointImpl()1206 PosixEndpointImpl ::~PosixEndpointImpl() {
1207 int release_fd = -1;
1208 handle_->OrphanHandle(on_done_,
1209 on_release_fd_ == nullptr ? nullptr : &release_fd, "");
1210 if (on_release_fd_ != nullptr) {
1211 engine_->Run([on_release_fd = std::move(on_release_fd_),
1212 release_fd]() mutable { on_release_fd(release_fd); });
1213 }
1214 delete on_read_;
1215 delete on_write_;
1216 delete on_error_;
1217 }
1218
PosixEndpointImpl(EventHandle * handle,PosixEngineClosure * on_done,std::shared_ptr<EventEngine> engine,MemoryAllocator &&,const PosixTcpOptions & options)1219 PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle,
1220 PosixEngineClosure* on_done,
1221 std::shared_ptr<EventEngine> engine,
1222 MemoryAllocator&& /*allocator*/,
1223 const PosixTcpOptions& options)
1224 : sock_(PosixSocketWrapper(handle->WrappedFd())),
1225 on_done_(on_done),
1226 traced_buffers_(),
1227 handle_(handle),
1228 poller_(handle->Poller()),
1229 engine_(engine) {
1230 PosixSocketWrapper sock(handle->WrappedFd());
1231 fd_ = handle_->WrappedFd();
1232 GPR_ASSERT(options.resource_quota != nullptr);
1233 auto peer_addr_string = sock.PeerAddressString();
1234 mem_quota_ = options.resource_quota->memory_quota();
1235 memory_owner_ = mem_quota_->CreateMemoryOwner(
1236 peer_addr_string.ok() ? *peer_addr_string : "");
1237 self_reservation_ = memory_owner_.MakeReservation(sizeof(PosixEndpointImpl));
1238 auto local_address = sock.LocalAddress();
1239 if (local_address.ok()) {
1240 local_address_ = *local_address;
1241 }
1242 auto peer_address = sock.PeerAddress();
1243 if (peer_address.ok()) {
1244 peer_address_ = *peer_address;
1245 }
1246 target_length_ = static_cast<double>(options.tcp_read_chunk_size);
1247 bytes_read_this_round_ = 0;
1248 min_read_chunk_size_ = options.tcp_min_read_chunk_size;
1249 max_read_chunk_size_ = options.tcp_max_read_chunk_size;
1250 bool zerocopy_enabled =
1251 options.tcp_tx_zero_copy_enabled && poller_->CanTrackErrors();
1252 #ifdef GRPC_LINUX_ERRQUEUE
1253 if (zerocopy_enabled) {
1254 if (GetRLimitMemLockMax() == 0) {
1255 zerocopy_enabled = false;
1256 gpr_log(
1257 GPR_ERROR,
1258 "Tx zero-copy will not be used by gRPC since RLIMIT_MEMLOCK value is "
1259 "not set. Consider raising its value with setrlimit().");
1260 } else if (GetUlimitHardMemLock() == 0) {
1261 zerocopy_enabled = false;
1262 gpr_log(GPR_ERROR,
1263 "Tx zero-copy will not be used by gRPC since hard memlock ulimit "
1264 "value is not set. Use ulimit -l <value> to set its value.");
1265 } else {
1266 const int enable = 1;
1267 if (setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)) !=
1268 0) {
1269 zerocopy_enabled = false;
1270 gpr_log(GPR_ERROR, "Failed to set zerocopy options on the socket.");
1271 }
1272 }
1273
1274 if (zerocopy_enabled) {
1275 gpr_log(GPR_INFO,
1276 "Tx-zero copy enabled for gRPC sends. RLIMIT_MEMLOCK value = "
1277 "%" PRIu64 ",ulimit hard memlock value = %" PRIu64,
1278 GetRLimitMemLockMax(), GetUlimitHardMemLock());
1279 }
1280 }
1281 #endif // GRPC_LINUX_ERRQUEUE
1282 tcp_zerocopy_send_ctx_ = std::make_unique<TcpZerocopySendCtx>(
1283 zerocopy_enabled, options.tcp_tx_zerocopy_max_simultaneous_sends,
1284 options.tcp_tx_zerocopy_send_bytes_threshold);
1285 #ifdef GRPC_HAVE_TCP_INQ
1286 int one = 1;
1287 if (setsockopt(fd_, SOL_TCP, TCP_INQ, &one, sizeof(one)) == 0) {
1288 inq_capable_ = true;
1289 } else {
1290 gpr_log(GPR_DEBUG, "cannot set inq fd=%d errno=%d", fd_, errno);
1291 inq_capable_ = false;
1292 }
1293 #else
1294 inq_capable_ = false;
1295 #endif // GRPC_HAVE_TCP_INQ
1296
1297 on_read_ = PosixEngineClosure::ToPermanentClosure(
1298 [this](absl::Status status) { HandleRead(std::move(status)); });
1299 on_write_ = PosixEngineClosure::ToPermanentClosure(
1300 [this](absl::Status status) { HandleWrite(std::move(status)); });
1301 on_error_ = PosixEngineClosure::ToPermanentClosure(
1302 [this](absl::Status status) { HandleError(std::move(status)); });
1303
1304 // Start being notified on errors if poller can track errors.
1305 if (poller_->CanTrackErrors()) {
1306 Ref().release();
1307 handle_->NotifyOnError(on_error_);
1308 }
1309 }
1310
CreatePosixEndpoint(EventHandle * handle,PosixEngineClosure * on_shutdown,std::shared_ptr<EventEngine> engine,MemoryAllocator && allocator,const PosixTcpOptions & options)1311 std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
1312 EventHandle* handle, PosixEngineClosure* on_shutdown,
1313 std::shared_ptr<EventEngine> engine, MemoryAllocator&& allocator,
1314 const PosixTcpOptions& options) {
1315 GPR_DEBUG_ASSERT(handle != nullptr);
1316 return std::make_unique<PosixEndpoint>(handle, on_shutdown, std::move(engine),
1317 std::move(allocator), options);
1318 }
1319
1320 } // namespace experimental
1321 } // namespace grpc_event_engine
1322
1323 #else // GRPC_POSIX_SOCKET_TCP
1324
1325 namespace grpc_event_engine {
1326 namespace experimental {
1327
CreatePosixEndpoint(EventHandle *,PosixEngineClosure *,std::shared_ptr<EventEngine>,const PosixTcpOptions &)1328 std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
1329 EventHandle* /*handle*/, PosixEngineClosure* /*on_shutdown*/,
1330 std::shared_ptr<EventEngine> /*engine*/,
1331 const PosixTcpOptions& /*options*/) {
1332 grpc_core::Crash("Cannot create PosixEndpoint on this platform");
1333 }
1334
1335 } // namespace experimental
1336 } // namespace grpc_event_engine
1337
1338 #endif // GRPC_POSIX_SOCKET_TCP
1339