1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_SURFACE_CHANNEL_H
20 #define GRPC_SRC_CORE_LIB_SURFACE_CHANNEL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stddef.h>
25 #include <stdint.h>
26 
27 #include <atomic>
28 #include <map>
29 #include <string>
30 #include <utility>
31 
32 #include "absl/base/thread_annotations.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36 
37 #include <grpc/event_engine/event_engine.h>
38 #include <grpc/event_engine/memory_allocator.h>
39 #include <grpc/grpc.h>
40 #include <grpc/impl/compression_types.h>
41 #include <grpc/slice.h>
42 
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/channel/channel_fwd.h"
45 #include "src/core/lib/channel/channel_stack.h"  // IWYU pragma: keep
46 #include "src/core/lib/channel/channel_stack_builder.h"
47 #include "src/core/lib/channel/channelz.h"
48 #include "src/core/lib/gprpp/cpp_impl_of.h"
49 #include "src/core/lib/gprpp/debug_location.h"
50 #include "src/core/lib/gprpp/ref_counted.h"
51 #include "src/core/lib/gprpp/ref_counted_ptr.h"
52 #include "src/core/lib/gprpp/sync.h"
53 #include "src/core/lib/gprpp/time.h"
54 #include "src/core/lib/iomgr/iomgr_fwd.h"
55 #include "src/core/lib/resource_quota/memory_quota.h"
56 #include "src/core/lib/slice/slice.h"
57 #include "src/core/lib/surface/channel_stack_type.h"
58 #include "src/core/lib/transport/transport_fwd.h"
59 
60 /// The same as grpc_channel_destroy, but doesn't create an ExecCtx, and so
61 /// is safe to use from within core.
62 void grpc_channel_destroy_internal(grpc_channel* channel);
63 
64 /// Create a call given a grpc_channel, in order to call \a method.
65 /// Progress is tied to activity on \a pollset_set. The returned call object is
66 /// meant to be used with \a grpc_call_start_batch_and_execute, which relies on
67 /// callbacks to signal completions. \a method and \a host need
68 /// only live through the invocation of this function. If \a parent_call is
69 /// non-NULL, it must be a server-side call. It will be used to propagate
70 /// properties from the server call to this new client call, depending on the
71 /// value of \a propagation_mask (see propagation_bits.h for possible values)
72 grpc_call* grpc_channel_create_pollset_set_call(
73     grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
74     grpc_pollset_set* pollset_set, const grpc_slice& method,
75     const grpc_slice* host, grpc_core::Timestamp deadline, void* reserved);
76 
77 /// Get a (borrowed) pointer to this channels underlying channel stack
78 grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel);
79 
80 grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
81     grpc_channel* channel);
82 
83 size_t grpc_channel_get_call_size_estimate(grpc_channel* channel);
84 void grpc_channel_update_call_size_estimate(grpc_channel* channel, size_t size);
85 
86 namespace grpc_core {
87 
88 struct RegisteredCall {
89   Slice path;
90   absl::optional<Slice> authority;
91 
92   explicit RegisteredCall(const char* method_arg, const char* host_arg);
93   RegisteredCall(const RegisteredCall& other);
94   RegisteredCall& operator=(const RegisteredCall&) = delete;
95 
96   ~RegisteredCall();
97 };
98 
99 struct CallRegistrationTable {
100   Mutex mu;
101   // The map key should be owned strings rather than unowned char*'s to
102   // guarantee that it outlives calls on the core channel (which may outlast the
103   // C++ or other wrapped language Channel that registered these calls).
104   std::map<std::pair<std::string, std::string>, RegisteredCall> map
105       ABSL_GUARDED_BY(mu);
106   int method_registration_attempts ABSL_GUARDED_BY(mu) = 0;
107 };
108 
109 class Channel : public RefCounted<Channel>,
110                 public CppImplOf<Channel, grpc_channel> {
111  public:
112   static absl::StatusOr<RefCountedPtr<Channel>> Create(
113       const char* target, ChannelArgs args,
114       grpc_channel_stack_type channel_stack_type,
115       grpc_transport* optional_transport);
116 
117   static absl::StatusOr<RefCountedPtr<Channel>> CreateWithBuilder(
118       ChannelStackBuilder* builder);
119 
channel_stack()120   grpc_channel_stack* channel_stack() const { return channel_stack_.get(); }
121 
compression_options()122   grpc_compression_options compression_options() const {
123     return compression_options_;
124   }
125 
channelz_node()126   channelz::ChannelNode* channelz_node() const { return channelz_node_.get(); }
127 
CallSizeEstimate()128   size_t CallSizeEstimate() {
129     // We round up our current estimate to the NEXT value of kRoundUpSize.
130     // This ensures:
131     //  1. a consistent size allocation when our estimate is drifting slowly
132     //     (which is common) - which tends to help most allocators reuse memory
133     //  2. a small amount of allowed growth over the estimate without hitting
134     //     the arena size doubling case, reducing overall memory usage
135     static constexpr size_t kRoundUpSize = 256;
136     return (call_size_estimate_.load(std::memory_order_relaxed) +
137             2 * kRoundUpSize) &
138            ~(kRoundUpSize - 1);
139   }
140 
141   void UpdateCallSizeEstimate(size_t size);
target()142   absl::string_view target() const { return target_; }
allocator()143   MemoryAllocator* allocator() { return &allocator_; }
is_client()144   bool is_client() const { return is_client_; }
is_promising()145   bool is_promising() const { return is_promising_; }
146   RegisteredCall* RegisterCall(const char* method, const char* host);
147 
TestOnlyRegisteredCalls()148   int TestOnlyRegisteredCalls() {
149     MutexLock lock(&registration_table_.mu);
150     return registration_table_.map.size();
151   }
152 
TestOnlyRegistrationAttempts()153   int TestOnlyRegistrationAttempts() {
154     MutexLock lock(&registration_table_.mu);
155     return registration_table_.method_registration_attempts;
156   }
157 
event_engine()158   grpc_event_engine::experimental::EventEngine* event_engine() const {
159     return channel_stack_->EventEngine();
160   }
161 
162  private:
163   Channel(bool is_client, bool is_promising, std::string target,
164           const ChannelArgs& channel_args,
165           grpc_compression_options compression_options,
166           RefCountedPtr<grpc_channel_stack> channel_stack);
167 
168   const bool is_client_;
169   const bool is_promising_;
170   const grpc_compression_options compression_options_;
171   std::atomic<size_t> call_size_estimate_;
172   CallRegistrationTable registration_table_;
173   RefCountedPtr<channelz::ChannelNode> channelz_node_;
174   MemoryAllocator allocator_;
175   std::string target_;
176   const RefCountedPtr<grpc_channel_stack> channel_stack_;
177 };
178 
179 }  // namespace grpc_core
180 
grpc_channel_compression_options(const grpc_channel * channel)181 inline grpc_compression_options grpc_channel_compression_options(
182     const grpc_channel* channel) {
183   return grpc_core::Channel::FromC(channel)->compression_options();
184 }
185 
grpc_channel_get_channel_stack(grpc_channel * channel)186 inline grpc_channel_stack* grpc_channel_get_channel_stack(
187     grpc_channel* channel) {
188   return grpc_core::Channel::FromC(channel)->channel_stack();
189 }
190 
grpc_channel_get_channelz_node(grpc_channel * channel)191 inline grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
192     grpc_channel* channel) {
193   return grpc_core::Channel::FromC(channel)->channelz_node();
194 }
195 
grpc_channel_internal_ref(grpc_channel * channel,const char * reason)196 inline void grpc_channel_internal_ref(grpc_channel* channel,
197                                       const char* reason) {
198   grpc_core::Channel::FromC(channel)->Ref(DEBUG_LOCATION, reason).release();
199 }
grpc_channel_internal_unref(grpc_channel * channel,const char * reason)200 inline void grpc_channel_internal_unref(grpc_channel* channel,
201                                         const char* reason) {
202   grpc_core::Channel::FromC(channel)->Unref(DEBUG_LOCATION, reason);
203 }
204 
205 // Return the channel's compression options.
206 grpc_compression_options grpc_channel_compression_options(
207     const grpc_channel* channel);
208 
209 // Ping the channels peer (load balanced channels will select one sub-channel to
210 // ping); if the channel is not connected, posts a failed.
211 void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq,
212                        void* tag, void* reserved);
213 
214 #endif  // GRPC_SRC_CORE_LIB_SURFACE_CHANNEL_H
215