1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #include <grpc/support/port_platform.h> 16 17 #include "src/core/lib/promise/sleep.h" 18 19 #include <utility> 20 21 #include <grpc/event_engine/event_engine.h> 22 23 #include "src/core/lib/event_engine/default_event_engine.h" // IWYU pragma: keep 24 #include "src/core/lib/gprpp/time.h" 25 #include "src/core/lib/iomgr/exec_ctx.h" 26 #include "src/core/lib/promise/activity.h" 27 #include "src/core/lib/promise/context.h" 28 #include "src/core/lib/promise/poll.h" 29 30 namespace grpc_core { 31 32 using ::grpc_event_engine::experimental::EventEngine; 33 Sleep(Timestamp deadline)34Sleep::Sleep(Timestamp deadline) : deadline_(deadline) {} 35 ~Sleep()36Sleep::~Sleep() { 37 if (closure_ != nullptr) closure_->Cancel(); 38 } 39 operator ()()40Poll<absl::Status> Sleep::operator()() { 41 // Invalidate now so that we see a fresh version of the time. 42 // TODO(ctiller): the following can be safely removed when we remove ExecCtx. 43 ExecCtx::Get()->InvalidateNow(); 44 const auto now = Timestamp::Now(); 45 // If the deadline is earlier than now we can just return. 46 if (deadline_ <= now) return absl::OkStatus(); 47 if (closure_ == nullptr) { 48 // TODO(ctiller): it's likely we'll want a pool of closures - probably per 49 // cpu? - to avoid allocating/deallocating on fast paths. 50 closure_ = new ActiveClosure(deadline_); 51 } 52 if (closure_->HasRun()) return absl::OkStatus(); 53 return Pending{}; 54 } 55 ActiveClosure(Timestamp deadline)56Sleep::ActiveClosure::ActiveClosure(Timestamp deadline) 57 : waker_(Activity::current()->MakeOwningWaker()), 58 timer_handle_(GetContext<EventEngine>()->RunAfter( 59 deadline - Timestamp::Now(), this)) {} 60 Run()61void Sleep::ActiveClosure::Run() { 62 ApplicationCallbackExecCtx callback_exec_ctx; 63 ExecCtx exec_ctx; 64 auto waker = std::move(waker_); 65 if (Unref()) { 66 delete this; 67 } else { 68 waker.Wakeup(); 69 } 70 } 71 Cancel()72void Sleep::ActiveClosure::Cancel() { 73 // If we cancel correctly then we must own both refs still and can simply 74 // delete without unreffing twice, otherwise try unreffing since this may be 75 // the last owned ref. 76 if (HasRun() || GetContext<EventEngine>()->Cancel(timer_handle_) || Unref()) { 77 delete this; 78 } 79 } 80 Unref()81bool Sleep::ActiveClosure::Unref() { 82 return (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1); 83 } 84 HasRun() const85bool Sleep::ActiveClosure::HasRun() const { 86 // If the closure has run (ie woken up the activity) then it will have 87 // decremented this ref count once. 88 return refs_.load(std::memory_order_acquire) == 1; 89 } 90 91 } // namespace grpc_core 92