1 // Copyright 2021 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #include "src/core/lib/event_engine/default_event_engine.h"
17 
18 #include <atomic>
19 #include <memory>
20 #include <utility>
21 
22 #include "absl/functional/any_invocable.h"
23 
24 #include <grpc/event_engine/event_engine.h>
25 
26 #include "src/core/lib/channel/channel_args.h"
27 #include "src/core/lib/config/core_configuration.h"
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/event_engine/default_event_engine_factory.h"
30 #include "src/core/lib/event_engine/trace.h"
31 #include "src/core/lib/gprpp/debug_location.h"
32 #include "src/core/lib/gprpp/no_destruct.h"
33 #include "src/core/lib/gprpp/sync.h"
34 
35 #ifdef GRPC_MAXIMIZE_THREADYNESS
36 #include "src/core/lib/event_engine/thready_event_engine/thready_event_engine.h"  // IWYU pragma: keep
37 #endif
38 
39 namespace grpc_event_engine {
40 namespace experimental {
41 
42 namespace {
43 std::atomic<absl::AnyInvocable<std::unique_ptr<EventEngine>()>*>
44     g_event_engine_factory{nullptr};
45 grpc_core::NoDestruct<grpc_core::Mutex> g_mu;
46 grpc_core::NoDestruct<std::weak_ptr<EventEngine>> g_event_engine;
47 }  // namespace
48 
SetEventEngineFactory(absl::AnyInvocable<std::unique_ptr<EventEngine> ()> factory)49 void SetEventEngineFactory(
50     absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory) {
51   delete g_event_engine_factory.exchange(
52       new absl::AnyInvocable<std::unique_ptr<EventEngine>()>(
53           std::move(factory)));
54   // Forget any previous EventEngines
55   grpc_core::MutexLock lock(&*g_mu);
56   g_event_engine->reset();
57 }
58 
EventEngineFactoryReset()59 void EventEngineFactoryReset() {
60   delete g_event_engine_factory.exchange(nullptr);
61   g_event_engine->reset();
62 }
63 
CreateEventEngineInner()64 std::unique_ptr<EventEngine> CreateEventEngineInner() {
65   if (auto* factory = g_event_engine_factory.load()) {
66     return (*factory)();
67   }
68   return DefaultEventEngineFactory();
69 }
70 
CreateEventEngine()71 std::unique_ptr<EventEngine> CreateEventEngine() {
72 #ifdef GRPC_MAXIMIZE_THREADYNESS
73   return std::make_unique<ThreadyEventEngine>(CreateEventEngineInner());
74 #else
75   return CreateEventEngineInner();
76 #endif
77 }
78 
GetDefaultEventEngine(grpc_core::SourceLocation location)79 std::shared_ptr<EventEngine> GetDefaultEventEngine(
80     grpc_core::SourceLocation location) {
81   grpc_core::MutexLock lock(&*g_mu);
82   if (std::shared_ptr<EventEngine> engine = g_event_engine->lock()) {
83     GRPC_EVENT_ENGINE_TRACE(
84         "Returning existing EventEngine::%p. use_count:%ld. Called from "
85         "[%s:%d]",
86         engine.get(), engine.use_count(), location.file(), location.line());
87     return engine;
88   }
89   std::shared_ptr<EventEngine> engine{CreateEventEngine()};
90   GRPC_EVENT_ENGINE_TRACE("Created DefaultEventEngine::%p. Called from [%s:%d]",
91                           engine.get(), location.file(), location.line());
92   *g_event_engine = engine;
93   return engine;
94 }
95 
96 namespace {
EnsureEventEngineInChannelArgs(grpc_core::ChannelArgs args)97 grpc_core::ChannelArgs EnsureEventEngineInChannelArgs(
98     grpc_core::ChannelArgs args) {
99   if (args.ContainsObject<EventEngine>()) return args;
100   return args.SetObject<EventEngine>(GetDefaultEventEngine());
101 }
102 }  // namespace
103 
RegisterEventEngineChannelArgPreconditioning(grpc_core::CoreConfiguration::Builder * builder)104 void RegisterEventEngineChannelArgPreconditioning(
105     grpc_core::CoreConfiguration::Builder* builder) {
106   builder->channel_args_preconditioning()->RegisterStage(
107       grpc_event_engine::experimental::EnsureEventEngineInChannelArgs);
108 }
109 
110 }  // namespace experimental
111 }  // namespace grpc_event_engine
112