1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 //
17
18 #include <grpc/support/port_platform.h>
19
20 #include <functional>
21 #include <utility>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/sync.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/alarm.h>
28 #include <grpcpp/completion_queue.h>
29 #include <grpcpp/impl/completion_queue_tag.h>
30
31 #include "src/core/lib/gprpp/time.h"
32 #include "src/core/lib/iomgr/closure.h"
33 #include "src/core/lib/iomgr/error.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "src/core/lib/iomgr/executor.h"
36 #include "src/core/lib/iomgr/timer.h"
37 #include "src/core/lib/surface/completion_queue.h"
38
39 namespace grpc {
40
41 namespace internal {
42 class AlarmImpl : public grpc::internal::CompletionQueueTag {
43 public:
AlarmImpl()44 AlarmImpl() : cq_(nullptr), tag_(nullptr) {
45 gpr_ref_init(&refs_, 1);
46 grpc_timer_init_unset(&timer_);
47 }
~AlarmImpl()48 ~AlarmImpl() override {}
FinalizeResult(void ** tag,bool *)49 bool FinalizeResult(void** tag, bool* /*status*/) override {
50 *tag = tag_;
51 Unref();
52 return true;
53 }
Set(grpc::CompletionQueue * cq,gpr_timespec deadline,void * tag)54 void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
55 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
56 grpc_core::ExecCtx exec_ctx;
57 GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
58 cq_ = cq->cq();
59 tag_ = tag;
60 GPR_ASSERT(grpc_cq_begin_op(cq_, this));
61 GRPC_CLOSURE_INIT(
62 &on_alarm_,
63 [](void* arg, grpc_error_handle error) {
64 // queue the op on the completion queue
65 AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
66 alarm->Ref();
67 // Preserve the cq and reset the cq_ so that the alarm
68 // can be reset when the alarm tag is delivered.
69 grpc_completion_queue* cq = alarm->cq_;
70 alarm->cq_ = nullptr;
71 grpc_cq_end_op(
72 cq, alarm, error,
73 [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg,
74 &alarm->completion_);
75 GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
76 },
77 this, grpc_schedule_on_exec_ctx);
78 grpc_timer_init(&timer_,
79 grpc_core::Timestamp::FromTimespecRoundUp(deadline),
80 &on_alarm_);
81 }
Set(gpr_timespec deadline,std::function<void (bool)> f)82 void Set(gpr_timespec deadline, std::function<void(bool)> f) {
83 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
84 grpc_core::ExecCtx exec_ctx;
85 // Don't use any CQ at all. Instead just use the timer to fire the function
86 callback_ = std::move(f);
87 Ref();
88 GRPC_CLOSURE_INIT(
89 &on_alarm_,
90 [](void* arg, grpc_error_handle error) {
91 grpc_core::Executor::Run(GRPC_CLOSURE_CREATE(
92 [](void* arg, grpc_error_handle error) {
93 AlarmImpl* alarm =
94 static_cast<AlarmImpl*>(arg);
95 alarm->callback_(error.ok());
96 alarm->Unref();
97 },
98 arg, nullptr),
99 error);
100 },
101 this, grpc_schedule_on_exec_ctx);
102 grpc_timer_init(&timer_,
103 grpc_core::Timestamp::FromTimespecRoundUp(deadline),
104 &on_alarm_);
105 }
Cancel()106 void Cancel() {
107 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
108 grpc_core::ExecCtx exec_ctx;
109 grpc_timer_cancel(&timer_);
110 }
Destroy()111 void Destroy() {
112 Cancel();
113 Unref();
114 }
115
116 private:
Ref()117 void Ref() { gpr_ref(&refs_); }
Unref()118 void Unref() {
119 if (gpr_unref(&refs_)) {
120 delete this;
121 }
122 }
123
124 grpc_timer timer_;
125 gpr_refcount refs_;
126 grpc_closure on_alarm_;
127 grpc_cq_completion completion_;
128 // completion queue where events about this alarm will be posted
129 grpc_completion_queue* cq_;
130 void* tag_;
131 std::function<void(bool)> callback_;
132 };
133 } // namespace internal
134
Alarm()135 Alarm::Alarm() : alarm_(new internal::AlarmImpl()) {}
136
SetInternal(grpc::CompletionQueue * cq,gpr_timespec deadline,void * tag)137 void Alarm::SetInternal(grpc::CompletionQueue* cq, gpr_timespec deadline,
138 void* tag) {
139 // Note that we know that alarm_ is actually an internal::AlarmImpl
140 // but we declared it as the base pointer to avoid a forward declaration
141 // or exposing core data structures in the C++ public headers.
142 // Thus it is safe to use a static_cast to the subclass here, and the
143 // C++ style guide allows us to do so in this case
144 static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
145 }
146
SetInternal(gpr_timespec deadline,std::function<void (bool)> f)147 void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
148 // Note that we know that alarm_ is actually an internal::AlarmImpl
149 // but we declared it as the base pointer to avoid a forward declaration
150 // or exposing core data structures in the C++ public headers.
151 // Thus it is safe to use a static_cast to the subclass here, and the
152 // C++ style guide allows us to do so in this case
153 static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
154 }
155
~Alarm()156 Alarm::~Alarm() {
157 if (alarm_ != nullptr) {
158 static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
159 }
160 }
161
Cancel()162 void Alarm::Cancel() { static_cast<internal::AlarmImpl*>(alarm_)->Cancel(); }
163 } // namespace grpc
164