1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_DETAIL_BASIC_SEQ_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_DETAIL_BASIC_SEQ_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <array>
21 #include <cassert>
22 #include <new>
23 #include <tuple>
24 #include <utility>
25 
26 #include "absl/meta/type_traits.h"
27 #include "absl/utility/utility.h"
28 
29 #include "src/core/lib/gprpp/construct_destruct.h"
30 #include "src/core/lib/promise/detail/promise_factory.h"
31 #include "src/core/lib/promise/detail/promise_like.h"
32 #include "src/core/lib/promise/detail/switch.h"
33 #include "src/core/lib/promise/poll.h"
34 
35 namespace grpc_core {
36 namespace promise_detail {
37 
38 // Helper for SeqState to evaluate some common types to all partial
39 // specializations.
40 template <template <typename> class Traits, typename FPromise, typename FNext>
41 struct SeqStateTypes {
42   // Our current promise.
43   using Promise = FPromise;
44   // The result of our current promise.
45   using PromiseResult = typename Promise::Result;
46   // Traits around the result of our promise.
47   using PromiseResultTraits = Traits<PromiseResult>;
48   // Wrap the factory callable in our factory wrapper to deal with common edge
49   // cases. We use the 'unwrapped type' from the traits, so for instance, TrySeq
50   // can pass back a T from a StatusOr<T>.
51   using Next = promise_detail::OncePromiseFactory<
52       typename PromiseResultTraits::UnwrappedType, FNext>;
53 };
54 
55 // One state in a sequence.
56 // A state contains the current promise, and the promise factory to turn the
57 // result of the current promise into the next state's promise. We play a shell
58 // game such that the prior state and our current promise are kept in a union,
59 // and the next promise factory is kept alongside in the state struct.
60 // Recursively this guarantees that the next functions get initialized once, and
61 // destroyed once, and don't need to be moved around in between, which avoids a
62 // potential O(n**2) loop of next factory moves had we used a variant of states
63 // here. The very first state does not have a prior state, and so that state has
64 // a partial specialization below. The final state does not have a next state;
65 // that state is inlined in BasicSeq since that was simpler to type.
66 template <template <typename> class Traits, char I, typename... Fs>
67 struct SeqState {
68   // The state evaluated before this state.
69   using PriorState = SeqState<Traits, I - 1, Fs...>;
70   // Initialization from callables.
SeqStateSeqState71   explicit SeqState(std::tuple<Fs*...> fs)
72       : next_factory(std::move(*std::get<I + 1>(fs))) {
73     new (&prior) PriorState(fs);
74   }
75   // Move constructor - assumes we're in the initial state (move prior) as it's
76   // illegal to move a promise after polling it.
SeqStateSeqState77   SeqState(SeqState&& other) noexcept
78       : next_factory(std::move(other.next_factory)) {
79     new (&prior) PriorState(std::move(other.prior));
80   }
81   // Copy constructor - assumes we're in the initial state (move prior) as it's
82   // illegal to move a promise after polling it.
SeqStateSeqState83   SeqState(const SeqState& other) : next_factory(other.next_factory) {
84     new (&prior) PriorState(std::move(other.prior));
85   }
86   // Empty destructor - we instead destruct the innards in BasicSeq manually
87   // depending on state.
~SeqStateSeqState88   ~SeqState() {}
89   // Evaluate the current promise, next promise factory types for this state.
90   // The current promise is the next promise from the prior state.
91   // The next factory callable is from the callables passed in:
92   // Fs[0] is the initial promise, Fs[1] is the state 0 next factory, Fs[2] is
93   // the state 1 next factory, etc...
94   using Types = SeqStateTypes<
95       Traits, typename PriorState::Types::Next::Promise,
96       typename std::tuple_element<I + 1, std::tuple<Fs...>>::type>;
97   // Storage for either the current promise or the prior state.
98   union {
99     // If we're in the prior state.
100     GPR_NO_UNIQUE_ADDRESS PriorState prior;
101     // The callables representing our promise.
102     GPR_NO_UNIQUE_ADDRESS typename Types::Promise current_promise;
103   };
104   // Storage for the next promise factory.
105   GPR_NO_UNIQUE_ADDRESS typename Types::Next next_factory;
106 };
107 
108 // Partial specialization of SeqState above for the first state - it has no
109 // prior state, so we take the first callable from the template arg list and use
110 // it as a promise.
111 template <template <typename> class Traits, typename... Fs>
112 struct SeqState<Traits, 0, Fs...> {
113   // Initialization from callables.
114   explicit SeqState(std::tuple<Fs*...> args)
115       : current_promise(std::move(*std::get<0>(args))),
116         next_factory(std::move(*std::get<1>(args))) {}
117   // Move constructor - it's assumed we're in this state (see above).
118   SeqState(SeqState&& other) noexcept
119       : current_promise(std::move(other.current_promise)),
120         next_factory(std::move(other.next_factory)) {}
121   // Copy constructor - it's assumed we're in this state (see above).
122   SeqState(const SeqState& other)
123       : current_promise(other.current_promise),
124         next_factory(other.next_factory) {}
125   // Empty destructor - we instead destruct the innards in BasicSeq manually
126   // depending on state.
127   ~SeqState() {}
128   // Evaluate the current promise, next promise factory types for this state.
129   // Our callable is the first element of Fs, wrapped in PromiseLike to handle
130   // some common edge cases. The next factory is the second element.
131   using Types = SeqStateTypes<
132       Traits,
133       PromiseLike<typename std::tuple_element<0, std::tuple<Fs...>>::type>,
134       typename std::tuple_element<1, std::tuple<Fs...>>::type>;
135   GPR_NO_UNIQUE_ADDRESS typename Types::Promise current_promise;
136   GPR_NO_UNIQUE_ADDRESS typename Types::Next next_factory;
137 };
138 
139 // Helper to get a specific state index.
140 // Calls the prior state, unless it's this state, in which case it returns
141 // that.
142 template <char I, template <typename> class Traits, char J, typename... Fs>
143 struct GetSeqStateInner {
144   static SeqState<Traits, I, Fs...>* f(SeqState<Traits, J, Fs...>* p) {
145     return GetSeqStateInner<I, Traits, J - 1, Fs...>::f(&p->prior);
146   }
147 };
148 
149 template <char I, template <typename> class Traits, typename... Fs>
150 struct GetSeqStateInner<I, Traits, I, Fs...> {
151   static SeqState<Traits, I, Fs...>* f(SeqState<Traits, I, Fs...>* p) {
152     return p;
153   }
154 };
155 
156 template <char I, template <typename> class Traits, char J, typename... Fs>
157 absl::enable_if_t<I <= J, SeqState<Traits, I, Fs...>*> GetSeqState(
158     SeqState<Traits, J, Fs...>* p) {
159   return GetSeqStateInner<I, Traits, J, Fs...>::f(p);
160 }
161 
162 template <template <typename> class Traits, char I, typename... Fs, typename T>
163 auto CallNext(SeqState<Traits, I, Fs...>* state, T&& arg)
164     -> decltype(SeqState<Traits, I, Fs...>::Types::PromiseResultTraits::
165                     CallFactory(&state->next_factory, std::forward<T>(arg))) {
166   return SeqState<Traits, I, Fs...>::Types::PromiseResultTraits::CallFactory(
167       &state->next_factory, std::forward<T>(arg));
168 }
169 
170 // A sequence under some traits for some set of callables Fs.
171 // Fs[0] should be a promise-like object that yields a value.
172 // Fs[1..] should be promise-factory-like objects that take the value from the
173 // previous step and yield a promise. Note that most of the machinery in
174 // PromiseFactory exists to make it possible for those promise-factory-like
175 // objects to be anything that's convenient. Traits defines how we move from one
176 // step to the next. Traits sets up the wrapping and escape handling for the
177 // sequence. Promises return wrapped values that the trait can inspect and
178 // unwrap before passing them to the next element of the sequence. The trait can
179 // also interpret a wrapped value as an escape value, which terminates
180 // evaluation of the sequence immediately yielding a result. Traits for type T
181 // have the members:
182 //  * type UnwrappedType - the type after removing wrapping from T (i.e. for
183 //    TrySeq, T=StatusOr<U> yields UnwrappedType=U).
184 //  * type WrappedType - the type after adding wrapping if it doesn't already
185 //    exist (i.e. for TrySeq if T is not Status/StatusOr/void, then
186 //    WrappedType=StatusOr<T>; if T is Status then WrappedType=Status (it's
187 //    already wrapped!))
188 //  * template <typename Next> void CallFactory(Next* next_factory, T&& value) -
189 //    call promise factory next_factory with the result of unwrapping value, and
190 //    return the resulting promise.
191 //  * template <typename Result, typename RunNext> Poll<Result>
192 //    CheckResultAndRunNext(T prior, RunNext run_next) - examine the value of
193 //    prior, and decide to escape or continue. If escaping, return the final
194 //    sequence value of type Poll<Result>. If continuing, return the value of
195 //    run_next(std::move(prior)).
196 template <template <typename> class Traits, typename... Fs>
197 class BasicSeq {
198  private:
199   // Number of states in the sequence - we'll refer to this some!
200   static constexpr char N = sizeof...(Fs);
201 
202   // Current state.
203   static_assert(N < 128, "Long sequence... please revisit BasicSeq");
204   char state_ = 0;
205   // The penultimate state contains all the preceding states too.
206   using PenultimateState = SeqState<Traits, N - 2, Fs...>;
207   // The final state is simply the final promise, which is the next promise from
208   // the penultimate state.
209   using FinalPromise = typename PenultimateState::Types::Next::Promise;
210   union {
211     GPR_NO_UNIQUE_ADDRESS PenultimateState penultimate_state_;
212     GPR_NO_UNIQUE_ADDRESS FinalPromise final_promise_;
213   };
214   using FinalPromiseResult = typename FinalPromise::Result;
215   using Result = typename Traits<FinalPromiseResult>::WrappedType;
216 
217   // Get a state by index.
218   template <char I>
219       absl::enable_if_t < I<N - 2, SeqState<Traits, I, Fs...>*> state() {
220     return GetSeqState<I>(&penultimate_state_);
221   }
222 
223   template <char I>
224   absl::enable_if_t<I == N - 2, PenultimateState*> state() {
225     return &penultimate_state_;
226   }
227 
228   // Get the next state's promise.
229   template <char I>
230   auto next_promise() -> absl::enable_if_t<
231       I != N - 2,
232       decltype(&GetSeqState<I + 1>(static_cast<PenultimateState*>(nullptr))
233                     ->current_promise)> {
234     return &GetSeqState<I + 1>(&penultimate_state_)->current_promise;
235   }
236 
237   template <char I>
238   absl::enable_if_t<I == N - 2, FinalPromise*> next_promise() {
239     return &final_promise_;
240   }
241 
242   // Callable to advance the state to the next one after I given the result from
243   // state I.
244   template <char I>
245   struct RunNext {
246     BasicSeq* s;
247     template <typename T>
248     Poll<Result> operator()(T&& value) {
249       auto* prior = s->state<I>();
250       using StateType = absl::remove_reference_t<decltype(*prior)>;
251       // Destroy the promise that just completed.
252       Destruct(&prior->current_promise);
253       // Construct the next promise by calling the next promise factory.
254       // We need to ask the traits to do this to deal with value
255       // wrapping/unwrapping.
256       auto n = StateType::Types::PromiseResultTraits::CallFactory(
257           &prior->next_factory, std::forward<T>(value));
258       // Now we also no longer need the factory, so we can destroy that.
259       Destruct(&prior->next_factory);
260       // Constructing the promise for the new state will use the memory
261       // previously occupied by the promise & next factory of the old state.
262       Construct(s->next_promise<I>(), std::move(n));
263       // Store the state counter.
264       s->state_ = I + 1;
265       // Recursively poll the new current state.
266       return s->RunState<I + 1>();
267     }
268   };
269 
270   // Poll the current state, advance it if necessary.
271   template <char I>
272   absl::enable_if_t<I != N - 1, Poll<Result>> RunState() {
273     // Get a pointer to the state object.
274     auto* s = state<I>();
275     // Poll the current promise in this state.
276     auto r = s->current_promise();
277     // If we are still pending, say so by returning.
278     if (r.pending()) return Pending();
279     // Current promise is ready, as the traits to do the next thing.
280     // That may be returning - eg if TrySeq sees an error.
281     // Or it may be by calling the callable we hand down - RunNext - which
282     // will advance the state and call the next promise.
283     return Traits<
284         typename absl::remove_reference_t<decltype(*s)>::Types::PromiseResult>::
285         template CheckResultAndRunNext<Result>(std::move(r.value()),
286                                                RunNext<I>{this});
287   }
288 
289   // Specialization of RunState to run the final state.
290   template <char I>
291   absl::enable_if_t<I == N - 1, Poll<Result>> RunState() {
292     // Poll the final promise.
293     auto r = final_promise_();
294     // If we are still pending, say so by returning.
295     if (r.pending()) return Pending();
296     // We are complete, return the (wrapped) result.
297     return Result(std::move(r.value()));
298   }
299 
300   // For state numbered I, destruct the current promise and the next promise
301   // factory, and recursively destruct the next promise factories for future
302   // states (since they also still exist).
303   template <char I>
304   absl::enable_if_t<I != N - 1, void>
305   DestructCurrentPromiseAndSubsequentFactories() {
306     Destruct(&GetSeqState<I>(&penultimate_state_)->current_promise);
307     DestructSubsequentFactories<I>();
308   }
309 
310   template <char I>
311   absl::enable_if_t<I == N - 1, void>
312   DestructCurrentPromiseAndSubsequentFactories() {
313     Destruct(&final_promise_);
314   }
315 
316   // For state I, destruct the next promise factory, and recursively the next
317   // promise factories after.
318   template <char I>
319   absl::enable_if_t<I != N - 1, void> DestructSubsequentFactories() {
320     Destruct(&GetSeqState<I>(&penultimate_state_)->next_factory);
321     DestructSubsequentFactories<I + 1>();
322   }
323 
324   template <char I>
325   absl::enable_if_t<I == N - 1, void> DestructSubsequentFactories() {}
326 
327   // Placate older compilers by wrapping RunState in a struct so that their
328   // parameter unpacking can work.
329   template <char I>
330   struct RunStateStruct {
331     BasicSeq* s;
332     Poll<Result> operator()() { return s->RunState<I>(); }
333   };
334 
335   // Similarly placate those compilers for
336   // DestructCurrentPromiseAndSubsequentFactories
337   template <char I>
338   struct DestructCurrentPromiseAndSubsequentFactoriesStruct {
339     BasicSeq* s;
340     void operator()() {
341       return s->DestructCurrentPromiseAndSubsequentFactories<I>();
342     }
343   };
344 
345   // Run the current state (and possibly later states if that one finishes).
346   // Single argument is a type that encodes the integer sequence 0, 1, 2, ...,
347   // N-1 as a type, but which uses no memory. This is used to expand out
348   // RunState instances using a template unpack to pass to Switch, which encodes
349   // a switch statement over the various cases. This ultimately gives us a
350   // Duff's device like mechanic for evaluating sequences.
351   template <char... I>
352   Poll<Result> Run(absl::integer_sequence<char, I...>) {
353     return Switch<Poll<Result>>(state_, RunStateStruct<I>{this}...);
354   }
355 
356   // Run the appropriate destructors for a given state.
357   // Single argument is a type that encodes the integer sequence 0, 1, 2, ...,
358   // N-1 as a type, but which uses no memory. This is used to expand out
359   // DestructCurrentPromiseAndSubsequentFactories instances to pass to Switch,
360   // which can choose the correct instance at runtime to destroy everything.
361   template <char... I>
362   void RunDestruct(absl::integer_sequence<char, I...>) {
363     Switch<void>(
364         state_, DestructCurrentPromiseAndSubsequentFactoriesStruct<I>{this}...);
365   }
366 
367  public:
368   // Construct a sequence given the callables that will control it.
369   explicit BasicSeq(Fs... fs) : penultimate_state_(std::make_tuple(&fs...)) {}
370   // No assignment... we don't need it (but if we ever find a good case, then
371   // it's ok to implement).
372   BasicSeq& operator=(const BasicSeq&) = delete;
373   // Copy construction - only for state 0.
374   // It's illegal to copy a Promise after polling it - if we are in state>0 we
375   // *must* have been polled.
376   BasicSeq(const BasicSeq& other) {
377     assert(other.state_ == 0);
378     new (&penultimate_state_) PenultimateState(other.penultimate_state_);
379   }
380   // Move construction - only for state 0.
381   // It's illegal to copy a Promise after polling it - if we are in state>0 we
382   // *must* have been polled.
383   BasicSeq(BasicSeq&& other) noexcept {
384     assert(other.state_ == 0);
385     new (&penultimate_state_)
386         PenultimateState(std::move(other.penultimate_state_));
387   }
388   // Destruct based on current state.
389   ~BasicSeq() { RunDestruct(absl::make_integer_sequence<char, N>()); }
390 
391   // Poll the sequence once.
392   Poll<Result> operator()() {
393     return Run(absl::make_integer_sequence<char, N>());
394   }
395 };
396 
397 // As above, but models a sequence of unknown size
398 // At each element, the accumulator A and the current value V is passed to some
399 // function of type IterTraits::Factory as f(V, IterTraits::Argument); f is
400 // expected to return a promise that resolves to IterTraits::Wrapped.
401 template <class IterTraits>
402 class BasicSeqIter {
403  private:
404   using Traits = typename IterTraits::Traits;
405   using Iter = typename IterTraits::Iter;
406   using Factory = typename IterTraits::Factory;
407   using Argument = typename IterTraits::Argument;
408   using IterValue = typename IterTraits::IterValue;
409   using StateCreated = typename IterTraits::StateCreated;
410   using State = typename IterTraits::State;
411   using Wrapped = typename IterTraits::Wrapped;
412 
413  public:
414   BasicSeqIter(Iter begin, Iter end, Factory f, Argument arg)
415       : cur_(begin), end_(end), f_(std::move(f)) {
416     if (cur_ == end_) {
417       Construct(&result_, std::move(arg));
418     } else {
419       Construct(&state_, f_(*cur_, std::move(arg)));
420     }
421   }
422 
423   ~BasicSeqIter() {
424     if (cur_ == end_) {
425       Destruct(&result_);
426     } else {
427       Destruct(&state_);
428     }
429   }
430 
431   BasicSeqIter(const BasicSeqIter& other) = delete;
432   BasicSeqIter& operator=(const BasicSeqIter&) = delete;
433 
434   BasicSeqIter(BasicSeqIter&& other) noexcept
435       : cur_(other.cur_), end_(other.end_), f_(std::move(other.f_)) {
436     if (cur_ == end_) {
437       Construct(&result_, std::move(other.result_));
438     } else {
439       Construct(&state_, std::move(other.state_));
440     }
441   }
442   BasicSeqIter& operator=(BasicSeqIter&& other) noexcept {
443     cur_ = other.cur_;
444     end_ = other.end_;
445     if (cur_ == end_) {
446       Construct(&result_, std::move(other.result_));
447     } else {
448       Construct(&state_, std::move(other.state_));
449     }
450     return *this;
451   }
452 
453   Poll<Wrapped> operator()() {
454     if (cur_ == end_) {
455       return std::move(result_);
456     }
457     return PollNonEmpty();
458   }
459 
460  private:
461   Poll<Wrapped> PollNonEmpty() {
462     Poll<Wrapped> r = state_();
463     if (r.pending()) return r;
464     return Traits::template CheckResultAndRunNext<Wrapped>(
465         std::move(r.value()), [this](Wrapped arg) -> Poll<Wrapped> {
466           auto next = cur_;
467           ++next;
468           if (next == end_) {
469             return std::move(arg);
470           }
471           cur_ = next;
472           state_.~State();
473           Construct(&state_,
474                     Traits::template CallSeqFactory(f_, *cur_, std::move(arg)));
475           return PollNonEmpty();
476         });
477   }
478 
479   Iter cur_;
480   const Iter end_;
481   GPR_NO_UNIQUE_ADDRESS Factory f_;
482   union {
483     GPR_NO_UNIQUE_ADDRESS State state_;
484     GPR_NO_UNIQUE_ADDRESS Argument result_;
485   };
486 };
487 
488 }  // namespace promise_detail
489 }  // namespace grpc_core
490 
491 #endif  // GRPC_SRC_CORE_LIB_PROMISE_DETAIL_BASIC_SEQ_H
492