1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/ext/transport/chaotic_good/server/chaotic_good_server.h"
18
19 #include <cstdint>
20 #include <memory>
21 #include <random>
22 #include <string>
23 #include <utility>
24 #include <vector>
25
26 #include "absl/random/bit_gen_ref.h"
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29
30 #include <grpc/event_engine/event_engine.h>
31 #include <grpc/grpc.h>
32 #include <grpc/slice.h>
33 #include <grpc/support/log.h>
34
35 #include "src/core/ext/transport/chaotic_good/frame.h"
36 #include "src/core/ext/transport/chaotic_good/frame_header.h"
37 #include "src/core/ext/transport/chaotic_good/server_transport.h"
38 #include "src/core/ext/transport/chaotic_good/settings_metadata.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
41 #include "src/core/lib/event_engine/event_engine_context.h"
42 #include "src/core/lib/event_engine/extensions/chaotic_good_extension.h"
43 #include "src/core/lib/event_engine/query_extensions.h"
44 #include "src/core/lib/event_engine/resolved_address_internal.h"
45 #include "src/core/lib/event_engine/tcp_socket_utils.h"
46 #include "src/core/lib/gprpp/orphanable.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/gprpp/status_helper.h"
49 #include "src/core/lib/gprpp/sync.h"
50 #include "src/core/lib/gprpp/time.h"
51 #include "src/core/lib/iomgr/error.h"
52 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
53 #include "src/core/lib/promise/activity.h"
54 #include "src/core/lib/promise/context.h"
55 #include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
56 #include "src/core/lib/promise/if.h"
57 #include "src/core/lib/promise/latch.h"
58 #include "src/core/lib/promise/race.h"
59 #include "src/core/lib/promise/sleep.h"
60 #include "src/core/lib/promise/try_seq.h"
61 #include "src/core/lib/resource_quota/arena.h"
62 #include "src/core/lib/resource_quota/resource_quota.h"
63 #include "src/core/lib/slice/slice.h"
64 #include "src/core/lib/slice/slice_buffer.h"
65 #include "src/core/lib/surface/server.h"
66 #include "src/core/lib/transport/error_utils.h"
67 #include "src/core/lib/transport/handshaker.h"
68 #include "src/core/lib/transport/metadata.h"
69 #include "src/core/lib/transport/metadata_batch.h"
70 #include "src/core/lib/transport/promise_endpoint.h"
71
72 namespace grpc_core {
73 namespace chaotic_good {
74
75 namespace {
76 const Duration kConnectionDeadline = Duration::Seconds(120);
77 } // namespace
78
79 using grpc_event_engine::experimental::EventEngine;
ChaoticGoodServerListener(Server * server,const ChannelArgs & args,absl::AnyInvocable<std::string ()> connection_id_generator)80 ChaoticGoodServerListener::ChaoticGoodServerListener(
81 Server* server, const ChannelArgs& args,
82 absl::AnyInvocable<std::string()> connection_id_generator)
83 : server_(server),
84 args_(args),
85 event_engine_(
86 args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
87 connection_id_generator_(std::move(connection_id_generator)) {}
88
~ChaoticGoodServerListener()89 ChaoticGoodServerListener::~ChaoticGoodServerListener() {
90 if (on_destroy_done_ != nullptr) {
91 event_engine_->Run([on_destroy_done = on_destroy_done_]() {
92 ExecCtx exec_ctx;
93 ExecCtx::Run(DEBUG_LOCATION, on_destroy_done, absl::OkStatus());
94 });
95 }
96 }
97
Bind(grpc_event_engine::experimental::EventEngine::ResolvedAddress addr)98 absl::StatusOr<int> ChaoticGoodServerListener::Bind(
99 grpc_event_engine::experimental::EventEngine::ResolvedAddress addr) {
100 if (grpc_chaotic_good_trace.enabled()) {
101 auto str = grpc_event_engine::experimental::ResolvedAddressToString(addr);
102 gpr_log(GPR_INFO, "CHAOTIC_GOOD: Listen on %s",
103 str.ok() ? str->c_str() : str.status().ToString().c_str());
104 }
105 EventEngine::Listener::AcceptCallback accept_cb =
106 [self = Ref()](std::unique_ptr<EventEngine::Endpoint> ep,
107 MemoryAllocator) {
108 ExecCtx exec_ctx;
109 MutexLock lock(&self->mu_);
110 if (self->shutdown_) return;
111 self->connection_list_.emplace(
112 MakeOrphanable<ActiveConnection>(self, std::move(ep)));
113 };
114 auto shutdown_cb = [](absl::Status status) {
115 if (!status.ok()) {
116 gpr_log(GPR_ERROR, "Server accept connection failed: %s",
117 StatusToString(status).c_str());
118 }
119 };
120 GPR_ASSERT(event_engine_ != nullptr);
121 auto ee_listener = event_engine_->CreateListener(
122 std::move(accept_cb), std::move(shutdown_cb),
123 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args_),
124 std::make_unique<MemoryQuota>("chaotic_good_server_listener"));
125 if (!ee_listener.ok()) {
126 gpr_log(GPR_ERROR, "Bind failed: %s",
127 ee_listener.status().ToString().c_str());
128 return ee_listener.status();
129 }
130 ee_listener_ = std::move(ee_listener.value());
131 auto port_num = ee_listener_->Bind(addr);
132 if (!port_num.ok()) {
133 return port_num.status();
134 }
135 return port_num;
136 }
137
StartListening()138 absl::Status ChaoticGoodServerListener::StartListening() {
139 GPR_ASSERT(ee_listener_ != nullptr);
140 auto status = ee_listener_->Start();
141 if (!status.ok()) {
142 gpr_log(GPR_ERROR, "Start listening failed: %s", status.ToString().c_str());
143 } else if (grpc_chaotic_good_trace.enabled()) {
144 gpr_log(GPR_INFO, "CHAOTIC_GOOD: Started listening");
145 }
146 return status;
147 }
148
ActiveConnection(RefCountedPtr<ChaoticGoodServerListener> listener,std::unique_ptr<EventEngine::Endpoint> endpoint)149 ChaoticGoodServerListener::ActiveConnection::ActiveConnection(
150 RefCountedPtr<ChaoticGoodServerListener> listener,
151 std::unique_ptr<EventEngine::Endpoint> endpoint)
152 : memory_allocator_(listener->memory_allocator_), listener_(listener) {
153 handshaking_state_ = MakeRefCounted<HandshakingState>(Ref());
154 handshaking_state_->Start(std::move(endpoint));
155 }
156
~ActiveConnection()157 ChaoticGoodServerListener::ActiveConnection::~ActiveConnection() {
158 if (receive_settings_activity_ != nullptr) receive_settings_activity_.reset();
159 }
160
Orphan()161 void ChaoticGoodServerListener::ActiveConnection::Orphan() {
162 if (grpc_chaotic_good_trace.enabled()) {
163 gpr_log(GPR_INFO, "ActiveConnection::Orphan() %p", this);
164 }
165 if (handshaking_state_ != nullptr) {
166 handshaking_state_->Shutdown();
167 handshaking_state_.reset();
168 }
169 ActivityPtr activity;
170 {
171 MutexLock lock(&mu_);
172 orphaned_ = true;
173 activity = std::move(receive_settings_activity_);
174 }
175 activity.reset();
176 Unref();
177 }
178
NewConnectionID()179 void ChaoticGoodServerListener::ActiveConnection::NewConnectionID() {
180 bool has_new_id = false;
181 MutexLock lock(&listener_->mu_);
182 while (!has_new_id) {
183 connection_id_ = listener_->connection_id_generator_();
184 if (!listener_->connectivity_map_.contains(connection_id_)) {
185 has_new_id = true;
186 }
187 }
188 listener_->connectivity_map_.emplace(
189 connection_id_, std::make_shared<InterActivityLatch<PromiseEndpoint>>());
190 }
191
Done(absl::optional<absl::string_view> error)192 void ChaoticGoodServerListener::ActiveConnection::Done(
193 absl::optional<absl::string_view> error) {
194 if (error.has_value()) {
195 gpr_log(GPR_ERROR, "ActiveConnection::Done:%p %s", this,
196 std::string(*error).c_str());
197 }
198 // Can easily be holding various locks here: bounce through EE to ensure no
199 // deadlocks.
200 listener_->event_engine_->Run([self = Ref()]() {
201 ExecCtx exec_ctx;
202 OrphanablePtr<ActiveConnection> con;
203 MutexLock lock(&self->listener_->mu_);
204 auto v = self->listener_->connection_list_.extract(self.get());
205 if (!v.empty()) con = std::move(v.value());
206 });
207 }
208
HandshakingState(RefCountedPtr<ActiveConnection> connection)209 ChaoticGoodServerListener::ActiveConnection::HandshakingState::HandshakingState(
210 RefCountedPtr<ActiveConnection> connection)
211 : memory_allocator_(connection->memory_allocator_),
212 connection_(std::move(connection)),
213 handshake_mgr_(MakeRefCounted<HandshakeManager>()) {}
214
Start(std::unique_ptr<EventEngine::Endpoint> endpoint)215 void ChaoticGoodServerListener::ActiveConnection::HandshakingState::Start(
216 std::unique_ptr<EventEngine::Endpoint> endpoint) {
217 handshake_mgr_->DoHandshake(
218 grpc_event_engine_endpoint_create(std::move(endpoint)),
219 connection_->args(), GetConnectionDeadline(), nullptr, OnHandshakeDone,
220 Ref().release());
221 }
222
223 auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
EndpointReadSettingsFrame(RefCountedPtr<HandshakingState> self)224 EndpointReadSettingsFrame(RefCountedPtr<HandshakingState> self) {
225 return TrySeq(
226 self->connection_->endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize),
227 [self](Slice slice) {
228 // Parse frame header
229 auto frame_header = FrameHeader::Parse(reinterpret_cast<const uint8_t*>(
230 GRPC_SLICE_START_PTR(slice.c_slice())));
231 return If(
232 frame_header.ok(),
233 [self, &frame_header]() {
234 return TrySeq(
235 self->connection_->endpoint_.Read(
236 frame_header->GetFrameLength()),
237 [frame_header = *frame_header,
238 self](SliceBuffer buffer) -> absl::StatusOr<bool> {
239 // Read Setting frame.
240 SettingsFrame frame;
241 // Deserialize frame from read buffer.
242 BufferPair buffer_pair{std::move(buffer), SliceBuffer()};
243 auto status = frame.Deserialize(
244 &self->connection_->hpack_parser_, frame_header,
245 absl::BitGenRef(self->connection_->bitgen_),
246 GetContext<Arena>(), std::move(buffer_pair),
247 FrameLimits{});
248 if (!status.ok()) return status;
249 if (frame.headers == nullptr) {
250 return absl::UnavailableError("no settings headers");
251 }
252 auto settings_metadata =
253 SettingsMetadata::FromMetadataBatch(*frame.headers);
254 if (!settings_metadata.ok()) {
255 return settings_metadata.status();
256 }
257 const bool is_control_endpoint =
258 settings_metadata->connection_type ==
259 SettingsMetadata::ConnectionType::kControl;
260 if (!is_control_endpoint) {
261 if (!settings_metadata->connection_id.has_value()) {
262 return absl::UnavailableError(
263 "no connection id in data endpoint settings frame");
264 }
265 if (!settings_metadata->alignment.has_value()) {
266 return absl::UnavailableError(
267 "no alignment in data endpoint settings frame");
268 }
269 // Get connection-id and data-alignment for data endpoint.
270 self->connection_->connection_id_ =
271 *settings_metadata->connection_id;
272 self->connection_->data_alignment_ =
273 *settings_metadata->alignment;
274 }
275 return is_control_endpoint;
276 });
277 },
278 [&frame_header]() {
279 return [r = frame_header.status()]() -> absl::StatusOr<bool> {
280 return r;
281 };
282 });
283 });
284 }
285
286 auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
WaitForDataEndpointSetup(RefCountedPtr<HandshakingState> self)287 WaitForDataEndpointSetup(RefCountedPtr<HandshakingState> self) {
288 return Race(
289 TrySeq(
290 []() {
291 // TODO(ladynana): find a way to resolve SeqState to actual
292 // value.
293 return absl::OkStatus();
294 },
295 [self]() {
296 MutexLock lock(&self->connection_->listener_->mu_);
297 auto latch = self->connection_->listener_->connectivity_map_
298 .find(self->connection_->connection_id_)
299 ->second;
300 return latch->Wait();
301 },
302 [self](PromiseEndpoint ret) -> absl::Status {
303 MutexLock lock(&self->connection_->listener_->mu_);
304 if (grpc_chaotic_good_trace.enabled()) {
305 gpr_log(
306 GPR_INFO, "%p Data endpoint setup done: shutdown=%s",
307 self->connection_.get(),
308 self->connection_->listener_->shutdown_ ? "true" : "false");
309 }
310 if (self->connection_->listener_->shutdown_) {
311 return absl::UnavailableError("Server shutdown");
312 }
313 return self->connection_->listener_->server_->SetupTransport(
314 new ChaoticGoodServerTransport(
315 self->connection_->args(),
316 std::move(self->connection_->endpoint_), std::move(ret),
317 self->connection_->listener_->event_engine_,
318 std::move(self->connection_->hpack_parser_),
319 std::move(self->connection_->hpack_compressor_)),
320 nullptr, self->connection_->args(), nullptr);
321 }),
322 // Set timeout for waiting data endpoint connect.
323 TrySeq(
324 // []() {
325 Sleep(Timestamp::Now() + kConnectionDeadline),
326 [self]() mutable -> absl::Status {
327 MutexLock lock(&self->connection_->listener_->mu_);
328 // Delete connection id from map when timeout;
329 self->connection_->listener_->connectivity_map_.erase(
330 self->connection_->connection_id_);
331 return absl::DeadlineExceededError("Deadline exceeded.");
332 }));
333 }
334
335 auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
ControlEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self)336 ControlEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self) {
337 self->connection_->NewConnectionID();
338 SettingsFrame frame;
339 frame.headers =
340 SettingsMetadata{absl::nullopt, self->connection_->connection_id_,
341 absl::nullopt}
342 .ToMetadataBatch();
343 auto write_buffer = frame.Serialize(&self->connection_->hpack_compressor_);
344 return TrySeq(
345 self->connection_->endpoint_.Write(std::move(write_buffer.control)),
346 WaitForDataEndpointSetup(self));
347 }
348
349 auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
DataEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self)350 DataEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self) {
351 // Send data endpoint setting frame
352 SettingsFrame frame;
353 frame.headers =
354 SettingsMetadata{absl::nullopt, self->connection_->connection_id_,
355 self->connection_->data_alignment_}
356 .ToMetadataBatch();
357 auto write_buffer = frame.Serialize(&self->connection_->hpack_compressor_);
358 return TrySeq(
359 self->connection_->endpoint_.Write(std::move(write_buffer.control)),
360 [self]() mutable {
361 MutexLock lock(&self->connection_->listener_->mu_);
362 // Set endpoint to latch
363 auto it = self->connection_->listener_->connectivity_map_.find(
364 self->connection_->connection_id_);
365 if (it == self->connection_->listener_->connectivity_map_.end()) {
366 return absl::InternalError(
367 absl::StrCat("Connection not in map: ",
368 absl::CEscape(self->connection_->connection_id_)));
369 }
370 it->second->Set(std::move(self->connection_->endpoint_));
371 return absl::OkStatus();
372 });
373 }
374
375 auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
EndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self,bool is_control_endpoint)376 EndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self,
377 bool is_control_endpoint) {
378 return If(
379 is_control_endpoint,
380 [&self] { return ControlEndpointWriteSettingsFrame(self); },
381 [&self] { return DataEndpointWriteSettingsFrame(self); });
382 }
383
384 void ChaoticGoodServerListener::ActiveConnection::HandshakingState::
OnHandshakeDone(void * arg,grpc_error_handle error)385 OnHandshakeDone(void* arg, grpc_error_handle error) {
386 auto* args = static_cast<HandshakerArgs*>(arg);
387 GPR_ASSERT(args != nullptr);
388 RefCountedPtr<HandshakingState> self(
389 static_cast<HandshakingState*>(args->user_data));
390 grpc_slice_buffer_destroy(args->read_buffer);
391 gpr_free(args->read_buffer);
392 if (!error.ok()) {
393 self->connection_->Done(
394 absl::StrCat("Handshake failed: ", StatusToString(error)));
395 return;
396 }
397 if (args->endpoint == nullptr) {
398 self->connection_->Done("Server handshake done but has empty endpoint.");
399 return;
400 }
401 GPR_ASSERT(grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
402 args->endpoint));
403 auto ee_endpoint =
404 grpc_event_engine::experimental::grpc_take_wrapped_event_engine_endpoint(
405 args->endpoint);
406 auto* chaotic_good_ext = grpc_event_engine::experimental::QueryExtension<
407 grpc_event_engine::experimental::ChaoticGoodExtension>(ee_endpoint.get());
408 self->connection_->endpoint_ =
409 PromiseEndpoint(std::move(ee_endpoint), SliceBuffer());
410 auto activity = MakeActivity(
411 [self, chaotic_good_ext]() {
412 return TrySeq(
413 Race(EndpointReadSettingsFrame(self),
414 TrySeq(Sleep(Timestamp::Now() + kConnectionDeadline),
415 []() -> absl::StatusOr<bool> {
416 return absl::DeadlineExceededError(
417 "Waiting for initial settings frame");
418 })),
419 [self, chaotic_good_ext](bool is_control_endpoint) {
420 if (chaotic_good_ext != nullptr) {
421 chaotic_good_ext->EnableStatsCollection(is_control_endpoint);
422 if (is_control_endpoint) {
423 // Control endpoint should use the default memory quota
424 chaotic_good_ext->UseMemoryQuota(
425 ResourceQuota::Default()->memory_quota());
426 }
427 }
428 return EndpointWriteSettingsFrame(self, is_control_endpoint);
429 });
430 },
431 EventEngineWakeupScheduler(self->connection_->listener_->event_engine_),
432 [self](absl::Status status) {
433 if (!status.ok()) {
434 self->connection_->Done(
435 absl::StrCat("Server setting frame handling failed: ",
436 StatusToString(status)));
437 } else {
438 self->connection_->Done();
439 }
440 },
441 self->connection_->arena_.get(),
442 self->connection_->listener_->event_engine_.get());
443 MutexLock lock(&self->connection_->mu_);
444 if (self->connection_->orphaned_) return;
445 self->connection_->receive_settings_activity_ = std::move(activity);
446 }
447
448 Timestamp ChaoticGoodServerListener::ActiveConnection::HandshakingState::
GetConnectionDeadline()449 GetConnectionDeadline() {
450 if (connection_->args().Contains(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS)) {
451 return Timestamp::Now() +
452 connection_->args()
453 .GetDurationFromIntMillis(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS)
454 .value();
455 }
456 return Timestamp::Now() + kConnectionDeadline;
457 }
458
Orphan()459 void ChaoticGoodServerListener::Orphan() {
460 if (grpc_chaotic_good_trace.enabled()) {
461 gpr_log(GPR_INFO, "ChaoticGoodServerListener::Orphan()");
462 }
463 {
464 absl::flat_hash_set<OrphanablePtr<ActiveConnection>> connection_list;
465 MutexLock lock(&mu_);
466 connection_list = std::move(connection_list_);
467 shutdown_ = true;
468 }
469 ee_listener_.reset();
470 Unref();
471 };
472
473 } // namespace chaotic_good
474 } // namespace grpc_core
475
grpc_server_add_chaotic_good_port(grpc_server * server,const char * addr)476 int grpc_server_add_chaotic_good_port(grpc_server* server, const char* addr) {
477 grpc_core::ExecCtx exec_ctx;
478 auto* const core_server = grpc_core::Server::FromC(server);
479 const std::string parsed_addr = grpc_core::URI::PercentDecode(addr);
480 const auto resolved_or = grpc_core::GetDNSResolver()->LookupHostnameBlocking(
481 parsed_addr, absl::StrCat(0xd20));
482 if (!resolved_or.ok()) {
483 gpr_log(GPR_ERROR, "Failed to resolve %s: %s", addr,
484 resolved_or.status().ToString().c_str());
485 return 0;
486 }
487 int port_num = 0;
488 for (const auto& resolved_addr : resolved_or.value()) {
489 auto listener = grpc_core::MakeOrphanable<
490 grpc_core::chaotic_good::ChaoticGoodServerListener>(
491 core_server, core_server->channel_args());
492 const auto ee_addr =
493 grpc_event_engine::experimental::CreateResolvedAddress(resolved_addr);
494 gpr_log(GPR_INFO, "BIND: %s",
495 grpc_event_engine::experimental::ResolvedAddressToString(ee_addr)
496 ->c_str());
497 auto bind_result = listener->Bind(ee_addr);
498 if (!bind_result.ok()) {
499 gpr_log(GPR_ERROR, "Failed to bind to %s: %s", addr,
500 bind_result.status().ToString().c_str());
501 return 0;
502 }
503 if (port_num == 0) {
504 port_num = bind_result.value();
505 } else {
506 GPR_ASSERT(port_num == bind_result.value());
507 }
508 core_server->AddListener(std::move(listener));
509 }
510 return port_num;
511 }
512