1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_PIPE_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_PIPE_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <stdint.h> 21 #include <stdlib.h> 22 23 #include <memory> 24 #include <string> 25 #include <type_traits> 26 #include <utility> 27 28 #include "absl/strings/str_cat.h" 29 #include "absl/types/optional.h" 30 #include "absl/types/variant.h" 31 32 #include <grpc/support/log.h> 33 34 #include "src/core/lib/debug/trace.h" 35 #include "src/core/lib/gprpp/debug_location.h" 36 #include "src/core/lib/gprpp/ref_counted_ptr.h" 37 #include "src/core/lib/promise/activity.h" 38 #include "src/core/lib/promise/context.h" 39 #include "src/core/lib/promise/if.h" 40 #include "src/core/lib/promise/interceptor_list.h" 41 #include "src/core/lib/promise/map.h" 42 #include "src/core/lib/promise/poll.h" 43 #include "src/core/lib/promise/seq.h" 44 #include "src/core/lib/promise/trace.h" 45 #include "src/core/lib/resource_quota/arena.h" 46 47 namespace grpc_core { 48 49 namespace pipe_detail { 50 template <typename T> 51 class Center; 52 } 53 54 template <typename T> 55 struct Pipe; 56 57 // Result of Pipe::Next - represents a received value. 58 // If has_value() is false, the pipe was closed by the time we polled for the 59 // next value. No value was received, nor will there ever be. 60 // This type is movable but not copyable. 61 // Once the final move is destroyed the pipe will ack the read and unblock the 62 // send. 63 template <typename T> 64 class NextResult final { 65 public: NextResult()66 NextResult() : center_(nullptr) {} NextResult(RefCountedPtr<pipe_detail::Center<T>> center)67 explicit NextResult(RefCountedPtr<pipe_detail::Center<T>> center) 68 : center_(std::move(center)) { 69 GPR_ASSERT(center_ != nullptr); 70 } NextResult(bool cancelled)71 explicit NextResult(bool cancelled) 72 : center_(nullptr), cancelled_(cancelled) {} 73 ~NextResult(); 74 NextResult(const NextResult&) = delete; 75 NextResult& operator=(const NextResult&) = delete; 76 NextResult(NextResult&& other) noexcept = default; 77 NextResult& operator=(NextResult&& other) noexcept = default; 78 79 using value_type = T; 80 81 void reset(); 82 bool has_value() const; 83 // Only valid if has_value() value()84 const T& value() const { 85 GPR_ASSERT(has_value()); 86 return **this; 87 } value()88 T& value() { 89 GPR_ASSERT(has_value()); 90 return **this; 91 } 92 const T& operator*() const; 93 T& operator*(); 94 // Only valid if !has_value() cancelled()95 bool cancelled() { return cancelled_; } 96 97 private: 98 RefCountedPtr<pipe_detail::Center<T>> center_; 99 bool cancelled_; 100 }; 101 102 namespace pipe_detail { 103 104 template <typename T> 105 class Push; 106 template <typename T> 107 class Next; 108 109 // Center sits between a sender and a receiver to provide a one-deep buffer of 110 // Ts 111 template <typename T> 112 class Center : public InterceptorList<T> { 113 public: 114 // Initialize with one send ref (held by PipeSender) and one recv ref (held by 115 // PipeReceiver) Center()116 Center() { 117 refs_ = 2; 118 value_state_ = ValueState::kEmpty; 119 } 120 121 // Add one ref to this object, and return this. IncrementRefCount()122 void IncrementRefCount() { 123 if (grpc_trace_promise_primitives.enabled()) { 124 gpr_log(GPR_DEBUG, "%s", DebugOpString("IncrementRefCount").c_str()); 125 } 126 refs_++; 127 GPR_DEBUG_ASSERT(refs_ != 0); 128 } 129 Ref()130 RefCountedPtr<Center> Ref() { 131 IncrementRefCount(); 132 return RefCountedPtr<Center>(this); 133 } 134 135 // Drop a ref 136 // If no refs remain, destroy this object Unref()137 void Unref() { 138 if (grpc_trace_promise_primitives.enabled()) { 139 gpr_log(GPR_DEBUG, "%s", DebugOpString("Unref").c_str()); 140 } 141 GPR_DEBUG_ASSERT(refs_ > 0); 142 refs_--; 143 if (0 == refs_) { 144 this->~Center(); 145 } 146 } 147 148 // Try to push *value into the pipe. 149 // Return Pending if there is no space. 150 // Return true if the value was pushed. 151 // Return false if the recv end is closed. Push(T * value)152 Poll<bool> Push(T* value) { 153 if (grpc_trace_promise_primitives.enabled()) { 154 gpr_log(GPR_INFO, "%s", DebugOpString("Push").c_str()); 155 } 156 GPR_DEBUG_ASSERT(refs_ != 0); 157 switch (value_state_) { 158 case ValueState::kClosed: 159 case ValueState::kReadyClosed: 160 case ValueState::kCancelled: 161 case ValueState::kWaitingForAckAndClosed: 162 return false; 163 case ValueState::kReady: 164 case ValueState::kAcked: 165 case ValueState::kWaitingForAck: 166 return on_empty_.pending(); 167 case ValueState::kEmpty: 168 value_state_ = ValueState::kReady; 169 value_ = std::move(*value); 170 on_full_.Wake(); 171 return true; 172 } 173 GPR_UNREACHABLE_CODE(return false); 174 } 175 PollAck()176 Poll<bool> PollAck() { 177 if (grpc_trace_promise_primitives.enabled()) { 178 gpr_log(GPR_INFO, "%s", DebugOpString("PollAck").c_str()); 179 } 180 GPR_DEBUG_ASSERT(refs_ != 0); 181 switch (value_state_) { 182 case ValueState::kClosed: 183 return true; 184 case ValueState::kCancelled: 185 return false; 186 case ValueState::kReady: 187 case ValueState::kReadyClosed: 188 case ValueState::kEmpty: 189 case ValueState::kWaitingForAck: 190 case ValueState::kWaitingForAckAndClosed: 191 return on_empty_.pending(); 192 case ValueState::kAcked: 193 value_state_ = ValueState::kEmpty; 194 on_empty_.Wake(); 195 return true; 196 } 197 return true; 198 } 199 200 // Try to receive a value from the pipe. 201 // Return Pending if there is no value. 202 // Return the value if one was retrieved. 203 // Return nullopt if the send end is closed and no value had been pushed. Next()204 Poll<absl::optional<T>> Next() { 205 if (grpc_trace_promise_primitives.enabled()) { 206 gpr_log(GPR_INFO, "%s", DebugOpString("Next").c_str()); 207 } 208 GPR_DEBUG_ASSERT(refs_ != 0); 209 switch (value_state_) { 210 case ValueState::kEmpty: 211 case ValueState::kAcked: 212 case ValueState::kWaitingForAck: 213 case ValueState::kWaitingForAckAndClosed: 214 return on_full_.pending(); 215 case ValueState::kReadyClosed: 216 value_state_ = ValueState::kWaitingForAckAndClosed; 217 return std::move(value_); 218 case ValueState::kReady: 219 value_state_ = ValueState::kWaitingForAck; 220 return std::move(value_); 221 case ValueState::kClosed: 222 case ValueState::kCancelled: 223 return absl::nullopt; 224 } 225 GPR_UNREACHABLE_CODE(return absl::nullopt); 226 } 227 228 // Check if the pipe is closed for sending (if there is a value still queued 229 // but the pipe is closed, reports closed). PollClosedForSender()230 Poll<bool> PollClosedForSender() { 231 if (grpc_trace_promise_primitives.enabled()) { 232 gpr_log(GPR_INFO, "%s", DebugOpString("PollClosedForSender").c_str()); 233 } 234 GPR_DEBUG_ASSERT(refs_ != 0); 235 switch (value_state_) { 236 case ValueState::kEmpty: 237 case ValueState::kAcked: 238 case ValueState::kReady: 239 case ValueState::kWaitingForAck: 240 return on_closed_.pending(); 241 case ValueState::kWaitingForAckAndClosed: 242 case ValueState::kReadyClosed: 243 case ValueState::kClosed: 244 return false; 245 case ValueState::kCancelled: 246 return true; 247 } 248 GPR_UNREACHABLE_CODE(return true); 249 } 250 251 // Check if the pipe is closed for receiving (if there is a value still queued 252 // but the pipe is closed, reports open). PollClosedForReceiver()253 Poll<bool> PollClosedForReceiver() { 254 if (grpc_trace_promise_primitives.enabled()) { 255 gpr_log(GPR_INFO, "%s", DebugOpString("PollClosedForReceiver").c_str()); 256 } 257 GPR_DEBUG_ASSERT(refs_ != 0); 258 switch (value_state_) { 259 case ValueState::kEmpty: 260 case ValueState::kAcked: 261 case ValueState::kReady: 262 case ValueState::kReadyClosed: 263 case ValueState::kWaitingForAck: 264 case ValueState::kWaitingForAckAndClosed: 265 return on_closed_.pending(); 266 case ValueState::kClosed: 267 return false; 268 case ValueState::kCancelled: 269 return true; 270 } 271 GPR_UNREACHABLE_CODE(return true); 272 } 273 PollEmpty()274 Poll<Empty> PollEmpty() { 275 if (grpc_trace_promise_primitives.enabled()) { 276 gpr_log(GPR_INFO, "%s", DebugOpString("PollEmpty").c_str()); 277 } 278 GPR_DEBUG_ASSERT(refs_ != 0); 279 switch (value_state_) { 280 case ValueState::kReady: 281 case ValueState::kReadyClosed: 282 return on_empty_.pending(); 283 case ValueState::kWaitingForAck: 284 case ValueState::kWaitingForAckAndClosed: 285 case ValueState::kAcked: 286 case ValueState::kEmpty: 287 case ValueState::kClosed: 288 case ValueState::kCancelled: 289 return Empty{}; 290 } 291 GPR_UNREACHABLE_CODE(return Empty{}); 292 } 293 AckNext()294 void AckNext() { 295 if (grpc_trace_promise_primitives.enabled()) { 296 gpr_log(GPR_INFO, "%s", DebugOpString("AckNext").c_str()); 297 } 298 switch (value_state_) { 299 case ValueState::kReady: 300 case ValueState::kWaitingForAck: 301 value_state_ = ValueState::kAcked; 302 on_empty_.Wake(); 303 break; 304 case ValueState::kReadyClosed: 305 case ValueState::kWaitingForAckAndClosed: 306 this->ResetInterceptorList(); 307 value_state_ = ValueState::kClosed; 308 on_closed_.Wake(); 309 on_empty_.Wake(); 310 on_full_.Wake(); 311 break; 312 case ValueState::kClosed: 313 case ValueState::kCancelled: 314 break; 315 case ValueState::kEmpty: 316 case ValueState::kAcked: 317 abort(); 318 } 319 } 320 MarkClosed()321 void MarkClosed() { 322 if (grpc_trace_promise_primitives.enabled()) { 323 gpr_log(GPR_INFO, "%s", DebugOpString("MarkClosed").c_str()); 324 } 325 switch (value_state_) { 326 case ValueState::kEmpty: 327 case ValueState::kAcked: 328 this->ResetInterceptorList(); 329 value_state_ = ValueState::kClosed; 330 on_empty_.Wake(); 331 on_full_.Wake(); 332 on_closed_.Wake(); 333 break; 334 case ValueState::kReady: 335 value_state_ = ValueState::kReadyClosed; 336 on_closed_.Wake(); 337 break; 338 case ValueState::kWaitingForAck: 339 value_state_ = ValueState::kWaitingForAckAndClosed; 340 on_closed_.Wake(); 341 break; 342 case ValueState::kReadyClosed: 343 case ValueState::kClosed: 344 case ValueState::kCancelled: 345 case ValueState::kWaitingForAckAndClosed: 346 break; 347 } 348 } 349 MarkCancelled()350 void MarkCancelled() { 351 if (grpc_trace_promise_primitives.enabled()) { 352 gpr_log(GPR_INFO, "%s", DebugOpString("MarkCancelled").c_str()); 353 } 354 switch (value_state_) { 355 case ValueState::kEmpty: 356 case ValueState::kAcked: 357 case ValueState::kReady: 358 case ValueState::kReadyClosed: 359 case ValueState::kWaitingForAck: 360 case ValueState::kWaitingForAckAndClosed: 361 this->ResetInterceptorList(); 362 value_state_ = ValueState::kCancelled; 363 on_empty_.Wake(); 364 on_full_.Wake(); 365 on_closed_.Wake(); 366 break; 367 case ValueState::kClosed: 368 case ValueState::kCancelled: 369 break; 370 } 371 } 372 cancelled()373 bool cancelled() { return value_state_ == ValueState::kCancelled; } 374 value()375 T& value() { return value_; } value()376 const T& value() const { return value_; } 377 DebugTag()378 std::string DebugTag() { 379 if (auto* activity = Activity::current()) { 380 return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this), 381 "]: "); 382 } else { 383 return absl::StrCat("PIPE[0x", reinterpret_cast<uintptr_t>(this), "]: "); 384 } 385 } 386 387 private: 388 // State of value_. 389 enum class ValueState : uint8_t { 390 // No value is set, it's possible to send. 391 kEmpty, 392 // Value has been pushed but not acked, it's possible to receive. 393 kReady, 394 // Value has been read and not acked, both send/receive blocked until ack. 395 kWaitingForAck, 396 // Value has been received and acked, we can unblock senders and transition 397 // to empty. 398 kAcked, 399 // Pipe is closed successfully, no more values can be sent 400 kClosed, 401 // Pipe is closed successfully, no more values can be sent 402 // (but one value is queued and ready to be received) 403 kReadyClosed, 404 // Pipe is closed successfully, no more values can be sent 405 // (but one value is queued and waiting to be acked) 406 kWaitingForAckAndClosed, 407 // Pipe is closed unsuccessfully, no more values can be sent 408 kCancelled, 409 }; 410 DebugOpString(std::string op)411 std::string DebugOpString(std::string op) { 412 return absl::StrCat(DebugTag(), op, " refs=", refs_, 413 " value_state=", ValueStateName(value_state_), 414 " on_empty=", on_empty_.DebugString().c_str(), 415 " on_full=", on_full_.DebugString().c_str(), 416 " on_closed=", on_closed_.DebugString().c_str()); 417 } 418 ValueStateName(ValueState state)419 static const char* ValueStateName(ValueState state) { 420 switch (state) { 421 case ValueState::kEmpty: 422 return "Empty"; 423 case ValueState::kReady: 424 return "Ready"; 425 case ValueState::kAcked: 426 return "Acked"; 427 case ValueState::kClosed: 428 return "Closed"; 429 case ValueState::kReadyClosed: 430 return "ReadyClosed"; 431 case ValueState::kWaitingForAck: 432 return "WaitingForAck"; 433 case ValueState::kWaitingForAckAndClosed: 434 return "WaitingForAckAndClosed"; 435 case ValueState::kCancelled: 436 return "Cancelled"; 437 } 438 GPR_UNREACHABLE_CODE(return "unknown"); 439 } 440 441 T value_; 442 // Number of refs 443 uint8_t refs_; 444 // Current state of the value. 445 ValueState value_state_; 446 IntraActivityWaiter on_empty_; 447 IntraActivityWaiter on_full_; 448 IntraActivityWaiter on_closed_; 449 450 // Make failure to destruct show up in ASAN builds. 451 #ifndef NDEBUG 452 std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0); 453 #endif 454 }; 455 456 } // namespace pipe_detail 457 458 // Send end of a Pipe. 459 template <typename T> 460 class PipeSender { 461 public: 462 using PushType = pipe_detail::Push<T>; 463 464 PipeSender(const PipeSender&) = delete; 465 PipeSender& operator=(const PipeSender&) = delete; 466 PipeSender(PipeSender&& other) noexcept = default; 467 PipeSender& operator=(PipeSender&& other) noexcept = default; 468 ~PipeSender()469 ~PipeSender() { 470 if (center_ != nullptr) center_->MarkClosed(); 471 } 472 Close()473 void Close() { 474 if (center_ != nullptr) { 475 center_->MarkClosed(); 476 center_.reset(); 477 } 478 } 479 CloseWithError()480 void CloseWithError() { 481 if (center_ != nullptr) { 482 center_->MarkCancelled(); 483 center_.reset(); 484 } 485 } 486 Swap(PipeSender<T> * other)487 void Swap(PipeSender<T>* other) { std::swap(center_, other->center_); } 488 489 // Send a single message along the pipe. 490 // Returns a promise that will resolve to a bool - true if the message was 491 // sent, false if it could never be sent. Blocks the promise until the 492 // receiver is either closed or able to receive another message. 493 PushType Push(T value); 494 495 // Return a promise that resolves when the receiver is closed. 496 // The resolved value is a bool - true if the pipe was cancelled, false if it 497 // was closed successfully. 498 // Checks closed from the senders perspective: that is, if there is a value in 499 // the pipe but the pipe is closed, reports closed. AwaitClosed()500 auto AwaitClosed() { 501 return [center = center_]() { return center->PollClosedForSender(); }; 502 } 503 504 // Interject PromiseFactory f into the pipeline. 505 // f will be called with the current value traversing the pipe, and should 506 // return a value to replace it with. 507 // Interjects at the Push end of the pipe. 508 template <typename Fn> 509 void InterceptAndMap(Fn f, DebugLocation from = {}) { 510 center_->PrependMap(std::move(f), from); 511 } 512 513 // Per above, but calls cleanup_fn when the pipe is closed. 514 template <typename Fn, typename OnHalfClose> 515 void InterceptAndMap(Fn f, OnHalfClose cleanup_fn, DebugLocation from = {}) { 516 center_->PrependMapWithCleanup(std::move(f), std::move(cleanup_fn), from); 517 } 518 519 private: 520 friend struct Pipe<T>; 521 explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {} 522 RefCountedPtr<pipe_detail::Center<T>> center_; 523 524 // Make failure to destruct show up in ASAN builds. 525 #ifndef NDEBUG 526 std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0); 527 #endif 528 }; 529 530 template <typename T> 531 class PipeReceiver; 532 533 namespace pipe_detail { 534 535 // Implementation of PipeReceiver::Next promise. 536 template <typename T> 537 class Next { 538 public: 539 Next(const Next&) = delete; 540 Next& operator=(const Next&) = delete; 541 Next(Next&& other) noexcept = default; 542 Next& operator=(Next&& other) noexcept = default; 543 544 Poll<absl::optional<T>> operator()() { return center_->Next(); } 545 546 private: 547 friend class PipeReceiver<T>; 548 explicit Next(RefCountedPtr<Center<T>> center) : center_(std::move(center)) {} 549 550 RefCountedPtr<Center<T>> center_; 551 }; 552 553 } // namespace pipe_detail 554 555 // Receive end of a Pipe. 556 template <typename T> 557 class PipeReceiver { 558 public: 559 PipeReceiver(const PipeReceiver&) = delete; 560 PipeReceiver& operator=(const PipeReceiver&) = delete; 561 PipeReceiver(PipeReceiver&& other) noexcept = default; 562 PipeReceiver& operator=(PipeReceiver&& other) noexcept = default; 563 ~PipeReceiver() { 564 if (center_ != nullptr) center_->MarkCancelled(); 565 } 566 567 void Swap(PipeReceiver<T>* other) { std::swap(center_, other->center_); } 568 569 // Receive a single message from the pipe. 570 // Returns a promise that will resolve to an optional<T> - with a value if a 571 // message was received, or no value if the other end of the pipe was closed. 572 // Blocks the promise until the receiver is either closed or a message is 573 // available. 574 auto Next() { 575 return Seq( 576 pipe_detail::Next<T>(center_->Ref()), 577 [center = center_->Ref()](absl::optional<T> value) { 578 bool open = value.has_value(); 579 bool cancelled = center->cancelled(); 580 return If( 581 open, 582 [center = std::move(center), value = std::move(value)]() mutable { 583 auto run = center->Run(std::move(value)); 584 return Map(std::move(run), 585 [center = std::move(center)]( 586 absl::optional<T> value) mutable { 587 if (value.has_value()) { 588 center->value() = std::move(*value); 589 return NextResult<T>(std::move(center)); 590 } else { 591 center->MarkCancelled(); 592 return NextResult<T>(true); 593 } 594 }); 595 }, 596 [cancelled]() { return NextResult<T>(cancelled); }); 597 }); 598 } 599 600 // Return a promise that resolves when the receiver is closed. 601 // The resolved value is a bool - true if the pipe was cancelled, false if it 602 // was closed successfully. 603 // Checks closed from the receivers perspective: that is, if there is a value 604 // in the pipe but the pipe is closed, reports open until that value is read. 605 auto AwaitClosed() { 606 return [center = center_]() { return center->PollClosedForReceiver(); }; 607 } 608 609 auto AwaitEmpty() { 610 return [center = center_]() { return center->PollEmpty(); }; 611 } 612 613 void CloseWithError() { 614 if (center_ != nullptr) { 615 center_->MarkCancelled(); 616 center_.reset(); 617 } 618 } 619 620 // Interject PromiseFactory f into the pipeline. 621 // f will be called with the current value traversing the pipe, and should 622 // return a value to replace it with. 623 // Interjects at the Next end of the pipe. 624 template <typename Fn> 625 void InterceptAndMap(Fn f, DebugLocation from = {}) { 626 center_->AppendMap(std::move(f), from); 627 } 628 629 // Per above, but calls cleanup_fn when the pipe is closed. 630 template <typename Fn, typename OnHalfClose> 631 void InterceptAndMapWithHalfClose(Fn f, OnHalfClose cleanup_fn, 632 DebugLocation from = {}) { 633 center_->AppendMapWithCleanup(std::move(f), std::move(cleanup_fn), from); 634 } 635 636 private: 637 friend struct Pipe<T>; 638 explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {} 639 RefCountedPtr<pipe_detail::Center<T>> center_; 640 641 // Make failure to destruct show up in ASAN builds. 642 #ifndef NDEBUG 643 std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0); 644 #endif 645 }; 646 647 namespace pipe_detail { 648 649 // Implementation of PipeSender::Push promise. 650 template <typename T> 651 class Push { 652 public: 653 Push(const Push&) = delete; 654 655 Push& operator=(const Push&) = delete; 656 Push(Push&& other) noexcept = default; 657 Push& operator=(Push&& other) noexcept = default; 658 659 Poll<bool> operator()() { 660 if (center_ == nullptr) { 661 if (grpc_trace_promise_primitives.enabled()) { 662 gpr_log(GPR_DEBUG, "%s Pipe push has a null center", 663 Activity::current()->DebugTag().c_str()); 664 } 665 return false; 666 } 667 if (auto* p = absl::get_if<T>(&state_)) { 668 auto r = center_->Push(p); 669 if (auto* ok = r.value_if_ready()) { 670 state_.template emplace<AwaitingAck>(); 671 if (!*ok) return false; 672 } else { 673 return Pending{}; 674 } 675 } 676 GPR_DEBUG_ASSERT(absl::holds_alternative<AwaitingAck>(state_)); 677 return center_->PollAck(); 678 } 679 680 private: 681 struct AwaitingAck {}; 682 683 friend class PipeSender<T>; 684 explicit Push(RefCountedPtr<pipe_detail::Center<T>> center, T push) 685 : center_(std::move(center)), state_(std::move(push)) {} 686 687 RefCountedPtr<Center<T>> center_; 688 absl::variant<T, AwaitingAck> state_; 689 }; 690 691 } // namespace pipe_detail 692 693 template <typename T> 694 pipe_detail::Push<T> PipeSender<T>::Push(T value) { 695 return pipe_detail::Push<T>(center_ == nullptr ? nullptr : center_->Ref(), 696 std::move(value)); 697 } 698 699 template <typename T> 700 using PipeReceiverNextType = decltype(std::declval<PipeReceiver<T>>().Next()); 701 702 template <typename T> 703 bool NextResult<T>::has_value() const { 704 return center_ != nullptr; 705 } 706 707 template <typename T> 708 T& NextResult<T>::operator*() { 709 return center_->value(); 710 } 711 712 template <typename T> 713 const T& NextResult<T>::operator*() const { 714 return center_->value(); 715 } 716 717 template <typename T> 718 NextResult<T>::~NextResult() { 719 if (center_ != nullptr) center_->AckNext(); 720 } 721 722 template <typename T> 723 void NextResult<T>::reset() { 724 if (center_ != nullptr) { 725 center_->AckNext(); 726 center_.reset(); 727 } 728 } 729 730 // A Pipe is an intra-Activity communications channel that transmits T's from 731 // one end to the other. 732 // It is only safe to use a Pipe within the context of a single Activity. 733 // No synchronization is performed internally. 734 // The primary Pipe data structure is allocated from an arena, so the activity 735 // must have an arena as part of its context. 736 // By performing that allocation we can ensure stable pointer to shared data 737 // allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their 738 // implementation. 739 // This type has been optimized with the expectation that there are relatively 740 // few pipes per activity. If this assumption does not hold then a design 741 // allowing inline filtering of pipe contents (instead of connecting pipes with 742 // polling code) would likely be more appropriate. 743 template <typename T> 744 struct Pipe { 745 Pipe() : Pipe(GetContext<Arena>()) {} 746 explicit Pipe(Arena* arena) : Pipe(arena->New<pipe_detail::Center<T>>()) {} 747 Pipe(const Pipe&) = delete; 748 Pipe& operator=(const Pipe&) = delete; 749 Pipe(Pipe&&) noexcept = default; 750 Pipe& operator=(Pipe&&) noexcept = default; 751 752 PipeSender<T> sender; 753 PipeReceiver<T> receiver; 754 755 private: 756 explicit Pipe(pipe_detail::Center<T>* center) 757 : sender(center), receiver(center) {} 758 }; 759 760 } // namespace grpc_core 761 762 #endif // GRPC_SRC_CORE_LIB_PROMISE_PIPE_H 763