1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stdint.h>
21 #include <stdlib.h>
22 
23 #include <memory>
24 #include <string>
25 #include <type_traits>
26 #include <utility>
27 
28 #include "absl/strings/str_cat.h"
29 #include "absl/types/optional.h"
30 #include "absl/types/variant.h"
31 
32 #include <grpc/support/log.h>
33 
34 #include "src/core/lib/debug/trace.h"
35 #include "src/core/lib/gprpp/debug_location.h"
36 #include "src/core/lib/gprpp/ref_counted_ptr.h"
37 #include "src/core/lib/promise/activity.h"
38 #include "src/core/lib/promise/context.h"
39 #include "src/core/lib/promise/if.h"
40 #include "src/core/lib/promise/interceptor_list.h"
41 #include "src/core/lib/promise/map.h"
42 #include "src/core/lib/promise/poll.h"
43 #include "src/core/lib/promise/seq.h"
44 #include "src/core/lib/promise/trace.h"
45 #include "src/core/lib/resource_quota/arena.h"
46 
47 namespace grpc_core {
48 
49 namespace pipe_detail {
50 template <typename T>
51 class Center;
52 }
53 
54 template <typename T>
55 struct Pipe;
56 
57 // Result of Pipe::Next - represents a received value.
58 // If has_value() is false, the pipe was closed by the time we polled for the
59 // next value. No value was received, nor will there ever be.
60 // This type is movable but not copyable.
61 // Once the final move is destroyed the pipe will ack the read and unblock the
62 // send.
63 template <typename T>
64 class NextResult final {
65  public:
NextResult()66   NextResult() : center_(nullptr) {}
NextResult(RefCountedPtr<pipe_detail::Center<T>> center)67   explicit NextResult(RefCountedPtr<pipe_detail::Center<T>> center)
68       : center_(std::move(center)) {
69     GPR_ASSERT(center_ != nullptr);
70   }
NextResult(bool cancelled)71   explicit NextResult(bool cancelled)
72       : center_(nullptr), cancelled_(cancelled) {}
73   ~NextResult();
74   NextResult(const NextResult&) = delete;
75   NextResult& operator=(const NextResult&) = delete;
76   NextResult(NextResult&& other) noexcept = default;
77   NextResult& operator=(NextResult&& other) noexcept = default;
78 
79   using value_type = T;
80 
81   void reset();
82   bool has_value() const;
83   // Only valid if has_value()
value()84   const T& value() const {
85     GPR_ASSERT(has_value());
86     return **this;
87   }
value()88   T& value() {
89     GPR_ASSERT(has_value());
90     return **this;
91   }
92   const T& operator*() const;
93   T& operator*();
94   // Only valid if !has_value()
cancelled()95   bool cancelled() { return cancelled_; }
96 
97  private:
98   RefCountedPtr<pipe_detail::Center<T>> center_;
99   bool cancelled_;
100 };
101 
102 namespace pipe_detail {
103 
104 template <typename T>
105 class Push;
106 template <typename T>
107 class Next;
108 
109 // Center sits between a sender and a receiver to provide a one-deep buffer of
110 // Ts
111 template <typename T>
112 class Center : public InterceptorList<T> {
113  public:
114   // Initialize with one send ref (held by PipeSender) and one recv ref (held by
115   // PipeReceiver)
Center()116   Center() {
117     refs_ = 2;
118     value_state_ = ValueState::kEmpty;
119   }
120 
121   // Add one ref to this object, and return this.
IncrementRefCount()122   void IncrementRefCount() {
123     if (grpc_trace_promise_primitives.enabled()) {
124       gpr_log(GPR_DEBUG, "%s", DebugOpString("IncrementRefCount").c_str());
125     }
126     refs_++;
127     GPR_DEBUG_ASSERT(refs_ != 0);
128   }
129 
Ref()130   RefCountedPtr<Center> Ref() {
131     IncrementRefCount();
132     return RefCountedPtr<Center>(this);
133   }
134 
135   // Drop a ref
136   // If no refs remain, destroy this object
Unref()137   void Unref() {
138     if (grpc_trace_promise_primitives.enabled()) {
139       gpr_log(GPR_DEBUG, "%s", DebugOpString("Unref").c_str());
140     }
141     GPR_DEBUG_ASSERT(refs_ > 0);
142     refs_--;
143     if (0 == refs_) {
144       this->~Center();
145     }
146   }
147 
148   // Try to push *value into the pipe.
149   // Return Pending if there is no space.
150   // Return true if the value was pushed.
151   // Return false if the recv end is closed.
Push(T * value)152   Poll<bool> Push(T* value) {
153     if (grpc_trace_promise_primitives.enabled()) {
154       gpr_log(GPR_INFO, "%s", DebugOpString("Push").c_str());
155     }
156     GPR_DEBUG_ASSERT(refs_ != 0);
157     switch (value_state_) {
158       case ValueState::kClosed:
159       case ValueState::kReadyClosed:
160       case ValueState::kCancelled:
161       case ValueState::kWaitingForAckAndClosed:
162         return false;
163       case ValueState::kReady:
164       case ValueState::kAcked:
165       case ValueState::kWaitingForAck:
166         return on_empty_.pending();
167       case ValueState::kEmpty:
168         value_state_ = ValueState::kReady;
169         value_ = std::move(*value);
170         on_full_.Wake();
171         return true;
172     }
173     GPR_UNREACHABLE_CODE(return false);
174   }
175 
PollAck()176   Poll<bool> PollAck() {
177     if (grpc_trace_promise_primitives.enabled()) {
178       gpr_log(GPR_INFO, "%s", DebugOpString("PollAck").c_str());
179     }
180     GPR_DEBUG_ASSERT(refs_ != 0);
181     switch (value_state_) {
182       case ValueState::kClosed:
183         return true;
184       case ValueState::kCancelled:
185         return false;
186       case ValueState::kReady:
187       case ValueState::kReadyClosed:
188       case ValueState::kEmpty:
189       case ValueState::kWaitingForAck:
190       case ValueState::kWaitingForAckAndClosed:
191         return on_empty_.pending();
192       case ValueState::kAcked:
193         value_state_ = ValueState::kEmpty;
194         on_empty_.Wake();
195         return true;
196     }
197     return true;
198   }
199 
200   // Try to receive a value from the pipe.
201   // Return Pending if there is no value.
202   // Return the value if one was retrieved.
203   // Return nullopt if the send end is closed and no value had been pushed.
Next()204   Poll<absl::optional<T>> Next() {
205     if (grpc_trace_promise_primitives.enabled()) {
206       gpr_log(GPR_INFO, "%s", DebugOpString("Next").c_str());
207     }
208     GPR_DEBUG_ASSERT(refs_ != 0);
209     switch (value_state_) {
210       case ValueState::kEmpty:
211       case ValueState::kAcked:
212       case ValueState::kWaitingForAck:
213       case ValueState::kWaitingForAckAndClosed:
214         return on_full_.pending();
215       case ValueState::kReadyClosed:
216         value_state_ = ValueState::kWaitingForAckAndClosed;
217         return std::move(value_);
218       case ValueState::kReady:
219         value_state_ = ValueState::kWaitingForAck;
220         return std::move(value_);
221       case ValueState::kClosed:
222       case ValueState::kCancelled:
223         return absl::nullopt;
224     }
225     GPR_UNREACHABLE_CODE(return absl::nullopt);
226   }
227 
228   // Check if the pipe is closed for sending (if there is a value still queued
229   // but the pipe is closed, reports closed).
PollClosedForSender()230   Poll<bool> PollClosedForSender() {
231     if (grpc_trace_promise_primitives.enabled()) {
232       gpr_log(GPR_INFO, "%s", DebugOpString("PollClosedForSender").c_str());
233     }
234     GPR_DEBUG_ASSERT(refs_ != 0);
235     switch (value_state_) {
236       case ValueState::kEmpty:
237       case ValueState::kAcked:
238       case ValueState::kReady:
239       case ValueState::kWaitingForAck:
240         return on_closed_.pending();
241       case ValueState::kWaitingForAckAndClosed:
242       case ValueState::kReadyClosed:
243       case ValueState::kClosed:
244         return false;
245       case ValueState::kCancelled:
246         return true;
247     }
248     GPR_UNREACHABLE_CODE(return true);
249   }
250 
251   // Check if the pipe is closed for receiving (if there is a value still queued
252   // but the pipe is closed, reports open).
PollClosedForReceiver()253   Poll<bool> PollClosedForReceiver() {
254     if (grpc_trace_promise_primitives.enabled()) {
255       gpr_log(GPR_INFO, "%s", DebugOpString("PollClosedForReceiver").c_str());
256     }
257     GPR_DEBUG_ASSERT(refs_ != 0);
258     switch (value_state_) {
259       case ValueState::kEmpty:
260       case ValueState::kAcked:
261       case ValueState::kReady:
262       case ValueState::kReadyClosed:
263       case ValueState::kWaitingForAck:
264       case ValueState::kWaitingForAckAndClosed:
265         return on_closed_.pending();
266       case ValueState::kClosed:
267         return false;
268       case ValueState::kCancelled:
269         return true;
270     }
271     GPR_UNREACHABLE_CODE(return true);
272   }
273 
PollEmpty()274   Poll<Empty> PollEmpty() {
275     if (grpc_trace_promise_primitives.enabled()) {
276       gpr_log(GPR_INFO, "%s", DebugOpString("PollEmpty").c_str());
277     }
278     GPR_DEBUG_ASSERT(refs_ != 0);
279     switch (value_state_) {
280       case ValueState::kReady:
281       case ValueState::kReadyClosed:
282         return on_empty_.pending();
283       case ValueState::kWaitingForAck:
284       case ValueState::kWaitingForAckAndClosed:
285       case ValueState::kAcked:
286       case ValueState::kEmpty:
287       case ValueState::kClosed:
288       case ValueState::kCancelled:
289         return Empty{};
290     }
291     GPR_UNREACHABLE_CODE(return Empty{});
292   }
293 
AckNext()294   void AckNext() {
295     if (grpc_trace_promise_primitives.enabled()) {
296       gpr_log(GPR_INFO, "%s", DebugOpString("AckNext").c_str());
297     }
298     switch (value_state_) {
299       case ValueState::kReady:
300       case ValueState::kWaitingForAck:
301         value_state_ = ValueState::kAcked;
302         on_empty_.Wake();
303         break;
304       case ValueState::kReadyClosed:
305       case ValueState::kWaitingForAckAndClosed:
306         this->ResetInterceptorList();
307         value_state_ = ValueState::kClosed;
308         on_closed_.Wake();
309         on_empty_.Wake();
310         on_full_.Wake();
311         break;
312       case ValueState::kClosed:
313       case ValueState::kCancelled:
314         break;
315       case ValueState::kEmpty:
316       case ValueState::kAcked:
317         abort();
318     }
319   }
320 
MarkClosed()321   void MarkClosed() {
322     if (grpc_trace_promise_primitives.enabled()) {
323       gpr_log(GPR_INFO, "%s", DebugOpString("MarkClosed").c_str());
324     }
325     switch (value_state_) {
326       case ValueState::kEmpty:
327       case ValueState::kAcked:
328         this->ResetInterceptorList();
329         value_state_ = ValueState::kClosed;
330         on_empty_.Wake();
331         on_full_.Wake();
332         on_closed_.Wake();
333         break;
334       case ValueState::kReady:
335         value_state_ = ValueState::kReadyClosed;
336         on_closed_.Wake();
337         break;
338       case ValueState::kWaitingForAck:
339         value_state_ = ValueState::kWaitingForAckAndClosed;
340         on_closed_.Wake();
341         break;
342       case ValueState::kReadyClosed:
343       case ValueState::kClosed:
344       case ValueState::kCancelled:
345       case ValueState::kWaitingForAckAndClosed:
346         break;
347     }
348   }
349 
MarkCancelled()350   void MarkCancelled() {
351     if (grpc_trace_promise_primitives.enabled()) {
352       gpr_log(GPR_INFO, "%s", DebugOpString("MarkCancelled").c_str());
353     }
354     switch (value_state_) {
355       case ValueState::kEmpty:
356       case ValueState::kAcked:
357       case ValueState::kReady:
358       case ValueState::kReadyClosed:
359       case ValueState::kWaitingForAck:
360       case ValueState::kWaitingForAckAndClosed:
361         this->ResetInterceptorList();
362         value_state_ = ValueState::kCancelled;
363         on_empty_.Wake();
364         on_full_.Wake();
365         on_closed_.Wake();
366         break;
367       case ValueState::kClosed:
368       case ValueState::kCancelled:
369         break;
370     }
371   }
372 
cancelled()373   bool cancelled() { return value_state_ == ValueState::kCancelled; }
374 
value()375   T& value() { return value_; }
value()376   const T& value() const { return value_; }
377 
DebugTag()378   std::string DebugTag() {
379     if (auto* activity = Activity::current()) {
380       return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this),
381                           "]: ");
382     } else {
383       return absl::StrCat("PIPE[0x", reinterpret_cast<uintptr_t>(this), "]: ");
384     }
385   }
386 
387  private:
388   // State of value_.
389   enum class ValueState : uint8_t {
390     // No value is set, it's possible to send.
391     kEmpty,
392     // Value has been pushed but not acked, it's possible to receive.
393     kReady,
394     // Value has been read and not acked, both send/receive blocked until ack.
395     kWaitingForAck,
396     // Value has been received and acked, we can unblock senders and transition
397     // to empty.
398     kAcked,
399     // Pipe is closed successfully, no more values can be sent
400     kClosed,
401     // Pipe is closed successfully, no more values can be sent
402     // (but one value is queued and ready to be received)
403     kReadyClosed,
404     // Pipe is closed successfully, no more values can be sent
405     // (but one value is queued and waiting to be acked)
406     kWaitingForAckAndClosed,
407     // Pipe is closed unsuccessfully, no more values can be sent
408     kCancelled,
409   };
410 
DebugOpString(std::string op)411   std::string DebugOpString(std::string op) {
412     return absl::StrCat(DebugTag(), op, " refs=", refs_,
413                         " value_state=", ValueStateName(value_state_),
414                         " on_empty=", on_empty_.DebugString().c_str(),
415                         " on_full=", on_full_.DebugString().c_str(),
416                         " on_closed=", on_closed_.DebugString().c_str());
417   }
418 
ValueStateName(ValueState state)419   static const char* ValueStateName(ValueState state) {
420     switch (state) {
421       case ValueState::kEmpty:
422         return "Empty";
423       case ValueState::kReady:
424         return "Ready";
425       case ValueState::kAcked:
426         return "Acked";
427       case ValueState::kClosed:
428         return "Closed";
429       case ValueState::kReadyClosed:
430         return "ReadyClosed";
431       case ValueState::kWaitingForAck:
432         return "WaitingForAck";
433       case ValueState::kWaitingForAckAndClosed:
434         return "WaitingForAckAndClosed";
435       case ValueState::kCancelled:
436         return "Cancelled";
437     }
438     GPR_UNREACHABLE_CODE(return "unknown");
439   }
440 
441   T value_;
442   // Number of refs
443   uint8_t refs_;
444   // Current state of the value.
445   ValueState value_state_;
446   IntraActivityWaiter on_empty_;
447   IntraActivityWaiter on_full_;
448   IntraActivityWaiter on_closed_;
449 
450   // Make failure to destruct show up in ASAN builds.
451 #ifndef NDEBUG
452   std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
453 #endif
454 };
455 
456 }  // namespace pipe_detail
457 
458 // Send end of a Pipe.
459 template <typename T>
460 class PipeSender {
461  public:
462   using PushType = pipe_detail::Push<T>;
463 
464   PipeSender(const PipeSender&) = delete;
465   PipeSender& operator=(const PipeSender&) = delete;
466   PipeSender(PipeSender&& other) noexcept = default;
467   PipeSender& operator=(PipeSender&& other) noexcept = default;
468 
~PipeSender()469   ~PipeSender() {
470     if (center_ != nullptr) center_->MarkClosed();
471   }
472 
Close()473   void Close() {
474     if (center_ != nullptr) {
475       center_->MarkClosed();
476       center_.reset();
477     }
478   }
479 
CloseWithError()480   void CloseWithError() {
481     if (center_ != nullptr) {
482       center_->MarkCancelled();
483       center_.reset();
484     }
485   }
486 
Swap(PipeSender<T> * other)487   void Swap(PipeSender<T>* other) { std::swap(center_, other->center_); }
488 
489   // Send a single message along the pipe.
490   // Returns a promise that will resolve to a bool - true if the message was
491   // sent, false if it could never be sent. Blocks the promise until the
492   // receiver is either closed or able to receive another message.
493   PushType Push(T value);
494 
495   // Return a promise that resolves when the receiver is closed.
496   // The resolved value is a bool - true if the pipe was cancelled, false if it
497   // was closed successfully.
498   // Checks closed from the senders perspective: that is, if there is a value in
499   // the pipe but the pipe is closed, reports closed.
AwaitClosed()500   auto AwaitClosed() {
501     return [center = center_]() { return center->PollClosedForSender(); };
502   }
503 
504   // Interject PromiseFactory f into the pipeline.
505   // f will be called with the current value traversing the pipe, and should
506   // return a value to replace it with.
507   // Interjects at the Push end of the pipe.
508   template <typename Fn>
509   void InterceptAndMap(Fn f, DebugLocation from = {}) {
510     center_->PrependMap(std::move(f), from);
511   }
512 
513   // Per above, but calls cleanup_fn when the pipe is closed.
514   template <typename Fn, typename OnHalfClose>
515   void InterceptAndMap(Fn f, OnHalfClose cleanup_fn, DebugLocation from = {}) {
516     center_->PrependMapWithCleanup(std::move(f), std::move(cleanup_fn), from);
517   }
518 
519  private:
520   friend struct Pipe<T>;
521   explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
522   RefCountedPtr<pipe_detail::Center<T>> center_;
523 
524   // Make failure to destruct show up in ASAN builds.
525 #ifndef NDEBUG
526   std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
527 #endif
528 };
529 
530 template <typename T>
531 class PipeReceiver;
532 
533 namespace pipe_detail {
534 
535 // Implementation of PipeReceiver::Next promise.
536 template <typename T>
537 class Next {
538  public:
539   Next(const Next&) = delete;
540   Next& operator=(const Next&) = delete;
541   Next(Next&& other) noexcept = default;
542   Next& operator=(Next&& other) noexcept = default;
543 
544   Poll<absl::optional<T>> operator()() { return center_->Next(); }
545 
546  private:
547   friend class PipeReceiver<T>;
548   explicit Next(RefCountedPtr<Center<T>> center) : center_(std::move(center)) {}
549 
550   RefCountedPtr<Center<T>> center_;
551 };
552 
553 }  // namespace pipe_detail
554 
555 // Receive end of a Pipe.
556 template <typename T>
557 class PipeReceiver {
558  public:
559   PipeReceiver(const PipeReceiver&) = delete;
560   PipeReceiver& operator=(const PipeReceiver&) = delete;
561   PipeReceiver(PipeReceiver&& other) noexcept = default;
562   PipeReceiver& operator=(PipeReceiver&& other) noexcept = default;
563   ~PipeReceiver() {
564     if (center_ != nullptr) center_->MarkCancelled();
565   }
566 
567   void Swap(PipeReceiver<T>* other) { std::swap(center_, other->center_); }
568 
569   // Receive a single message from the pipe.
570   // Returns a promise that will resolve to an optional<T> - with a value if a
571   // message was received, or no value if the other end of the pipe was closed.
572   // Blocks the promise until the receiver is either closed or a message is
573   // available.
574   auto Next() {
575     return Seq(
576         pipe_detail::Next<T>(center_->Ref()),
577         [center = center_->Ref()](absl::optional<T> value) {
578           bool open = value.has_value();
579           bool cancelled = center->cancelled();
580           return If(
581               open,
582               [center = std::move(center), value = std::move(value)]() mutable {
583                 auto run = center->Run(std::move(value));
584                 return Map(std::move(run),
585                            [center = std::move(center)](
586                                absl::optional<T> value) mutable {
587                              if (value.has_value()) {
588                                center->value() = std::move(*value);
589                                return NextResult<T>(std::move(center));
590                              } else {
591                                center->MarkCancelled();
592                                return NextResult<T>(true);
593                              }
594                            });
595               },
596               [cancelled]() { return NextResult<T>(cancelled); });
597         });
598   }
599 
600   // Return a promise that resolves when the receiver is closed.
601   // The resolved value is a bool - true if the pipe was cancelled, false if it
602   // was closed successfully.
603   // Checks closed from the receivers perspective: that is, if there is a value
604   // in the pipe but the pipe is closed, reports open until that value is read.
605   auto AwaitClosed() {
606     return [center = center_]() { return center->PollClosedForReceiver(); };
607   }
608 
609   auto AwaitEmpty() {
610     return [center = center_]() { return center->PollEmpty(); };
611   }
612 
613   void CloseWithError() {
614     if (center_ != nullptr) {
615       center_->MarkCancelled();
616       center_.reset();
617     }
618   }
619 
620   // Interject PromiseFactory f into the pipeline.
621   // f will be called with the current value traversing the pipe, and should
622   // return a value to replace it with.
623   // Interjects at the Next end of the pipe.
624   template <typename Fn>
625   void InterceptAndMap(Fn f, DebugLocation from = {}) {
626     center_->AppendMap(std::move(f), from);
627   }
628 
629   // Per above, but calls cleanup_fn when the pipe is closed.
630   template <typename Fn, typename OnHalfClose>
631   void InterceptAndMapWithHalfClose(Fn f, OnHalfClose cleanup_fn,
632                                     DebugLocation from = {}) {
633     center_->AppendMapWithCleanup(std::move(f), std::move(cleanup_fn), from);
634   }
635 
636  private:
637   friend struct Pipe<T>;
638   explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
639   RefCountedPtr<pipe_detail::Center<T>> center_;
640 
641   // Make failure to destruct show up in ASAN builds.
642 #ifndef NDEBUG
643   std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
644 #endif
645 };
646 
647 namespace pipe_detail {
648 
649 // Implementation of PipeSender::Push promise.
650 template <typename T>
651 class Push {
652  public:
653   Push(const Push&) = delete;
654 
655   Push& operator=(const Push&) = delete;
656   Push(Push&& other) noexcept = default;
657   Push& operator=(Push&& other) noexcept = default;
658 
659   Poll<bool> operator()() {
660     if (center_ == nullptr) {
661       if (grpc_trace_promise_primitives.enabled()) {
662         gpr_log(GPR_DEBUG, "%s Pipe push has a null center",
663                 Activity::current()->DebugTag().c_str());
664       }
665       return false;
666     }
667     if (auto* p = absl::get_if<T>(&state_)) {
668       auto r = center_->Push(p);
669       if (auto* ok = r.value_if_ready()) {
670         state_.template emplace<AwaitingAck>();
671         if (!*ok) return false;
672       } else {
673         return Pending{};
674       }
675     }
676     GPR_DEBUG_ASSERT(absl::holds_alternative<AwaitingAck>(state_));
677     return center_->PollAck();
678   }
679 
680  private:
681   struct AwaitingAck {};
682 
683   friend class PipeSender<T>;
684   explicit Push(RefCountedPtr<pipe_detail::Center<T>> center, T push)
685       : center_(std::move(center)), state_(std::move(push)) {}
686 
687   RefCountedPtr<Center<T>> center_;
688   absl::variant<T, AwaitingAck> state_;
689 };
690 
691 }  // namespace pipe_detail
692 
693 template <typename T>
694 pipe_detail::Push<T> PipeSender<T>::Push(T value) {
695   return pipe_detail::Push<T>(center_ == nullptr ? nullptr : center_->Ref(),
696                               std::move(value));
697 }
698 
699 template <typename T>
700 using PipeReceiverNextType = decltype(std::declval<PipeReceiver<T>>().Next());
701 
702 template <typename T>
703 bool NextResult<T>::has_value() const {
704   return center_ != nullptr;
705 }
706 
707 template <typename T>
708 T& NextResult<T>::operator*() {
709   return center_->value();
710 }
711 
712 template <typename T>
713 const T& NextResult<T>::operator*() const {
714   return center_->value();
715 }
716 
717 template <typename T>
718 NextResult<T>::~NextResult() {
719   if (center_ != nullptr) center_->AckNext();
720 }
721 
722 template <typename T>
723 void NextResult<T>::reset() {
724   if (center_ != nullptr) {
725     center_->AckNext();
726     center_.reset();
727   }
728 }
729 
730 // A Pipe is an intra-Activity communications channel that transmits T's from
731 // one end to the other.
732 // It is only safe to use a Pipe within the context of a single Activity.
733 // No synchronization is performed internally.
734 // The primary Pipe data structure is allocated from an arena, so the activity
735 // must have an arena as part of its context.
736 // By performing that allocation we can ensure stable pointer to shared data
737 // allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
738 // implementation.
739 // This type has been optimized with the expectation that there are relatively
740 // few pipes per activity. If this assumption does not hold then a design
741 // allowing inline filtering of pipe contents (instead of connecting pipes with
742 // polling code) would likely be more appropriate.
743 template <typename T>
744 struct Pipe {
745   Pipe() : Pipe(GetContext<Arena>()) {}
746   explicit Pipe(Arena* arena) : Pipe(arena->New<pipe_detail::Center<T>>()) {}
747   Pipe(const Pipe&) = delete;
748   Pipe& operator=(const Pipe&) = delete;
749   Pipe(Pipe&&) noexcept = default;
750   Pipe& operator=(Pipe&&) noexcept = default;
751 
752   PipeSender<T> sender;
753   PipeReceiver<T> receiver;
754 
755  private:
756   explicit Pipe(pipe_detail::Center<T>* center)
757       : sender(center), receiver(center) {}
758 };
759 
760 }  // namespace grpc_core
761 
762 #endif  // GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
763