1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/surface/channel_init.h"
22
23 #include <string.h>
24
25 #include <algorithm>
26 #include <map>
27 #include <set>
28 #include <string>
29 #include <type_traits>
30
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/str_join.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/optional.h"
35
36 #include <grpc/support/log.h>
37
38 #include "src/core/lib/channel/channel_stack_trace.h"
39 #include "src/core/lib/debug/trace.h"
40 #include "src/core/lib/gprpp/crash.h"
41 #include "src/core/lib/gprpp/sync.h"
42 #include "src/core/lib/surface/channel_stack_type.h"
43
44 namespace grpc_core {
45
46 const char* (*NameFromChannelFilter)(const grpc_channel_filter*);
47
48 namespace {
49 struct CompareChannelFiltersByName {
operator ()grpc_core::__anona21a48c80111::CompareChannelFiltersByName50 bool operator()(const grpc_channel_filter* a,
51 const grpc_channel_filter* b) const {
52 return strcmp(NameFromChannelFilter(a), NameFromChannelFilter(b)) < 0;
53 }
54 };
55 } // namespace
56
After(std::initializer_list<const grpc_channel_filter * > filters)57 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::After(
58 std::initializer_list<const grpc_channel_filter*> filters) {
59 for (auto filter : filters) {
60 after_.push_back(filter);
61 }
62 return *this;
63 }
64
Before(std::initializer_list<const grpc_channel_filter * > filters)65 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::Before(
66 std::initializer_list<const grpc_channel_filter*> filters) {
67 for (auto filter : filters) {
68 before_.push_back(filter);
69 }
70 return *this;
71 }
72
If(InclusionPredicate predicate)73 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::If(
74 InclusionPredicate predicate) {
75 predicates_.emplace_back(std::move(predicate));
76 return *this;
77 }
78
IfNot(InclusionPredicate predicate)79 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::IfNot(
80 InclusionPredicate predicate) {
81 predicates_.emplace_back(
82 [predicate = std::move(predicate)](const ChannelArgs& args) {
83 return !predicate(args);
84 });
85 return *this;
86 }
87
88 ChannelInit::FilterRegistration&
IfHasChannelArg(const char * arg)89 ChannelInit::FilterRegistration::IfHasChannelArg(const char* arg) {
90 return If([arg](const ChannelArgs& args) { return args.Contains(arg); });
91 }
92
IfChannelArg(const char * arg,bool default_value)93 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::IfChannelArg(
94 const char* arg, bool default_value) {
95 return If([arg, default_value](const ChannelArgs& args) {
96 return args.GetBool(arg).value_or(default_value);
97 });
98 }
99
100 ChannelInit::FilterRegistration&
ExcludeFromMinimalStack()101 ChannelInit::FilterRegistration::ExcludeFromMinimalStack() {
102 return If([](const ChannelArgs& args) { return !args.WantMinimalStack(); });
103 }
104
RegisterFilter(grpc_channel_stack_type type,const grpc_channel_filter * filter,const ChannelFilterVtable * vtable,SourceLocation registration_source)105 ChannelInit::FilterRegistration& ChannelInit::Builder::RegisterFilter(
106 grpc_channel_stack_type type, const grpc_channel_filter* filter,
107 const ChannelFilterVtable* vtable, SourceLocation registration_source) {
108 filters_[type].emplace_back(std::make_unique<FilterRegistration>(
109 filter, vtable, registration_source));
110 return *filters_[type].back();
111 }
112
BuildStackConfig(const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>> & registrations,PostProcessor * post_processors,grpc_channel_stack_type type)113 ChannelInit::StackConfig ChannelInit::BuildStackConfig(
114 const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
115 registrations,
116 PostProcessor* post_processors, grpc_channel_stack_type type) {
117 // Phase 1: Build a map from filter to the set of filters that must be
118 // initialized before it.
119 // We order this map (and the set of dependent filters) by filter name to
120 // ensure algorithm ordering stability is deterministic for a given build.
121 // We should not require this, but at the time of writing it's expected that
122 // this will help overall stability.
123 using F = const grpc_channel_filter*;
124 std::map<F, FilterRegistration*> filter_to_registration;
125 using DependencyMap = std::map<F, std::set<F, CompareChannelFiltersByName>,
126 CompareChannelFiltersByName>;
127 DependencyMap dependencies;
128 std::vector<Filter> terminal_filters;
129 for (const auto& registration : registrations) {
130 if (filter_to_registration.count(registration->filter_) > 0) {
131 const auto first =
132 filter_to_registration[registration->filter_]->registration_source_;
133 const auto second = registration->registration_source_;
134 Crash(absl::StrCat("Duplicate registration of channel filter ",
135 NameFromChannelFilter(registration->filter_),
136 "\nfirst: ", first.file(), ":", first.line(),
137 "\nsecond: ", second.file(), ":", second.line()));
138 }
139 filter_to_registration[registration->filter_] = registration.get();
140 if (registration->terminal_) {
141 GPR_ASSERT(registration->after_.empty());
142 GPR_ASSERT(registration->before_.empty());
143 GPR_ASSERT(!registration->before_all_);
144 terminal_filters.emplace_back(
145 registration->filter_, nullptr, std::move(registration->predicates_),
146 registration->skip_v3_, registration->registration_source_);
147 } else {
148 dependencies[registration->filter_]; // Ensure it's in the map.
149 }
150 }
151 for (const auto& registration : registrations) {
152 if (registration->terminal_) continue;
153 GPR_ASSERT(filter_to_registration.count(registration->filter_) > 0);
154 for (F after : registration->after_) {
155 if (filter_to_registration.count(after) == 0) {
156 gpr_log(
157 GPR_DEBUG, "%s",
158 absl::StrCat(
159 "Filter ", NameFromChannelFilter(after),
160 " not registered, but is referenced in the after clause of ",
161 NameFromChannelFilter(registration->filter_),
162 " when building channel stack ",
163 grpc_channel_stack_type_string(type))
164 .c_str());
165 continue;
166 }
167 dependencies[registration->filter_].insert(after);
168 }
169 for (F before : registration->before_) {
170 if (filter_to_registration.count(before) == 0) {
171 gpr_log(
172 GPR_DEBUG, "%s",
173 absl::StrCat(
174 "Filter ", NameFromChannelFilter(before),
175 " not registered, but is referenced in the before clause of ",
176 NameFromChannelFilter(registration->filter_),
177 " when building channel stack ",
178 grpc_channel_stack_type_string(type))
179 .c_str());
180 continue;
181 }
182 dependencies[before].insert(registration->filter_);
183 }
184 if (registration->before_all_) {
185 for (const auto& other : registrations) {
186 if (other.get() == registration.get()) continue;
187 if (other->terminal_) continue;
188 dependencies[other->filter_].insert(registration->filter_);
189 }
190 }
191 }
192 // Phase 2: Build a list of filters in dependency order.
193 // We can simply iterate through and add anything with no dependency.
194 // We then remove that filter from the dependency list of all other filters.
195 // We repeat until we have no more filters to add.
196 auto build_remaining_dependency_graph =
197 [](const DependencyMap& dependencies) {
198 std::string result;
199 for (const auto& p : dependencies) {
200 absl::StrAppend(&result, NameFromChannelFilter(p.first), " ->");
201 for (const auto& d : p.second) {
202 absl::StrAppend(&result, " ", NameFromChannelFilter(d));
203 }
204 absl::StrAppend(&result, "\n");
205 }
206 return result;
207 };
208 const DependencyMap original = dependencies;
209 auto take_ready_dependency = [&]() {
210 for (auto it = dependencies.begin(); it != dependencies.end(); ++it) {
211 if (it->second.empty()) {
212 auto r = it->first;
213 dependencies.erase(it);
214 return r;
215 }
216 }
217 Crash(absl::StrCat(
218 "Unresolvable graph of channel filters - remaining graph:\n",
219 build_remaining_dependency_graph(dependencies), "original:\n",
220 build_remaining_dependency_graph(original)));
221 };
222 std::vector<Filter> filters;
223 while (!dependencies.empty()) {
224 auto filter = take_ready_dependency();
225 auto* registration = filter_to_registration[filter];
226 filters.emplace_back(
227 filter, registration->vtable_, std::move(registration->predicates_),
228 registration->skip_v3_, registration->registration_source_);
229 for (auto& p : dependencies) {
230 p.second.erase(filter);
231 }
232 }
233 // Collect post processors that need to be applied.
234 // We've already ensured the one-per-slot constraint, so now we can just
235 // collect everything up into a vector and run it in order.
236 std::vector<PostProcessor> post_processor_functions;
237 for (int i = 0; i < static_cast<int>(PostProcessorSlot::kCount); i++) {
238 if (post_processors[i] == nullptr) continue;
239 post_processor_functions.emplace_back(std::move(post_processors[i]));
240 }
241 // Log out the graph we built if that's been requested.
242 if (grpc_trace_channel_stack.enabled()) {
243 // It can happen that multiple threads attempt to construct a core config at
244 // once.
245 // This is benign - the first one wins and others are discarded.
246 // However, it messes up our logging and makes it harder to reason about the
247 // graph, so we add some protection here.
248 static Mutex* const m = new Mutex();
249 MutexLock lock(m);
250 // List the channel stack type (since we'll be repeatedly printing graphs in
251 // this loop).
252 gpr_log(GPR_INFO,
253 "ORDERED CHANNEL STACK %s:", grpc_channel_stack_type_string(type));
254 // First build up a map of filter -> file:line: strings, because it helps
255 // the readability of this log to get later fields aligned vertically.
256 std::map<const grpc_channel_filter*, std::string> loc_strs;
257 size_t max_loc_str_len = 0;
258 size_t max_filter_name_len = 0;
259 auto add_loc_str = [&max_loc_str_len, &loc_strs, &filter_to_registration,
260 &max_filter_name_len](
261 const grpc_channel_filter* filter) {
262 max_filter_name_len =
263 std::max(strlen(NameFromChannelFilter(filter)), max_filter_name_len);
264 const auto registration =
265 filter_to_registration[filter]->registration_source_;
266 absl::string_view file = registration.file();
267 auto slash_pos = file.rfind('/');
268 if (slash_pos != file.npos) {
269 file = file.substr(slash_pos + 1);
270 }
271 auto loc_str = absl::StrCat(file, ":", registration.line(), ":");
272 max_loc_str_len = std::max(max_loc_str_len, loc_str.length());
273 loc_strs.emplace(filter, std::move(loc_str));
274 };
275 for (const auto& filter : filters) {
276 add_loc_str(filter.filter);
277 }
278 for (const auto& terminal : terminal_filters) {
279 add_loc_str(terminal.filter);
280 }
281 for (auto& loc_str : loc_strs) {
282 loc_str.second = absl::StrCat(
283 loc_str.second,
284 std::string(max_loc_str_len + 2 - loc_str.second.length(), ' '));
285 }
286 // For each regular filter, print the location registered, the name of the
287 // filter, and if it needed to occur after some other filters list those
288 // filters too.
289 // Note that we use the processed after list here - earlier we turned Before
290 // registrations into After registrations and we used those converted
291 // registrations to build the final ordering.
292 // If you're trying to track down why 'A' is listed as after 'B', look at
293 // the following:
294 // - If A is registered with .After({B}), then A will be 'after' B here.
295 // - If B is registered with .Before({A}), then A will be 'after' B here.
296 // - If B is registered as BeforeAll, then A will be 'after' B here.
297 for (const auto& filter : filters) {
298 auto dep_it = original.find(filter.filter);
299 std::string after_str;
300 if (dep_it != original.end() && !dep_it->second.empty()) {
301 after_str = absl::StrCat(
302 std::string(max_filter_name_len + 1 -
303 strlen(NameFromChannelFilter(filter.filter)),
304 ' '),
305 "after ",
306 absl::StrJoin(
307 dep_it->second, ", ",
308 [](std::string* out, const grpc_channel_filter* filter) {
309 out->append(NameFromChannelFilter(filter));
310 }));
311 }
312 const auto filter_str =
313 absl::StrCat(" ", loc_strs[filter.filter],
314 NameFromChannelFilter(filter.filter), after_str);
315 gpr_log(GPR_INFO, "%s", filter_str.c_str());
316 }
317 // Finally list out the terminal filters and where they were registered
318 // from.
319 for (const auto& terminal : terminal_filters) {
320 const auto filter_str = absl::StrCat(
321 " ", loc_strs[terminal.filter],
322 NameFromChannelFilter(terminal.filter),
323 std::string(max_filter_name_len + 1 -
324 strlen(NameFromChannelFilter(terminal.filter)),
325 ' '),
326 "[terminal]");
327 gpr_log(GPR_INFO, "%s", filter_str.c_str());
328 }
329 }
330 // Check if there are no terminal filters: this would be an error.
331 // GRPC_CLIENT_DYNAMIC stacks don't use this mechanism, so we don't check that
332 // condition here.
333 // Right now we only log: many tests end up with a core configuration that
334 // is invalid.
335 // TODO(ctiller): evaluate if we can turn this into a crash one day.
336 // Right now it forces too many tests to know about channel initialization,
337 // either by supplying a valid configuration or by including an opt-out flag.
338 if (terminal_filters.empty() && type != GRPC_CLIENT_DYNAMIC) {
339 gpr_log(
340 GPR_ERROR,
341 "No terminal filters registered for channel stack type %s; this is "
342 "common for unit tests messing with CoreConfiguration, but will result "
343 "in a ChannelInit::CreateStack that never completes successfully.",
344 grpc_channel_stack_type_string(type));
345 }
346 return StackConfig{std::move(filters), std::move(terminal_filters),
347 std::move(post_processor_functions)};
348 };
349
Build()350 ChannelInit ChannelInit::Builder::Build() {
351 ChannelInit result;
352 for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
353 result.stack_configs_[i] =
354 BuildStackConfig(filters_[i], post_processors_[i],
355 static_cast<grpc_channel_stack_type>(i));
356 }
357 return result;
358 }
359
CheckPredicates(const ChannelArgs & args) const360 bool ChannelInit::Filter::CheckPredicates(const ChannelArgs& args) const {
361 for (const auto& predicate : predicates) {
362 if (!predicate(args)) return false;
363 }
364 return true;
365 }
366
CreateStack(ChannelStackBuilder * builder) const367 bool ChannelInit::CreateStack(ChannelStackBuilder* builder) const {
368 const auto& stack_config = stack_configs_[builder->channel_stack_type()];
369 for (const auto& filter : stack_config.filters) {
370 if (!filter.CheckPredicates(builder->channel_args())) continue;
371 builder->AppendFilter(filter.filter);
372 }
373 int found_terminators = 0;
374 for (const auto& terminator : stack_config.terminators) {
375 if (!terminator.CheckPredicates(builder->channel_args())) continue;
376 builder->AppendFilter(terminator.filter);
377 ++found_terminators;
378 }
379 if (found_terminators != 1) {
380 std::string error = absl::StrCat(
381 found_terminators,
382 " terminating filters found creating a channel of type ",
383 grpc_channel_stack_type_string(builder->channel_stack_type()),
384 " with arguments ", builder->channel_args().ToString(),
385 " (we insist upon one and only one terminating "
386 "filter)\n");
387 if (stack_config.terminators.empty()) {
388 absl::StrAppend(&error, " No terminal filters were registered");
389 } else {
390 for (const auto& terminator : stack_config.terminators) {
391 absl::StrAppend(
392 &error, " ", NameFromChannelFilter(terminator.filter),
393 " registered @ ", terminator.registration_source.file(), ":",
394 terminator.registration_source.line(), ": enabled = ",
395 terminator.CheckPredicates(builder->channel_args()) ? "true"
396 : "false",
397 "\n");
398 }
399 }
400 gpr_log(GPR_ERROR, "%s", error.c_str());
401 return false;
402 }
403 for (const auto& post_processor : stack_config.post_processors) {
404 post_processor(*builder);
405 }
406 return true;
407 }
408
CreateStackSegment(grpc_channel_stack_type type,const ChannelArgs & args) const409 absl::StatusOr<ChannelInit::StackSegment> ChannelInit::CreateStackSegment(
410 grpc_channel_stack_type type, const ChannelArgs& args) const {
411 const auto& stack_config = stack_configs_[type];
412 std::vector<StackSegment::ChannelFilter> filters;
413 size_t channel_data_size = 0;
414 size_t channel_data_alignment = 0;
415 // Based on predicates build a list of filters to include in this segment.
416 for (const auto& filter : stack_config.filters) {
417 if (filter.skip_v3) continue;
418 if (!filter.CheckPredicates(args)) continue;
419 if (filter.vtable == nullptr) {
420 return absl::InvalidArgumentError(
421 absl::StrCat("Filter ", NameFromChannelFilter(filter.filter),
422 " has no v3-callstack vtable"));
423 }
424 channel_data_alignment =
425 std::max(channel_data_alignment, filter.vtable->alignment);
426 if (channel_data_size % filter.vtable->alignment != 0) {
427 channel_data_size += filter.vtable->alignment -
428 (channel_data_size % filter.vtable->alignment);
429 }
430 filters.push_back({channel_data_size, filter.vtable});
431 channel_data_size += filter.vtable->size;
432 }
433 // Shortcut for empty segments.
434 if (filters.empty()) return StackSegment();
435 // Allocate memory for the channel data, initialize channel filters into it.
436 uint8_t* p = static_cast<uint8_t*>(
437 gpr_malloc_aligned(channel_data_size, channel_data_alignment));
438 for (size_t i = 0; i < filters.size(); i++) {
439 auto r = filters[i].vtable->init(p + filters[i].offset, args);
440 if (!r.ok()) {
441 for (size_t j = 0; j < i; j++) {
442 filters[j].vtable->destroy(p + filters[j].offset);
443 }
444 gpr_free_aligned(p);
445 return r;
446 }
447 }
448 return StackSegment(std::move(filters), p);
449 }
450
451 ///////////////////////////////////////////////////////////////////////////////
452 // ChannelInit::StackSegment
453
StackSegment(std::vector<ChannelFilter> filters,uint8_t * channel_data)454 ChannelInit::StackSegment::StackSegment(std::vector<ChannelFilter> filters,
455 uint8_t* channel_data)
456 : data_(MakeRefCounted<ChannelData>(std::move(filters), channel_data)) {}
457
AddToCallFilterStack(CallFilters::StackBuilder & builder)458 void ChannelInit::StackSegment::AddToCallFilterStack(
459 CallFilters::StackBuilder& builder) {
460 if (data_ == nullptr) return;
461 data_->AddToCallFilterStack(builder);
462 builder.AddOwnedObject(data_);
463 };
464
ChannelData(std::vector<ChannelFilter> filters,uint8_t * channel_data)465 ChannelInit::StackSegment::ChannelData::ChannelData(
466 std::vector<ChannelFilter> filters, uint8_t* channel_data)
467 : filters_(std::move(filters)), channel_data_(channel_data) {}
468
AddToCallFilterStack(CallFilters::StackBuilder & builder)469 void ChannelInit::StackSegment::ChannelData::AddToCallFilterStack(
470 CallFilters::StackBuilder& builder) {
471 for (const auto& filter : filters_) {
472 filter.vtable->add_to_stack_builder(channel_data_ + filter.offset, builder);
473 }
474 }
475
~ChannelData()476 ChannelInit::StackSegment::ChannelData::~ChannelData() {
477 for (const auto& filter : filters_) {
478 filter.vtable->destroy(channel_data_ + filter.offset);
479 }
480 gpr_free_aligned(channel_data_);
481 }
482
483 } // namespace grpc_core
484