1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
17 
18 #include <atomic>
19 #include <memory>
20 
21 #include "absl/status/status.h"
22 #include "absl/strings/string_view.h"
23 
24 #include <grpc/event_engine/event_engine.h>
25 #include <grpc/impl/codegen/slice.h>
26 #include <grpc/slice.h>
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/time.h>
30 
31 #include "src/core/lib/event_engine/posix.h"
32 #include "src/core/lib/event_engine/shim.h"
33 #include "src/core/lib/event_engine/tcp_socket_utils.h"
34 #include "src/core/lib/event_engine/trace.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/construct_destruct.h"
37 #include "src/core/lib/gprpp/debug_location.h"
38 #include "src/core/lib/gprpp/sync.h"
39 #include "src/core/lib/iomgr/closure.h"
40 #include "src/core/lib/iomgr/endpoint.h"
41 #include "src/core/lib/iomgr/error.h"
42 #include "src/core/lib/iomgr/event_engine_shims/closure.h"
43 #include "src/core/lib/iomgr/exec_ctx.h"
44 #include "src/core/lib/iomgr/port.h"
45 #include "src/core/lib/slice/slice_string_helpers.h"
46 #include "src/core/lib/transport/error_utils.h"
47 
48 extern grpc_core::TraceFlag grpc_tcp_trace;
49 
50 namespace grpc_event_engine {
51 namespace experimental {
52 namespace {
53 
54 constexpr int64_t kShutdownBit = static_cast<int64_t>(1) << 32;
55 
56 // A wrapper class to manage Event Engine endpoint ref counting and
57 // asynchronous shutdown.
58 class EventEngineEndpointWrapper {
59  public:
60   struct grpc_event_engine_endpoint {
61     grpc_endpoint base;
62     EventEngineEndpointWrapper* wrapper;
63     std::aligned_storage<sizeof(SliceBuffer), alignof(SliceBuffer)>::type
64         read_buffer;
65     std::aligned_storage<sizeof(SliceBuffer), alignof(SliceBuffer)>::type
66         write_buffer;
67   };
68 
69   explicit EventEngineEndpointWrapper(
70       std::unique_ptr<EventEngine::Endpoint> endpoint);
71 
Fd()72   int Fd() {
73     grpc_core::MutexLock lock(&mu_);
74     return fd_;
75   }
76 
PeerAddress()77   absl::string_view PeerAddress() {
78     grpc_core::MutexLock lock(&mu_);
79     return peer_address_;
80   }
81 
LocalAddress()82   absl::string_view LocalAddress() {
83     grpc_core::MutexLock lock(&mu_);
84     return local_address_;
85   }
86 
Ref()87   void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
Unref()88   void Unref() {
89     if (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
90       delete this;
91     }
92   }
93 
94   // Returns a managed grpc_endpoint object. It retains ownership of the
95   // object.
GetGrpcEndpoint()96   grpc_endpoint* GetGrpcEndpoint() { return &eeep_->base; }
97 
98   // Read using the underlying EventEngine endpoint object.
Read(grpc_closure * read_cb,grpc_slice_buffer * pending_read_buffer,const EventEngine::Endpoint::ReadArgs * args)99   bool Read(grpc_closure* read_cb, grpc_slice_buffer* pending_read_buffer,
100             const EventEngine::Endpoint::ReadArgs* args) {
101     Ref();
102     pending_read_cb_ = read_cb;
103     pending_read_buffer_ = pending_read_buffer;
104     // TODO(vigneshbabu): Use SliceBufferCast<> here.
105     grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer),
106                          SliceBuffer::TakeCSliceBuffer(*pending_read_buffer_));
107     SliceBuffer* read_buffer =
108         reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
109     read_buffer->Clear();
110     return endpoint_->Read(
111         [this](absl::Status status) { FinishPendingRead(status); }, read_buffer,
112         args);
113   }
114 
FinishPendingRead(absl::Status status)115   void FinishPendingRead(absl::Status status) {
116     auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
117     grpc_slice_buffer_move_into(read_buffer->c_slice_buffer(),
118                                 pending_read_buffer_);
119     read_buffer->~SliceBuffer();
120     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
121       size_t i;
122       gpr_log(GPR_INFO, "TCP: %p READ (peer=%s) error=%s", eeep_->wrapper,
123               std::string(eeep_->wrapper->PeerAddress()).c_str(),
124               status.ToString().c_str());
125       if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
126         for (i = 0; i < pending_read_buffer_->count; i++) {
127           char* dump = grpc_dump_slice(pending_read_buffer_->slices[i],
128                                        GPR_DUMP_HEX | GPR_DUMP_ASCII);
129           gpr_log(GPR_DEBUG, "READ DATA: %s", dump);
130           gpr_free(dump);
131         }
132       }
133     }
134     pending_read_buffer_ = nullptr;
135     grpc_closure* cb = pending_read_cb_;
136     pending_read_cb_ = nullptr;
137     if (grpc_core::ExecCtx::Get() == nullptr) {
138       grpc_core::ApplicationCallbackExecCtx app_ctx;
139       grpc_core::ExecCtx exec_ctx;
140       grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
141     } else {
142       grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
143     }
144     // For the ref taken in EventEngineEndpointWrapper::Read().
145     Unref();
146   }
147 
148   // Write using the underlying EventEngine endpoint object
Write(grpc_closure * write_cb,grpc_slice_buffer * slices,const EventEngine::Endpoint::WriteArgs * args)149   bool Write(grpc_closure* write_cb, grpc_slice_buffer* slices,
150              const EventEngine::Endpoint::WriteArgs* args) {
151     Ref();
152     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
153       size_t i;
154       gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s)", this,
155               std::string(PeerAddress()).c_str());
156       if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
157         for (i = 0; i < slices->count; i++) {
158           char* dump =
159               grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
160           gpr_log(GPR_DEBUG, "WRITE DATA: %s", dump);
161           gpr_free(dump);
162         }
163       }
164     }
165     // TODO(vigneshbabu): Use SliceBufferCast<> here.
166     grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer),
167                          SliceBuffer::TakeCSliceBuffer(*slices));
168     SliceBuffer* write_buffer =
169         reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
170     pending_write_cb_ = write_cb;
171     return endpoint_->Write(
172         [this](absl::Status status) { FinishPendingWrite(status); },
173         write_buffer, args);
174   }
175 
FinishPendingWrite(absl::Status status)176   void FinishPendingWrite(absl::Status status) {
177     auto* write_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
178     write_buffer->~SliceBuffer();
179     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
180       gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s) error=%s", this,
181               std::string(PeerAddress()).c_str(), status.ToString().c_str());
182     }
183     grpc_closure* cb = pending_write_cb_;
184     pending_write_cb_ = nullptr;
185     if (grpc_core::ExecCtx::Get() == nullptr) {
186       grpc_core::ApplicationCallbackExecCtx app_ctx;
187       grpc_core::ExecCtx exec_ctx;
188       grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
189     } else {
190       grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
191     }
192     // For the ref taken in EventEngineEndpointWrapper::Write().
193     Unref();
194   }
195 
196   // Returns true if the endpoint is not yet shutdown. In that case, it also
197   // acquires a shutdown ref. Otherwise it returns false and doesn't modify
198   // the shutdown ref.
ShutdownRef()199   bool ShutdownRef() {
200     int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
201     while (true) {
202       if (curr & kShutdownBit) {
203         return false;
204       }
205       if (shutdown_ref_.compare_exchange_strong(curr, curr + 1,
206                                                 std::memory_order_acq_rel,
207                                                 std::memory_order_relaxed)) {
208         return true;
209       }
210     }
211   }
212 
213   // Decrement the shutdown ref. If this is the last shutdown ref, it also
214   // deletes the underlying event engine endpoint. Deletion of the event
215   // engine endpoint should trigger execution of any pending read/write
216   // callbacks with NOT-OK status.
ShutdownUnref()217   void ShutdownUnref() {
218     if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
219         kShutdownBit + 1) {
220       if (EventEngineSupportsFd() && fd_ > 0 && on_release_fd_) {
221         reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
222             ->Shutdown(std::move(on_release_fd_));
223       }
224       OnShutdownInternal();
225     }
226   }
227 
228   // If trigger shutdown is called the first time, it sets the shutdown bit
229   // and decrements the shutdown ref. If trigger shutdown has been called
230   // before or in parallel, only one of them would win the race. The other
231   // invocation would simply return.
TriggerShutdown(absl::AnyInvocable<void (absl::StatusOr<int>)> on_release_fd)232   void TriggerShutdown(
233       absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
234     if (EventEngineSupportsFd()) {
235       on_release_fd_ = std::move(on_release_fd);
236     }
237     int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
238     while (true) {
239       if (curr & kShutdownBit) {
240         return;
241       }
242       if (shutdown_ref_.compare_exchange_strong(curr, curr | kShutdownBit,
243                                                 std::memory_order_acq_rel,
244                                                 std::memory_order_relaxed)) {
245         Ref();
246         if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
247             kShutdownBit + 1) {
248           if (EventEngineSupportsFd() && fd_ > 0 && on_release_fd_) {
249             reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
250                 ->Shutdown(std::move(on_release_fd_));
251           }
252           OnShutdownInternal();
253         }
254         return;
255       }
256     }
257   }
258 
CanTrackErrors()259   bool CanTrackErrors() {
260     if (EventEngineSupportsFd()) {
261       return reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
262           ->CanTrackErrors();
263     } else {
264       return false;
265     }
266   }
267 
268  private:
OnShutdownInternal()269   void OnShutdownInternal() {
270     {
271       grpc_core::MutexLock lock(&mu_);
272       fd_ = -1;
273       local_address_ = "";
274       peer_address_ = "";
275     }
276     endpoint_.reset();
277     // For the Ref taken in TriggerShutdown
278     Unref();
279   }
280   std::unique_ptr<EventEngine::Endpoint> endpoint_;
281   std::unique_ptr<grpc_event_engine_endpoint> eeep_;
282   std::atomic<int64_t> refs_{1};
283   std::atomic<int64_t> shutdown_ref_{1};
284   absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_;
285   grpc_core::Mutex mu_;
286   grpc_closure* pending_read_cb_;
287   grpc_closure* pending_write_cb_;
288   grpc_slice_buffer* pending_read_buffer_;
289   std::string peer_address_;
290   std::string local_address_;
291   int fd_{-1};
292 };
293 
294 // Read from the endpoint and place the data in slices slice buffer. The
295 // provided closure is also invoked asynchronously.
EndpointRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool,int min_progress_size)296 void EndpointRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
297                   grpc_closure* cb, bool /* urgent */, int min_progress_size) {
298   auto* eeep =
299       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
300           ep);
301   if (!eeep->wrapper->ShutdownRef()) {
302     // Shutdown has already been triggered on the endpoint.
303     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
304     return;
305   }
306 
307   EventEngine::Endpoint::ReadArgs read_args = {min_progress_size};
308   if (eeep->wrapper->Read(cb, slices, &read_args)) {
309     // Read succeeded immediately. Run the callback inline.
310     eeep->wrapper->FinishPendingRead(absl::OkStatus());
311   }
312 
313   eeep->wrapper->ShutdownUnref();
314 }
315 
316 // Write the data from slices and invoke the provided closure asynchronously
317 // after the write is complete.
EndpointWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg,int max_frame_size)318 void EndpointWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
319                    grpc_closure* cb, void* arg, int max_frame_size) {
320   auto* eeep =
321       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
322           ep);
323   if (!eeep->wrapper->ShutdownRef()) {
324     // Shutdown has already been triggered on the endpoint.
325     grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
326     return;
327   }
328 
329   EventEngine::Endpoint::WriteArgs write_args = {arg, max_frame_size};
330   if (eeep->wrapper->Write(cb, slices, &write_args)) {
331     // Write succeeded immediately. Run the callback inline.
332     eeep->wrapper->FinishPendingWrite(absl::OkStatus());
333   }
334   eeep->wrapper->ShutdownUnref();
335 }
336 
EndpointAddToPollset(grpc_endpoint *,grpc_pollset *)337 void EndpointAddToPollset(grpc_endpoint* /* ep */,
338                           grpc_pollset* /* pollset */) {}
EndpointAddToPollsetSet(grpc_endpoint *,grpc_pollset_set *)339 void EndpointAddToPollsetSet(grpc_endpoint* /* ep */,
340                              grpc_pollset_set* /* pollset */) {}
EndpointDeleteFromPollsetSet(grpc_endpoint *,grpc_pollset_set *)341 void EndpointDeleteFromPollsetSet(grpc_endpoint* /* ep */,
342                                   grpc_pollset_set* /* pollset */) {}
343 /// After shutdown, all endpoint operations except destroy are no-op,
344 /// and will return some kind of sane default (empty strings, nullptrs, etc).
345 /// It is the caller's responsibility to ensure that calls to EndpointShutdown
346 /// are synchronized.
EndpointShutdown(grpc_endpoint * ep,grpc_error_handle why)347 void EndpointShutdown(grpc_endpoint* ep, grpc_error_handle why) {
348   auto* eeep =
349       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
350           ep);
351   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
352     gpr_log(GPR_INFO, "TCP Endpoint %p shutdown why=%s", eeep->wrapper,
353             why.ToString().c_str());
354   }
355   GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Shutdown:%s", eeep->wrapper,
356                           why.ToString().c_str());
357   eeep->wrapper->TriggerShutdown(nullptr);
358 }
359 
360 // Attempts to free the underlying data structures.
EndpointDestroy(grpc_endpoint * ep)361 void EndpointDestroy(grpc_endpoint* ep) {
362   auto* eeep =
363       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
364           ep);
365   GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Destroy", eeep->wrapper);
366   eeep->wrapper->Unref();
367 }
368 
EndpointGetPeerAddress(grpc_endpoint * ep)369 absl::string_view EndpointGetPeerAddress(grpc_endpoint* ep) {
370   auto* eeep =
371       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
372           ep);
373   return eeep->wrapper->PeerAddress();
374 }
375 
EndpointGetLocalAddress(grpc_endpoint * ep)376 absl::string_view EndpointGetLocalAddress(grpc_endpoint* ep) {
377   auto* eeep =
378       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
379           ep);
380   return eeep->wrapper->LocalAddress();
381 }
382 
EndpointGetFd(grpc_endpoint * ep)383 int EndpointGetFd(grpc_endpoint* ep) {
384   auto* eeep =
385       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
386           ep);
387   return eeep->wrapper->Fd();
388 }
389 
EndpointCanTrackErr(grpc_endpoint * ep)390 bool EndpointCanTrackErr(grpc_endpoint* ep) {
391   auto* eeep =
392       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
393           ep);
394   return eeep->wrapper->CanTrackErrors();
395 }
396 
397 grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
398     EndpointRead,
399     EndpointWrite,
400     EndpointAddToPollset,
401     EndpointAddToPollsetSet,
402     EndpointDeleteFromPollsetSet,
403     EndpointShutdown,
404     EndpointDestroy,
405     EndpointGetPeerAddress,
406     EndpointGetLocalAddress,
407     EndpointGetFd,
408     EndpointCanTrackErr};
409 
EventEngineEndpointWrapper(std::unique_ptr<EventEngine::Endpoint> endpoint)410 EventEngineEndpointWrapper::EventEngineEndpointWrapper(
411     std::unique_ptr<EventEngine::Endpoint> endpoint)
412     : endpoint_(std::move(endpoint)),
413       eeep_(std::make_unique<grpc_event_engine_endpoint>()) {
414   eeep_->base.vtable = &grpc_event_engine_endpoint_vtable;
415   eeep_->wrapper = this;
416   auto local_addr = ResolvedAddressToURI(endpoint_->GetLocalAddress());
417   if (local_addr.ok()) {
418     local_address_ = *local_addr;
419   }
420   auto peer_addr = ResolvedAddressToURI(endpoint_->GetPeerAddress());
421   if (peer_addr.ok()) {
422     peer_address_ = *peer_addr;
423   }
424   if (EventEngineSupportsFd()) {
425     fd_ = reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
426               ->GetWrappedFd();
427   } else {
428     fd_ = -1;
429   }
430   GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Create", eeep_->wrapper);
431 }
432 
433 }  // namespace
434 
grpc_event_engine_endpoint_create(std::unique_ptr<EventEngine::Endpoint> ee_endpoint)435 grpc_endpoint* grpc_event_engine_endpoint_create(
436     std::unique_ptr<EventEngine::Endpoint> ee_endpoint) {
437   GPR_DEBUG_ASSERT(ee_endpoint != nullptr);
438   auto wrapper = new EventEngineEndpointWrapper(std::move(ee_endpoint));
439   return wrapper->GetGrpcEndpoint();
440 }
441 
grpc_is_event_engine_endpoint(grpc_endpoint * ep)442 bool grpc_is_event_engine_endpoint(grpc_endpoint* ep) {
443   return ep->vtable == &grpc_event_engine_endpoint_vtable;
444 }
445 
grpc_event_engine_endpoint_destroy_and_release_fd(grpc_endpoint * ep,int * fd,grpc_closure * on_release_fd)446 void grpc_event_engine_endpoint_destroy_and_release_fd(
447     grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd) {
448   auto* eeep =
449       reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
450           ep);
451   if (fd == nullptr || on_release_fd == nullptr) {
452     if (fd != nullptr) {
453       *fd = -1;
454     }
455     eeep->wrapper->TriggerShutdown(nullptr);
456   } else {
457     *fd = -1;
458     eeep->wrapper->TriggerShutdown(
459         [fd, on_release_fd](absl::StatusOr<int> release_fd) {
460           if (release_fd.ok()) {
461             *fd = *release_fd;
462           }
463           RunEventEngineClosure(on_release_fd,
464                                 absl_status_to_grpc_error(release_fd.status()));
465         });
466   }
467   eeep->wrapper->Unref();
468 }
469 
470 }  // namespace experimental
471 }  // namespace grpc_event_engine
472