xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/ext/transport/chaotic_good/server/chaotic_good_server.h"
18 
19 #include <cstdint>
20 #include <memory>
21 #include <random>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 
26 #include "absl/random/bit_gen_ref.h"
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 
30 #include <grpc/event_engine/event_engine.h>
31 #include <grpc/grpc.h>
32 #include <grpc/slice.h>
33 #include <grpc/support/log.h>
34 
35 #include "src/core/ext/transport/chaotic_good/frame.h"
36 #include "src/core/ext/transport/chaotic_good/frame_header.h"
37 #include "src/core/ext/transport/chaotic_good/server_transport.h"
38 #include "src/core/ext/transport/chaotic_good/settings_metadata.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
41 #include "src/core/lib/event_engine/event_engine_context.h"
42 #include "src/core/lib/event_engine/extensions/chaotic_good_extension.h"
43 #include "src/core/lib/event_engine/query_extensions.h"
44 #include "src/core/lib/event_engine/resolved_address_internal.h"
45 #include "src/core/lib/event_engine/tcp_socket_utils.h"
46 #include "src/core/lib/gprpp/orphanable.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/gprpp/status_helper.h"
49 #include "src/core/lib/gprpp/sync.h"
50 #include "src/core/lib/gprpp/time.h"
51 #include "src/core/lib/iomgr/error.h"
52 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
53 #include "src/core/lib/promise/activity.h"
54 #include "src/core/lib/promise/context.h"
55 #include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
56 #include "src/core/lib/promise/if.h"
57 #include "src/core/lib/promise/latch.h"
58 #include "src/core/lib/promise/race.h"
59 #include "src/core/lib/promise/sleep.h"
60 #include "src/core/lib/promise/try_seq.h"
61 #include "src/core/lib/resource_quota/arena.h"
62 #include "src/core/lib/resource_quota/resource_quota.h"
63 #include "src/core/lib/slice/slice.h"
64 #include "src/core/lib/slice/slice_buffer.h"
65 #include "src/core/lib/surface/server.h"
66 #include "src/core/lib/transport/error_utils.h"
67 #include "src/core/lib/transport/handshaker.h"
68 #include "src/core/lib/transport/metadata.h"
69 #include "src/core/lib/transport/metadata_batch.h"
70 #include "src/core/lib/transport/promise_endpoint.h"
71 
72 namespace grpc_core {
73 namespace chaotic_good {
74 
75 namespace {
76 const Duration kConnectionDeadline = Duration::Seconds(120);
77 }  // namespace
78 
79 using grpc_event_engine::experimental::EventEngine;
ChaoticGoodServerListener(Server * server,const ChannelArgs & args,absl::AnyInvocable<std::string ()> connection_id_generator)80 ChaoticGoodServerListener::ChaoticGoodServerListener(
81     Server* server, const ChannelArgs& args,
82     absl::AnyInvocable<std::string()> connection_id_generator)
83     : server_(server),
84       args_(args),
85       event_engine_(
86           args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
87       connection_id_generator_(std::move(connection_id_generator)) {}
88 
~ChaoticGoodServerListener()89 ChaoticGoodServerListener::~ChaoticGoodServerListener() {
90   if (on_destroy_done_ != nullptr) {
91     event_engine_->Run([on_destroy_done = on_destroy_done_]() {
92       ExecCtx exec_ctx;
93       ExecCtx::Run(DEBUG_LOCATION, on_destroy_done, absl::OkStatus());
94     });
95   }
96 }
97 
Bind(grpc_event_engine::experimental::EventEngine::ResolvedAddress addr)98 absl::StatusOr<int> ChaoticGoodServerListener::Bind(
99     grpc_event_engine::experimental::EventEngine::ResolvedAddress addr) {
100   if (grpc_chaotic_good_trace.enabled()) {
101     auto str = grpc_event_engine::experimental::ResolvedAddressToString(addr);
102     gpr_log(GPR_INFO, "CHAOTIC_GOOD: Listen on %s",
103             str.ok() ? str->c_str() : str.status().ToString().c_str());
104   }
105   EventEngine::Listener::AcceptCallback accept_cb =
106       [self = Ref()](std::unique_ptr<EventEngine::Endpoint> ep,
107                      MemoryAllocator) {
108         ExecCtx exec_ctx;
109         MutexLock lock(&self->mu_);
110         if (self->shutdown_) return;
111         self->connection_list_.emplace(
112             MakeOrphanable<ActiveConnection>(self, std::move(ep)));
113       };
114   auto shutdown_cb = [](absl::Status status) {
115     if (!status.ok()) {
116       gpr_log(GPR_ERROR, "Server accept connection failed: %s",
117               StatusToString(status).c_str());
118     }
119   };
120   GPR_ASSERT(event_engine_ != nullptr);
121   auto ee_listener = event_engine_->CreateListener(
122       std::move(accept_cb), std::move(shutdown_cb),
123       grpc_event_engine::experimental::ChannelArgsEndpointConfig(args_),
124       std::make_unique<MemoryQuota>("chaotic_good_server_listener"));
125   if (!ee_listener.ok()) {
126     gpr_log(GPR_ERROR, "Bind failed: %s",
127             ee_listener.status().ToString().c_str());
128     return ee_listener.status();
129   }
130   ee_listener_ = std::move(ee_listener.value());
131   auto port_num = ee_listener_->Bind(addr);
132   if (!port_num.ok()) {
133     return port_num.status();
134   }
135   return port_num;
136 }
137 
StartListening()138 absl::Status ChaoticGoodServerListener::StartListening() {
139   GPR_ASSERT(ee_listener_ != nullptr);
140   auto status = ee_listener_->Start();
141   if (!status.ok()) {
142     gpr_log(GPR_ERROR, "Start listening failed: %s", status.ToString().c_str());
143   } else if (grpc_chaotic_good_trace.enabled()) {
144     gpr_log(GPR_INFO, "CHAOTIC_GOOD: Started listening");
145   }
146   return status;
147 }
148 
ActiveConnection(RefCountedPtr<ChaoticGoodServerListener> listener,std::unique_ptr<EventEngine::Endpoint> endpoint)149 ChaoticGoodServerListener::ActiveConnection::ActiveConnection(
150     RefCountedPtr<ChaoticGoodServerListener> listener,
151     std::unique_ptr<EventEngine::Endpoint> endpoint)
152     : memory_allocator_(listener->memory_allocator_), listener_(listener) {
153   handshaking_state_ = MakeRefCounted<HandshakingState>(Ref());
154   handshaking_state_->Start(std::move(endpoint));
155 }
156 
~ActiveConnection()157 ChaoticGoodServerListener::ActiveConnection::~ActiveConnection() {
158   if (receive_settings_activity_ != nullptr) receive_settings_activity_.reset();
159 }
160 
Orphan()161 void ChaoticGoodServerListener::ActiveConnection::Orphan() {
162   if (grpc_chaotic_good_trace.enabled()) {
163     gpr_log(GPR_INFO, "ActiveConnection::Orphan() %p", this);
164   }
165   if (handshaking_state_ != nullptr) {
166     handshaking_state_->Shutdown();
167     handshaking_state_.reset();
168   }
169   ActivityPtr activity;
170   {
171     MutexLock lock(&mu_);
172     orphaned_ = true;
173     activity = std::move(receive_settings_activity_);
174   }
175   activity.reset();
176   Unref();
177 }
178 
NewConnectionID()179 void ChaoticGoodServerListener::ActiveConnection::NewConnectionID() {
180   bool has_new_id = false;
181   MutexLock lock(&listener_->mu_);
182   while (!has_new_id) {
183     connection_id_ = listener_->connection_id_generator_();
184     if (!listener_->connectivity_map_.contains(connection_id_)) {
185       has_new_id = true;
186     }
187   }
188   listener_->connectivity_map_.emplace(
189       connection_id_, std::make_shared<InterActivityLatch<PromiseEndpoint>>());
190 }
191 
Done(absl::optional<absl::string_view> error)192 void ChaoticGoodServerListener::ActiveConnection::Done(
193     absl::optional<absl::string_view> error) {
194   if (error.has_value()) {
195     gpr_log(GPR_ERROR, "ActiveConnection::Done:%p %s", this,
196             std::string(*error).c_str());
197   }
198   // Can easily be holding various locks here: bounce through EE to ensure no
199   // deadlocks.
200   listener_->event_engine_->Run([self = Ref()]() {
201     ExecCtx exec_ctx;
202     OrphanablePtr<ActiveConnection> con;
203     MutexLock lock(&self->listener_->mu_);
204     auto v = self->listener_->connection_list_.extract(self.get());
205     if (!v.empty()) con = std::move(v.value());
206   });
207 }
208 
HandshakingState(RefCountedPtr<ActiveConnection> connection)209 ChaoticGoodServerListener::ActiveConnection::HandshakingState::HandshakingState(
210     RefCountedPtr<ActiveConnection> connection)
211     : memory_allocator_(connection->memory_allocator_),
212       connection_(std::move(connection)),
213       handshake_mgr_(MakeRefCounted<HandshakeManager>()) {}
214 
Start(std::unique_ptr<EventEngine::Endpoint> endpoint)215 void ChaoticGoodServerListener::ActiveConnection::HandshakingState::Start(
216     std::unique_ptr<EventEngine::Endpoint> endpoint) {
217   handshake_mgr_->DoHandshake(
218       grpc_event_engine_endpoint_create(std::move(endpoint)),
219       connection_->args(), GetConnectionDeadline(), nullptr, OnHandshakeDone,
220       Ref().release());
221 }
222 
223 auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
EndpointReadSettingsFrame(RefCountedPtr<HandshakingState> self)224     EndpointReadSettingsFrame(RefCountedPtr<HandshakingState> self) {
225   return TrySeq(
226       self->connection_->endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize),
227       [self](Slice slice) {
228         // Parse frame header
229         auto frame_header = FrameHeader::Parse(reinterpret_cast<const uint8_t*>(
230             GRPC_SLICE_START_PTR(slice.c_slice())));
231         return If(
232             frame_header.ok(),
233             [self, &frame_header]() {
234               return TrySeq(
235                   self->connection_->endpoint_.Read(
236                       frame_header->GetFrameLength()),
237                   [frame_header = *frame_header,
238                    self](SliceBuffer buffer) -> absl::StatusOr<bool> {
239                     // Read Setting frame.
240                     SettingsFrame frame;
241                     // Deserialize frame from read buffer.
242                     BufferPair buffer_pair{std::move(buffer), SliceBuffer()};
243                     auto status = frame.Deserialize(
244                         &self->connection_->hpack_parser_, frame_header,
245                         absl::BitGenRef(self->connection_->bitgen_),
246                         GetContext<Arena>(), std::move(buffer_pair),
247                         FrameLimits{});
248                     if (!status.ok()) return status;
249                     if (frame.headers == nullptr) {
250                       return absl::UnavailableError("no settings headers");
251                     }
252                     auto settings_metadata =
253                         SettingsMetadata::FromMetadataBatch(*frame.headers);
254                     if (!settings_metadata.ok()) {
255                       return settings_metadata.status();
256                     }
257                     const bool is_control_endpoint =
258                         settings_metadata->connection_type ==
259                         SettingsMetadata::ConnectionType::kControl;
260                     if (!is_control_endpoint) {
261                       if (!settings_metadata->connection_id.has_value()) {
262                         return absl::UnavailableError(
263                             "no connection id in data endpoint settings frame");
264                       }
265                       if (!settings_metadata->alignment.has_value()) {
266                         return absl::UnavailableError(
267                             "no alignment in data endpoint settings frame");
268                       }
269                       // Get connection-id and data-alignment for data endpoint.
270                       self->connection_->connection_id_ =
271                           *settings_metadata->connection_id;
272                       self->connection_->data_alignment_ =
273                           *settings_metadata->alignment;
274                     }
275                     return is_control_endpoint;
276                   });
277             },
278             [&frame_header]() {
279               return [r = frame_header.status()]() -> absl::StatusOr<bool> {
280                 return r;
281               };
282             });
283       });
284 }
285 
286 auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
WaitForDataEndpointSetup(RefCountedPtr<HandshakingState> self)287     WaitForDataEndpointSetup(RefCountedPtr<HandshakingState> self) {
288   return Race(
289       TrySeq(
290           []() {
291             // TODO(ladynana): find a way to resolve SeqState to actual
292             // value.
293             return absl::OkStatus();
294           },
295           [self]() {
296             MutexLock lock(&self->connection_->listener_->mu_);
297             auto latch = self->connection_->listener_->connectivity_map_
298                              .find(self->connection_->connection_id_)
299                              ->second;
300             return latch->Wait();
301           },
302           [self](PromiseEndpoint ret) -> absl::Status {
303             MutexLock lock(&self->connection_->listener_->mu_);
304             if (grpc_chaotic_good_trace.enabled()) {
305               gpr_log(
306                   GPR_INFO, "%p Data endpoint setup done: shutdown=%s",
307                   self->connection_.get(),
308                   self->connection_->listener_->shutdown_ ? "true" : "false");
309             }
310             if (self->connection_->listener_->shutdown_) {
311               return absl::UnavailableError("Server shutdown");
312             }
313             return self->connection_->listener_->server_->SetupTransport(
314                 new ChaoticGoodServerTransport(
315                     self->connection_->args(),
316                     std::move(self->connection_->endpoint_), std::move(ret),
317                     self->connection_->listener_->event_engine_,
318                     std::move(self->connection_->hpack_parser_),
319                     std::move(self->connection_->hpack_compressor_)),
320                 nullptr, self->connection_->args(), nullptr);
321           }),
322       // Set timeout for waiting data endpoint connect.
323       TrySeq(
324           // []() {
325           Sleep(Timestamp::Now() + kConnectionDeadline),
326           [self]() mutable -> absl::Status {
327             MutexLock lock(&self->connection_->listener_->mu_);
328             // Delete connection id from map when timeout;
329             self->connection_->listener_->connectivity_map_.erase(
330                 self->connection_->connection_id_);
331             return absl::DeadlineExceededError("Deadline exceeded.");
332           }));
333 }
334 
335 auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
ControlEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self)336     ControlEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self) {
337   self->connection_->NewConnectionID();
338   SettingsFrame frame;
339   frame.headers =
340       SettingsMetadata{absl::nullopt, self->connection_->connection_id_,
341                        absl::nullopt}
342           .ToMetadataBatch();
343   auto write_buffer = frame.Serialize(&self->connection_->hpack_compressor_);
344   return TrySeq(
345       self->connection_->endpoint_.Write(std::move(write_buffer.control)),
346       WaitForDataEndpointSetup(self));
347 }
348 
349 auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
DataEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self)350     DataEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self) {
351   // Send data endpoint setting frame
352   SettingsFrame frame;
353   frame.headers =
354       SettingsMetadata{absl::nullopt, self->connection_->connection_id_,
355                        self->connection_->data_alignment_}
356           .ToMetadataBatch();
357   auto write_buffer = frame.Serialize(&self->connection_->hpack_compressor_);
358   return TrySeq(
359       self->connection_->endpoint_.Write(std::move(write_buffer.control)),
360       [self]() mutable {
361         MutexLock lock(&self->connection_->listener_->mu_);
362         // Set endpoint to latch
363         auto it = self->connection_->listener_->connectivity_map_.find(
364             self->connection_->connection_id_);
365         if (it == self->connection_->listener_->connectivity_map_.end()) {
366           return absl::InternalError(
367               absl::StrCat("Connection not in map: ",
368                            absl::CEscape(self->connection_->connection_id_)));
369         }
370         it->second->Set(std::move(self->connection_->endpoint_));
371         return absl::OkStatus();
372       });
373 }
374 
375 auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
EndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self,bool is_control_endpoint)376     EndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self,
377                                bool is_control_endpoint) {
378   return If(
379       is_control_endpoint,
380       [&self] { return ControlEndpointWriteSettingsFrame(self); },
381       [&self] { return DataEndpointWriteSettingsFrame(self); });
382 }
383 
384 void ChaoticGoodServerListener::ActiveConnection::HandshakingState::
OnHandshakeDone(void * arg,grpc_error_handle error)385     OnHandshakeDone(void* arg, grpc_error_handle error) {
386   auto* args = static_cast<HandshakerArgs*>(arg);
387   GPR_ASSERT(args != nullptr);
388   RefCountedPtr<HandshakingState> self(
389       static_cast<HandshakingState*>(args->user_data));
390   grpc_slice_buffer_destroy(args->read_buffer);
391   gpr_free(args->read_buffer);
392   if (!error.ok()) {
393     self->connection_->Done(
394         absl::StrCat("Handshake failed: ", StatusToString(error)));
395     return;
396   }
397   if (args->endpoint == nullptr) {
398     self->connection_->Done("Server handshake done but has empty endpoint.");
399     return;
400   }
401   GPR_ASSERT(grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
402       args->endpoint));
403   auto ee_endpoint =
404       grpc_event_engine::experimental::grpc_take_wrapped_event_engine_endpoint(
405           args->endpoint);
406   auto* chaotic_good_ext = grpc_event_engine::experimental::QueryExtension<
407       grpc_event_engine::experimental::ChaoticGoodExtension>(ee_endpoint.get());
408   self->connection_->endpoint_ =
409       PromiseEndpoint(std::move(ee_endpoint), SliceBuffer());
410   auto activity = MakeActivity(
411       [self, chaotic_good_ext]() {
412         return TrySeq(
413             Race(EndpointReadSettingsFrame(self),
414                  TrySeq(Sleep(Timestamp::Now() + kConnectionDeadline),
415                         []() -> absl::StatusOr<bool> {
416                           return absl::DeadlineExceededError(
417                               "Waiting for initial settings frame");
418                         })),
419             [self, chaotic_good_ext](bool is_control_endpoint) {
420               if (chaotic_good_ext != nullptr) {
421                 chaotic_good_ext->EnableStatsCollection(is_control_endpoint);
422                 if (is_control_endpoint) {
423                   // Control endpoint should use the default memory quota
424                   chaotic_good_ext->UseMemoryQuota(
425                       ResourceQuota::Default()->memory_quota());
426                 }
427               }
428               return EndpointWriteSettingsFrame(self, is_control_endpoint);
429             });
430       },
431       EventEngineWakeupScheduler(self->connection_->listener_->event_engine_),
432       [self](absl::Status status) {
433         if (!status.ok()) {
434           self->connection_->Done(
435               absl::StrCat("Server setting frame handling failed: ",
436                            StatusToString(status)));
437         } else {
438           self->connection_->Done();
439         }
440       },
441       self->connection_->arena_.get(),
442       self->connection_->listener_->event_engine_.get());
443   MutexLock lock(&self->connection_->mu_);
444   if (self->connection_->orphaned_) return;
445   self->connection_->receive_settings_activity_ = std::move(activity);
446 }
447 
448 Timestamp ChaoticGoodServerListener::ActiveConnection::HandshakingState::
GetConnectionDeadline()449     GetConnectionDeadline() {
450   if (connection_->args().Contains(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS)) {
451     return Timestamp::Now() +
452            connection_->args()
453                .GetDurationFromIntMillis(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS)
454                .value();
455   }
456   return Timestamp::Now() + kConnectionDeadline;
457 }
458 
Orphan()459 void ChaoticGoodServerListener::Orphan() {
460   if (grpc_chaotic_good_trace.enabled()) {
461     gpr_log(GPR_INFO, "ChaoticGoodServerListener::Orphan()");
462   }
463   {
464     absl::flat_hash_set<OrphanablePtr<ActiveConnection>> connection_list;
465     MutexLock lock(&mu_);
466     connection_list = std::move(connection_list_);
467     shutdown_ = true;
468   }
469   ee_listener_.reset();
470   Unref();
471 };
472 
473 }  // namespace chaotic_good
474 }  // namespace grpc_core
475 
grpc_server_add_chaotic_good_port(grpc_server * server,const char * addr)476 int grpc_server_add_chaotic_good_port(grpc_server* server, const char* addr) {
477   grpc_core::ExecCtx exec_ctx;
478   auto* const core_server = grpc_core::Server::FromC(server);
479   const std::string parsed_addr = grpc_core::URI::PercentDecode(addr);
480   const auto resolved_or = grpc_core::GetDNSResolver()->LookupHostnameBlocking(
481       parsed_addr, absl::StrCat(0xd20));
482   if (!resolved_or.ok()) {
483     gpr_log(GPR_ERROR, "Failed to resolve %s: %s", addr,
484             resolved_or.status().ToString().c_str());
485     return 0;
486   }
487   int port_num = 0;
488   for (const auto& resolved_addr : resolved_or.value()) {
489     auto listener = grpc_core::MakeOrphanable<
490         grpc_core::chaotic_good::ChaoticGoodServerListener>(
491         core_server, core_server->channel_args());
492     const auto ee_addr =
493         grpc_event_engine::experimental::CreateResolvedAddress(resolved_addr);
494     gpr_log(GPR_INFO, "BIND: %s",
495             grpc_event_engine::experimental::ResolvedAddressToString(ee_addr)
496                 ->c_str());
497     auto bind_result = listener->Bind(ee_addr);
498     if (!bind_result.ok()) {
499       gpr_log(GPR_ERROR, "Failed to bind to %s: %s", addr,
500               bind_result.status().ToString().c_str());
501       return 0;
502     }
503     if (port_num == 0) {
504       port_num = bind_result.value();
505     } else {
506       GPR_ASSERT(port_num == bind_result.value());
507     }
508     core_server->AddListener(std::move(listener));
509   }
510   return port_num;
511 }
512