1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 //
17
18 #include <grpc/support/port_platform.h>
19
20 #include <atomic>
21 #include <functional>
22 #include <memory>
23 #include <utility>
24
25 #include "absl/status/status.h"
26
27 #include <grpc/event_engine/event_engine.h>
28 #include <grpc/grpc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/sync.h>
31 #include <grpc/support/time.h>
32 #include <grpcpp/alarm.h>
33 #include <grpcpp/completion_queue.h>
34 #include <grpcpp/impl/completion_queue_tag.h>
35
36 #include "src/core/lib/event_engine/default_event_engine.h"
37 #include "src/core/lib/gprpp/time.h"
38 #include "src/core/lib/iomgr/error.h"
39 #include "src/core/lib/iomgr/exec_ctx.h"
40 #include "src/core/lib/surface/completion_queue.h"
41
42 namespace grpc {
43
44 namespace internal {
45
46 namespace {
47 using grpc_event_engine::experimental::EventEngine;
48 } // namespace
49
50 class AlarmImpl : public grpc::internal::CompletionQueueTag {
51 public:
AlarmImpl()52 AlarmImpl()
53 : event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()),
54 cq_(nullptr),
55 tag_(nullptr) {
56 gpr_ref_init(&refs_, 1);
57 }
~AlarmImpl()58 ~AlarmImpl() override {}
FinalizeResult(void ** tag,bool *)59 bool FinalizeResult(void** tag, bool* /*status*/) override {
60 *tag = tag_;
61 Unref();
62 return true;
63 }
Set(grpc::CompletionQueue * cq,gpr_timespec deadline,void * tag)64 void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
65 grpc_core::ExecCtx exec_ctx;
66 GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
67 cq_ = cq->cq();
68 tag_ = tag;
69 GPR_ASSERT(grpc_cq_begin_op(cq_, this));
70 Ref();
71 GPR_ASSERT(cq_armed_.exchange(true) == false);
72 GPR_ASSERT(!callback_armed_.load());
73 cq_timer_handle_ = event_engine_->RunAfter(
74 grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
75 grpc_core::ExecCtx::Get()->Now(),
76 [this] { OnCQAlarm(absl::OkStatus()); });
77 }
Set(gpr_timespec deadline,std::function<void (bool)> f)78 void Set(gpr_timespec deadline, std::function<void(bool)> f) {
79 grpc_core::ExecCtx exec_ctx;
80 // Don't use any CQ at all. Instead just use the timer to fire the function
81 callback_ = std::move(f);
82 Ref();
83 GPR_ASSERT(callback_armed_.exchange(true) == false);
84 GPR_ASSERT(!cq_armed_.load());
85 callback_timer_handle_ = event_engine_->RunAfter(
86 grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
87 grpc_core::ExecCtx::Get()->Now(),
88 [this] { OnCallbackAlarm(true); });
89 }
Cancel()90 void Cancel() {
91 grpc_core::ExecCtx exec_ctx;
92 if (callback_armed_.load() &&
93 event_engine_->Cancel(callback_timer_handle_)) {
94 event_engine_->Run([this] { OnCallbackAlarm(/*is_ok=*/false); });
95 }
96 if (cq_armed_.load() && event_engine_->Cancel(cq_timer_handle_)) {
97 event_engine_->Run(
98 [this] { OnCQAlarm(absl::CancelledError("cancelled")); });
99 }
100 }
Destroy()101 void Destroy() {
102 Cancel();
103 Unref();
104 }
105
106 private:
OnCQAlarm(grpc_error_handle error)107 void OnCQAlarm(grpc_error_handle error) {
108 cq_armed_.store(false);
109 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
110 grpc_core::ExecCtx exec_ctx;
111 // Preserve the cq and reset the cq_ so that the alarm
112 // can be reset when the alarm tag is delivered.
113 grpc_completion_queue* cq = cq_;
114 cq_ = nullptr;
115 grpc_cq_end_op(
116 cq, this, error,
117 [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr,
118 &completion_);
119 GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
120 }
121
OnCallbackAlarm(bool is_ok)122 void OnCallbackAlarm(bool is_ok) {
123 callback_armed_.store(false);
124 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
125 grpc_core::ExecCtx exec_ctx;
126 callback_(is_ok);
127 Unref();
128 }
129
Ref()130 void Ref() { gpr_ref(&refs_); }
Unref()131 void Unref() {
132 if (gpr_unref(&refs_)) {
133 delete this;
134 }
135 }
136
137 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
138 std::atomic<bool> cq_armed_{false};
139 EventEngine::TaskHandle cq_timer_handle_ = EventEngine::TaskHandle::kInvalid;
140 std::atomic<bool> callback_armed_{false};
141 EventEngine::TaskHandle callback_timer_handle_ =
142 EventEngine::TaskHandle::kInvalid;
143 gpr_refcount refs_;
144 grpc_cq_completion completion_;
145 // completion queue where events about this alarm will be posted
146 grpc_completion_queue* cq_;
147 void* tag_;
148 std::function<void(bool)> callback_;
149 };
150 } // namespace internal
151
Alarm()152 Alarm::Alarm() : alarm_(new internal::AlarmImpl()) {}
153
SetInternal(grpc::CompletionQueue * cq,gpr_timespec deadline,void * tag)154 void Alarm::SetInternal(grpc::CompletionQueue* cq, gpr_timespec deadline,
155 void* tag) {
156 // Note that we know that alarm_ is actually an internal::AlarmImpl
157 // but we declared it as the base pointer to avoid a forward declaration
158 // or exposing core data structures in the C++ public headers.
159 // Thus it is safe to use a static_cast to the subclass here, and the
160 // C++ style guide allows us to do so in this case
161 static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
162 }
163
SetInternal(gpr_timespec deadline,std::function<void (bool)> f)164 void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
165 // Note that we know that alarm_ is actually an internal::AlarmImpl
166 // but we declared it as the base pointer to avoid a forward declaration
167 // or exposing core data structures in the C++ public headers.
168 // Thus it is safe to use a static_cast to the subclass here, and the
169 // C++ style guide allows us to do so in this case
170 static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
171 }
172
~Alarm()173 Alarm::~Alarm() {
174 if (alarm_ != nullptr) {
175 static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
176 }
177 }
178
Cancel()179 void Alarm::Cancel() { static_cast<internal::AlarmImpl*>(alarm_)->Cancel(); }
180 } // namespace grpc
181