1 // Copyright 2023 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <stdint.h> 21 22 #include <array> 23 #include <utility> 24 25 #include "absl/base/thread_annotations.h" 26 #include "absl/types/optional.h" 27 28 #include "src/core/lib/gprpp/ref_counted.h" 29 #include "src/core/lib/gprpp/ref_counted_ptr.h" 30 #include "src/core/lib/gprpp/sync.h" 31 #include "src/core/lib/promise/activity.h" 32 #include "src/core/lib/promise/poll.h" 33 34 namespace grpc_core { 35 36 template <typename T, uint8_t kQueueSize> 37 class InterActivityPipe { 38 public: 39 class NextResult { 40 public: 41 template <typename... Args> NextResult(Args &&...args)42 explicit NextResult(Args&&... args) : value_(std::forward<Args>(args)...) {} 43 using value_type = T; reset()44 void reset() { value_.reset(); } cancelled()45 bool cancelled() const { return false; } has_value()46 bool has_value() const { return value_.has_value(); } value()47 const T& value() const { return value_.value(); } value()48 T& value() { return value_.value(); } 49 const T& operator*() const { return *value_; } 50 T& operator*() { return *value_; } 51 52 private: 53 absl::optional<T> value_; 54 }; 55 56 private: 57 class Center : public RefCounted<Center, NonPolymorphicRefCount> { 58 public: Push(T & value)59 Poll<bool> Push(T& value) { 60 ReleasableMutexLock lock(&mu_); 61 if (closed_) return false; 62 if (count_ == kQueueSize) { 63 on_available_ = GetContext<Activity>()->MakeNonOwningWaker(); 64 return Pending{}; 65 } 66 queue_[(first_ + count_) % kQueueSize] = std::move(value); 67 ++count_; 68 if (count_ == 1) { 69 auto on_occupied = std::move(on_occupied_); 70 lock.Release(); 71 on_occupied.Wakeup(); 72 } 73 return true; 74 } 75 Next()76 Poll<NextResult> Next() { 77 ReleasableMutexLock lock(&mu_); 78 if (count_ == 0) { 79 if (closed_) return absl::nullopt; 80 on_occupied_ = GetContext<Activity>()->MakeNonOwningWaker(); 81 return Pending{}; 82 } 83 auto value = std::move(queue_[first_]); 84 first_ = (first_ + 1) % kQueueSize; 85 --count_; 86 if (count_ == kQueueSize - 1) { 87 auto on_available = std::move(on_available_); 88 lock.Release(); 89 on_available.Wakeup(); 90 } 91 return std::move(value); 92 } 93 MarkClosed()94 void MarkClosed() { 95 ReleasableMutexLock lock(&mu_); 96 if (std::exchange(closed_, true)) return; 97 auto on_occupied = std::move(on_occupied_); 98 auto on_available = std::move(on_available_); 99 lock.Release(); 100 on_occupied.Wakeup(); 101 on_available.Wakeup(); 102 } 103 IsClosed()104 bool IsClosed() { 105 MutexLock lock(&mu_); 106 return closed_; 107 } 108 109 private: 110 Mutex mu_; 111 std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_); 112 bool closed_ ABSL_GUARDED_BY(mu_) = false; 113 uint8_t first_ ABSL_GUARDED_BY(mu_) = 0; 114 uint8_t count_ ABSL_GUARDED_BY(mu_) = 0; 115 Waker on_occupied_ ABSL_GUARDED_BY(mu_); 116 Waker on_available_ ABSL_GUARDED_BY(mu_); 117 }; 118 RefCountedPtr<Center> center_{MakeRefCounted<Center>()}; 119 120 public: 121 class Sender { 122 public: Sender(RefCountedPtr<Center> center)123 explicit Sender(RefCountedPtr<Center> center) 124 : center_(std::move(center)) {} 125 Sender(const Sender&) = delete; 126 Sender& operator=(const Sender&) = delete; 127 Sender(Sender&&) noexcept = default; 128 Sender& operator=(Sender&&) noexcept = default; 129 ~Sender()130 ~Sender() { 131 if (center_ != nullptr) center_->MarkClosed(); 132 } 133 IsClosed()134 bool IsClosed() { return center_->IsClosed(); } 135 MarkClosed()136 void MarkClosed() { 137 if (center_ != nullptr) center_->MarkClosed(); 138 } 139 Push(T value)140 auto Push(T value) { 141 return [center = center_, value = std::move(value)]() mutable { 142 return center->Push(value); 143 }; 144 } 145 146 private: 147 RefCountedPtr<Center> center_; 148 }; 149 150 class Receiver { 151 public: Receiver(RefCountedPtr<Center> center)152 explicit Receiver(RefCountedPtr<Center> center) 153 : center_(std::move(center)) {} 154 Receiver(const Receiver&) = delete; 155 Receiver& operator=(const Receiver&) = delete; 156 Receiver(Receiver&&) noexcept = default; 157 Receiver& operator=(Receiver&&) noexcept = default; 158 ~Receiver()159 ~Receiver() { 160 if (center_ != nullptr) center_->MarkClosed(); 161 } 162 Next()163 auto Next() { 164 return [center = center_]() { return center->Next(); }; 165 } 166 IsClose()167 bool IsClose() { return center_->IsClosed(); } 168 MarkClose()169 void MarkClose() { 170 if (center_ != nullptr) center_->MarkClosed(); 171 } 172 173 private: 174 RefCountedPtr<Center> center_; 175 }; 176 177 Sender sender{center_}; 178 Receiver receiver{center_}; 179 }; 180 181 } // namespace grpc_core 182 183 #endif // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H 184