xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/iomgr/event_engine_shims/endpoint.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
17 
18 #include <atomic>
19 #include <memory>
20 #include <utility>
21 
22 #include "absl/functional/any_invocable.h"
23 #include "absl/status/status.h"
24 #include "absl/status/statusor.h"
25 #include "absl/strings/string_view.h"
26 
27 #include <grpc/event_engine/event_engine.h>
28 #include <grpc/impl/codegen/slice.h>
29 #include <grpc/slice.h>
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/time.h>
33 
34 #include "src/core/lib/event_engine/extensions/can_track_errors.h"
35 #include "src/core/lib/event_engine/extensions/supports_fd.h"
36 #include "src/core/lib/event_engine/query_extensions.h"
37 #include "src/core/lib/event_engine/shim.h"
38 #include "src/core/lib/event_engine/tcp_socket_utils.h"
39 #include "src/core/lib/event_engine/trace.h"
40 #include "src/core/lib/gpr/string.h"
41 #include "src/core/lib/gprpp/construct_destruct.h"
42 #include "src/core/lib/gprpp/debug_location.h"
43 #include "src/core/lib/gprpp/sync.h"
44 #include "src/core/lib/iomgr/closure.h"
45 #include "src/core/lib/iomgr/endpoint.h"
46 #include "src/core/lib/iomgr/error.h"
47 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
48 #include "src/core/lib/iomgr/exec_ctx.h"
49 #include "src/core/lib/iomgr/port.h"
50 #include "src/core/lib/slice/slice_string_helpers.h"
51 #include "src/core/lib/transport/error_utils.h"
52 
53 extern grpc_core::TraceFlag grpc_tcp_trace;
54 
55 namespace grpc_event_engine {
56 namespace experimental {
57 namespace {
58 
59 constexpr int64_t kShutdownBit = static_cast<int64_t>(1) << 32;
60 
61 // A wrapper class to manage Event Engine endpoint ref counting and
62 // asynchronous shutdown.
63 class EventEngineEndpointWrapper {
64  public:
65   struct grpc_event_engine_endpoint {
66     grpc_endpoint base;
67     EventEngineEndpointWrapper* wrapper;
68     alignas(SliceBuffer) char read_buffer[sizeof(SliceBuffer)];
69     alignas(SliceBuffer) char write_buffer[sizeof(SliceBuffer)];
70   };
71 
72   explicit EventEngineEndpointWrapper(
73       std::unique_ptr<EventEngine::Endpoint> endpoint);
74 
endpoint()75   EventEngine::Endpoint* endpoint() { return endpoint_.get(); }
76 
ReleaseEndpoint()77   std::unique_ptr<EventEngine::Endpoint> ReleaseEndpoint() {
78     return std::move(endpoint_);
79   }
80 
Fd()81   int Fd() {
82     grpc_core::MutexLock lock(&mu_);
83     return fd_;
84   }
85 
PeerAddress()86   absl::string_view PeerAddress() { return peer_address_; }
87 
LocalAddress()88   absl::string_view LocalAddress() { return local_address_; }
89 
Ref()90   void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
Unref()91   void Unref() {
92     if (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
93       delete this;
94     }
95   }
96 
97   // Returns a managed grpc_endpoint object. It retains ownership of the
98   // object.
GetGrpcEndpoint()99   grpc_endpoint* GetGrpcEndpoint() { return &eeep_->base; }
100 
101   // Read using the underlying EventEngine endpoint object.
Read(grpc_closure * read_cb,grpc_slice_buffer * pending_read_buffer,const EventEngine::Endpoint::ReadArgs * args)102   bool Read(grpc_closure* read_cb, grpc_slice_buffer* pending_read_buffer,
103             const EventEngine::Endpoint::ReadArgs* args) {
104     Ref();
105     pending_read_cb_ = read_cb;
106     pending_read_buffer_ = pending_read_buffer;
107     // TODO(vigneshbabu): Use SliceBufferCast<> here.
108     grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer),
109                          SliceBuffer::TakeCSliceBuffer(*pending_read_buffer_));
110     SliceBuffer* read_buffer =
111         reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
112     read_buffer->Clear();
113     return endpoint_->Read(
114         [this](absl::Status status) { FinishPendingRead(status); }, read_buffer,
115         args);
116   }
117 
FinishPendingRead(absl::Status status)118   void FinishPendingRead(absl::Status status) {
119     auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
120     grpc_slice_buffer_move_into(read_buffer->c_slice_buffer(),
121                                 pending_read_buffer_);
122     read_buffer->~SliceBuffer();
123     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
124       size_t i;
125       gpr_log(GPR_INFO, "TCP: %p READ error=%s", eeep_->wrapper,
126               status.ToString().c_str());
127       if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
128         for (i = 0; i < pending_read_buffer_->count; i++) {
129           char* dump = grpc_dump_slice(pending_read_buffer_->slices[i],
130                                        GPR_DUMP_HEX | GPR_DUMP_ASCII);
131           gpr_log(GPR_DEBUG, "READ DATA: %s", dump);
132           gpr_free(dump);
133         }
134       }
135     }
136     pending_read_buffer_ = nullptr;
137     grpc_closure* cb = pending_read_cb_;
138     pending_read_cb_ = nullptr;
139     if (grpc_core::ExecCtx::Get() == nullptr) {
140       grpc_core::ApplicationCallbackExecCtx app_ctx;
141       grpc_core::ExecCtx exec_ctx;
142       grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
143     } else {
144       grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
145     }
146     // For the ref taken in EventEngineEndpointWrapper::Read().
147     Unref();
148   }
149 
150   // Write using the underlying EventEngine endpoint object
Write(grpc_closure * write_cb,grpc_slice_buffer * slices,const EventEngine::Endpoint::WriteArgs * args)151   bool Write(grpc_closure* write_cb, grpc_slice_buffer* slices,
152              const EventEngine::Endpoint::WriteArgs* args) {
153     Ref();
154     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
155       size_t i;
156       gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s)", this,
157               std::string(PeerAddress()).c_str());
158       if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
159         for (i = 0; i < slices->count; i++) {
160           char* dump =
161               grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
162           gpr_log(GPR_DEBUG, "WRITE DATA: %s", dump);
163           gpr_free(dump);
164         }
165       }
166     }
167     // TODO(vigneshbabu): Use SliceBufferCast<> here.
168     grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer),
169                          SliceBuffer::TakeCSliceBuffer(*slices));
170     SliceBuffer* write_buffer =
171         reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
172     pending_write_cb_ = write_cb;
173     return endpoint_->Write(
174         [this](absl::Status status) { FinishPendingWrite(status); },
175         write_buffer, args);
176   }
177 
FinishPendingWrite(absl::Status status)178   void FinishPendingWrite(absl::Status status) {
179     auto* write_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
180     write_buffer->~SliceBuffer();
181     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
182       gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s) error=%s", this,
183               std::string(PeerAddress()).c_str(), status.ToString().c_str());
184     }
185     grpc_closure* cb = pending_write_cb_;
186     pending_write_cb_ = nullptr;
187     if (grpc_core::ExecCtx::Get() == nullptr) {
188       grpc_core::ApplicationCallbackExecCtx app_ctx;
189       grpc_core::ExecCtx exec_ctx;
190       grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
191     } else {
192       grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
193     }
194     // For the ref taken in EventEngineEndpointWrapper::Write().
195     Unref();
196   }
197 
198   // Returns true if the endpoint is not yet shutdown. In that case, it also
199   // acquires a shutdown ref. Otherwise it returns false and doesn't modify
200   // the shutdown ref.
ShutdownRef()201   bool ShutdownRef() {
202     int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
203     while (true) {
204       if (curr & kShutdownBit) {
205         return false;
206       }
207       if (shutdown_ref_.compare_exchange_strong(curr, curr + 1,
208                                                 std::memory_order_acq_rel,
209                                                 std::memory_order_relaxed)) {
210         return true;
211       }
212     }
213   }
214 
215   // Decrement the shutdown ref. If this is the last shutdown ref, it also
216   // deletes the underlying event engine endpoint. Deletion of the event
217   // engine endpoint should trigger execution of any pending read/write
218   // callbacks with NOT-OK status.
ShutdownUnref()219   void ShutdownUnref() {
220     if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
221         kShutdownBit + 1) {
222       auto* supports_fd =
223           QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
224       if (supports_fd != nullptr && fd_ > 0 && on_release_fd_) {
225         supports_fd->Shutdown(std::move(on_release_fd_));
226       }
227       OnShutdownInternal();
228     }
229   }
230 
231   // If trigger shutdown is called the first time, it sets the shutdown bit
232   // and decrements the shutdown ref. If trigger shutdown has been called
233   // before or in parallel, only one of them would win the race. The other
234   // invocation would simply return.
TriggerShutdown(absl::AnyInvocable<void (absl::StatusOr<int>)> on_release_fd)235   void TriggerShutdown(
236       absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
237     auto* supports_fd =
238         QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
239     if (supports_fd != nullptr) {
240       on_release_fd_ = std::move(on_release_fd);
241     }
242     int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
243     while (true) {
244       if (curr & kShutdownBit) {
245         return;
246       }
247       if (shutdown_ref_.compare_exchange_strong(curr, curr | kShutdownBit,
248                                                 std::memory_order_acq_rel,
249                                                 std::memory_order_relaxed)) {
250         Ref();
251         if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
252             kShutdownBit + 1) {
253           if (supports_fd != nullptr && fd_ > 0 && on_release_fd_) {
254             supports_fd->Shutdown(std::move(on_release_fd_));
255           }
256           OnShutdownInternal();
257         }
258         return;
259       }
260     }
261   }
262 
CanTrackErrors()263   bool CanTrackErrors() {
264     auto* can_track_errors =
265         QueryExtension<EndpointCanTrackErrorsExtension>(endpoint_.get());
266     if (can_track_errors != nullptr) {
267       return can_track_errors->CanTrackErrors();
268     } else {
269       return false;
270     }
271   }
272 
273  private:
OnShutdownInternal()274   void OnShutdownInternal() {
275     {
276       grpc_core::MutexLock lock(&mu_);
277       fd_ = -1;
278     }
279     endpoint_.reset();
280     // For the Ref taken in TriggerShutdown
281     Unref();
282   }
283   std::unique_ptr<EventEngine::Endpoint> endpoint_;
284   std::unique_ptr<grpc_event_engine_endpoint> eeep_;
285   std::atomic<int64_t> refs_{1};
286   std::atomic<int64_t> shutdown_ref_{1};
287   absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_;
288   grpc_core::Mutex mu_;
289   grpc_closure* pending_read_cb_;
290   grpc_closure* pending_write_cb_;
291   grpc_slice_buffer* pending_read_buffer_;
292   const std::string peer_address_{
293       ResolvedAddressToURI(endpoint_->GetPeerAddress()).value_or("")};
294   const std::string local_address_{
295       ResolvedAddressToURI(endpoint_->GetLocalAddress()).value_or("")};
296   int fd_{-1};
297 };
298 
299 // Read from the endpoint and place the data in slices slice buffer. The
300 // provided closure is also invoked asynchronously.
EndpointRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool,int min_progress_size)301 void EndpointRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
302                   grpc_closure* cb, bool /* urgent */, int min_progress_size) {
303   auto* eeep =
304       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
305           ep);
306   if (!eeep->wrapper->ShutdownRef()) {
307     // Shutdown has already been triggered on the endpoint.
308     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
309     return;
310   }
311 
312   EventEngine::Endpoint::ReadArgs read_args = {min_progress_size};
313   if (eeep->wrapper->Read(cb, slices, &read_args)) {
314     // Read succeeded immediately. Run the callback inline.
315     eeep->wrapper->FinishPendingRead(absl::OkStatus());
316   }
317 
318   eeep->wrapper->ShutdownUnref();
319 }
320 
321 // Write the data from slices and invoke the provided closure asynchronously
322 // after the write is complete.
EndpointWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg,int max_frame_size)323 void EndpointWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
324                    grpc_closure* cb, void* arg, int max_frame_size) {
325   auto* eeep =
326       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
327           ep);
328   if (!eeep->wrapper->ShutdownRef()) {
329     // Shutdown has already been triggered on the endpoint.
330     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
331     return;
332   }
333 
334   EventEngine::Endpoint::WriteArgs write_args = {arg, max_frame_size};
335   if (eeep->wrapper->Write(cb, slices, &write_args)) {
336     // Write succeeded immediately. Run the callback inline.
337     eeep->wrapper->FinishPendingWrite(absl::OkStatus());
338   }
339   eeep->wrapper->ShutdownUnref();
340 }
341 
EndpointAddToPollset(grpc_endpoint *,grpc_pollset *)342 void EndpointAddToPollset(grpc_endpoint* /* ep */,
343                           grpc_pollset* /* pollset */) {}
EndpointAddToPollsetSet(grpc_endpoint *,grpc_pollset_set *)344 void EndpointAddToPollsetSet(grpc_endpoint* /* ep */,
345                              grpc_pollset_set* /* pollset */) {}
EndpointDeleteFromPollsetSet(grpc_endpoint *,grpc_pollset_set *)346 void EndpointDeleteFromPollsetSet(grpc_endpoint* /* ep */,
347                                   grpc_pollset_set* /* pollset */) {}
348 /// After shutdown, all endpoint operations except destroy are no-op,
349 /// and will return some kind of sane default (empty strings, nullptrs, etc).
350 /// It is the caller's responsibility to ensure that calls to EndpointShutdown
351 /// are synchronized.
EndpointShutdown(grpc_endpoint * ep,grpc_error_handle why)352 void EndpointShutdown(grpc_endpoint* ep, grpc_error_handle why) {
353   auto* eeep =
354       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
355           ep);
356   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
357     gpr_log(GPR_INFO, "TCP Endpoint %p shutdown why=%s", eeep->wrapper,
358             why.ToString().c_str());
359   }
360   GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Shutdown:%s", eeep->wrapper,
361                           why.ToString().c_str());
362   eeep->wrapper->TriggerShutdown(nullptr);
363 }
364 
365 // Attempts to free the underlying data structures.
EndpointDestroy(grpc_endpoint * ep)366 void EndpointDestroy(grpc_endpoint* ep) {
367   auto* eeep =
368       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
369           ep);
370   GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Destroy", eeep->wrapper);
371   eeep->wrapper->Unref();
372 }
373 
EndpointGetPeerAddress(grpc_endpoint * ep)374 absl::string_view EndpointGetPeerAddress(grpc_endpoint* ep) {
375   auto* eeep =
376       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
377           ep);
378   return eeep->wrapper->PeerAddress();
379 }
380 
EndpointGetLocalAddress(grpc_endpoint * ep)381 absl::string_view EndpointGetLocalAddress(grpc_endpoint* ep) {
382   auto* eeep =
383       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
384           ep);
385   return eeep->wrapper->LocalAddress();
386 }
387 
EndpointGetFd(grpc_endpoint * ep)388 int EndpointGetFd(grpc_endpoint* ep) {
389   auto* eeep =
390       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
391           ep);
392   return eeep->wrapper->Fd();
393 }
394 
EndpointCanTrackErr(grpc_endpoint * ep)395 bool EndpointCanTrackErr(grpc_endpoint* ep) {
396   auto* eeep =
397       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
398           ep);
399   return eeep->wrapper->CanTrackErrors();
400 }
401 
402 grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
403     EndpointRead,
404     EndpointWrite,
405     EndpointAddToPollset,
406     EndpointAddToPollsetSet,
407     EndpointDeleteFromPollsetSet,
408     EndpointShutdown,
409     EndpointDestroy,
410     EndpointGetPeerAddress,
411     EndpointGetLocalAddress,
412     EndpointGetFd,
413     EndpointCanTrackErr};
414 
EventEngineEndpointWrapper(std::unique_ptr<EventEngine::Endpoint> endpoint)415 EventEngineEndpointWrapper::EventEngineEndpointWrapper(
416     std::unique_ptr<EventEngine::Endpoint> endpoint)
417     : endpoint_(std::move(endpoint)),
418       eeep_(std::make_unique<grpc_event_engine_endpoint>()) {
419   eeep_->base.vtable = &grpc_event_engine_endpoint_vtable;
420   eeep_->wrapper = this;
421   auto* supports_fd =
422       QueryExtension<EndpointSupportsFdExtension>(endpoint_.get());
423   if (supports_fd != nullptr) {
424     fd_ = supports_fd->GetWrappedFd();
425   } else {
426     fd_ = -1;
427   }
428   GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Create", eeep_->wrapper);
429 }
430 
431 }  // namespace
432 
grpc_event_engine_endpoint_create(std::unique_ptr<EventEngine::Endpoint> ee_endpoint)433 grpc_endpoint* grpc_event_engine_endpoint_create(
434     std::unique_ptr<EventEngine::Endpoint> ee_endpoint) {
435   GPR_DEBUG_ASSERT(ee_endpoint != nullptr);
436   auto wrapper = new EventEngineEndpointWrapper(std::move(ee_endpoint));
437   return wrapper->GetGrpcEndpoint();
438 }
439 
grpc_is_event_engine_endpoint(grpc_endpoint * ep)440 bool grpc_is_event_engine_endpoint(grpc_endpoint* ep) {
441   return ep->vtable == &grpc_event_engine_endpoint_vtable;
442 }
443 
grpc_get_wrapped_event_engine_endpoint(grpc_endpoint * ep)444 EventEngine::Endpoint* grpc_get_wrapped_event_engine_endpoint(
445     grpc_endpoint* ep) {
446   if (!grpc_is_event_engine_endpoint(ep)) {
447     return nullptr;
448   }
449   auto* eeep =
450       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
451           ep);
452   return eeep->wrapper->endpoint();
453 }
454 
grpc_take_wrapped_event_engine_endpoint(grpc_endpoint * ep)455 std::unique_ptr<EventEngine::Endpoint> grpc_take_wrapped_event_engine_endpoint(
456     grpc_endpoint* ep) {
457   if (!grpc_is_event_engine_endpoint(ep)) {
458     return nullptr;
459   }
460   auto* eeep =
461       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
462           ep);
463   auto endpoint = eeep->wrapper->ReleaseEndpoint();
464   delete eeep->wrapper;
465   return endpoint;
466 }
467 
grpc_event_engine_endpoint_destroy_and_release_fd(grpc_endpoint * ep,int * fd,grpc_closure * on_release_fd)468 void grpc_event_engine_endpoint_destroy_and_release_fd(
469     grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd) {
470   auto* eeep =
471       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
472           ep);
473   if (fd == nullptr || on_release_fd == nullptr) {
474     if (fd != nullptr) {
475       *fd = -1;
476     }
477     eeep->wrapper->TriggerShutdown(nullptr);
478   } else {
479     *fd = -1;
480     eeep->wrapper->TriggerShutdown(
481         [fd, on_release_fd](absl::StatusOr<int> release_fd) {
482           if (release_fd.ok()) {
483             *fd = *release_fd;
484           }
485           RunEventEngineClosure(on_release_fd,
486                                 absl_status_to_grpc_error(release_fd.status()));
487         });
488   }
489   eeep->wrapper->Unref();
490 }
491 
492 }  // namespace experimental
493 }  // namespace grpc_event_engine
494