1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_CHAOTIC_GOOD_SERVER_H 16 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_CHAOTIC_GOOD_SERVER_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <cstddef> 21 #include <cstdint> 22 #include <memory> 23 #include <string> 24 #include <vector> 25 26 #include "absl/container/flat_hash_map.h" 27 #include "absl/random/random.h" 28 #include "absl/status/status.h" 29 #include "absl/status/statusor.h" 30 31 #include <grpc/event_engine/event_engine.h> 32 33 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" 34 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" 35 #include "src/core/lib/channel/channel_args.h" 36 #include "src/core/lib/channel/channelz.h" 37 #include "src/core/lib/gprpp/sync.h" 38 #include "src/core/lib/gprpp/time.h" 39 #include "src/core/lib/iomgr/closure.h" 40 #include "src/core/lib/iomgr/error.h" 41 #include "src/core/lib/iomgr/iomgr_fwd.h" 42 #include "src/core/lib/promise/activity.h" 43 #include "src/core/lib/promise/inter_activity_latch.h" 44 #include "src/core/lib/resource_quota/memory_quota.h" 45 #include "src/core/lib/resource_quota/resource_quota.h" 46 #include "src/core/lib/slice/slice.h" 47 #include "src/core/lib/surface/server.h" 48 #include "src/core/lib/transport/handshaker.h" 49 #include "src/core/lib/transport/promise_endpoint.h" 50 51 namespace grpc_core { 52 namespace chaotic_good { 53 class ChaoticGoodServerListener final 54 : public Server::ListenerInterface, 55 public RefCounted<ChaoticGoodServerListener> { 56 public: DefaultConnectionIDGenerator()57 static absl::AnyInvocable<std::string()> DefaultConnectionIDGenerator() { 58 return [bitgen = absl::BitGen()]() mutable { 59 return absl::StrCat(absl::Hex(absl::Uniform<uint64_t>(bitgen))); 60 }; 61 } 62 63 ChaoticGoodServerListener( 64 Server* server, const ChannelArgs& args, 65 absl::AnyInvocable<std::string()> connection_id_generator = 66 DefaultConnectionIDGenerator()); 67 ~ChaoticGoodServerListener() override; 68 // Bind address to EventEngine listener. 69 absl::StatusOr<int> Bind( 70 grpc_event_engine::experimental::EventEngine::ResolvedAddress addr); 71 absl::Status StartListening(); args()72 const ChannelArgs& args() const { return args_; } 73 void Orphan() override; 74 75 class ActiveConnection : public InternallyRefCounted<ActiveConnection> { 76 public: 77 ActiveConnection( 78 RefCountedPtr<ChaoticGoodServerListener> listener, 79 std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> 80 endpoint); 81 ~ActiveConnection() override; args()82 const ChannelArgs& args() const { return listener_->args(); } 83 84 void Orphan() override; 85 86 class HandshakingState : public RefCounted<HandshakingState> { 87 public: 88 explicit HandshakingState(RefCountedPtr<ActiveConnection> connection); ~HandshakingState()89 ~HandshakingState() override{}; 90 void Start(std::unique_ptr< 91 grpc_event_engine::experimental::EventEngine::Endpoint> 92 endpoint); 93 Shutdown()94 void Shutdown() { 95 handshake_mgr_->Shutdown(absl::CancelledError("Shutdown")); 96 } 97 98 private: 99 static auto EndpointReadSettingsFrame( 100 RefCountedPtr<HandshakingState> self); 101 static auto EndpointWriteSettingsFrame( 102 RefCountedPtr<HandshakingState> self, bool is_control_endpoint); 103 static auto WaitForDataEndpointSetup( 104 RefCountedPtr<HandshakingState> self); 105 static auto ControlEndpointWriteSettingsFrame( 106 RefCountedPtr<HandshakingState> self); 107 static auto DataEndpointWriteSettingsFrame( 108 RefCountedPtr<HandshakingState> self); 109 110 static void OnHandshakeDone(void* arg, grpc_error_handle error); 111 Timestamp GetConnectionDeadline(); 112 const std::shared_ptr<grpc_event_engine::experimental::MemoryAllocator> 113 memory_allocator_; 114 const RefCountedPtr<ActiveConnection> connection_; 115 const RefCountedPtr<HandshakeManager> handshake_mgr_; 116 }; 117 118 private: 119 void Done(absl::optional<absl::string_view> error = absl::nullopt); 120 void NewConnectionID(); 121 const std::shared_ptr<grpc_event_engine::experimental::MemoryAllocator> 122 memory_allocator_; 123 ScopedArenaPtr arena_ = MakeScopedArena(1024, memory_allocator_.get()); 124 const RefCountedPtr<ChaoticGoodServerListener> listener_; 125 RefCountedPtr<HandshakingState> handshaking_state_; 126 Mutex mu_; 127 ActivityPtr receive_settings_activity_ ABSL_GUARDED_BY(mu_); 128 bool orphaned_ ABSL_GUARDED_BY(mu_) = false; 129 PromiseEndpoint endpoint_; 130 HPackCompressor hpack_compressor_; 131 HPackParser hpack_parser_; 132 absl::BitGen bitgen_; 133 std::string connection_id_; 134 int32_t data_alignment_; 135 }; 136 Start(Server *,const std::vector<grpc_pollset * > *)137 void Start(Server*, const std::vector<grpc_pollset*>*) override { 138 StartListening().IgnoreError(); 139 }; 140 channelz_listen_socket_node()141 channelz::ListenSocketNode* channelz_listen_socket_node() const override { 142 return nullptr; 143 } 144 SetOnDestroyDone(grpc_closure * closure)145 void SetOnDestroyDone(grpc_closure* closure) override { 146 MutexLock lock(&mu_); 147 on_destroy_done_ = closure; 148 }; 149 150 private: 151 Server* const server_; 152 ChannelArgs args_; 153 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_; 154 std::unique_ptr<grpc_event_engine::experimental::EventEngine::Listener> 155 ee_listener_; 156 Mutex mu_; 157 bool shutdown_ ABSL_GUARDED_BY(mu_) = false; 158 // Map of connection id to endpoints connectivity. 159 absl::flat_hash_map<std::string, 160 std::shared_ptr<InterActivityLatch<PromiseEndpoint>>> 161 connectivity_map_ ABSL_GUARDED_BY(mu_); 162 absl::flat_hash_set<OrphanablePtr<ActiveConnection>> connection_list_ 163 ABSL_GUARDED_BY(mu_); 164 absl::AnyInvocable<std::string()> connection_id_generator_ 165 ABSL_GUARDED_BY(mu_); 166 grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr; 167 std::shared_ptr<grpc_event_engine::experimental::MemoryAllocator> 168 memory_allocator_ = 169 std::make_shared<grpc_event_engine::experimental::MemoryAllocator>( 170 ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( 171 "server_connection")); 172 }; 173 174 } // namespace chaotic_good 175 } // namespace grpc_core 176 177 int grpc_server_add_chaotic_good_port(grpc_server* server, const char* addr); 178 179 #endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_CHAOTIC_GOOD_SERVER_H 180