1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_MAP_PIPE_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_MAP_PIPE_H
17
18 #include <grpc/support/port_platform.h>
19
20 #include "absl/status/status.h"
21
22 #include <grpc/support/log.h>
23
24 #include "src/core/lib/debug/trace.h"
25 #include "src/core/lib/promise/detail/promise_factory.h"
26 #include "src/core/lib/promise/for_each.h"
27 #include "src/core/lib/promise/map.h"
28 #include "src/core/lib/promise/pipe.h"
29 #include "src/core/lib/promise/poll.h"
30 #include "src/core/lib/promise/trace.h"
31 #include "src/core/lib/promise/try_seq.h"
32
33 namespace grpc_core {
34
35 // Apply a (possibly async) mapping function to src, and output into dst.
36 //
37 // In psuedo-code:
38 // for each element in wait_for src.Next:
39 // x = wait_for filter_factory(element)
40 // wait_for dst.Push(x)
41 template <typename T, typename Filter>
MapPipe(PipeReceiver<T> src,PipeSender<T> dst,Filter filter_factory)42 auto MapPipe(PipeReceiver<T> src, PipeSender<T> dst, Filter filter_factory) {
43 return ForEach(
44 std::move(src),
45 [filter_factory = promise_detail::RepeatedPromiseFactory<T, Filter>(
46 std::move(filter_factory)),
47 dst = std::move(dst)](T t) mutable {
48 return TrySeq(
49 [] {
50 if (grpc_trace_promise_primitives.enabled()) {
51 gpr_log(GPR_DEBUG, "MapPipe: start map");
52 }
53 return Empty{};
54 },
55 filter_factory.Make(std::move(t)),
56 [&dst](T t) {
57 if (grpc_trace_promise_primitives.enabled()) {
58 gpr_log(GPR_DEBUG, "MapPipe: start push");
59 }
60 return Map(dst.Push(std::move(t)), [](bool successful_push) {
61 if (successful_push) {
62 return absl::OkStatus();
63 }
64 return absl::CancelledError();
65 });
66 });
67 });
68 }
69
70 // Helper to intecept a pipe and apply a mapping function.
71 // Each of the `Intercept` constructors will take a PipeSender or PipeReceiver,
72 // construct a new pipe, and then replace the passed in pipe with its new end.
73 // In this way it can interject logic per-element.
74 // Next, the TakeAndRun function will return a promise that can be run to apply
75 // a mapping promise to each element of the pipe.
76 template <typename T>
77 class PipeMapper {
78 public:
Intercept(PipeSender<T> & intercept_sender)79 static PipeMapper Intercept(PipeSender<T>& intercept_sender) {
80 PipeMapper<T> r;
81 r.interceptor_.sender.Swap(&intercept_sender);
82 return r;
83 }
84
Intercept(PipeReceiver<T> & intercept_receiver)85 static PipeMapper Intercept(PipeReceiver<T>& intercept_receiver) {
86 PipeMapper<T> r;
87 r.interceptor_.receiver.Swap(&intercept_receiver);
88 return r;
89 }
90
91 template <typename Filter>
TakeAndRun(Filter filter)92 auto TakeAndRun(Filter filter) {
93 return MapPipe(std::move(interceptor_.receiver),
94 std::move(interceptor_.sender), std::move(filter));
95 }
96
97 private:
98 PipeMapper() = default;
99 Pipe<T> interceptor_;
100 };
101
102 } // namespace grpc_core
103
104 #endif // GRPC_SRC_CORE_LIB_PROMISE_MAP_PIPE_H
105