xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/chaotic_good/server/chaotic_good_server.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_CHAOTIC_GOOD_SERVER_H
16 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_CHAOTIC_GOOD_SERVER_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <cstddef>
21 #include <cstdint>
22 #include <memory>
23 #include <string>
24 #include <vector>
25 
26 #include "absl/container/flat_hash_map.h"
27 #include "absl/random/random.h"
28 #include "absl/status/status.h"
29 #include "absl/status/statusor.h"
30 
31 #include <grpc/event_engine/event_engine.h>
32 
33 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
34 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/channel/channelz.h"
37 #include "src/core/lib/gprpp/sync.h"
38 #include "src/core/lib/gprpp/time.h"
39 #include "src/core/lib/iomgr/closure.h"
40 #include "src/core/lib/iomgr/error.h"
41 #include "src/core/lib/iomgr/iomgr_fwd.h"
42 #include "src/core/lib/promise/activity.h"
43 #include "src/core/lib/promise/inter_activity_latch.h"
44 #include "src/core/lib/resource_quota/memory_quota.h"
45 #include "src/core/lib/resource_quota/resource_quota.h"
46 #include "src/core/lib/slice/slice.h"
47 #include "src/core/lib/surface/server.h"
48 #include "src/core/lib/transport/handshaker.h"
49 #include "src/core/lib/transport/promise_endpoint.h"
50 
51 namespace grpc_core {
52 namespace chaotic_good {
53 class ChaoticGoodServerListener final
54     : public Server::ListenerInterface,
55       public RefCounted<ChaoticGoodServerListener> {
56  public:
DefaultConnectionIDGenerator()57   static absl::AnyInvocable<std::string()> DefaultConnectionIDGenerator() {
58     return [bitgen = absl::BitGen()]() mutable {
59       return absl::StrCat(absl::Hex(absl::Uniform<uint64_t>(bitgen)));
60     };
61   }
62 
63   ChaoticGoodServerListener(
64       Server* server, const ChannelArgs& args,
65       absl::AnyInvocable<std::string()> connection_id_generator =
66           DefaultConnectionIDGenerator());
67   ~ChaoticGoodServerListener() override;
68   // Bind address to EventEngine listener.
69   absl::StatusOr<int> Bind(
70       grpc_event_engine::experimental::EventEngine::ResolvedAddress addr);
71   absl::Status StartListening();
args()72   const ChannelArgs& args() const { return args_; }
73   void Orphan() override;
74 
75   class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
76    public:
77     ActiveConnection(
78         RefCountedPtr<ChaoticGoodServerListener> listener,
79         std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
80             endpoint);
81     ~ActiveConnection() override;
args()82     const ChannelArgs& args() const { return listener_->args(); }
83 
84     void Orphan() override;
85 
86     class HandshakingState : public RefCounted<HandshakingState> {
87      public:
88       explicit HandshakingState(RefCountedPtr<ActiveConnection> connection);
~HandshakingState()89       ~HandshakingState() override{};
90       void Start(std::unique_ptr<
91                  grpc_event_engine::experimental::EventEngine::Endpoint>
92                      endpoint);
93 
Shutdown()94       void Shutdown() {
95         handshake_mgr_->Shutdown(absl::CancelledError("Shutdown"));
96       }
97 
98      private:
99       static auto EndpointReadSettingsFrame(
100           RefCountedPtr<HandshakingState> self);
101       static auto EndpointWriteSettingsFrame(
102           RefCountedPtr<HandshakingState> self, bool is_control_endpoint);
103       static auto WaitForDataEndpointSetup(
104           RefCountedPtr<HandshakingState> self);
105       static auto ControlEndpointWriteSettingsFrame(
106           RefCountedPtr<HandshakingState> self);
107       static auto DataEndpointWriteSettingsFrame(
108           RefCountedPtr<HandshakingState> self);
109 
110       static void OnHandshakeDone(void* arg, grpc_error_handle error);
111       Timestamp GetConnectionDeadline();
112       const std::shared_ptr<grpc_event_engine::experimental::MemoryAllocator>
113           memory_allocator_;
114       const RefCountedPtr<ActiveConnection> connection_;
115       const RefCountedPtr<HandshakeManager> handshake_mgr_;
116     };
117 
118    private:
119     void Done(absl::optional<absl::string_view> error = absl::nullopt);
120     void NewConnectionID();
121     const std::shared_ptr<grpc_event_engine::experimental::MemoryAllocator>
122         memory_allocator_;
123     ScopedArenaPtr arena_ = MakeScopedArena(1024, memory_allocator_.get());
124     const RefCountedPtr<ChaoticGoodServerListener> listener_;
125     RefCountedPtr<HandshakingState> handshaking_state_;
126     Mutex mu_;
127     ActivityPtr receive_settings_activity_ ABSL_GUARDED_BY(mu_);
128     bool orphaned_ ABSL_GUARDED_BY(mu_) = false;
129     PromiseEndpoint endpoint_;
130     HPackCompressor hpack_compressor_;
131     HPackParser hpack_parser_;
132     absl::BitGen bitgen_;
133     std::string connection_id_;
134     int32_t data_alignment_;
135   };
136 
Start(Server *,const std::vector<grpc_pollset * > *)137   void Start(Server*, const std::vector<grpc_pollset*>*) override {
138     StartListening().IgnoreError();
139   };
140 
channelz_listen_socket_node()141   channelz::ListenSocketNode* channelz_listen_socket_node() const override {
142     return nullptr;
143   }
144 
SetOnDestroyDone(grpc_closure * closure)145   void SetOnDestroyDone(grpc_closure* closure) override {
146     MutexLock lock(&mu_);
147     on_destroy_done_ = closure;
148   };
149 
150  private:
151   Server* const server_;
152   ChannelArgs args_;
153   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
154   std::unique_ptr<grpc_event_engine::experimental::EventEngine::Listener>
155       ee_listener_;
156   Mutex mu_;
157   bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
158   // Map of connection id to endpoints connectivity.
159   absl::flat_hash_map<std::string,
160                       std::shared_ptr<InterActivityLatch<PromiseEndpoint>>>
161       connectivity_map_ ABSL_GUARDED_BY(mu_);
162   absl::flat_hash_set<OrphanablePtr<ActiveConnection>> connection_list_
163       ABSL_GUARDED_BY(mu_);
164   absl::AnyInvocable<std::string()> connection_id_generator_
165       ABSL_GUARDED_BY(mu_);
166   grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
167   std::shared_ptr<grpc_event_engine::experimental::MemoryAllocator>
168       memory_allocator_ =
169           std::make_shared<grpc_event_engine::experimental::MemoryAllocator>(
170               ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
171                   "server_connection"));
172 };
173 
174 }  // namespace chaotic_good
175 }  // namespace grpc_core
176 
177 int grpc_server_add_chaotic_good_port(grpc_server* server, const char* addr);
178 
179 #endif  // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_CHAOTIC_GOOD_SERVER_H
180