1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <stddef.h> 21 22 #include <algorithm> 23 #include <new> 24 #include <string> 25 #include <utility> 26 27 #include "absl/strings/str_cat.h" 28 #include "absl/strings/str_format.h" 29 #include "absl/types/optional.h" 30 31 #include <grpc/support/log.h> 32 33 #include "src/core/lib/debug/trace.h" 34 #include "src/core/lib/gprpp/construct_destruct.h" 35 #include "src/core/lib/gprpp/debug_location.h" 36 #include "src/core/lib/promise/context.h" 37 #include "src/core/lib/promise/detail/promise_factory.h" 38 #include "src/core/lib/promise/poll.h" 39 #include "src/core/lib/promise/trace.h" 40 #include "src/core/lib/resource_quota/arena.h" 41 42 namespace grpc_core { 43 44 // Tracks a list of maps of T -> optional<T> via promises. 45 // When Run, runs each transformation in order, and resolves to the ulimate 46 // result. 47 // If a map resolves to nullopt, the chain is terminated and the result is 48 // nullopt. 49 // Maps can also have synchronous cleanup functions, which are guaranteed to be 50 // called at the termination of each run through the chain. 51 template <typename T> 52 class InterceptorList { 53 private: 54 // A map of T -> T via promises. 55 class Map { 56 public: Map(DebugLocation from)57 explicit Map(DebugLocation from) : from_(from) {} 58 // Construct a promise to transform x into some other value at memory. 59 virtual void MakePromise(T x, void* memory) = 0; 60 // Destroy a promise constructed at memory. 61 virtual void Destroy(void* memory) = 0; 62 // Poll a promise constructed at memory. 63 // Resolves to an optional<T> -- if nullopt it means terminate the chain and 64 // resolve. 65 virtual Poll<absl::optional<T>> PollOnce(void* memory) = 0; 66 virtual ~Map() = default; 67 68 // Update the next pointer stored with this map. 69 // This is only valid to call once, and only before the map is used. SetNext(Map * next)70 void SetNext(Map* next) { 71 GPR_DEBUG_ASSERT(next_ == nullptr); 72 next_ = next; 73 } 74 75 // Access the creation location for this map (for debug tracing). from()76 DebugLocation from() const { return from_; } 77 78 // Access the next map in the chain (or nullptr if this is the last map). next()79 Map* next() const { return next_; } 80 81 private: 82 GPR_NO_UNIQUE_ADDRESS const DebugLocation from_; 83 Map* next_ = nullptr; 84 }; 85 86 public: 87 // The result of Run: a promise that will execute the entire chain. 88 class RunPromise { 89 public: RunPromise(size_t memory_required,Map * factory,absl::optional<T> value)90 RunPromise(size_t memory_required, Map* factory, absl::optional<T> value) { 91 if (!value.has_value() || factory == nullptr) { 92 if (grpc_trace_promise_primitives.enabled()) { 93 gpr_log(GPR_DEBUG, 94 "InterceptorList::RunPromise[%p]: create immediate", this); 95 } 96 is_immediately_resolved_ = true; 97 Construct(&result_, std::move(value)); 98 } else { 99 is_immediately_resolved_ = false; 100 Construct(&async_resolution_, memory_required); 101 factory->MakePromise(std::move(*value), async_resolution_.space.get()); 102 async_resolution_.current_factory = factory; 103 if (grpc_trace_promise_primitives.enabled()) { 104 gpr_log(GPR_DEBUG, 105 "InterceptorList::RunPromise[%p]: create async; mem=%p", this, 106 async_resolution_.space.get()); 107 } 108 } 109 } 110 ~RunPromise()111 ~RunPromise() { 112 if (grpc_trace_promise_primitives.enabled()) { 113 gpr_log(GPR_DEBUG, "InterceptorList::RunPromise[%p]: destroy", this); 114 } 115 if (is_immediately_resolved_) { 116 Destruct(&result_); 117 } else { 118 if (async_resolution_.current_factory != nullptr) { 119 async_resolution_.current_factory->Destroy( 120 async_resolution_.space.get()); 121 } 122 Destruct(&async_resolution_); 123 } 124 } 125 126 RunPromise(const RunPromise&) = delete; 127 RunPromise& operator=(const RunPromise&) = delete; 128 RunPromise(RunPromise && other)129 RunPromise(RunPromise&& other) noexcept 130 : is_immediately_resolved_(other.is_immediately_resolved_) { 131 if (grpc_trace_promise_primitives.enabled()) { 132 gpr_log(GPR_DEBUG, "InterceptorList::RunPromise[%p]: move from %p", 133 this, &other); 134 } 135 if (is_immediately_resolved_) { 136 Construct(&result_, std::move(other.result_)); 137 } else { 138 Construct(&async_resolution_, std::move(other.async_resolution_)); 139 } 140 } 141 142 RunPromise& operator=(RunPromise&& other) noexcept = delete; 143 operator()144 Poll<absl::optional<T>> operator()() { 145 if (grpc_trace_promise_primitives.enabled()) { 146 gpr_log(GPR_DEBUG, "InterceptorList::RunPromise[%p]: %s", this, 147 DebugString().c_str()); 148 } 149 if (is_immediately_resolved_) return std::move(result_); 150 while (true) { 151 auto r = async_resolution_.current_factory->PollOnce( 152 async_resolution_.space.get()); 153 if (auto* p = r.value_if_ready()) { 154 async_resolution_.current_factory->Destroy( 155 async_resolution_.space.get()); 156 async_resolution_.current_factory = 157 async_resolution_.current_factory->next(); 158 if (!p->has_value()) async_resolution_.current_factory = nullptr; 159 if (grpc_trace_promise_primitives.enabled()) { 160 gpr_log(GPR_DEBUG, "InterceptorList::RunPromise[%p]: %s", this, 161 DebugString().c_str()); 162 } 163 if (async_resolution_.current_factory == nullptr) { 164 return std::move(*p); 165 } 166 async_resolution_.current_factory->MakePromise( 167 std::move(**p), async_resolution_.space.get()); 168 continue; 169 } 170 return Pending{}; 171 } 172 } 173 174 private: DebugString()175 std::string DebugString() const { 176 if (is_immediately_resolved_) { 177 return absl::StrFormat("Result:has_value:%d", result_.has_value()); 178 } else { 179 return absl::StrCat( 180 "Running:", 181 async_resolution_.current_factory == nullptr 182 ? "END" 183 : ([p = async_resolution_.current_factory->from()]() { 184 return absl::StrCat(p.file(), ":", p.line()); 185 })() 186 .c_str()); 187 } 188 } 189 struct AsyncResolution { AsyncResolutionAsyncResolution190 explicit AsyncResolution(size_t max_size) 191 : space(GetContext<Arena>()->MakePooledArray<char>(max_size)) {} 192 AsyncResolution(const AsyncResolution&) = delete; 193 AsyncResolution& operator=(const AsyncResolution&) = delete; AsyncResolutionAsyncResolution194 AsyncResolution(AsyncResolution&& other) noexcept 195 : current_factory(std::exchange(other.current_factory, nullptr)), 196 space(std::move(other.space)) {} 197 Map* current_factory; 198 Arena::PoolPtr<char[]> space; 199 }; 200 union { 201 AsyncResolution async_resolution_; 202 absl::optional<T> result_; 203 }; 204 // If true, the result_ union is valid, otherwise async_resolution_ is. 205 // Indicates whether the promise resolved immediately at construction or if 206 // additional steps were needed. 207 bool is_immediately_resolved_; 208 }; 209 210 InterceptorList() = default; 211 InterceptorList(const InterceptorList&) = delete; 212 InterceptorList& operator=(const InterceptorList&) = delete; ~InterceptorList()213 ~InterceptorList() { DeleteFactories(); } 214 Run(absl::optional<T> initial_value)215 RunPromise Run(absl::optional<T> initial_value) { 216 return RunPromise(promise_memory_required_, first_map_, 217 std::move(initial_value)); 218 } 219 220 // Append a new map to the end of the chain. 221 template <typename Fn> AppendMap(Fn fn,DebugLocation from)222 void AppendMap(Fn fn, DebugLocation from) { 223 Append(MakeMapToAdd( 224 std::move(fn), [] {}, from)); 225 } 226 227 // Prepend a new map to the beginning of the chain. 228 template <typename Fn> PrependMap(Fn fn,DebugLocation from)229 void PrependMap(Fn fn, DebugLocation from) { 230 Prepend(MakeMapToAdd( 231 std::move(fn), [] {}, from)); 232 } 233 234 // Append a new map to the end of the chain, with a cleanup function to be 235 // called at the end of run promise execution. 236 template <typename Fn, typename CleanupFn> AppendMapWithCleanup(Fn fn,CleanupFn cleanup_fn,DebugLocation from)237 void AppendMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { 238 Append(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from)); 239 } 240 241 // Prepend a new map to the beginning of the chain, with a cleanup function to 242 // be called at the end of run promise execution. 243 template <typename Fn, typename CleanupFn> PrependMapWithCleanup(Fn fn,CleanupFn cleanup_fn,DebugLocation from)244 void PrependMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { 245 Prepend(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from)); 246 } 247 248 protected: 249 // Clear the interceptor list ResetInterceptorList()250 void ResetInterceptorList() { 251 DeleteFactories(); 252 first_map_ = nullptr; 253 last_map_ = nullptr; 254 promise_memory_required_ = 0; 255 } 256 257 private: 258 template <typename Fn, typename CleanupFn> 259 class MapImpl final : public Map { 260 public: 261 using PromiseFactory = promise_detail::RepeatedPromiseFactory<T, Fn>; 262 using Promise = typename PromiseFactory::Promise; 263 MapImpl(Fn fn,CleanupFn cleanup_fn,DebugLocation from)264 explicit MapImpl(Fn fn, CleanupFn cleanup_fn, DebugLocation from) 265 : Map(from), fn_(std::move(fn)), cleanup_fn_(std::move(cleanup_fn)) {} ~MapImpl()266 ~MapImpl() override { cleanup_fn_(); } MakePromise(T x,void * memory)267 void MakePromise(T x, void* memory) override { 268 new (memory) Promise(fn_.Make(std::move(x))); 269 } Destroy(void * memory)270 void Destroy(void* memory) override { 271 static_cast<Promise*>(memory)->~Promise(); 272 } PollOnce(void * memory)273 Poll<absl::optional<T>> PollOnce(void* memory) override { 274 return poll_cast<absl::optional<T>>((*static_cast<Promise*>(memory))()); 275 } 276 277 private: 278 GPR_NO_UNIQUE_ADDRESS PromiseFactory fn_; 279 GPR_NO_UNIQUE_ADDRESS CleanupFn cleanup_fn_; 280 }; 281 282 template <typename Fn, typename CleanupFn> MakeMapToAdd(Fn fn,CleanupFn cleanup_fn,DebugLocation from)283 Map* MakeMapToAdd(Fn fn, CleanupFn cleanup_fn, DebugLocation from) { 284 using FactoryType = MapImpl<Fn, CleanupFn>; 285 promise_memory_required_ = std::max(promise_memory_required_, 286 sizeof(typename FactoryType::Promise)); 287 return GetContext<Arena>()->New<FactoryType>(std::move(fn), 288 std::move(cleanup_fn), from); 289 } 290 Append(Map * f)291 void Append(Map* f) { 292 if (first_map_ == nullptr) { 293 first_map_ = f; 294 last_map_ = f; 295 } else { 296 last_map_->SetNext(f); 297 last_map_ = f; 298 } 299 } 300 Prepend(Map * f)301 void Prepend(Map* f) { 302 if (first_map_ == nullptr) { 303 first_map_ = f; 304 last_map_ = f; 305 } else { 306 f->SetNext(first_map_); 307 first_map_ = f; 308 } 309 } 310 DeleteFactories()311 void DeleteFactories() { 312 for (auto* f = first_map_; f != nullptr;) { 313 auto* next = f->next(); 314 f->~Map(); 315 f = next; 316 } 317 } 318 319 // The first map in the chain. 320 Map* first_map_ = nullptr; 321 // The last map in the chain. 322 Map* last_map_ = nullptr; 323 // The amount of memory required to store the largest promise in the chain. 324 size_t promise_memory_required_ = 0; 325 }; 326 327 } // namespace grpc_core 328 329 #endif // GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H 330