xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/promise/inter_activity_pipe.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stdint.h>
21 
22 #include <array>
23 #include <utility>
24 
25 #include "absl/base/thread_annotations.h"
26 #include "absl/types/optional.h"
27 
28 #include "src/core/lib/gprpp/ref_counted.h"
29 #include "src/core/lib/gprpp/ref_counted_ptr.h"
30 #include "src/core/lib/gprpp/sync.h"
31 #include "src/core/lib/promise/activity.h"
32 #include "src/core/lib/promise/poll.h"
33 
34 namespace grpc_core {
35 
36 template <typename T, uint8_t kQueueSize>
37 class InterActivityPipe {
38  public:
39   class NextResult {
40    public:
41     template <typename... Args>
NextResult(Args &&...args)42     explicit NextResult(Args&&... args) : value_(std::forward<Args>(args)...) {}
43     using value_type = T;
reset()44     void reset() { value_.reset(); }
cancelled()45     bool cancelled() const { return false; }
has_value()46     bool has_value() const { return value_.has_value(); }
value()47     const T& value() const { return value_.value(); }
value()48     T& value() { return value_.value(); }
49     const T& operator*() const { return *value_; }
50     T& operator*() { return *value_; }
51 
52    private:
53     absl::optional<T> value_;
54   };
55 
56  private:
57   class Center : public RefCounted<Center, NonPolymorphicRefCount> {
58    public:
Push(T & value)59     Poll<bool> Push(T& value) {
60       ReleasableMutexLock lock(&mu_);
61       if (closed_) return false;
62       if (count_ == kQueueSize) {
63         on_available_ = GetContext<Activity>()->MakeNonOwningWaker();
64         return Pending{};
65       }
66       queue_[(first_ + count_) % kQueueSize] = std::move(value);
67       ++count_;
68       if (count_ == 1) {
69         auto on_occupied = std::move(on_occupied_);
70         lock.Release();
71         on_occupied.Wakeup();
72       }
73       return true;
74     }
75 
Next()76     Poll<NextResult> Next() {
77       ReleasableMutexLock lock(&mu_);
78       if (count_ == 0) {
79         if (closed_) return absl::nullopt;
80         on_occupied_ = GetContext<Activity>()->MakeNonOwningWaker();
81         return Pending{};
82       }
83       auto value = std::move(queue_[first_]);
84       first_ = (first_ + 1) % kQueueSize;
85       --count_;
86       if (count_ == kQueueSize - 1) {
87         auto on_available = std::move(on_available_);
88         lock.Release();
89         on_available.Wakeup();
90       }
91       return std::move(value);
92     }
93 
MarkClosed()94     void MarkClosed() {
95       ReleasableMutexLock lock(&mu_);
96       if (std::exchange(closed_, true)) return;
97       auto on_occupied = std::move(on_occupied_);
98       auto on_available = std::move(on_available_);
99       lock.Release();
100       on_occupied.Wakeup();
101       on_available.Wakeup();
102     }
103 
IsClosed()104     bool IsClosed() {
105       MutexLock lock(&mu_);
106       return closed_;
107     }
108 
109    private:
110     Mutex mu_;
111     std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_);
112     bool closed_ ABSL_GUARDED_BY(mu_) = false;
113     uint8_t first_ ABSL_GUARDED_BY(mu_) = 0;
114     uint8_t count_ ABSL_GUARDED_BY(mu_) = 0;
115     Waker on_occupied_ ABSL_GUARDED_BY(mu_);
116     Waker on_available_ ABSL_GUARDED_BY(mu_);
117   };
118   RefCountedPtr<Center> center_{MakeRefCounted<Center>()};
119 
120  public:
121   class Sender {
122    public:
Sender(RefCountedPtr<Center> center)123     explicit Sender(RefCountedPtr<Center> center)
124         : center_(std::move(center)) {}
125     Sender(const Sender&) = delete;
126     Sender& operator=(const Sender&) = delete;
127     Sender(Sender&&) noexcept = default;
128     Sender& operator=(Sender&&) noexcept = default;
129 
~Sender()130     ~Sender() {
131       if (center_ != nullptr) center_->MarkClosed();
132     }
133 
IsClosed()134     bool IsClosed() { return center_->IsClosed(); }
135 
MarkClosed()136     void MarkClosed() {
137       if (center_ != nullptr) center_->MarkClosed();
138     }
139 
Push(T value)140     auto Push(T value) {
141       return [center = center_, value = std::move(value)]() mutable {
142         return center->Push(value);
143       };
144     }
145 
146    private:
147     RefCountedPtr<Center> center_;
148   };
149 
150   class Receiver {
151    public:
Receiver(RefCountedPtr<Center> center)152     explicit Receiver(RefCountedPtr<Center> center)
153         : center_(std::move(center)) {}
154     Receiver(const Receiver&) = delete;
155     Receiver& operator=(const Receiver&) = delete;
156     Receiver(Receiver&&) noexcept = default;
157     Receiver& operator=(Receiver&&) noexcept = default;
158 
~Receiver()159     ~Receiver() {
160       if (center_ != nullptr) center_->MarkClosed();
161     }
162 
Next()163     auto Next() {
164       return [center = center_]() { return center->Next(); };
165     }
166 
IsClose()167     bool IsClose() { return center_->IsClosed(); }
168 
MarkClose()169     void MarkClose() {
170       if (center_ != nullptr) center_->MarkClosed();
171     }
172 
173    private:
174     RefCountedPtr<Center> center_;
175   };
176 
177   Sender sender{center_};
178   Receiver receiver{center_};
179 };
180 
181 }  // namespace grpc_core
182 
183 #endif  // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H
184