1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
17
18 #include <atomic>
19 #include <memory>
20
21 #include "absl/status/status.h"
22 #include "absl/strings/string_view.h"
23
24 #include <grpc/event_engine/event_engine.h>
25 #include <grpc/impl/codegen/slice.h>
26 #include <grpc/slice.h>
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/time.h>
30
31 #include "src/core/lib/event_engine/posix.h"
32 #include "src/core/lib/event_engine/shim.h"
33 #include "src/core/lib/event_engine/tcp_socket_utils.h"
34 #include "src/core/lib/event_engine/trace.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/construct_destruct.h"
37 #include "src/core/lib/gprpp/debug_location.h"
38 #include "src/core/lib/gprpp/sync.h"
39 #include "src/core/lib/iomgr/closure.h"
40 #include "src/core/lib/iomgr/endpoint.h"
41 #include "src/core/lib/iomgr/error.h"
42 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
43 #include "src/core/lib/iomgr/exec_ctx.h"
44 #include "src/core/lib/iomgr/port.h"
45 #include "src/core/lib/slice/slice_string_helpers.h"
46 #include "src/core/lib/transport/error_utils.h"
47
48 extern grpc_core::TraceFlag grpc_tcp_trace;
49
50 namespace grpc_event_engine {
51 namespace experimental {
52 namespace {
53
54 constexpr int64_t kShutdownBit = static_cast<int64_t>(1) << 32;
55
56 // A wrapper class to manage Event Engine endpoint ref counting and
57 // asynchronous shutdown.
58 class EventEngineEndpointWrapper {
59 public:
60 struct grpc_event_engine_endpoint {
61 grpc_endpoint base;
62 EventEngineEndpointWrapper* wrapper;
63 std::aligned_storage<sizeof(SliceBuffer), alignof(SliceBuffer)>::type
64 read_buffer;
65 std::aligned_storage<sizeof(SliceBuffer), alignof(SliceBuffer)>::type
66 write_buffer;
67 };
68
69 explicit EventEngineEndpointWrapper(
70 std::unique_ptr<EventEngine::Endpoint> endpoint);
71
Fd()72 int Fd() {
73 grpc_core::MutexLock lock(&mu_);
74 return fd_;
75 }
76
PeerAddress()77 absl::string_view PeerAddress() {
78 grpc_core::MutexLock lock(&mu_);
79 return peer_address_;
80 }
81
LocalAddress()82 absl::string_view LocalAddress() {
83 grpc_core::MutexLock lock(&mu_);
84 return local_address_;
85 }
86
Ref()87 void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
Unref()88 void Unref() {
89 if (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
90 delete this;
91 }
92 }
93
94 // Returns a managed grpc_endpoint object. It retains ownership of the
95 // object.
GetGrpcEndpoint()96 grpc_endpoint* GetGrpcEndpoint() { return &eeep_->base; }
97
98 // Read using the underlying EventEngine endpoint object.
Read(grpc_closure * read_cb,grpc_slice_buffer * pending_read_buffer,const EventEngine::Endpoint::ReadArgs * args)99 bool Read(grpc_closure* read_cb, grpc_slice_buffer* pending_read_buffer,
100 const EventEngine::Endpoint::ReadArgs* args) {
101 Ref();
102 pending_read_cb_ = read_cb;
103 pending_read_buffer_ = pending_read_buffer;
104 // TODO(vigneshbabu): Use SliceBufferCast<> here.
105 grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer),
106 SliceBuffer::TakeCSliceBuffer(*pending_read_buffer_));
107 SliceBuffer* read_buffer =
108 reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
109 read_buffer->Clear();
110 return endpoint_->Read(
111 [this](absl::Status status) { FinishPendingRead(status); }, read_buffer,
112 args);
113 }
114
FinishPendingRead(absl::Status status)115 void FinishPendingRead(absl::Status status) {
116 auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
117 grpc_slice_buffer_move_into(read_buffer->c_slice_buffer(),
118 pending_read_buffer_);
119 read_buffer->~SliceBuffer();
120 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
121 size_t i;
122 gpr_log(GPR_INFO, "TCP: %p READ (peer=%s) error=%s", eeep_->wrapper,
123 std::string(eeep_->wrapper->PeerAddress()).c_str(),
124 status.ToString().c_str());
125 if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
126 for (i = 0; i < pending_read_buffer_->count; i++) {
127 char* dump = grpc_dump_slice(pending_read_buffer_->slices[i],
128 GPR_DUMP_HEX | GPR_DUMP_ASCII);
129 gpr_log(GPR_DEBUG, "READ DATA: %s", dump);
130 gpr_free(dump);
131 }
132 }
133 }
134 pending_read_buffer_ = nullptr;
135 grpc_closure* cb = pending_read_cb_;
136 pending_read_cb_ = nullptr;
137 if (grpc_core::ExecCtx::Get() == nullptr) {
138 grpc_core::ApplicationCallbackExecCtx app_ctx;
139 grpc_core::ExecCtx exec_ctx;
140 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
141 } else {
142 grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
143 }
144 // For the ref taken in EventEngineEndpointWrapper::Read().
145 Unref();
146 }
147
148 // Write using the underlying EventEngine endpoint object
Write(grpc_closure * write_cb,grpc_slice_buffer * slices,const EventEngine::Endpoint::WriteArgs * args)149 bool Write(grpc_closure* write_cb, grpc_slice_buffer* slices,
150 const EventEngine::Endpoint::WriteArgs* args) {
151 Ref();
152 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
153 size_t i;
154 gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s)", this,
155 std::string(PeerAddress()).c_str());
156 if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
157 for (i = 0; i < slices->count; i++) {
158 char* dump =
159 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
160 gpr_log(GPR_DEBUG, "WRITE DATA: %s", dump);
161 gpr_free(dump);
162 }
163 }
164 }
165 // TODO(vigneshbabu): Use SliceBufferCast<> here.
166 grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer),
167 SliceBuffer::TakeCSliceBuffer(*slices));
168 SliceBuffer* write_buffer =
169 reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
170 pending_write_cb_ = write_cb;
171 return endpoint_->Write(
172 [this](absl::Status status) { FinishPendingWrite(status); },
173 write_buffer, args);
174 }
175
FinishPendingWrite(absl::Status status)176 void FinishPendingWrite(absl::Status status) {
177 auto* write_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
178 write_buffer->~SliceBuffer();
179 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
180 gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s) error=%s", this,
181 std::string(PeerAddress()).c_str(), status.ToString().c_str());
182 }
183 grpc_closure* cb = pending_write_cb_;
184 pending_write_cb_ = nullptr;
185 if (grpc_core::ExecCtx::Get() == nullptr) {
186 grpc_core::ApplicationCallbackExecCtx app_ctx;
187 grpc_core::ExecCtx exec_ctx;
188 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
189 } else {
190 grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
191 }
192 // For the ref taken in EventEngineEndpointWrapper::Write().
193 Unref();
194 }
195
196 // Returns true if the endpoint is not yet shutdown. In that case, it also
197 // acquires a shutdown ref. Otherwise it returns false and doesn't modify
198 // the shutdown ref.
ShutdownRef()199 bool ShutdownRef() {
200 int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
201 while (true) {
202 if (curr & kShutdownBit) {
203 return false;
204 }
205 if (shutdown_ref_.compare_exchange_strong(curr, curr + 1,
206 std::memory_order_acq_rel,
207 std::memory_order_relaxed)) {
208 return true;
209 }
210 }
211 }
212
213 // Decrement the shutdown ref. If this is the last shutdown ref, it also
214 // deletes the underlying event engine endpoint. Deletion of the event
215 // engine endpoint should trigger execution of any pending read/write
216 // callbacks with NOT-OK status.
ShutdownUnref()217 void ShutdownUnref() {
218 if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
219 kShutdownBit + 1) {
220 if (EventEngineSupportsFd() && fd_ > 0 && on_release_fd_) {
221 reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
222 ->Shutdown(std::move(on_release_fd_));
223 }
224 OnShutdownInternal();
225 }
226 }
227
228 // If trigger shutdown is called the first time, it sets the shutdown bit
229 // and decrements the shutdown ref. If trigger shutdown has been called
230 // before or in parallel, only one of them would win the race. The other
231 // invocation would simply return.
TriggerShutdown(absl::AnyInvocable<void (absl::StatusOr<int>)> on_release_fd)232 void TriggerShutdown(
233 absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
234 if (EventEngineSupportsFd()) {
235 on_release_fd_ = std::move(on_release_fd);
236 }
237 int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
238 while (true) {
239 if (curr & kShutdownBit) {
240 return;
241 }
242 if (shutdown_ref_.compare_exchange_strong(curr, curr | kShutdownBit,
243 std::memory_order_acq_rel,
244 std::memory_order_relaxed)) {
245 Ref();
246 if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
247 kShutdownBit + 1) {
248 if (EventEngineSupportsFd() && fd_ > 0 && on_release_fd_) {
249 reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
250 ->Shutdown(std::move(on_release_fd_));
251 }
252 OnShutdownInternal();
253 }
254 return;
255 }
256 }
257 }
258
CanTrackErrors()259 bool CanTrackErrors() {
260 if (EventEngineSupportsFd()) {
261 return reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
262 ->CanTrackErrors();
263 } else {
264 return false;
265 }
266 }
267
268 private:
OnShutdownInternal()269 void OnShutdownInternal() {
270 {
271 grpc_core::MutexLock lock(&mu_);
272 fd_ = -1;
273 local_address_ = "";
274 peer_address_ = "";
275 }
276 endpoint_.reset();
277 // For the Ref taken in TriggerShutdown
278 Unref();
279 }
280 std::unique_ptr<EventEngine::Endpoint> endpoint_;
281 std::unique_ptr<grpc_event_engine_endpoint> eeep_;
282 std::atomic<int64_t> refs_{1};
283 std::atomic<int64_t> shutdown_ref_{1};
284 absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_;
285 grpc_core::Mutex mu_;
286 grpc_closure* pending_read_cb_;
287 grpc_closure* pending_write_cb_;
288 grpc_slice_buffer* pending_read_buffer_;
289 std::string peer_address_;
290 std::string local_address_;
291 int fd_{-1};
292 };
293
294 // Read from the endpoint and place the data in slices slice buffer. The
295 // provided closure is also invoked asynchronously.
EndpointRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool,int min_progress_size)296 void EndpointRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
297 grpc_closure* cb, bool /* urgent */, int min_progress_size) {
298 auto* eeep =
299 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
300 ep);
301 if (!eeep->wrapper->ShutdownRef()) {
302 // Shutdown has already been triggered on the endpoint.
303 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
304 return;
305 }
306
307 EventEngine::Endpoint::ReadArgs read_args = {min_progress_size};
308 if (eeep->wrapper->Read(cb, slices, &read_args)) {
309 // Read succeeded immediately. Run the callback inline.
310 eeep->wrapper->FinishPendingRead(absl::OkStatus());
311 }
312
313 eeep->wrapper->ShutdownUnref();
314 }
315
316 // Write the data from slices and invoke the provided closure asynchronously
317 // after the write is complete.
EndpointWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg,int max_frame_size)318 void EndpointWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
319 grpc_closure* cb, void* arg, int max_frame_size) {
320 auto* eeep =
321 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
322 ep);
323 if (!eeep->wrapper->ShutdownRef()) {
324 // Shutdown has already been triggered on the endpoint.
325 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
326 return;
327 }
328
329 EventEngine::Endpoint::WriteArgs write_args = {arg, max_frame_size};
330 if (eeep->wrapper->Write(cb, slices, &write_args)) {
331 // Write succeeded immediately. Run the callback inline.
332 eeep->wrapper->FinishPendingWrite(absl::OkStatus());
333 }
334 eeep->wrapper->ShutdownUnref();
335 }
336
EndpointAddToPollset(grpc_endpoint *,grpc_pollset *)337 void EndpointAddToPollset(grpc_endpoint* /* ep */,
338 grpc_pollset* /* pollset */) {}
EndpointAddToPollsetSet(grpc_endpoint *,grpc_pollset_set *)339 void EndpointAddToPollsetSet(grpc_endpoint* /* ep */,
340 grpc_pollset_set* /* pollset */) {}
EndpointDeleteFromPollsetSet(grpc_endpoint *,grpc_pollset_set *)341 void EndpointDeleteFromPollsetSet(grpc_endpoint* /* ep */,
342 grpc_pollset_set* /* pollset */) {}
343 /// After shutdown, all endpoint operations except destroy are no-op,
344 /// and will return some kind of sane default (empty strings, nullptrs, etc).
345 /// It is the caller's responsibility to ensure that calls to EndpointShutdown
346 /// are synchronized.
EndpointShutdown(grpc_endpoint * ep,grpc_error_handle why)347 void EndpointShutdown(grpc_endpoint* ep, grpc_error_handle why) {
348 auto* eeep =
349 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
350 ep);
351 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
352 gpr_log(GPR_INFO, "TCP Endpoint %p shutdown why=%s", eeep->wrapper,
353 why.ToString().c_str());
354 }
355 GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Shutdown:%s", eeep->wrapper,
356 why.ToString().c_str());
357 eeep->wrapper->TriggerShutdown(nullptr);
358 }
359
360 // Attempts to free the underlying data structures.
EndpointDestroy(grpc_endpoint * ep)361 void EndpointDestroy(grpc_endpoint* ep) {
362 auto* eeep =
363 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
364 ep);
365 GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Destroy", eeep->wrapper);
366 eeep->wrapper->Unref();
367 }
368
EndpointGetPeerAddress(grpc_endpoint * ep)369 absl::string_view EndpointGetPeerAddress(grpc_endpoint* ep) {
370 auto* eeep =
371 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
372 ep);
373 return eeep->wrapper->PeerAddress();
374 }
375
EndpointGetLocalAddress(grpc_endpoint * ep)376 absl::string_view EndpointGetLocalAddress(grpc_endpoint* ep) {
377 auto* eeep =
378 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
379 ep);
380 return eeep->wrapper->LocalAddress();
381 }
382
EndpointGetFd(grpc_endpoint * ep)383 int EndpointGetFd(grpc_endpoint* ep) {
384 auto* eeep =
385 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
386 ep);
387 return eeep->wrapper->Fd();
388 }
389
EndpointCanTrackErr(grpc_endpoint * ep)390 bool EndpointCanTrackErr(grpc_endpoint* ep) {
391 auto* eeep =
392 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
393 ep);
394 return eeep->wrapper->CanTrackErrors();
395 }
396
397 grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
398 EndpointRead,
399 EndpointWrite,
400 EndpointAddToPollset,
401 EndpointAddToPollsetSet,
402 EndpointDeleteFromPollsetSet,
403 EndpointShutdown,
404 EndpointDestroy,
405 EndpointGetPeerAddress,
406 EndpointGetLocalAddress,
407 EndpointGetFd,
408 EndpointCanTrackErr};
409
EventEngineEndpointWrapper(std::unique_ptr<EventEngine::Endpoint> endpoint)410 EventEngineEndpointWrapper::EventEngineEndpointWrapper(
411 std::unique_ptr<EventEngine::Endpoint> endpoint)
412 : endpoint_(std::move(endpoint)),
413 eeep_(std::make_unique<grpc_event_engine_endpoint>()) {
414 eeep_->base.vtable = &grpc_event_engine_endpoint_vtable;
415 eeep_->wrapper = this;
416 auto local_addr = ResolvedAddressToURI(endpoint_->GetLocalAddress());
417 if (local_addr.ok()) {
418 local_address_ = *local_addr;
419 }
420 auto peer_addr = ResolvedAddressToURI(endpoint_->GetPeerAddress());
421 if (peer_addr.ok()) {
422 peer_address_ = *peer_addr;
423 }
424 if (EventEngineSupportsFd()) {
425 fd_ = reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
426 ->GetWrappedFd();
427 } else {
428 fd_ = -1;
429 }
430 GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Create", eeep_->wrapper);
431 }
432
433 } // namespace
434
grpc_event_engine_endpoint_create(std::unique_ptr<EventEngine::Endpoint> ee_endpoint)435 grpc_endpoint* grpc_event_engine_endpoint_create(
436 std::unique_ptr<EventEngine::Endpoint> ee_endpoint) {
437 GPR_DEBUG_ASSERT(ee_endpoint != nullptr);
438 auto wrapper = new EventEngineEndpointWrapper(std::move(ee_endpoint));
439 return wrapper->GetGrpcEndpoint();
440 }
441
grpc_is_event_engine_endpoint(grpc_endpoint * ep)442 bool grpc_is_event_engine_endpoint(grpc_endpoint* ep) {
443 return ep->vtable == &grpc_event_engine_endpoint_vtable;
444 }
445
grpc_event_engine_endpoint_destroy_and_release_fd(grpc_endpoint * ep,int * fd,grpc_closure * on_release_fd)446 void grpc_event_engine_endpoint_destroy_and_release_fd(
447 grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd) {
448 auto* eeep =
449 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
450 ep);
451 if (fd == nullptr || on_release_fd == nullptr) {
452 if (fd != nullptr) {
453 *fd = -1;
454 }
455 eeep->wrapper->TriggerShutdown(nullptr);
456 } else {
457 *fd = -1;
458 eeep->wrapper->TriggerShutdown(
459 [fd, on_release_fd](absl::StatusOr<int> release_fd) {
460 if (release_fd.ok()) {
461 *fd = *release_fd;
462 }
463 RunEventEngineClosure(on_release_fd,
464 absl_status_to_grpc_error(release_fd.status()));
465 });
466 }
467 eeep->wrapper->Unref();
468 }
469
470 } // namespace experimental
471 } // namespace grpc_event_engine
472