1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 //
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <functional>
21 #include <utility>
22 
23 #include <grpc/grpc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/sync.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/alarm.h>
28 #include <grpcpp/completion_queue.h>
29 #include <grpcpp/impl/completion_queue_tag.h>
30 
31 #include "src/core/lib/gprpp/time.h"
32 #include "src/core/lib/iomgr/closure.h"
33 #include "src/core/lib/iomgr/error.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "src/core/lib/iomgr/executor.h"
36 #include "src/core/lib/iomgr/timer.h"
37 #include "src/core/lib/surface/completion_queue.h"
38 
39 namespace grpc {
40 
41 namespace internal {
42 class AlarmImpl : public grpc::internal::CompletionQueueTag {
43  public:
AlarmImpl()44   AlarmImpl() : cq_(nullptr), tag_(nullptr) {
45     gpr_ref_init(&refs_, 1);
46     grpc_timer_init_unset(&timer_);
47   }
~AlarmImpl()48   ~AlarmImpl() override {}
FinalizeResult(void ** tag,bool *)49   bool FinalizeResult(void** tag, bool* /*status*/) override {
50     *tag = tag_;
51     Unref();
52     return true;
53   }
Set(grpc::CompletionQueue * cq,gpr_timespec deadline,void * tag)54   void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
55     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
56     grpc_core::ExecCtx exec_ctx;
57     GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
58     cq_ = cq->cq();
59     tag_ = tag;
60     GPR_ASSERT(grpc_cq_begin_op(cq_, this));
61     GRPC_CLOSURE_INIT(
62         &on_alarm_,
63         [](void* arg, grpc_error_handle error) {
64           // queue the op on the completion queue
65           AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
66           alarm->Ref();
67           // Preserve the cq and reset the cq_ so that the alarm
68           // can be reset when the alarm tag is delivered.
69           grpc_completion_queue* cq = alarm->cq_;
70           alarm->cq_ = nullptr;
71           grpc_cq_end_op(
72               cq, alarm, error,
73               [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, arg,
74               &alarm->completion_);
75           GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
76         },
77         this, grpc_schedule_on_exec_ctx);
78     grpc_timer_init(&timer_,
79                     grpc_core::Timestamp::FromTimespecRoundUp(deadline),
80                     &on_alarm_);
81   }
Set(gpr_timespec deadline,std::function<void (bool)> f)82   void Set(gpr_timespec deadline, std::function<void(bool)> f) {
83     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
84     grpc_core::ExecCtx exec_ctx;
85     // Don't use any CQ at all. Instead just use the timer to fire the function
86     callback_ = std::move(f);
87     Ref();
88     GRPC_CLOSURE_INIT(
89         &on_alarm_,
90         [](void* arg, grpc_error_handle error) {
91           grpc_core::Executor::Run(GRPC_CLOSURE_CREATE(
92                                        [](void* arg, grpc_error_handle error) {
93                                          AlarmImpl* alarm =
94                                              static_cast<AlarmImpl*>(arg);
95                                          alarm->callback_(error.ok());
96                                          alarm->Unref();
97                                        },
98                                        arg, nullptr),
99                                    error);
100         },
101         this, grpc_schedule_on_exec_ctx);
102     grpc_timer_init(&timer_,
103                     grpc_core::Timestamp::FromTimespecRoundUp(deadline),
104                     &on_alarm_);
105   }
Cancel()106   void Cancel() {
107     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
108     grpc_core::ExecCtx exec_ctx;
109     grpc_timer_cancel(&timer_);
110   }
Destroy()111   void Destroy() {
112     Cancel();
113     Unref();
114   }
115 
116  private:
Ref()117   void Ref() { gpr_ref(&refs_); }
Unref()118   void Unref() {
119     if (gpr_unref(&refs_)) {
120       delete this;
121     }
122   }
123 
124   grpc_timer timer_;
125   gpr_refcount refs_;
126   grpc_closure on_alarm_;
127   grpc_cq_completion completion_;
128   // completion queue where events about this alarm will be posted
129   grpc_completion_queue* cq_;
130   void* tag_;
131   std::function<void(bool)> callback_;
132 };
133 }  // namespace internal
134 
Alarm()135 Alarm::Alarm() : alarm_(new internal::AlarmImpl()) {}
136 
SetInternal(grpc::CompletionQueue * cq,gpr_timespec deadline,void * tag)137 void Alarm::SetInternal(grpc::CompletionQueue* cq, gpr_timespec deadline,
138                         void* tag) {
139   // Note that we know that alarm_ is actually an internal::AlarmImpl
140   // but we declared it as the base pointer to avoid a forward declaration
141   // or exposing core data structures in the C++ public headers.
142   // Thus it is safe to use a static_cast to the subclass here, and the
143   // C++ style guide allows us to do so in this case
144   static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
145 }
146 
SetInternal(gpr_timespec deadline,std::function<void (bool)> f)147 void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
148   // Note that we know that alarm_ is actually an internal::AlarmImpl
149   // but we declared it as the base pointer to avoid a forward declaration
150   // or exposing core data structures in the C++ public headers.
151   // Thus it is safe to use a static_cast to the subclass here, and the
152   // C++ style guide allows us to do so in this case
153   static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
154 }
155 
~Alarm()156 Alarm::~Alarm() {
157   if (alarm_ != nullptr) {
158     static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
159   }
160 }
161 
Cancel()162 void Alarm::Cancel() { static_cast<internal::AlarmImpl*>(alarm_)->Cancel(); }
163 }  // namespace grpc
164