1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stddef.h>
21 
22 #include <algorithm>
23 #include <new>
24 #include <string>
25 #include <utility>
26 
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/str_format.h"
29 #include "absl/types/optional.h"
30 
31 #include <grpc/support/log.h>
32 
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/gprpp/construct_destruct.h"
35 #include "src/core/lib/gprpp/debug_location.h"
36 #include "src/core/lib/promise/context.h"
37 #include "src/core/lib/promise/detail/promise_factory.h"
38 #include "src/core/lib/promise/poll.h"
39 #include "src/core/lib/promise/trace.h"
40 #include "src/core/lib/resource_quota/arena.h"
41 
42 namespace grpc_core {
43 
44 // Tracks a list of maps of T -> optional<T> via promises.
45 // When Run, runs each transformation in order, and resolves to the ulimate
46 // result.
47 // If a map resolves to nullopt, the chain is terminated and the result is
48 // nullopt.
49 // Maps can also have synchronous cleanup functions, which are guaranteed to be
50 // called at the termination of each run through the chain.
51 template <typename T>
52 class InterceptorList {
53  private:
54   // A map of T -> T via promises.
55   class Map {
56    public:
Map(DebugLocation from)57     explicit Map(DebugLocation from) : from_(from) {}
58     // Construct a promise to transform x into some other value at memory.
59     virtual void MakePromise(T x, void* memory) = 0;
60     // Destroy a promise constructed at memory.
61     virtual void Destroy(void* memory) = 0;
62     // Poll a promise constructed at memory.
63     // Resolves to an optional<T> -- if nullopt it means terminate the chain and
64     // resolve.
65     virtual Poll<absl::optional<T>> PollOnce(void* memory) = 0;
66     virtual ~Map() = default;
67 
68     // Update the next pointer stored with this map.
69     // This is only valid to call once, and only before the map is used.
SetNext(Map * next)70     void SetNext(Map* next) {
71       GPR_DEBUG_ASSERT(next_ == nullptr);
72       next_ = next;
73     }
74 
75     // Access the creation location for this map (for debug tracing).
from()76     DebugLocation from() const { return from_; }
77 
78     // Access the next map in the chain (or nullptr if this is the last map).
next()79     Map* next() const { return next_; }
80 
81    private:
82     GPR_NO_UNIQUE_ADDRESS const DebugLocation from_;
83     Map* next_ = nullptr;
84   };
85 
86  public:
87   // The result of Run: a promise that will execute the entire chain.
88   class RunPromise {
89    public:
RunPromise(size_t memory_required,Map * factory,absl::optional<T> value)90     RunPromise(size_t memory_required, Map* factory, absl::optional<T> value) {
91       if (!value.has_value() || factory == nullptr) {
92         if (grpc_trace_promise_primitives.enabled()) {
93           gpr_log(GPR_DEBUG,
94                   "InterceptorList::RunPromise[%p]: create immediate", this);
95         }
96         is_immediately_resolved_ = true;
97         Construct(&result_, std::move(value));
98       } else {
99         is_immediately_resolved_ = false;
100         Construct(&async_resolution_, memory_required);
101         factory->MakePromise(std::move(*value), async_resolution_.space.get());
102         async_resolution_.current_factory = factory;
103         if (grpc_trace_promise_primitives.enabled()) {
104           gpr_log(GPR_DEBUG,
105                   "InterceptorList::RunPromise[%p]: create async; mem=%p", this,
106                   async_resolution_.space.get());
107         }
108       }
109     }
110 
~RunPromise()111     ~RunPromise() {
112       if (grpc_trace_promise_primitives.enabled()) {
113         gpr_log(GPR_DEBUG, "InterceptorList::RunPromise[%p]: destroy", this);
114       }
115       if (is_immediately_resolved_) {
116         Destruct(&result_);
117       } else {
118         if (async_resolution_.current_factory != nullptr) {
119           async_resolution_.current_factory->Destroy(
120               async_resolution_.space.get());
121         }
122         Destruct(&async_resolution_);
123       }
124     }
125 
126     RunPromise(const RunPromise&) = delete;
127     RunPromise& operator=(const RunPromise&) = delete;
128 
RunPromise(RunPromise && other)129     RunPromise(RunPromise&& other) noexcept
130         : is_immediately_resolved_(other.is_immediately_resolved_) {
131       if (grpc_trace_promise_primitives.enabled()) {
132         gpr_log(GPR_DEBUG, "InterceptorList::RunPromise[%p]: move from %p",
133                 this, &other);
134       }
135       if (is_immediately_resolved_) {
136         Construct(&result_, std::move(other.result_));
137       } else {
138         Construct(&async_resolution_, std::move(other.async_resolution_));
139       }
140     }
141 
142     RunPromise& operator=(RunPromise&& other) noexcept = delete;
143 
operator()144     Poll<absl::optional<T>> operator()() {
145       if (grpc_trace_promise_primitives.enabled()) {
146         gpr_log(GPR_DEBUG, "InterceptorList::RunPromise[%p]: %s", this,
147                 DebugString().c_str());
148       }
149       if (is_immediately_resolved_) return std::move(result_);
150       while (true) {
151         auto r = async_resolution_.current_factory->PollOnce(
152             async_resolution_.space.get());
153         if (auto* p = r.value_if_ready()) {
154           async_resolution_.current_factory->Destroy(
155               async_resolution_.space.get());
156           async_resolution_.current_factory =
157               async_resolution_.current_factory->next();
158           if (!p->has_value()) async_resolution_.current_factory = nullptr;
159           if (grpc_trace_promise_primitives.enabled()) {
160             gpr_log(GPR_DEBUG, "InterceptorList::RunPromise[%p]: %s", this,
161                     DebugString().c_str());
162           }
163           if (async_resolution_.current_factory == nullptr) {
164             return std::move(*p);
165           }
166           async_resolution_.current_factory->MakePromise(
167               std::move(**p), async_resolution_.space.get());
168           continue;
169         }
170         return Pending{};
171       }
172     }
173 
174    private:
DebugString()175     std::string DebugString() const {
176       if (is_immediately_resolved_) {
177         return absl::StrFormat("Result:has_value:%d", result_.has_value());
178       } else {
179         return absl::StrCat(
180             "Running:",
181             async_resolution_.current_factory == nullptr
182                 ? "END"
183                 : ([p = async_resolution_.current_factory->from()]() {
184                     return absl::StrCat(p.file(), ":", p.line());
185                   })()
186                       .c_str());
187       }
188     }
189     struct AsyncResolution {
AsyncResolutionAsyncResolution190       explicit AsyncResolution(size_t max_size)
191           : space(GetContext<Arena>()->MakePooledArray<char>(max_size)) {}
192       AsyncResolution(const AsyncResolution&) = delete;
193       AsyncResolution& operator=(const AsyncResolution&) = delete;
AsyncResolutionAsyncResolution194       AsyncResolution(AsyncResolution&& other) noexcept
195           : current_factory(std::exchange(other.current_factory, nullptr)),
196             space(std::move(other.space)) {}
197       Map* current_factory;
198       Arena::PoolPtr<char[]> space;
199     };
200     union {
201       AsyncResolution async_resolution_;
202       absl::optional<T> result_;
203     };
204     // If true, the result_ union is valid, otherwise async_resolution_ is.
205     // Indicates whether the promise resolved immediately at construction or if
206     // additional steps were needed.
207     bool is_immediately_resolved_;
208   };
209 
210   InterceptorList() = default;
211   InterceptorList(const InterceptorList&) = delete;
212   InterceptorList& operator=(const InterceptorList&) = delete;
~InterceptorList()213   ~InterceptorList() { DeleteFactories(); }
214 
Run(absl::optional<T> initial_value)215   RunPromise Run(absl::optional<T> initial_value) {
216     return RunPromise(promise_memory_required_, first_map_,
217                       std::move(initial_value));
218   }
219 
220   // Append a new map to the end of the chain.
221   template <typename Fn>
AppendMap(Fn fn,DebugLocation from)222   void AppendMap(Fn fn, DebugLocation from) {
223     Append(MakeMapToAdd(
224         std::move(fn), [] {}, from));
225   }
226 
227   // Prepend a new map to the beginning of the chain.
228   template <typename Fn>
PrependMap(Fn fn,DebugLocation from)229   void PrependMap(Fn fn, DebugLocation from) {
230     Prepend(MakeMapToAdd(
231         std::move(fn), [] {}, from));
232   }
233 
234   // Append a new map to the end of the chain, with a cleanup function to be
235   // called at the end of run promise execution.
236   template <typename Fn, typename CleanupFn>
AppendMapWithCleanup(Fn fn,CleanupFn cleanup_fn,DebugLocation from)237   void AppendMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) {
238     Append(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from));
239   }
240 
241   // Prepend a new map to the beginning of the chain, with a cleanup function to
242   // be called at the end of run promise execution.
243   template <typename Fn, typename CleanupFn>
PrependMapWithCleanup(Fn fn,CleanupFn cleanup_fn,DebugLocation from)244   void PrependMapWithCleanup(Fn fn, CleanupFn cleanup_fn, DebugLocation from) {
245     Prepend(MakeMapToAdd(std::move(fn), std::move(cleanup_fn), from));
246   }
247 
248  protected:
249   // Clear the interceptor list
ResetInterceptorList()250   void ResetInterceptorList() {
251     DeleteFactories();
252     first_map_ = nullptr;
253     last_map_ = nullptr;
254     promise_memory_required_ = 0;
255   }
256 
257  private:
258   template <typename Fn, typename CleanupFn>
259   class MapImpl final : public Map {
260    public:
261     using PromiseFactory = promise_detail::RepeatedPromiseFactory<T, Fn>;
262     using Promise = typename PromiseFactory::Promise;
263 
MapImpl(Fn fn,CleanupFn cleanup_fn,DebugLocation from)264     explicit MapImpl(Fn fn, CleanupFn cleanup_fn, DebugLocation from)
265         : Map(from), fn_(std::move(fn)), cleanup_fn_(std::move(cleanup_fn)) {}
~MapImpl()266     ~MapImpl() override { cleanup_fn_(); }
MakePromise(T x,void * memory)267     void MakePromise(T x, void* memory) override {
268       new (memory) Promise(fn_.Make(std::move(x)));
269     }
Destroy(void * memory)270     void Destroy(void* memory) override {
271       static_cast<Promise*>(memory)->~Promise();
272     }
PollOnce(void * memory)273     Poll<absl::optional<T>> PollOnce(void* memory) override {
274       return poll_cast<absl::optional<T>>((*static_cast<Promise*>(memory))());
275     }
276 
277    private:
278     GPR_NO_UNIQUE_ADDRESS PromiseFactory fn_;
279     GPR_NO_UNIQUE_ADDRESS CleanupFn cleanup_fn_;
280   };
281 
282   template <typename Fn, typename CleanupFn>
MakeMapToAdd(Fn fn,CleanupFn cleanup_fn,DebugLocation from)283   Map* MakeMapToAdd(Fn fn, CleanupFn cleanup_fn, DebugLocation from) {
284     using FactoryType = MapImpl<Fn, CleanupFn>;
285     promise_memory_required_ = std::max(promise_memory_required_,
286                                         sizeof(typename FactoryType::Promise));
287     return GetContext<Arena>()->New<FactoryType>(std::move(fn),
288                                                  std::move(cleanup_fn), from);
289   }
290 
Append(Map * f)291   void Append(Map* f) {
292     if (first_map_ == nullptr) {
293       first_map_ = f;
294       last_map_ = f;
295     } else {
296       last_map_->SetNext(f);
297       last_map_ = f;
298     }
299   }
300 
Prepend(Map * f)301   void Prepend(Map* f) {
302     if (first_map_ == nullptr) {
303       first_map_ = f;
304       last_map_ = f;
305     } else {
306       f->SetNext(first_map_);
307       first_map_ = f;
308     }
309   }
310 
DeleteFactories()311   void DeleteFactories() {
312     for (auto* f = first_map_; f != nullptr;) {
313       auto* next = f->next();
314       f->~Map();
315       f = next;
316     }
317   }
318 
319   // The first map in the chain.
320   Map* first_map_ = nullptr;
321   // The last map in the chain.
322   Map* last_map_ = nullptr;
323   // The amount of memory required to store the largest promise in the chain.
324   size_t promise_memory_required_ = 0;
325 };
326 
327 }  // namespace grpc_core
328 
329 #endif  // GRPC_SRC_CORE_LIB_PROMISE_INTERCEPTOR_LIST_H
330