1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
20 #define GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
21
22 //////////////////////////////////////////////////////////////////////////////
23 // IMPORTANT NOTE:
24 //
25 // When you update this API, please make the corresponding changes to
26 // the C++ API in src/cpp/common/channel_filter.{h,cc}
27 //////////////////////////////////////////////////////////////////////////////
28
29 // A channel filter defines how operations on a channel are implemented.
30 // Channel filters are chained together to create full channels, and if those
31 // chains are linear, then channel stacks provide a mechanism to minimize
32 // allocations for that chain.
33 // Call stacks are created by channel stacks and represent the per-call data
34 // for that stack.
35
36 // Implementations should take care of the following details for a batch -
37 // 1. Synchronization is achieved with a CallCombiner. View
38 // src/core/lib/iomgr/call_combiner.h for more details.
39 // 2. If the filter wants to inject an error on the way down, it needs to call
40 // grpc_transport_stream_op_batch_finish_with_failure from within the call
41 // combiner. This will cause any batch callbacks to be called with that error.
42 // 3. If the filter wants to inject an error on the way up (from a callback), it
43 // should also inject that error in the recv_trailing_metadata callback so that
44 // it can have an effect on the call status.
45 //
46
47 #include <grpc/support/port_platform.h>
48
49 #include <stddef.h>
50
51 #include <functional>
52 #include <memory>
53
54 #include <grpc/event_engine/event_engine.h>
55 #include <grpc/grpc.h>
56 #include <grpc/slice.h>
57 #include <grpc/status.h>
58 #include <grpc/support/log.h>
59 #include <grpc/support/time.h>
60
61 #include "src/core/lib/channel/channel_args.h"
62 #include "src/core/lib/channel/channel_fwd.h"
63 #include "src/core/lib/channel/context.h"
64 #include "src/core/lib/debug/trace.h"
65 #include "src/core/lib/gpr/time_precise.h"
66 #include "src/core/lib/gprpp/manual_constructor.h"
67 #include "src/core/lib/gprpp/ref_counted_ptr.h"
68 #include "src/core/lib/gprpp/time.h"
69 #include "src/core/lib/iomgr/call_combiner.h"
70 #include "src/core/lib/iomgr/closure.h"
71 #include "src/core/lib/iomgr/error.h"
72 #include "src/core/lib/iomgr/polling_entity.h"
73 #include "src/core/lib/promise/arena_promise.h"
74 #include "src/core/lib/resource_quota/arena.h"
75 #include "src/core/lib/transport/transport.h"
76
77 struct grpc_channel_element_args {
78 grpc_channel_stack* channel_stack;
79 grpc_core::ChannelArgs channel_args;
80 int is_first;
81 int is_last;
82 };
83 struct grpc_call_element_args {
84 grpc_call_stack* call_stack;
85 const void* server_transport_data;
86 grpc_call_context_element* context;
87 const grpc_slice& path;
88 gpr_cycle_counter start_time; // Note: not populated in subchannel stack.
89 grpc_core::Timestamp deadline;
90 grpc_core::Arena* arena;
91 grpc_core::CallCombiner* call_combiner;
92 };
93 struct grpc_call_stats {
94 grpc_transport_stream_stats transport_stream_stats;
95 gpr_timespec latency; // From call creating to enqueing of received status
96 };
97 /// Information about the call upon completion.
98 struct grpc_call_final_info {
99 grpc_call_stats stats;
100 grpc_status_code final_status = GRPC_STATUS_OK;
101 const char* error_string = nullptr;
102 };
103
104 // Channel filters specify:
105 // 1. the amount of memory needed in the channel & call (via the sizeof_XXX
106 // members)
107 // 2. functions to initialize and destroy channel & call data
108 // (init_XXX, destroy_XXX)
109 // 3. functions to implement call operations and channel operations (call_op,
110 // channel_op)
111 // 4. a name, which is useful when debugging
112
113 // Members are laid out in approximate frequency of use order.
114 struct grpc_channel_filter {
115 // Called to eg. send/receive data on a call.
116 // See grpc_call_next_op on how to call the next element in the stack
117 void (*start_transport_stream_op_batch)(grpc_call_element* elem,
118 grpc_transport_stream_op_batch* op);
119 // Create a promise to execute one call.
120 // If this is non-null, it may be used in preference to
121 // start_transport_stream_op_batch.
122 // If this is used in preference to start_transport_stream_op_batch, the
123 // following can be omitted also:
124 // - calling init_call_elem, destroy_call_elem, set_pollset_or_pollset_set
125 // - allocation of memory for call data
126 // There is an on-going migration to move all filters to providing this, and
127 // then to drop start_transport_stream_op_batch.
128 grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> (*make_call_promise)(
129 grpc_channel_element* elem, grpc_core::CallArgs call_args,
130 grpc_core::NextPromiseFactory next_promise_factory);
131 // Called to handle channel level operations - e.g. new calls, or transport
132 // closure.
133 // See grpc_channel_next_op on how to call the next element in the stack
134 void (*start_transport_op)(grpc_channel_element* elem, grpc_transport_op* op);
135
136 // sizeof(per call data)
137 size_t sizeof_call_data;
138 // Initialize per call data.
139 // elem is initialized at the start of the call, and elem->call_data is what
140 // needs initializing.
141 // The filter does not need to do any chaining.
142 // server_transport_data is an opaque pointer. If it is NULL, this call is
143 // on a client; if it is non-NULL, then it points to memory owned by the
144 // transport and is on the server. Most filters want to ignore this
145 // argument.
146 // Implementations may assume that elem->call_data is all zeros.
147 grpc_error_handle (*init_call_elem)(grpc_call_element* elem,
148 const grpc_call_element_args* args);
149 void (*set_pollset_or_pollset_set)(grpc_call_element* elem,
150 grpc_polling_entity* pollent);
151 // Destroy per call data.
152 // The filter does not need to do any chaining.
153 // The bottom filter of a stack will be passed a non-NULL pointer to
154 // \a then_schedule_closure that should be passed to GRPC_CLOSURE_SCHED when
155 // destruction is complete. \a final_info contains data about the completed
156 // call, mainly for reporting purposes.
157 void (*destroy_call_elem)(grpc_call_element* elem,
158 const grpc_call_final_info* final_info,
159 grpc_closure* then_schedule_closure);
160
161 // sizeof(per channel data)
162 size_t sizeof_channel_data;
163 // Initialize per-channel data.
164 // elem is initialized at the creating of the channel, and elem->channel_data
165 // is what needs initializing.
166 // is_first, is_last designate this elements position in the stack, and are
167 // useful for asserting correct configuration by upper layer code.
168 // The filter does not need to do any chaining.
169 // Implementations may assume that elem->channel_data is all zeros.
170 grpc_error_handle (*init_channel_elem)(grpc_channel_element* elem,
171 grpc_channel_element_args* args);
172 // Post init per-channel data.
173 // Called after all channel elements have been successfully created.
174 void (*post_init_channel_elem)(grpc_channel_stack* stk,
175 grpc_channel_element* elem);
176 // Destroy per channel data.
177 // The filter does not need to do any chaining
178 void (*destroy_channel_elem)(grpc_channel_element* elem);
179
180 // Implement grpc_channel_get_info()
181 void (*get_channel_info)(grpc_channel_element* elem,
182 const grpc_channel_info* channel_info);
183
184 // The name of this filter
185 const char* name;
186 };
187 // A channel_element tracks its filter and the filter requested memory within
188 // a channel allocation
189 struct grpc_channel_element {
190 const grpc_channel_filter* filter;
191 void* channel_data;
192 };
193
194 // A call_element tracks its filter, the filter requested memory within
195 // a channel allocation, and the filter requested memory within a call
196 // allocation
197 struct grpc_call_element {
198 const grpc_channel_filter* filter;
199 void* channel_data;
200 void* call_data;
201 };
202
203 // A channel stack tracks a set of related filters for one channel, and
204 // guarantees they live within a single malloc() allocation
205 struct grpc_channel_stack {
206 grpc_stream_refcount refcount;
207 size_t count;
208 // Memory required for a call stack (computed at channel stack
209 // initialization)
210 size_t call_stack_size;
211 // TODO(ctiller): remove this mechanism... it's a hack to allow
212 // Channel to be separated from grpc_channel_stack's allocation. As the
213 // promise conversion continues, we'll reconsider what grpc_channel_stack
214 // should look like and this can go.
215 grpc_core::ManualConstructor<std::function<void()>> on_destroy;
216
217 grpc_core::ManualConstructor<
218 std::shared_ptr<grpc_event_engine::experimental::EventEngine>>
219 event_engine;
220
EventEnginegrpc_channel_stack221 grpc_event_engine::experimental::EventEngine* EventEngine() const {
222 return event_engine->get();
223 }
224
225 // Minimal infrastructure to act like a RefCounted thing without converting
226 // everything.
227 // It's likely that we'll want to replace grpc_channel_stack with something
228 // less regimented once the promise conversion completes, so avoiding doing a
229 // full C++-ification for now.
230 void IncrementRefCount();
231 void Unref();
Refgrpc_channel_stack232 grpc_core::RefCountedPtr<grpc_channel_stack> Ref() {
233 IncrementRefCount();
234 return grpc_core::RefCountedPtr<grpc_channel_stack>(this);
235 }
236
237 grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
238 MakeClientCallPromise(grpc_core::CallArgs call_args);
239 grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
240 MakeServerCallPromise(grpc_core::CallArgs call_args);
241 };
242
243 // A call stack tracks a set of related filters for one call, and guarantees
244 // they live within a single malloc() allocation
245 struct grpc_call_stack {
246 // shared refcount for this channel stack.
247 // MUST be the first element: the underlying code calls destroy
248 // with the address of the refcount, but higher layers prefer to think
249 // about the address of the call stack itself.
250 grpc_stream_refcount refcount;
251 size_t count;
252
253 // Minimal infrastructure to act like a RefCounted thing without converting
254 // everything.
255 // grpc_call_stack will be eliminated once the promise conversion completes.
256 void IncrementRefCount();
257 void Unref();
Refgrpc_call_stack258 grpc_core::RefCountedPtr<grpc_call_stack> Ref() {
259 IncrementRefCount();
260 return grpc_core::RefCountedPtr<grpc_call_stack>(this);
261 }
262 };
263
264 // Get a channel element given a channel stack and its index
265 grpc_channel_element* grpc_channel_stack_element(grpc_channel_stack* stack,
266 size_t i);
267 // Get the last channel element in a channel stack
268 grpc_channel_element* grpc_channel_stack_last_element(
269 grpc_channel_stack* stack);
270
271 // A utility function for a filter to determine how many other instances
272 // of the same filter exist above it in the same stack. Intended to be
273 // used in the filter's init_channel_elem() method.
274 size_t grpc_channel_stack_filter_instance_number(
275 grpc_channel_stack* channel_stack, grpc_channel_element* elem);
276
277 // Get a call stack element given a call stack and an index
278 grpc_call_element* grpc_call_stack_element(grpc_call_stack* stack, size_t i);
279
280 // Determine memory required for a channel stack containing a set of filters
281 size_t grpc_channel_stack_size(const grpc_channel_filter** filters,
282 size_t filter_count);
283 // Initialize a channel stack given some filters
284 grpc_error_handle grpc_channel_stack_init(
285 int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
286 const grpc_channel_filter** filters, size_t filter_count,
287 const grpc_core::ChannelArgs& args, const char* name,
288 grpc_channel_stack* stack);
289 // Destroy a channel stack
290 void grpc_channel_stack_destroy(grpc_channel_stack* stack);
291
292 // Initialize a call stack given a channel stack. transport_server_data is
293 // expected to be NULL on a client, or an opaque transport owned pointer on the
294 // server.
295 grpc_error_handle grpc_call_stack_init(grpc_channel_stack* channel_stack,
296 int initial_refs,
297 grpc_iomgr_cb_func destroy,
298 void* destroy_arg,
299 const grpc_call_element_args* elem_args);
300 // Set a pollset or a pollset_set for a call stack: must occur before the first
301 // op is started
302 void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack,
303 grpc_polling_entity* pollent);
304
305 #ifndef NDEBUG
306 #define GRPC_CALL_STACK_REF(call_stack, reason) \
307 grpc_stream_ref(&(call_stack)->refcount, reason)
308 #define GRPC_CALL_STACK_UNREF(call_stack, reason) \
309 grpc_stream_unref(&(call_stack)->refcount, reason)
310 #define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
311 grpc_stream_ref(&(channel_stack)->refcount, reason)
312 #define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \
313 grpc_stream_unref(&(channel_stack)->refcount, reason)
314 #else
315 #define GRPC_CALL_STACK_REF(call_stack, reason) \
316 do { \
317 grpc_stream_ref(&(call_stack)->refcount); \
318 (void)(reason); \
319 } while (0);
320 #define GRPC_CALL_STACK_UNREF(call_stack, reason) \
321 do { \
322 grpc_stream_unref(&(call_stack)->refcount); \
323 (void)(reason); \
324 } while (0);
325 #define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
326 do { \
327 grpc_stream_ref(&(channel_stack)->refcount); \
328 (void)(reason); \
329 } while (0);
330 #define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \
331 do { \
332 grpc_stream_unref(&(channel_stack)->refcount); \
333 (void)(reason); \
334 } while (0);
335 #endif
336
IncrementRefCount()337 inline void grpc_channel_stack::IncrementRefCount() {
338 GRPC_CHANNEL_STACK_REF(this, "smart_pointer");
339 }
340
Unref()341 inline void grpc_channel_stack::Unref() {
342 GRPC_CHANNEL_STACK_UNREF(this, "smart_pointer");
343 }
344
IncrementRefCount()345 inline void grpc_call_stack::IncrementRefCount() {
346 GRPC_CALL_STACK_REF(this, "smart_pointer");
347 }
348
Unref()349 inline void grpc_call_stack::Unref() {
350 GRPC_CALL_STACK_UNREF(this, "smart_pointer");
351 }
352
353 // Destroy a call stack
354 void grpc_call_stack_destroy(grpc_call_stack* stack,
355 const grpc_call_final_info* final_info,
356 grpc_closure* then_schedule_closure);
357
358 // Ignore set pollset{_set} - used by filters if they don't care about pollsets
359 // at all. Does nothing.
360 void grpc_call_stack_ignore_set_pollset_or_pollset_set(
361 grpc_call_element* elem, grpc_polling_entity* pollent);
362 // Call the next operation in a call stack
363 void grpc_call_next_op(grpc_call_element* elem,
364 grpc_transport_stream_op_batch* op);
365 // Call the next operation (depending on call directionality) in a channel
366 // stack
367 void grpc_channel_next_op(grpc_channel_element* elem, grpc_transport_op* op);
368 // Pass through a request to get_channel_info() to the next child element
369 void grpc_channel_next_get_info(grpc_channel_element* elem,
370 const grpc_channel_info* channel_info);
371
372 // Given the top element of a channel stack, get the channel stack itself
373 grpc_channel_stack* grpc_channel_stack_from_top_element(
374 grpc_channel_element* elem);
375 // Given the top element of a call stack, get the call stack itself
376 grpc_call_stack* grpc_call_stack_from_top_element(grpc_call_element* elem);
377
378 void grpc_call_log_op(const char* file, int line, gpr_log_severity severity,
379 grpc_call_element* elem,
380 grpc_transport_stream_op_batch* op);
381
382 void grpc_channel_stack_no_post_init(grpc_channel_stack* stk,
383 grpc_channel_element* elem);
384
385 extern grpc_core::TraceFlag grpc_trace_channel;
386
387 #define GRPC_CALL_LOG_OP(sev, elem, op) \
388 do { \
389 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) { \
390 grpc_call_log_op(sev, elem, op); \
391 } \
392 } while (0)
393
394 #endif // GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
395