xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/channel/channel_stack.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
20 #define GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
21 
22 //////////////////////////////////////////////////////////////////////////////
23 // IMPORTANT NOTE:
24 //
25 // When you update this API, please make the corresponding changes to
26 // the C++ API in src/cpp/common/channel_filter.{h,cc}
27 //////////////////////////////////////////////////////////////////////////////
28 
29 // A channel filter defines how operations on a channel are implemented.
30 // Channel filters are chained together to create full channels, and if those
31 // chains are linear, then channel stacks provide a mechanism to minimize
32 // allocations for that chain.
33 // Call stacks are created by channel stacks and represent the per-call data
34 // for that stack.
35 
36 // Implementations should take care of the following details for a batch -
37 // 1. Synchronization is achieved with a CallCombiner. View
38 // src/core/lib/iomgr/call_combiner.h for more details.
39 // 2. If the filter wants to inject an error on the way down, it needs to call
40 // grpc_transport_stream_op_batch_finish_with_failure from within the call
41 // combiner. This will cause any batch callbacks to be called with that error.
42 // 3. If the filter wants to inject an error on the way up (from a callback), it
43 // should also inject that error in the recv_trailing_metadata callback so that
44 // it can have an effect on the call status.
45 //
46 
47 #include <grpc/support/port_platform.h>
48 
49 #include <stddef.h>
50 
51 #include <functional>
52 #include <memory>
53 
54 #include <grpc/event_engine/event_engine.h>
55 #include <grpc/grpc.h>
56 #include <grpc/slice.h>
57 #include <grpc/status.h>
58 #include <grpc/support/log.h>
59 #include <grpc/support/time.h>
60 
61 #include "src/core/lib/channel/channel_args.h"
62 #include "src/core/lib/channel/channel_fwd.h"
63 #include "src/core/lib/channel/context.h"
64 #include "src/core/lib/channel/metrics.h"
65 #include "src/core/lib/debug/trace.h"
66 #include "src/core/lib/gpr/time_precise.h"
67 #include "src/core/lib/gprpp/manual_constructor.h"
68 #include "src/core/lib/gprpp/ref_counted_ptr.h"
69 #include "src/core/lib/gprpp/time.h"
70 #include "src/core/lib/iomgr/call_combiner.h"
71 #include "src/core/lib/iomgr/closure.h"
72 #include "src/core/lib/iomgr/error.h"
73 #include "src/core/lib/iomgr/polling_entity.h"
74 #include "src/core/lib/promise/arena_promise.h"
75 #include "src/core/lib/resource_quota/arena.h"
76 #include "src/core/lib/transport/call_final_info.h"
77 #include "src/core/lib/transport/transport.h"
78 
79 struct grpc_channel_element_args {
80   grpc_channel_stack* channel_stack;
81   grpc_core::ChannelArgs channel_args;
82   int is_first;
83   int is_last;
84 };
85 struct grpc_call_element_args {
86   grpc_call_stack* call_stack;
87   const void* server_transport_data;
88   grpc_call_context_element* context;
89   const grpc_slice& path;
90   gpr_cycle_counter start_time;  // Note: not populated in subchannel stack.
91   grpc_core::Timestamp deadline;
92   grpc_core::Arena* arena;
93   grpc_core::CallCombiner* call_combiner;
94 };
95 
96 // Channel filters specify:
97 // 1. the amount of memory needed in the channel & call (via the sizeof_XXX
98 //    members)
99 // 2. functions to initialize and destroy channel & call data
100 //    (init_XXX, destroy_XXX)
101 // 3. functions to implement call operations and channel operations (call_op,
102 //    channel_op)
103 // 4. a name, which is useful when debugging
104 
105 // Members are laid out in approximate frequency of use order.
106 struct grpc_channel_filter {
107   // Called to eg. send/receive data on a call.
108   // See grpc_call_next_op on how to call the next element in the stack
109   void (*start_transport_stream_op_batch)(grpc_call_element* elem,
110                                           grpc_transport_stream_op_batch* op);
111   // Create a promise to execute one call.
112   // If this is non-null, it may be used in preference to
113   // start_transport_stream_op_batch.
114   // If this is used in preference to start_transport_stream_op_batch, the
115   // following can be omitted also:
116   //   - calling init_call_elem, destroy_call_elem, set_pollset_or_pollset_set
117   //   - allocation of memory for call data
118   // There is an on-going migration to move all filters to providing this, and
119   // then to drop start_transport_stream_op_batch.
120   grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> (*make_call_promise)(
121       grpc_channel_element* elem, grpc_core::CallArgs call_args,
122       grpc_core::NextPromiseFactory next_promise_factory);
123   // Register interceptors into a call.
124   // If this is non-null it may be used in preference to make_call_promise.
125   // There is an on-going migration to move all filters to providing this, and
126   // then to drop start_transport_stream_op_batch.
127   void (*init_call)(grpc_channel_element* elem,
128                     grpc_core::CallSpineInterface* call_spine);
129   // Called to handle channel level operations - e.g. new calls, or transport
130   // closure.
131   // See grpc_channel_next_op on how to call the next element in the stack
132   void (*start_transport_op)(grpc_channel_element* elem, grpc_transport_op* op);
133 
134   // sizeof(per call data)
135   size_t sizeof_call_data;
136   // Initialize per call data.
137   // elem is initialized at the start of the call, and elem->call_data is what
138   // needs initializing.
139   // The filter does not need to do any chaining.
140   // server_transport_data is an opaque pointer. If it is NULL, this call is
141   // on a client; if it is non-NULL, then it points to memory owned by the
142   // transport and is on the server. Most filters want to ignore this
143   // argument.
144   // Implementations may assume that elem->call_data is all zeros.
145   grpc_error_handle (*init_call_elem)(grpc_call_element* elem,
146                                       const grpc_call_element_args* args);
147   void (*set_pollset_or_pollset_set)(grpc_call_element* elem,
148                                      grpc_polling_entity* pollent);
149   // Destroy per call data.
150   // The filter does not need to do any chaining.
151   // The bottom filter of a stack will be passed a non-NULL pointer to
152   // \a then_schedule_closure that should be passed to GRPC_CLOSURE_SCHED when
153   // destruction is complete. \a final_info contains data about the completed
154   // call, mainly for reporting purposes.
155   void (*destroy_call_elem)(grpc_call_element* elem,
156                             const grpc_call_final_info* final_info,
157                             grpc_closure* then_schedule_closure);
158 
159   // sizeof(per channel data)
160   size_t sizeof_channel_data;
161   // Initialize per-channel data.
162   // elem is initialized at the creating of the channel, and elem->channel_data
163   // is what needs initializing.
164   // is_first, is_last designate this elements position in the stack, and are
165   // useful for asserting correct configuration by upper layer code.
166   // The filter does not need to do any chaining.
167   // Implementations may assume that elem->channel_data is all zeros.
168   grpc_error_handle (*init_channel_elem)(grpc_channel_element* elem,
169                                          grpc_channel_element_args* args);
170   // Post init per-channel data.
171   // Called after all channel elements have been successfully created.
172   void (*post_init_channel_elem)(grpc_channel_stack* stk,
173                                  grpc_channel_element* elem);
174   // Destroy per channel data.
175   // The filter does not need to do any chaining
176   void (*destroy_channel_elem)(grpc_channel_element* elem);
177 
178   // Implement grpc_channel_get_info()
179   void (*get_channel_info)(grpc_channel_element* elem,
180                            const grpc_channel_info* channel_info);
181 
182   // The name of this filter
183   const char* name;
184 };
185 // A channel_element tracks its filter and the filter requested memory within
186 // a channel allocation
187 struct grpc_channel_element {
188   const grpc_channel_filter* filter;
189   void* channel_data;
190 };
191 
192 // A call_element tracks its filter, the filter requested memory within
193 // a channel allocation, and the filter requested memory within a call
194 // allocation
195 struct grpc_call_element {
196   const grpc_channel_filter* filter;
197   void* channel_data;
198   void* call_data;
199 };
200 
201 // A channel stack tracks a set of related filters for one channel, and
202 // guarantees they live within a single malloc() allocation
203 struct grpc_channel_stack {
204   grpc_stream_refcount refcount;
205   size_t count;
206   // Memory required for a call stack (computed at channel stack
207   // initialization)
208   size_t call_stack_size;
209   // TODO(ctiller): remove this mechanism... it's a hack to allow
210   // Channel to be separated from grpc_channel_stack's allocation. As the
211   // promise conversion continues, we'll reconsider what grpc_channel_stack
212   // should look like and this can go.
213   grpc_core::ManualConstructor<std::function<void()>> on_destroy;
214 
215   grpc_core::ManualConstructor<
216       std::shared_ptr<grpc_event_engine::experimental::EventEngine>>
217       event_engine;
218 
EventEnginegrpc_channel_stack219   grpc_event_engine::experimental::EventEngine* EventEngine() const {
220     return event_engine->get();
221   }
222 
223   grpc_core::ManualConstructor<
224       grpc_core::GlobalStatsPluginRegistry::StatsPluginGroup>
225       stats_plugin_group;
226 
227   // Minimal infrastructure to act like a RefCounted thing without converting
228   // everything.
229   // It's likely that we'll want to replace grpc_channel_stack with something
230   // less regimented once the promise conversion completes, so avoiding doing a
231   // full C++-ification for now.
232   void IncrementRefCount();
233   void Unref();
Refgrpc_channel_stack234   grpc_core::RefCountedPtr<grpc_channel_stack> Ref() {
235     IncrementRefCount();
236     return grpc_core::RefCountedPtr<grpc_channel_stack>(this);
237   }
238 
239   grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
240   MakeClientCallPromise(grpc_core::CallArgs call_args);
241   grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
242   MakeServerCallPromise(grpc_core::CallArgs call_args);
243 
244   void InitClientCallSpine(grpc_core::CallSpineInterface* call);
245   void InitServerCallSpine(grpc_core::CallSpineInterface* call);
246 };
247 
248 // A call stack tracks a set of related filters for one call, and guarantees
249 // they live within a single malloc() allocation
250 struct grpc_call_stack {
251   // shared refcount for this channel stack.
252   // MUST be the first element: the underlying code calls destroy
253   // with the address of the refcount, but higher layers prefer to think
254   // about the address of the call stack itself.
255   grpc_stream_refcount refcount;
256   size_t count;
257 
258   // Minimal infrastructure to act like a RefCounted thing without converting
259   // everything.
260   // grpc_call_stack will be eliminated once the promise conversion completes.
261   void IncrementRefCount();
262   void Unref();
Refgrpc_call_stack263   grpc_core::RefCountedPtr<grpc_call_stack> Ref() {
264     IncrementRefCount();
265     return grpc_core::RefCountedPtr<grpc_call_stack>(this);
266   }
267 };
268 
269 // Get a channel element given a channel stack and its index
270 grpc_channel_element* grpc_channel_stack_element(grpc_channel_stack* stack,
271                                                  size_t i);
272 // Get the last channel element in a channel stack
273 grpc_channel_element* grpc_channel_stack_last_element(
274     grpc_channel_stack* stack);
275 
276 // A utility function for a filter to determine how many other instances
277 // of the same filter exist above it in the same stack.  Intended to be
278 // used in the filter's init_channel_elem() method.
279 size_t grpc_channel_stack_filter_instance_number(
280     grpc_channel_stack* channel_stack, grpc_channel_element* elem);
281 
282 // Get a call stack element given a call stack and an index
283 grpc_call_element* grpc_call_stack_element(grpc_call_stack* stack, size_t i);
284 
285 // Determine memory required for a channel stack containing a set of filters
286 size_t grpc_channel_stack_size(const grpc_channel_filter** filters,
287                                size_t filter_count);
288 // Initialize a channel stack given some filters
289 grpc_error_handle grpc_channel_stack_init(
290     int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
291     const grpc_channel_filter** filters, size_t filter_count,
292     const grpc_core::ChannelArgs& args, const char* name,
293     grpc_channel_stack* stack);
294 // Destroy a channel stack
295 void grpc_channel_stack_destroy(grpc_channel_stack* stack);
296 
297 // Initialize a call stack given a channel stack. transport_server_data is
298 // expected to be NULL on a client, or an opaque transport owned pointer on the
299 // server.
300 grpc_error_handle grpc_call_stack_init(grpc_channel_stack* channel_stack,
301                                        int initial_refs,
302                                        grpc_iomgr_cb_func destroy,
303                                        void* destroy_arg,
304                                        const grpc_call_element_args* elem_args);
305 // Set a pollset or a pollset_set for a call stack: must occur before the first
306 // op is started
307 void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack,
308                                                 grpc_polling_entity* pollent);
309 
310 #ifndef NDEBUG
311 #define GRPC_CALL_STACK_REF(call_stack, reason) \
312   grpc_stream_ref(&(call_stack)->refcount, reason)
313 #define GRPC_CALL_STACK_UNREF(call_stack, reason) \
314   grpc_stream_unref(&(call_stack)->refcount, reason)
315 #define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
316   grpc_stream_ref(&(channel_stack)->refcount, reason)
317 #define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \
318   grpc_stream_unref(&(channel_stack)->refcount, reason)
319 #else
320 #define GRPC_CALL_STACK_REF(call_stack, reason) \
321   do {                                          \
322     grpc_stream_ref(&(call_stack)->refcount);   \
323     (void)(reason);                             \
324   } while (0);
325 #define GRPC_CALL_STACK_UNREF(call_stack, reason) \
326   do {                                            \
327     grpc_stream_unref(&(call_stack)->refcount);   \
328     (void)(reason);                               \
329   } while (0);
330 #define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
331   do {                                                \
332     grpc_stream_ref(&(channel_stack)->refcount);      \
333     (void)(reason);                                   \
334   } while (0);
335 #define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \
336   do {                                                  \
337     grpc_stream_unref(&(channel_stack)->refcount);      \
338     (void)(reason);                                     \
339   } while (0);
340 #endif
341 
IncrementRefCount()342 inline void grpc_channel_stack::IncrementRefCount() {
343   GRPC_CHANNEL_STACK_REF(this, "smart_pointer");
344 }
345 
Unref()346 inline void grpc_channel_stack::Unref() {
347   GRPC_CHANNEL_STACK_UNREF(this, "smart_pointer");
348 }
349 
IncrementRefCount()350 inline void grpc_call_stack::IncrementRefCount() {
351   GRPC_CALL_STACK_REF(this, "smart_pointer");
352 }
353 
Unref()354 inline void grpc_call_stack::Unref() {
355   GRPC_CALL_STACK_UNREF(this, "smart_pointer");
356 }
357 
358 // Destroy a call stack
359 void grpc_call_stack_destroy(grpc_call_stack* stack,
360                              const grpc_call_final_info* final_info,
361                              grpc_closure* then_schedule_closure);
362 
363 // Ignore set pollset{_set} - used by filters if they don't care about pollsets
364 // at all. Does nothing.
365 void grpc_call_stack_ignore_set_pollset_or_pollset_set(
366     grpc_call_element* elem, grpc_polling_entity* pollent);
367 // Call the next operation in a call stack
368 void grpc_call_next_op(grpc_call_element* elem,
369                        grpc_transport_stream_op_batch* op);
370 // Call the next operation (depending on call directionality) in a channel
371 // stack
372 void grpc_channel_next_op(grpc_channel_element* elem, grpc_transport_op* op);
373 // Pass through a request to get_channel_info() to the next child element
374 void grpc_channel_next_get_info(grpc_channel_element* elem,
375                                 const grpc_channel_info* channel_info);
376 
377 // Given the top element of a channel stack, get the channel stack itself
378 grpc_channel_stack* grpc_channel_stack_from_top_element(
379     grpc_channel_element* elem);
380 // Given the top element of a call stack, get the call stack itself
381 grpc_call_stack* grpc_call_stack_from_top_element(grpc_call_element* elem);
382 
383 void grpc_call_log_op(const char* file, int line, gpr_log_severity severity,
384                       grpc_call_element* elem,
385                       grpc_transport_stream_op_batch* op);
386 
387 void grpc_channel_stack_no_post_init(grpc_channel_stack* stk,
388                                      grpc_channel_element* elem);
389 
390 extern grpc_core::TraceFlag grpc_trace_channel;
391 
392 #define GRPC_CALL_LOG_OP(sev, elem, op)                \
393   do {                                                 \
394     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) { \
395       grpc_call_log_op(sev, elem, op);                 \
396     }                                                  \
397   } while (0)
398 
399 #endif  // GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
400