xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/chaotic_good/client/chaotic_good_connector.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/ext/transport/chaotic_good/client/chaotic_good_connector.h"
18 
19 #include <cstdint>
20 #include <memory>
21 #include <utility>
22 
23 #include "absl/random/bit_gen_ref.h"
24 #include "absl/status/status.h"
25 #include "absl/status/statusor.h"
26 
27 #include <grpc/event_engine/event_engine.h>
28 
29 #include "src/core/client_channel/client_channel_factory.h"
30 #include "src/core/client_channel/client_channel_filter.h"
31 #include "src/core/ext/transport/chaotic_good/client_transport.h"
32 #include "src/core/ext/transport/chaotic_good/frame.h"
33 #include "src/core/ext/transport/chaotic_good/frame_header.h"
34 #include "src/core/ext/transport/chaotic_good/settings_metadata.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/config/core_configuration.h"
37 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
38 #include "src/core/lib/event_engine/extensions/chaotic_good_extension.h"
39 #include "src/core/lib/event_engine/query_extensions.h"
40 #include "src/core/lib/event_engine/tcp_socket_utils.h"
41 #include "src/core/lib/gprpp/debug_location.h"
42 #include "src/core/lib/gprpp/no_destruct.h"
43 #include "src/core/lib/gprpp/ref_counted_ptr.h"
44 #include "src/core/lib/gprpp/time.h"
45 #include "src/core/lib/iomgr/closure.h"
46 #include "src/core/lib/iomgr/error.h"
47 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
48 #include "src/core/lib/iomgr/exec_ctx.h"
49 #include "src/core/lib/promise/activity.h"
50 #include "src/core/lib/promise/context.h"
51 #include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
52 #include "src/core/lib/promise/latch.h"
53 #include "src/core/lib/promise/race.h"
54 #include "src/core/lib/promise/sleep.h"
55 #include "src/core/lib/promise/try_seq.h"
56 #include "src/core/lib/promise/wait_for_callback.h"
57 #include "src/core/lib/resource_quota/arena.h"
58 #include "src/core/lib/resource_quota/resource_quota.h"
59 #include "src/core/lib/slice/slice.h"
60 #include "src/core/lib/slice/slice_buffer.h"
61 #include "src/core/lib/surface/api_trace.h"
62 #include "src/core/lib/surface/channel.h"
63 #include "src/core/lib/surface/channel_create.h"
64 #include "src/core/lib/transport/error_utils.h"
65 #include "src/core/lib/transport/handshaker.h"
66 #include "src/core/lib/transport/promise_endpoint.h"
67 
68 namespace grpc_core {
69 namespace chaotic_good {
70 using grpc_event_engine::experimental::EventEngine;
71 namespace {
72 const int32_t kDataAlignmentBytes = 64;
73 const int32_t kTimeoutSecs = 120;
74 }  // namespace
75 
ChaoticGoodConnector(std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)76 ChaoticGoodConnector::ChaoticGoodConnector(
77     std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
78     : event_engine_(std::move(event_engine)),
79       handshake_mgr_(std::make_shared<HandshakeManager>()) {}
80 
~ChaoticGoodConnector()81 ChaoticGoodConnector::~ChaoticGoodConnector() {
82   GPR_ASSERT(notify_ == nullptr);
83   if (connect_activity_ != nullptr) {
84     connect_activity_.reset();
85   }
86 }
87 
DataEndpointReadSettingsFrame(RefCountedPtr<ChaoticGoodConnector> self)88 auto ChaoticGoodConnector::DataEndpointReadSettingsFrame(
89     RefCountedPtr<ChaoticGoodConnector> self) {
90   return TrySeq(
91       self->data_endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize),
92       [self](Slice slice) mutable {
93         // Read setting frame;
94         // Parse frame header
95         auto frame_header_ =
96             FrameHeader::Parse(reinterpret_cast<const uint8_t*>(
97                 GRPC_SLICE_START_PTR(slice.c_slice())));
98         return If(
99             frame_header_.ok(),
100             [frame_header_ = *frame_header_, self]() {
101               auto frame_header_length = frame_header_.GetFrameLength();
102               return TrySeq(self->data_endpoint_.Read(frame_header_length),
103                             []() { return absl::OkStatus(); });
104             },
105             [status = frame_header_.status()]() { return status; });
106       });
107 }
108 
DataEndpointWriteSettingsFrame(RefCountedPtr<ChaoticGoodConnector> self)109 auto ChaoticGoodConnector::DataEndpointWriteSettingsFrame(
110     RefCountedPtr<ChaoticGoodConnector> self) {
111   // Serialize setting frame.
112   SettingsFrame frame;
113   // frame.header set connectiion_type: control
114   frame.headers = SettingsMetadata{SettingsMetadata::ConnectionType::kData,
115                                    self->connection_id_, kDataAlignmentBytes}
116                       .ToMetadataBatch();
117   auto write_buffer = frame.Serialize(&self->hpack_compressor_);
118   return self->data_endpoint_.Write(std::move(write_buffer.control));
119 }
120 
WaitForDataEndpointSetup(RefCountedPtr<ChaoticGoodConnector> self)121 auto ChaoticGoodConnector::WaitForDataEndpointSetup(
122     RefCountedPtr<ChaoticGoodConnector> self) {
123   // Data endpoint on_connect callback.
124   grpc_event_engine::experimental::EventEngine::OnConnectCallback
125       on_data_endpoint_connect =
126           [self](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>>
127                      endpoint) mutable {
128             ExecCtx exec_ctx;
129             if (!endpoint.ok() || self->handshake_mgr_ == nullptr) {
130               ExecCtx::Run(DEBUG_LOCATION,
131                            std::exchange(self->notify_, nullptr),
132                            GRPC_ERROR_CREATE("connect endpoint failed"));
133               return;
134             }
135             auto* chaotic_good_ext =
136                 grpc_event_engine::experimental::QueryExtension<
137                     grpc_event_engine::experimental::ChaoticGoodExtension>(
138                     endpoint.value().get());
139             if (chaotic_good_ext != nullptr) {
140               chaotic_good_ext->EnableStatsCollection(
141                   /*is_control_channel=*/false);
142             }
143             self->data_endpoint_ =
144                 PromiseEndpoint(std::move(endpoint.value()), SliceBuffer());
145             self->data_endpoint_ready_.Set();
146           };
147   self->event_engine_->Connect(
148       std::move(on_data_endpoint_connect), *self->resolved_addr_,
149       grpc_event_engine::experimental::ChannelArgsEndpointConfig(
150           self->args_.channel_args),
151       ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
152           "data_endpoint_connection"),
153       std::chrono::seconds(kTimeoutSecs));
154 
155   return TrySeq(Race(
156       TrySeq(self->data_endpoint_ready_.Wait(),
157              [self]() mutable {
158                return TrySeq(DataEndpointWriteSettingsFrame(self),
159                              DataEndpointReadSettingsFrame(self),
160                              []() -> absl::Status { return absl::OkStatus(); });
161              }),
162       TrySeq(Sleep(Timestamp::Now() + Duration::Seconds(kTimeoutSecs)),
163              []() -> absl::Status {
164                return absl::DeadlineExceededError(
165                    "Data endpoint connect deadline exceeded.");
166              })));
167 }
168 
ControlEndpointReadSettingsFrame(RefCountedPtr<ChaoticGoodConnector> self)169 auto ChaoticGoodConnector::ControlEndpointReadSettingsFrame(
170     RefCountedPtr<ChaoticGoodConnector> self) {
171   return TrySeq(
172       self->control_endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize),
173       [self](Slice slice) {
174         // Parse frame header
175         auto frame_header = FrameHeader::Parse(reinterpret_cast<const uint8_t*>(
176             GRPC_SLICE_START_PTR(slice.c_slice())));
177         return If(
178             frame_header.ok(),
179             TrySeq(
180                 self->control_endpoint_.Read(frame_header->GetFrameLength()),
181                 [frame_header = *frame_header, self](SliceBuffer buffer) {
182                   // Deserialize setting frame.
183                   SettingsFrame frame;
184                   BufferPair buffer_pair{std::move(buffer), SliceBuffer()};
185                   auto status = frame.Deserialize(
186                       &self->hpack_parser_, frame_header,
187                       absl::BitGenRef(self->bitgen_), GetContext<Arena>(),
188                       std::move(buffer_pair), FrameLimits{});
189                   if (!status.ok()) return status;
190                   if (frame.headers == nullptr) {
191                     return absl::UnavailableError("no settings headers");
192                   }
193                   auto settings_metadata =
194                       SettingsMetadata::FromMetadataBatch(*frame.headers);
195                   if (!settings_metadata.ok()) {
196                     return settings_metadata.status();
197                   }
198                   if (!settings_metadata->connection_id.has_value()) {
199                     return absl::UnavailableError(
200                         "no connection id in settings frame");
201                   }
202                   self->connection_id_ = *settings_metadata->connection_id;
203                   return absl::OkStatus();
204                 },
205                 WaitForDataEndpointSetup(self)),
206             [status = frame_header.status()]() { return status; });
207       });
208 }
209 
ControlEndpointWriteSettingsFrame(RefCountedPtr<ChaoticGoodConnector> self)210 auto ChaoticGoodConnector::ControlEndpointWriteSettingsFrame(
211     RefCountedPtr<ChaoticGoodConnector> self) {
212   // Serialize setting frame.
213   SettingsFrame frame;
214   // frame.header set connectiion_type: control
215   frame.headers = SettingsMetadata{SettingsMetadata::ConnectionType::kControl,
216                                    absl::nullopt, absl::nullopt}
217                       .ToMetadataBatch();
218   auto write_buffer = frame.Serialize(&self->hpack_compressor_);
219   return self->control_endpoint_.Write(std::move(write_buffer.control));
220 }
221 
Connect(const Args & args,Result * result,grpc_closure * notify)222 void ChaoticGoodConnector::Connect(const Args& args, Result* result,
223                                    grpc_closure* notify) {
224   {
225     MutexLock lock(&mu_);
226     result_ = result;
227     if (is_shutdown_) {
228       GPR_ASSERT(notify_ == nullptr);
229       ExecCtx::Run(DEBUG_LOCATION, notify,
230                    GRPC_ERROR_CREATE("connector shutdown"));
231       return;
232     }
233   }
234   args_ = args;
235   notify_ = notify;
236   resolved_addr_ = EventEngine::ResolvedAddress(
237       reinterpret_cast<const sockaddr*>(args_.address->addr),
238       args_.address->len);
239   GPR_ASSERT(resolved_addr_.value().address() != nullptr);
240   grpc_event_engine::experimental::EventEngine::OnConnectCallback on_connect =
241       [self = RefAsSubclass<ChaoticGoodConnector>()](
242           absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>>
243               endpoint) mutable {
244         ExecCtx exec_ctx;
245         if (!endpoint.ok() || self->handshake_mgr_ == nullptr) {
246           auto endpoint_status = endpoint.status();
247           auto error = GRPC_ERROR_CREATE_REFERENCING("connect endpoint failed",
248                                                      &endpoint_status, 1);
249           ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr),
250                        error);
251           return;
252         }
253         auto* p = self.release();
254         auto* chaotic_good_ext =
255             grpc_event_engine::experimental::QueryExtension<
256                 grpc_event_engine::experimental::ChaoticGoodExtension>(
257                 endpoint.value().get());
258         if (chaotic_good_ext != nullptr) {
259           chaotic_good_ext->EnableStatsCollection(/*is_control_channel=*/true);
260           chaotic_good_ext->UseMemoryQuota(
261               ResourceQuota::Default()->memory_quota());
262         }
263         p->handshake_mgr_->DoHandshake(
264             grpc_event_engine_endpoint_create(std::move(endpoint.value())),
265             p->args_.channel_args, p->args_.deadline, nullptr /* acceptor */,
266             OnHandshakeDone, p);
267       };
268   event_engine_->Connect(
269       std::move(on_connect), *resolved_addr_,
270       grpc_event_engine::experimental::ChannelArgsEndpointConfig(
271           args_.channel_args),
272       ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
273           "data_endpoint_connection"),
274       std::chrono::seconds(kTimeoutSecs));
275 }
276 
OnHandshakeDone(void * arg,grpc_error_handle error)277 void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) {
278   auto* args = static_cast<HandshakerArgs*>(arg);
279   RefCountedPtr<ChaoticGoodConnector> self(
280       static_cast<ChaoticGoodConnector*>(args->user_data));
281   grpc_slice_buffer_destroy(args->read_buffer);
282   gpr_free(args->read_buffer);
283   // Start receiving setting frames;
284   {
285     MutexLock lock(&self->mu_);
286     if (!error.ok() || self->is_shutdown_) {
287       if (error.ok()) {
288         error = GRPC_ERROR_CREATE("connector shutdown");
289         // We were shut down after handshaking completed successfully, so
290         // destroy the endpoint here.
291         if (args->endpoint != nullptr) {
292           grpc_endpoint_shutdown(args->endpoint, error);
293           grpc_endpoint_destroy(args->endpoint);
294         }
295       }
296       self->result_->Reset();
297       ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr),
298                    error);
299       return;
300     }
301   }
302   if (args->endpoint != nullptr) {
303     GPR_ASSERT(grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
304         args->endpoint));
305     self->control_endpoint_ = PromiseEndpoint(
306         grpc_event_engine::experimental::
307             grpc_take_wrapped_event_engine_endpoint(args->endpoint),
308         SliceBuffer());
309     auto activity = MakeActivity(
310         [self] {
311           return TrySeq(ControlEndpointWriteSettingsFrame(self),
312                         ControlEndpointReadSettingsFrame(self),
313                         []() { return absl::OkStatus(); });
314         },
315         EventEngineWakeupScheduler(self->event_engine_),
316         [self](absl::Status status) {
317           if (grpc_chaotic_good_trace.enabled()) {
318             gpr_log(GPR_INFO, "ChaoticGoodConnector::OnHandshakeDone: %s",
319                     status.ToString().c_str());
320           }
321           if (status.ok()) {
322             MutexLock lock(&self->mu_);
323             self->result_->transport = new ChaoticGoodClientTransport(
324                 std::move(self->control_endpoint_),
325                 std::move(self->data_endpoint_), self->args_.channel_args,
326                 self->event_engine_, std::move(self->hpack_parser_),
327                 std::move(self->hpack_compressor_));
328             self->result_->channel_args = self->args_.channel_args;
329             ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr),
330                          status);
331           } else if (self->notify_ != nullptr) {
332             ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr),
333                          status);
334           }
335         },
336         self->arena_.get(), self->event_engine_.get());
337     MutexLock lock(&self->mu_);
338     if (!self->is_shutdown_) {
339       self->connect_activity_ = std::move(activity);
340     }
341   } else {
342     // Handshaking succeeded but there is no endpoint.
343     MutexLock lock(&self->mu_);
344     self->result_->Reset();
345     auto error = GRPC_ERROR_CREATE("handshake complete with empty endpoint.");
346     ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr), error);
347   }
348 }
349 
350 namespace {
351 
352 class ChaoticGoodChannelFactory final : public ClientChannelFactory {
353  public:
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & args)354   RefCountedPtr<Subchannel> CreateSubchannel(
355       const grpc_resolved_address& address, const ChannelArgs& args) override {
356     return Subchannel::Create(
357         MakeOrphanable<ChaoticGoodConnector>(
358             args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
359         address, args);
360   }
361 };
362 
363 }  // namespace
364 }  // namespace chaotic_good
365 }  // namespace grpc_core
366 
grpc_chaotic_good_channel_create(const char * target,const grpc_channel_args * args)367 grpc_channel* grpc_chaotic_good_channel_create(const char* target,
368                                                const grpc_channel_args* args) {
369   grpc_core::ExecCtx exec_ctx;
370   GRPC_API_TRACE("grpc_chaotic_good_channel_create(target=%s,  args=%p)", 2,
371                  (target, (void*)args));
372   grpc_channel* channel = nullptr;
373   grpc_error_handle error;
374   // Create channel.
375   auto r = grpc_core::ChannelCreate(
376       target,
377       grpc_core::CoreConfiguration::Get()
378           .channel_args_preconditioning()
379           .PreconditionChannelArgs(args)
380           .SetObject(
381               grpc_core::NoDestructSingleton<
382                   grpc_core::chaotic_good::ChaoticGoodChannelFactory>::Get()),
383       GRPC_CLIENT_CHANNEL, nullptr);
384   if (r.ok()) {
385     return r->release()->c_ptr();
386   }
387   error = absl_status_to_grpc_error(r.status());
388   intptr_t integer;
389   grpc_status_code status = GRPC_STATUS_INTERNAL;
390   if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
391                          &integer)) {
392     status = static_cast<grpc_status_code>(integer);
393   }
394   channel = grpc_lame_client_channel_create(
395       target, status, "Failed to create secure client channel");
396   return channel;
397 }
398