1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
17
18 #include <atomic>
19 #include <memory>
20 #include <utility>
21
22 #include "absl/functional/any_invocable.h"
23 #include "absl/status/status.h"
24 #include "absl/status/statusor.h"
25 #include "absl/strings/string_view.h"
26
27 #include <grpc/event_engine/event_engine.h>
28 #include <grpc/impl/codegen/slice.h>
29 #include <grpc/slice.h>
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/time.h>
33
34 #include "src/core/lib/event_engine/extensions/can_track_errors.h"
35 #include "src/core/lib/event_engine/extensions/supports_fd.h"
36 #include "src/core/lib/event_engine/query_extensions.h"
37 #include "src/core/lib/event_engine/shim.h"
38 #include "src/core/lib/event_engine/tcp_socket_utils.h"
39 #include "src/core/lib/event_engine/trace.h"
40 #include "src/core/lib/gpr/string.h"
41 #include "src/core/lib/gprpp/construct_destruct.h"
42 #include "src/core/lib/gprpp/debug_location.h"
43 #include "src/core/lib/gprpp/sync.h"
44 #include "src/core/lib/iomgr/closure.h"
45 #include "src/core/lib/iomgr/endpoint.h"
46 #include "src/core/lib/iomgr/error.h"
47 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
48 #include "src/core/lib/iomgr/exec_ctx.h"
49 #include "src/core/lib/iomgr/port.h"
50 #include "src/core/lib/slice/slice_string_helpers.h"
51 #include "src/core/lib/transport/error_utils.h"
52
53 extern grpc_core::TraceFlag grpc_tcp_trace;
54
55 namespace grpc_event_engine {
56 namespace experimental {
57 namespace {
58
59 constexpr int64_t kShutdownBit = static_cast<int64_t>(1) << 32;
60
61 // A wrapper class to manage Event Engine endpoint ref counting and
62 // asynchronous shutdown.
63 class EventEngineEndpointWrapper {
64 public:
65 struct grpc_event_engine_endpoint {
66 grpc_endpoint base;
67 EventEngineEndpointWrapper* wrapper;
68 alignas(SliceBuffer) char read_buffer[sizeof(SliceBuffer)];
69 alignas(SliceBuffer) char write_buffer[sizeof(SliceBuffer)];
70 };
71
72 explicit EventEngineEndpointWrapper(
73 std::unique_ptr<EventEngine::Endpoint> endpoint);
74
endpoint()75 EventEngine::Endpoint* endpoint() { return endpoint_.get(); }
76
ReleaseEndpoint()77 std::unique_ptr<EventEngine::Endpoint> ReleaseEndpoint() {
78 return std::move(endpoint_);
79 }
80
Fd()81 int Fd() {
82 grpc_core::MutexLock lock(&mu_);
83 return fd_;
84 }
85
PeerAddress()86 absl::string_view PeerAddress() { return peer_address_; }
87
LocalAddress()88 absl::string_view LocalAddress() { return local_address_; }
89
Ref()90 void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
Unref()91 void Unref() {
92 if (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
93 delete this;
94 }
95 }
96
97 // Returns a managed grpc_endpoint object. It retains ownership of the
98 // object.
GetGrpcEndpoint()99 grpc_endpoint* GetGrpcEndpoint() { return &eeep_->base; }
100
101 // Read using the underlying EventEngine endpoint object.
Read(grpc_closure * read_cb,grpc_slice_buffer * pending_read_buffer,const EventEngine::Endpoint::ReadArgs * args)102 bool Read(grpc_closure* read_cb, grpc_slice_buffer* pending_read_buffer,
103 const EventEngine::Endpoint::ReadArgs* args) {
104 Ref();
105 pending_read_cb_ = read_cb;
106 pending_read_buffer_ = pending_read_buffer;
107 // TODO(vigneshbabu): Use SliceBufferCast<> here.
108 grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer),
109 SliceBuffer::TakeCSliceBuffer(*pending_read_buffer_));
110 SliceBuffer* read_buffer =
111 reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
112 read_buffer->Clear();
113 return endpoint_->Read(
114 [this](absl::Status status) { FinishPendingRead(status); }, read_buffer,
115 args);
116 }
117
FinishPendingRead(absl::Status status)118 void FinishPendingRead(absl::Status status) {
119 auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
120 grpc_slice_buffer_move_into(read_buffer->c_slice_buffer(),
121 pending_read_buffer_);
122 read_buffer->~SliceBuffer();
123 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
124 size_t i;
125 gpr_log(GPR_INFO, "TCP: %p READ error=%s", eeep_->wrapper,
126 status.ToString().c_str());
127 if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
128 for (i = 0; i < pending_read_buffer_->count; i++) {
129 char* dump = grpc_dump_slice(pending_read_buffer_->slices[i],
130 GPR_DUMP_HEX | GPR_DUMP_ASCII);
131 gpr_log(GPR_DEBUG, "READ DATA: %s", dump);
132 gpr_free(dump);
133 }
134 }
135 }
136 pending_read_buffer_ = nullptr;
137 grpc_closure* cb = pending_read_cb_;
138 pending_read_cb_ = nullptr;
139 if (grpc_core::ExecCtx::Get() == nullptr) {
140 grpc_core::ApplicationCallbackExecCtx app_ctx;
141 grpc_core::ExecCtx exec_ctx;
142 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
143 } else {
144 grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
145 }
146 // For the ref taken in EventEngineEndpointWrapper::Read().
147 Unref();
148 }
149
150 // Write using the underlying EventEngine endpoint object
Write(grpc_closure * write_cb,grpc_slice_buffer * slices,const EventEngine::Endpoint::WriteArgs * args)151 bool Write(grpc_closure* write_cb, grpc_slice_buffer* slices,
152 const EventEngine::Endpoint::WriteArgs* args) {
153 Ref();
154 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
155 size_t i;
156 gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s)", this,
157 std::string(PeerAddress()).c_str());
158 if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
159 for (i = 0; i < slices->count; i++) {
160 char* dump =
161 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
162 gpr_log(GPR_DEBUG, "WRITE DATA: %s", dump);
163 gpr_free(dump);
164 }
165 }
166 }
167 // TODO(vigneshbabu): Use SliceBufferCast<> here.
168 grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer),
169 SliceBuffer::TakeCSliceBuffer(*slices));
170 SliceBuffer* write_buffer =
171 reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
172 pending_write_cb_ = write_cb;
173 return endpoint_->Write(
174 [this](absl::Status status) { FinishPendingWrite(status); },
175 write_buffer, args);
176 }
177
FinishPendingWrite(absl::Status status)178 void FinishPendingWrite(absl::Status status) {
179 auto* write_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
180 write_buffer->~SliceBuffer();
181 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
182 gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s) error=%s", this,
183 std::string(PeerAddress()).c_str(), status.ToString().c_str());
184 }
185 grpc_closure* cb = pending_write_cb_;
186 pending_write_cb_ = nullptr;
187 if (grpc_core::ExecCtx::Get() == nullptr) {
188 grpc_core::ApplicationCallbackExecCtx app_ctx;
189 grpc_core::ExecCtx exec_ctx;
190 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
191 } else {
192 grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
193 }
194 // For the ref taken in EventEngineEndpointWrapper::Write().
195 Unref();
196 }
197
198 // Returns true if the endpoint is not yet shutdown. In that case, it also
199 // acquires a shutdown ref. Otherwise it returns false and doesn't modify
200 // the shutdown ref.
ShutdownRef()201 bool ShutdownRef() {
202 int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
203 while (true) {
204 if (curr & kShutdownBit) {
205 return false;
206 }
207 if (shutdown_ref_.compare_exchange_strong(curr, curr + 1,
208 std::memory_order_acq_rel,
209 std::memory_order_relaxed)) {
210 return true;
211 }
212 }
213 }
214
215 // Decrement the shutdown ref. If this is the last shutdown ref, it also
216 // deletes the underlying event engine endpoint. Deletion of the event
217 // engine endpoint should trigger execution of any pending read/write
218 // callbacks with NOT-OK status.
ShutdownUnref()219 void ShutdownUnref() {
220 if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
221 kShutdownBit + 1) {
222 auto* supports_fd =
223 QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
224 if (supports_fd != nullptr && fd_ > 0 && on_release_fd_) {
225 supports_fd->Shutdown(std::move(on_release_fd_));
226 }
227 OnShutdownInternal();
228 }
229 }
230
231 // If trigger shutdown is called the first time, it sets the shutdown bit
232 // and decrements the shutdown ref. If trigger shutdown has been called
233 // before or in parallel, only one of them would win the race. The other
234 // invocation would simply return.
TriggerShutdown(absl::AnyInvocable<void (absl::StatusOr<int>)> on_release_fd)235 void TriggerShutdown(
236 absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
237 auto* supports_fd =
238 QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
239 if (supports_fd != nullptr) {
240 on_release_fd_ = std::move(on_release_fd);
241 }
242 int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
243 while (true) {
244 if (curr & kShutdownBit) {
245 return;
246 }
247 if (shutdown_ref_.compare_exchange_strong(curr, curr | kShutdownBit,
248 std::memory_order_acq_rel,
249 std::memory_order_relaxed)) {
250 Ref();
251 if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
252 kShutdownBit + 1) {
253 if (supports_fd != nullptr && fd_ > 0 && on_release_fd_) {
254 supports_fd->Shutdown(std::move(on_release_fd_));
255 }
256 OnShutdownInternal();
257 }
258 return;
259 }
260 }
261 }
262
CanTrackErrors()263 bool CanTrackErrors() {
264 auto* can_track_errors =
265 QueryExtension<EndpointCanTrackErrorsExtension>(endpoint_.get());
266 if (can_track_errors != nullptr) {
267 return can_track_errors->CanTrackErrors();
268 } else {
269 return false;
270 }
271 }
272
273 private:
OnShutdownInternal()274 void OnShutdownInternal() {
275 {
276 grpc_core::MutexLock lock(&mu_);
277 fd_ = -1;
278 }
279 endpoint_.reset();
280 // For the Ref taken in TriggerShutdown
281 Unref();
282 }
283 std::unique_ptr<EventEngine::Endpoint> endpoint_;
284 std::unique_ptr<grpc_event_engine_endpoint> eeep_;
285 std::atomic<int64_t> refs_{1};
286 std::atomic<int64_t> shutdown_ref_{1};
287 absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_;
288 grpc_core::Mutex mu_;
289 grpc_closure* pending_read_cb_;
290 grpc_closure* pending_write_cb_;
291 grpc_slice_buffer* pending_read_buffer_;
292 const std::string peer_address_{
293 ResolvedAddressToURI(endpoint_->GetPeerAddress()).value_or("")};
294 const std::string local_address_{
295 ResolvedAddressToURI(endpoint_->GetLocalAddress()).value_or("")};
296 int fd_{-1};
297 };
298
299 // Read from the endpoint and place the data in slices slice buffer. The
300 // provided closure is also invoked asynchronously.
EndpointRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool,int min_progress_size)301 void EndpointRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
302 grpc_closure* cb, bool /* urgent */, int min_progress_size) {
303 auto* eeep =
304 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
305 ep);
306 if (!eeep->wrapper->ShutdownRef()) {
307 // Shutdown has already been triggered on the endpoint.
308 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
309 return;
310 }
311
312 EventEngine::Endpoint::ReadArgs read_args = {min_progress_size};
313 if (eeep->wrapper->Read(cb, slices, &read_args)) {
314 // Read succeeded immediately. Run the callback inline.
315 eeep->wrapper->FinishPendingRead(absl::OkStatus());
316 }
317
318 eeep->wrapper->ShutdownUnref();
319 }
320
321 // Write the data from slices and invoke the provided closure asynchronously
322 // after the write is complete.
EndpointWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg,int max_frame_size)323 void EndpointWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
324 grpc_closure* cb, void* arg, int max_frame_size) {
325 auto* eeep =
326 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
327 ep);
328 if (!eeep->wrapper->ShutdownRef()) {
329 // Shutdown has already been triggered on the endpoint.
330 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
331 return;
332 }
333
334 EventEngine::Endpoint::WriteArgs write_args = {arg, max_frame_size};
335 if (eeep->wrapper->Write(cb, slices, &write_args)) {
336 // Write succeeded immediately. Run the callback inline.
337 eeep->wrapper->FinishPendingWrite(absl::OkStatus());
338 }
339 eeep->wrapper->ShutdownUnref();
340 }
341
EndpointAddToPollset(grpc_endpoint *,grpc_pollset *)342 void EndpointAddToPollset(grpc_endpoint* /* ep */,
343 grpc_pollset* /* pollset */) {}
EndpointAddToPollsetSet(grpc_endpoint *,grpc_pollset_set *)344 void EndpointAddToPollsetSet(grpc_endpoint* /* ep */,
345 grpc_pollset_set* /* pollset */) {}
EndpointDeleteFromPollsetSet(grpc_endpoint *,grpc_pollset_set *)346 void EndpointDeleteFromPollsetSet(grpc_endpoint* /* ep */,
347 grpc_pollset_set* /* pollset */) {}
348 /// After shutdown, all endpoint operations except destroy are no-op,
349 /// and will return some kind of sane default (empty strings, nullptrs, etc).
350 /// It is the caller's responsibility to ensure that calls to EndpointShutdown
351 /// are synchronized.
EndpointShutdown(grpc_endpoint * ep,grpc_error_handle why)352 void EndpointShutdown(grpc_endpoint* ep, grpc_error_handle why) {
353 auto* eeep =
354 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
355 ep);
356 if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
357 gpr_log(GPR_INFO, "TCP Endpoint %p shutdown why=%s", eeep->wrapper,
358 why.ToString().c_str());
359 }
360 GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Shutdown:%s", eeep->wrapper,
361 why.ToString().c_str());
362 eeep->wrapper->TriggerShutdown(nullptr);
363 }
364
365 // Attempts to free the underlying data structures.
EndpointDestroy(grpc_endpoint * ep)366 void EndpointDestroy(grpc_endpoint* ep) {
367 auto* eeep =
368 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
369 ep);
370 GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Destroy", eeep->wrapper);
371 eeep->wrapper->Unref();
372 }
373
EndpointGetPeerAddress(grpc_endpoint * ep)374 absl::string_view EndpointGetPeerAddress(grpc_endpoint* ep) {
375 auto* eeep =
376 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
377 ep);
378 return eeep->wrapper->PeerAddress();
379 }
380
EndpointGetLocalAddress(grpc_endpoint * ep)381 absl::string_view EndpointGetLocalAddress(grpc_endpoint* ep) {
382 auto* eeep =
383 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
384 ep);
385 return eeep->wrapper->LocalAddress();
386 }
387
EndpointGetFd(grpc_endpoint * ep)388 int EndpointGetFd(grpc_endpoint* ep) {
389 auto* eeep =
390 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
391 ep);
392 return eeep->wrapper->Fd();
393 }
394
EndpointCanTrackErr(grpc_endpoint * ep)395 bool EndpointCanTrackErr(grpc_endpoint* ep) {
396 auto* eeep =
397 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
398 ep);
399 return eeep->wrapper->CanTrackErrors();
400 }
401
402 grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
403 EndpointRead,
404 EndpointWrite,
405 EndpointAddToPollset,
406 EndpointAddToPollsetSet,
407 EndpointDeleteFromPollsetSet,
408 EndpointShutdown,
409 EndpointDestroy,
410 EndpointGetPeerAddress,
411 EndpointGetLocalAddress,
412 EndpointGetFd,
413 EndpointCanTrackErr};
414
EventEngineEndpointWrapper(std::unique_ptr<EventEngine::Endpoint> endpoint)415 EventEngineEndpointWrapper::EventEngineEndpointWrapper(
416 std::unique_ptr<EventEngine::Endpoint> endpoint)
417 : endpoint_(std::move(endpoint)),
418 eeep_(std::make_unique<grpc_event_engine_endpoint>()) {
419 eeep_->base.vtable = &grpc_event_engine_endpoint_vtable;
420 eeep_->wrapper = this;
421 auto* supports_fd =
422 QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
423 if (supports_fd != nullptr) {
424 fd_ = supports_fd->GetWrappedFd();
425 } else {
426 fd_ = -1;
427 }
428 GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Create", eeep_->wrapper);
429 }
430
431 } // namespace
432
grpc_event_engine_endpoint_create(std::unique_ptr<EventEngine::Endpoint> ee_endpoint)433 grpc_endpoint* grpc_event_engine_endpoint_create(
434 std::unique_ptr<EventEngine::Endpoint> ee_endpoint) {
435 GPR_DEBUG_ASSERT(ee_endpoint != nullptr);
436 auto wrapper = new EventEngineEndpointWrapper(std::move(ee_endpoint));
437 return wrapper->GetGrpcEndpoint();
438 }
439
grpc_is_event_engine_endpoint(grpc_endpoint * ep)440 bool grpc_is_event_engine_endpoint(grpc_endpoint* ep) {
441 return ep->vtable == &grpc_event_engine_endpoint_vtable;
442 }
443
grpc_get_wrapped_event_engine_endpoint(grpc_endpoint * ep)444 EventEngine::Endpoint* grpc_get_wrapped_event_engine_endpoint(
445 grpc_endpoint* ep) {
446 if (!grpc_is_event_engine_endpoint(ep)) {
447 return nullptr;
448 }
449 auto* eeep =
450 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
451 ep);
452 return eeep->wrapper->endpoint();
453 }
454
grpc_take_wrapped_event_engine_endpoint(grpc_endpoint * ep)455 std::unique_ptr<EventEngine::Endpoint> grpc_take_wrapped_event_engine_endpoint(
456 grpc_endpoint* ep) {
457 if (!grpc_is_event_engine_endpoint(ep)) {
458 return nullptr;
459 }
460 auto* eeep =
461 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
462 ep);
463 auto endpoint = eeep->wrapper->ReleaseEndpoint();
464 delete eeep->wrapper;
465 return endpoint;
466 }
467
grpc_event_engine_endpoint_destroy_and_release_fd(grpc_endpoint * ep,int * fd,grpc_closure * on_release_fd)468 void grpc_event_engine_endpoint_destroy_and_release_fd(
469 grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd) {
470 auto* eeep =
471 reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
472 ep);
473 if (fd == nullptr || on_release_fd == nullptr) {
474 if (fd != nullptr) {
475 *fd = -1;
476 }
477 eeep->wrapper->TriggerShutdown(nullptr);
478 } else {
479 *fd = -1;
480 eeep->wrapper->TriggerShutdown(
481 [fd, on_release_fd](absl::StatusOr<int> release_fd) {
482 if (release_fd.ok()) {
483 *fd = *release_fd;
484 }
485 RunEventEngineClosure(on_release_fd,
486 absl_status_to_grpc_error(release_fd.status()));
487 });
488 }
489 eeep->wrapper->Unref();
490 }
491
492 } // namespace experimental
493 } // namespace grpc_event_engine
494