1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/ext/transport/chaotic_good/client/chaotic_good_connector.h"
18
19 #include <cstdint>
20 #include <memory>
21 #include <utility>
22
23 #include "absl/random/bit_gen_ref.h"
24 #include "absl/status/status.h"
25 #include "absl/status/statusor.h"
26
27 #include <grpc/event_engine/event_engine.h>
28
29 #include "src/core/client_channel/client_channel_factory.h"
30 #include "src/core/client_channel/client_channel_filter.h"
31 #include "src/core/ext/transport/chaotic_good/client_transport.h"
32 #include "src/core/ext/transport/chaotic_good/frame.h"
33 #include "src/core/ext/transport/chaotic_good/frame_header.h"
34 #include "src/core/ext/transport/chaotic_good/settings_metadata.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/config/core_configuration.h"
37 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
38 #include "src/core/lib/event_engine/extensions/chaotic_good_extension.h"
39 #include "src/core/lib/event_engine/query_extensions.h"
40 #include "src/core/lib/event_engine/tcp_socket_utils.h"
41 #include "src/core/lib/gprpp/debug_location.h"
42 #include "src/core/lib/gprpp/no_destruct.h"
43 #include "src/core/lib/gprpp/ref_counted_ptr.h"
44 #include "src/core/lib/gprpp/time.h"
45 #include "src/core/lib/iomgr/closure.h"
46 #include "src/core/lib/iomgr/error.h"
47 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
48 #include "src/core/lib/iomgr/exec_ctx.h"
49 #include "src/core/lib/promise/activity.h"
50 #include "src/core/lib/promise/context.h"
51 #include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
52 #include "src/core/lib/promise/latch.h"
53 #include "src/core/lib/promise/race.h"
54 #include "src/core/lib/promise/sleep.h"
55 #include "src/core/lib/promise/try_seq.h"
56 #include "src/core/lib/promise/wait_for_callback.h"
57 #include "src/core/lib/resource_quota/arena.h"
58 #include "src/core/lib/resource_quota/resource_quota.h"
59 #include "src/core/lib/slice/slice.h"
60 #include "src/core/lib/slice/slice_buffer.h"
61 #include "src/core/lib/surface/api_trace.h"
62 #include "src/core/lib/surface/channel.h"
63 #include "src/core/lib/surface/channel_create.h"
64 #include "src/core/lib/transport/error_utils.h"
65 #include "src/core/lib/transport/handshaker.h"
66 #include "src/core/lib/transport/promise_endpoint.h"
67
68 namespace grpc_core {
69 namespace chaotic_good {
70 using grpc_event_engine::experimental::EventEngine;
71 namespace {
72 const int32_t kDataAlignmentBytes = 64;
73 const int32_t kTimeoutSecs = 120;
74 } // namespace
75
ChaoticGoodConnector(std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)76 ChaoticGoodConnector::ChaoticGoodConnector(
77 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
78 : event_engine_(std::move(event_engine)),
79 handshake_mgr_(std::make_shared<HandshakeManager>()) {}
80
~ChaoticGoodConnector()81 ChaoticGoodConnector::~ChaoticGoodConnector() {
82 GPR_ASSERT(notify_ == nullptr);
83 if (connect_activity_ != nullptr) {
84 connect_activity_.reset();
85 }
86 }
87
DataEndpointReadSettingsFrame(RefCountedPtr<ChaoticGoodConnector> self)88 auto ChaoticGoodConnector::DataEndpointReadSettingsFrame(
89 RefCountedPtr<ChaoticGoodConnector> self) {
90 return TrySeq(
91 self->data_endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize),
92 [self](Slice slice) mutable {
93 // Read setting frame;
94 // Parse frame header
95 auto frame_header_ =
96 FrameHeader::Parse(reinterpret_cast<const uint8_t*>(
97 GRPC_SLICE_START_PTR(slice.c_slice())));
98 return If(
99 frame_header_.ok(),
100 [frame_header_ = *frame_header_, self]() {
101 auto frame_header_length = frame_header_.GetFrameLength();
102 return TrySeq(self->data_endpoint_.Read(frame_header_length),
103 []() { return absl::OkStatus(); });
104 },
105 [status = frame_header_.status()]() { return status; });
106 });
107 }
108
DataEndpointWriteSettingsFrame(RefCountedPtr<ChaoticGoodConnector> self)109 auto ChaoticGoodConnector::DataEndpointWriteSettingsFrame(
110 RefCountedPtr<ChaoticGoodConnector> self) {
111 // Serialize setting frame.
112 SettingsFrame frame;
113 // frame.header set connectiion_type: control
114 frame.headers = SettingsMetadata{SettingsMetadata::ConnectionType::kData,
115 self->connection_id_, kDataAlignmentBytes}
116 .ToMetadataBatch();
117 auto write_buffer = frame.Serialize(&self->hpack_compressor_);
118 return self->data_endpoint_.Write(std::move(write_buffer.control));
119 }
120
WaitForDataEndpointSetup(RefCountedPtr<ChaoticGoodConnector> self)121 auto ChaoticGoodConnector::WaitForDataEndpointSetup(
122 RefCountedPtr<ChaoticGoodConnector> self) {
123 // Data endpoint on_connect callback.
124 grpc_event_engine::experimental::EventEngine::OnConnectCallback
125 on_data_endpoint_connect =
126 [self](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>>
127 endpoint) mutable {
128 ExecCtx exec_ctx;
129 if (!endpoint.ok() || self->handshake_mgr_ == nullptr) {
130 ExecCtx::Run(DEBUG_LOCATION,
131 std::exchange(self->notify_, nullptr),
132 GRPC_ERROR_CREATE("connect endpoint failed"));
133 return;
134 }
135 auto* chaotic_good_ext =
136 grpc_event_engine::experimental::QueryExtension<
137 grpc_event_engine::experimental::ChaoticGoodExtension>(
138 endpoint.value().get());
139 if (chaotic_good_ext != nullptr) {
140 chaotic_good_ext->EnableStatsCollection(
141 /*is_control_channel=*/false);
142 }
143 self->data_endpoint_ =
144 PromiseEndpoint(std::move(endpoint.value()), SliceBuffer());
145 self->data_endpoint_ready_.Set();
146 };
147 self->event_engine_->Connect(
148 std::move(on_data_endpoint_connect), *self->resolved_addr_,
149 grpc_event_engine::experimental::ChannelArgsEndpointConfig(
150 self->args_.channel_args),
151 ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
152 "data_endpoint_connection"),
153 std::chrono::seconds(kTimeoutSecs));
154
155 return TrySeq(Race(
156 TrySeq(self->data_endpoint_ready_.Wait(),
157 [self]() mutable {
158 return TrySeq(DataEndpointWriteSettingsFrame(self),
159 DataEndpointReadSettingsFrame(self),
160 []() -> absl::Status { return absl::OkStatus(); });
161 }),
162 TrySeq(Sleep(Timestamp::Now() + Duration::Seconds(kTimeoutSecs)),
163 []() -> absl::Status {
164 return absl::DeadlineExceededError(
165 "Data endpoint connect deadline exceeded.");
166 })));
167 }
168
ControlEndpointReadSettingsFrame(RefCountedPtr<ChaoticGoodConnector> self)169 auto ChaoticGoodConnector::ControlEndpointReadSettingsFrame(
170 RefCountedPtr<ChaoticGoodConnector> self) {
171 return TrySeq(
172 self->control_endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize),
173 [self](Slice slice) {
174 // Parse frame header
175 auto frame_header = FrameHeader::Parse(reinterpret_cast<const uint8_t*>(
176 GRPC_SLICE_START_PTR(slice.c_slice())));
177 return If(
178 frame_header.ok(),
179 TrySeq(
180 self->control_endpoint_.Read(frame_header->GetFrameLength()),
181 [frame_header = *frame_header, self](SliceBuffer buffer) {
182 // Deserialize setting frame.
183 SettingsFrame frame;
184 BufferPair buffer_pair{std::move(buffer), SliceBuffer()};
185 auto status = frame.Deserialize(
186 &self->hpack_parser_, frame_header,
187 absl::BitGenRef(self->bitgen_), GetContext<Arena>(),
188 std::move(buffer_pair), FrameLimits{});
189 if (!status.ok()) return status;
190 if (frame.headers == nullptr) {
191 return absl::UnavailableError("no settings headers");
192 }
193 auto settings_metadata =
194 SettingsMetadata::FromMetadataBatch(*frame.headers);
195 if (!settings_metadata.ok()) {
196 return settings_metadata.status();
197 }
198 if (!settings_metadata->connection_id.has_value()) {
199 return absl::UnavailableError(
200 "no connection id in settings frame");
201 }
202 self->connection_id_ = *settings_metadata->connection_id;
203 return absl::OkStatus();
204 },
205 WaitForDataEndpointSetup(self)),
206 [status = frame_header.status()]() { return status; });
207 });
208 }
209
ControlEndpointWriteSettingsFrame(RefCountedPtr<ChaoticGoodConnector> self)210 auto ChaoticGoodConnector::ControlEndpointWriteSettingsFrame(
211 RefCountedPtr<ChaoticGoodConnector> self) {
212 // Serialize setting frame.
213 SettingsFrame frame;
214 // frame.header set connectiion_type: control
215 frame.headers = SettingsMetadata{SettingsMetadata::ConnectionType::kControl,
216 absl::nullopt, absl::nullopt}
217 .ToMetadataBatch();
218 auto write_buffer = frame.Serialize(&self->hpack_compressor_);
219 return self->control_endpoint_.Write(std::move(write_buffer.control));
220 }
221
Connect(const Args & args,Result * result,grpc_closure * notify)222 void ChaoticGoodConnector::Connect(const Args& args, Result* result,
223 grpc_closure* notify) {
224 {
225 MutexLock lock(&mu_);
226 result_ = result;
227 if (is_shutdown_) {
228 GPR_ASSERT(notify_ == nullptr);
229 ExecCtx::Run(DEBUG_LOCATION, notify,
230 GRPC_ERROR_CREATE("connector shutdown"));
231 return;
232 }
233 }
234 args_ = args;
235 notify_ = notify;
236 resolved_addr_ = EventEngine::ResolvedAddress(
237 reinterpret_cast<const sockaddr*>(args_.address->addr),
238 args_.address->len);
239 GPR_ASSERT(resolved_addr_.value().address() != nullptr);
240 grpc_event_engine::experimental::EventEngine::OnConnectCallback on_connect =
241 [self = RefAsSubclass<ChaoticGoodConnector>()](
242 absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>>
243 endpoint) mutable {
244 ExecCtx exec_ctx;
245 if (!endpoint.ok() || self->handshake_mgr_ == nullptr) {
246 auto endpoint_status = endpoint.status();
247 auto error = GRPC_ERROR_CREATE_REFERENCING("connect endpoint failed",
248 &endpoint_status, 1);
249 ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr),
250 error);
251 return;
252 }
253 auto* p = self.release();
254 auto* chaotic_good_ext =
255 grpc_event_engine::experimental::QueryExtension<
256 grpc_event_engine::experimental::ChaoticGoodExtension>(
257 endpoint.value().get());
258 if (chaotic_good_ext != nullptr) {
259 chaotic_good_ext->EnableStatsCollection(/*is_control_channel=*/true);
260 chaotic_good_ext->UseMemoryQuota(
261 ResourceQuota::Default()->memory_quota());
262 }
263 p->handshake_mgr_->DoHandshake(
264 grpc_event_engine_endpoint_create(std::move(endpoint.value())),
265 p->args_.channel_args, p->args_.deadline, nullptr /* acceptor */,
266 OnHandshakeDone, p);
267 };
268 event_engine_->Connect(
269 std::move(on_connect), *resolved_addr_,
270 grpc_event_engine::experimental::ChannelArgsEndpointConfig(
271 args_.channel_args),
272 ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
273 "data_endpoint_connection"),
274 std::chrono::seconds(kTimeoutSecs));
275 }
276
OnHandshakeDone(void * arg,grpc_error_handle error)277 void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) {
278 auto* args = static_cast<HandshakerArgs*>(arg);
279 RefCountedPtr<ChaoticGoodConnector> self(
280 static_cast<ChaoticGoodConnector*>(args->user_data));
281 grpc_slice_buffer_destroy(args->read_buffer);
282 gpr_free(args->read_buffer);
283 // Start receiving setting frames;
284 {
285 MutexLock lock(&self->mu_);
286 if (!error.ok() || self->is_shutdown_) {
287 if (error.ok()) {
288 error = GRPC_ERROR_CREATE("connector shutdown");
289 // We were shut down after handshaking completed successfully, so
290 // destroy the endpoint here.
291 if (args->endpoint != nullptr) {
292 grpc_endpoint_shutdown(args->endpoint, error);
293 grpc_endpoint_destroy(args->endpoint);
294 }
295 }
296 self->result_->Reset();
297 ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr),
298 error);
299 return;
300 }
301 }
302 if (args->endpoint != nullptr) {
303 GPR_ASSERT(grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
304 args->endpoint));
305 self->control_endpoint_ = PromiseEndpoint(
306 grpc_event_engine::experimental::
307 grpc_take_wrapped_event_engine_endpoint(args->endpoint),
308 SliceBuffer());
309 auto activity = MakeActivity(
310 [self] {
311 return TrySeq(ControlEndpointWriteSettingsFrame(self),
312 ControlEndpointReadSettingsFrame(self),
313 []() { return absl::OkStatus(); });
314 },
315 EventEngineWakeupScheduler(self->event_engine_),
316 [self](absl::Status status) {
317 if (grpc_chaotic_good_trace.enabled()) {
318 gpr_log(GPR_INFO, "ChaoticGoodConnector::OnHandshakeDone: %s",
319 status.ToString().c_str());
320 }
321 if (status.ok()) {
322 MutexLock lock(&self->mu_);
323 self->result_->transport = new ChaoticGoodClientTransport(
324 std::move(self->control_endpoint_),
325 std::move(self->data_endpoint_), self->args_.channel_args,
326 self->event_engine_, std::move(self->hpack_parser_),
327 std::move(self->hpack_compressor_));
328 self->result_->channel_args = self->args_.channel_args;
329 ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr),
330 status);
331 } else if (self->notify_ != nullptr) {
332 ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr),
333 status);
334 }
335 },
336 self->arena_.get(), self->event_engine_.get());
337 MutexLock lock(&self->mu_);
338 if (!self->is_shutdown_) {
339 self->connect_activity_ = std::move(activity);
340 }
341 } else {
342 // Handshaking succeeded but there is no endpoint.
343 MutexLock lock(&self->mu_);
344 self->result_->Reset();
345 auto error = GRPC_ERROR_CREATE("handshake complete with empty endpoint.");
346 ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr), error);
347 }
348 }
349
350 namespace {
351
352 class ChaoticGoodChannelFactory final : public ClientChannelFactory {
353 public:
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & args)354 RefCountedPtr<Subchannel> CreateSubchannel(
355 const grpc_resolved_address& address, const ChannelArgs& args) override {
356 return Subchannel::Create(
357 MakeOrphanable<ChaoticGoodConnector>(
358 args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
359 address, args);
360 }
361 };
362
363 } // namespace
364 } // namespace chaotic_good
365 } // namespace grpc_core
366
grpc_chaotic_good_channel_create(const char * target,const grpc_channel_args * args)367 grpc_channel* grpc_chaotic_good_channel_create(const char* target,
368 const grpc_channel_args* args) {
369 grpc_core::ExecCtx exec_ctx;
370 GRPC_API_TRACE("grpc_chaotic_good_channel_create(target=%s, args=%p)", 2,
371 (target, (void*)args));
372 grpc_channel* channel = nullptr;
373 grpc_error_handle error;
374 // Create channel.
375 auto r = grpc_core::ChannelCreate(
376 target,
377 grpc_core::CoreConfiguration::Get()
378 .channel_args_preconditioning()
379 .PreconditionChannelArgs(args)
380 .SetObject(
381 grpc_core::NoDestructSingleton<
382 grpc_core::chaotic_good::ChaoticGoodChannelFactory>::Get()),
383 GRPC_CLIENT_CHANNEL, nullptr);
384 if (r.ok()) {
385 return r->release()->c_ptr();
386 }
387 error = absl_status_to_grpc_error(r.status());
388 intptr_t integer;
389 grpc_status_code status = GRPC_STATUS_INTERNAL;
390 if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
391 &integer)) {
392 status = static_cast<grpc_status_code>(integer);
393 }
394 channel = grpc_lame_client_channel_create(
395 target, status, "Failed to create secure client channel");
396 return channel;
397 }
398