1*14675a02SAndroid Build Coastguard Worker /*
2*14675a02SAndroid Build Coastguard Worker * Copyright 2020 Google LLC
3*14675a02SAndroid Build Coastguard Worker *
4*14675a02SAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License");
5*14675a02SAndroid Build Coastguard Worker * you may not use this file except in compliance with the License.
6*14675a02SAndroid Build Coastguard Worker * You may obtain a copy of the License at
7*14675a02SAndroid Build Coastguard Worker *
8*14675a02SAndroid Build Coastguard Worker * http://www.apache.org/licenses/LICENSE-2.0
9*14675a02SAndroid Build Coastguard Worker *
10*14675a02SAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software
11*14675a02SAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS,
12*14675a02SAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*14675a02SAndroid Build Coastguard Worker * See the License for the specific language governing permissions and
14*14675a02SAndroid Build Coastguard Worker * limitations under the License.
15*14675a02SAndroid Build Coastguard Worker */
16*14675a02SAndroid Build Coastguard Worker
17*14675a02SAndroid Build Coastguard Worker #include "fcp/base/clock.h"
18*14675a02SAndroid Build Coastguard Worker
19*14675a02SAndroid Build Coastguard Worker #include <algorithm>
20*14675a02SAndroid Build Coastguard Worker #include <iterator>
21*14675a02SAndroid Build Coastguard Worker #include <memory>
22*14675a02SAndroid Build Coastguard Worker #include <thread> // NOLINT(build/c++11)
23*14675a02SAndroid Build Coastguard Worker
24*14675a02SAndroid Build Coastguard Worker #include "absl/synchronization/mutex.h"
25*14675a02SAndroid Build Coastguard Worker #include "absl/time/time.h"
26*14675a02SAndroid Build Coastguard Worker #include "fcp/base/monitoring.h"
27*14675a02SAndroid Build Coastguard Worker
28*14675a02SAndroid Build Coastguard Worker namespace fcp {
29*14675a02SAndroid Build Coastguard Worker
30*14675a02SAndroid Build Coastguard Worker // Implements global realtime clock that uses timers to schedule wake-up of
31*14675a02SAndroid Build Coastguard Worker // waiters.
32*14675a02SAndroid Build Coastguard Worker class RealTimeClock : public Clock {
33*14675a02SAndroid Build Coastguard Worker public:
34*14675a02SAndroid Build Coastguard Worker RealTimeClock();
~RealTimeClock()35*14675a02SAndroid Build Coastguard Worker ~RealTimeClock() override {
36*14675a02SAndroid Build Coastguard Worker FCP_LOG(FATAL) << "RealTimeClock should never be destroyed";
37*14675a02SAndroid Build Coastguard Worker }
38*14675a02SAndroid Build Coastguard Worker
39*14675a02SAndroid Build Coastguard Worker // Returns the current time.
Now()40*14675a02SAndroid Build Coastguard Worker absl::Time Now() override { return absl::Now(); }
NowLocked()41*14675a02SAndroid Build Coastguard Worker absl::Time NowLocked() override { return absl::Now(); }
42*14675a02SAndroid Build Coastguard Worker
43*14675a02SAndroid Build Coastguard Worker // Sleeps for the specified duration.
Sleep(absl::Duration d)44*14675a02SAndroid Build Coastguard Worker void Sleep(absl::Duration d) override { absl::SleepFor(d); }
45*14675a02SAndroid Build Coastguard Worker
46*14675a02SAndroid Build Coastguard Worker // Schedules wakeup at the specified wakeup_time.
47*14675a02SAndroid Build Coastguard Worker void ScheduleWakeup(absl::Time wakeup_time) override;
48*14675a02SAndroid Build Coastguard Worker
49*14675a02SAndroid Build Coastguard Worker private:
50*14675a02SAndroid Build Coastguard Worker // The worker loop that performs the sleep and dispatches wake-up calls.
51*14675a02SAndroid Build Coastguard Worker void WorkerLoop();
52*14675a02SAndroid Build Coastguard Worker
53*14675a02SAndroid Build Coastguard Worker // The currently scheduled wake-up time. There is at most one wake-up
54*14675a02SAndroid Build Coastguard Worker // time per process.
55*14675a02SAndroid Build Coastguard Worker absl::Time next_wakeup_ ABSL_GUARDED_BY(&wakeup_mu_) = absl::InfiniteFuture();
56*14675a02SAndroid Build Coastguard Worker // Mutex that protects next_wakeup and used with the wake-up CondVar.
57*14675a02SAndroid Build Coastguard Worker absl::Mutex wakeup_mu_;
58*14675a02SAndroid Build Coastguard Worker // CondVar used to sleep until the next wake-up deadline.
59*14675a02SAndroid Build Coastguard Worker absl::CondVar wakeup_condvar_;
60*14675a02SAndroid Build Coastguard Worker // Worker thread that runs the worker loop function. Since this class
61*14675a02SAndroid Build Coastguard Worker // is singleton, there is only one thread per process, and that thread is
62*14675a02SAndroid Build Coastguard Worker // never terminated.
63*14675a02SAndroid Build Coastguard Worker std::unique_ptr<std::thread> worker_thread_;
64*14675a02SAndroid Build Coastguard Worker };
65*14675a02SAndroid Build Coastguard Worker
RealClock()66*14675a02SAndroid Build Coastguard Worker Clock* Clock::RealClock() {
67*14675a02SAndroid Build Coastguard Worker static Clock* realtime_clock = new RealTimeClock;
68*14675a02SAndroid Build Coastguard Worker return realtime_clock;
69*14675a02SAndroid Build Coastguard Worker }
70*14675a02SAndroid Build Coastguard Worker
WakeupWithDeadline(absl::Time deadline,const std::shared_ptr<Clock::Waiter> & waiter)71*14675a02SAndroid Build Coastguard Worker void Clock::WakeupWithDeadline(absl::Time deadline,
72*14675a02SAndroid Build Coastguard Worker const std::shared_ptr<Clock::Waiter>& waiter) {
73*14675a02SAndroid Build Coastguard Worker // Insert the new waiter into the map ordered by its deadline.
74*14675a02SAndroid Build Coastguard Worker // Waiters with matching deadlines are inserted into the same bucket and
75*14675a02SAndroid Build Coastguard Worker // their order within the bucket is preserved.
76*14675a02SAndroid Build Coastguard Worker {
77*14675a02SAndroid Build Coastguard Worker absl::MutexLock lock(mutex());
78*14675a02SAndroid Build Coastguard Worker WaiterMap::iterator it;
79*14675a02SAndroid Build Coastguard Worker if ((it = pending_waiters_.find(deadline)) != pending_waiters_.end()) {
80*14675a02SAndroid Build Coastguard Worker it->second.push_back(waiter);
81*14675a02SAndroid Build Coastguard Worker } else {
82*14675a02SAndroid Build Coastguard Worker pending_waiters_.insert(std::make_pair(deadline, WaiterList{waiter}));
83*14675a02SAndroid Build Coastguard Worker }
84*14675a02SAndroid Build Coastguard Worker }
85*14675a02SAndroid Build Coastguard Worker
86*14675a02SAndroid Build Coastguard Worker // Inserting a new waiter may trigger an immediate wake-up if the deadline
87*14675a02SAndroid Build Coastguard Worker // is due. Otherwise a new wake-up is scheduled at the end on the dispatch.
88*14675a02SAndroid Build Coastguard Worker DispatchWakeups();
89*14675a02SAndroid Build Coastguard Worker }
90*14675a02SAndroid Build Coastguard Worker
91*14675a02SAndroid Build Coastguard Worker // DispatchWakeup performs the following actions in the loop:
92*14675a02SAndroid Build Coastguard Worker // - Check for reentrancy to avoid more than one concurrent dispatch loop
93*14675a02SAndroid Build Coastguard Worker // - Take out all waiters that are due
94*14675a02SAndroid Build Coastguard Worker // - Make WakeUp calls on all of those waiters. This step is done outside
95*14675a02SAndroid Build Coastguard Worker // of lock because it may potentially take longer time and new waiters may
96*14675a02SAndroid Build Coastguard Worker // potentially be inserted during that step.
97*14675a02SAndroid Build Coastguard Worker // - If there are any waiters that are still due at that point (because the
98*14675a02SAndroid Build Coastguard Worker // the previous step took too long and new waiters have expired or because
99*14675a02SAndroid Build Coastguard Worker // there were any new waiters inserted during the previous steps), loop
100*14675a02SAndroid Build Coastguard Worker // back and repeat the previous steps.
101*14675a02SAndroid Build Coastguard Worker // - Otherwise finish the dispatch by scheduling a new wakeup for the bucket
102*14675a02SAndroid Build Coastguard Worker // that expires the soonest.
DispatchWakeups()103*14675a02SAndroid Build Coastguard Worker void Clock::DispatchWakeups() {
104*14675a02SAndroid Build Coastguard Worker do {
105*14675a02SAndroid Build Coastguard Worker if (CheckReentrancy()) {
106*14675a02SAndroid Build Coastguard Worker // Avoid reentrancy. An ongoing DispatchWakeups() call will take care
107*14675a02SAndroid Build Coastguard Worker // of dispatching any new due wakeups if necessary.
108*14675a02SAndroid Build Coastguard Worker // If there is a race condition, only one of dispatch calls will go
109*14675a02SAndroid Build Coastguard Worker // through and all other will just increment the dispatch_level and
110*14675a02SAndroid Build Coastguard Worker // return.
111*14675a02SAndroid Build Coastguard Worker return;
112*14675a02SAndroid Build Coastguard Worker }
113*14675a02SAndroid Build Coastguard Worker
114*14675a02SAndroid Build Coastguard Worker // Collect waiters that are due.
115*14675a02SAndroid Build Coastguard Worker WaiterList wakeup_calls = GetExpiredWaiters();
116*14675a02SAndroid Build Coastguard Worker
117*14675a02SAndroid Build Coastguard Worker // Dispatch WakeUp calls to those waiters.
118*14675a02SAndroid Build Coastguard Worker for (const auto& waiter : wakeup_calls) {
119*14675a02SAndroid Build Coastguard Worker waiter->WakeUp();
120*14675a02SAndroid Build Coastguard Worker }
121*14675a02SAndroid Build Coastguard Worker // One more dispatch loop may be needed if there were any reentrant calls
122*14675a02SAndroid Build Coastguard Worker // or if WakeUp() calls took so long that more waiters have become due.
123*14675a02SAndroid Build Coastguard Worker } while (!FinishDispatchAndScheduleNextWakeup());
124*14675a02SAndroid Build Coastguard Worker }
125*14675a02SAndroid Build Coastguard Worker
126*14675a02SAndroid Build Coastguard Worker // Called at the beginning of dispatch loop.
127*14675a02SAndroid Build Coastguard Worker // Increments dispatch_level_ and returns true if there is already
128*14675a02SAndroid Build Coastguard Worker // another dispatch call in progress.
CheckReentrancy()129*14675a02SAndroid Build Coastguard Worker bool Clock::CheckReentrancy() {
130*14675a02SAndroid Build Coastguard Worker absl::MutexLock lock(mutex());
131*14675a02SAndroid Build Coastguard Worker return ++dispatch_level_ > 1;
132*14675a02SAndroid Build Coastguard Worker }
133*14675a02SAndroid Build Coastguard Worker
134*14675a02SAndroid Build Coastguard Worker // Iterate through waiter buckets ordered by deadline time and take out all
135*14675a02SAndroid Build Coastguard Worker // waiters that are due.
GetExpiredWaiters()136*14675a02SAndroid Build Coastguard Worker Clock::WaiterList Clock::GetExpiredWaiters() {
137*14675a02SAndroid Build Coastguard Worker absl::MutexLock lock(mutex());
138*14675a02SAndroid Build Coastguard Worker absl::Time now = NowLocked();
139*14675a02SAndroid Build Coastguard Worker std::vector<std::shared_ptr<Waiter>> wakeup_calls;
140*14675a02SAndroid Build Coastguard Worker WaiterMap::iterator iter;
141*14675a02SAndroid Build Coastguard Worker
142*14675a02SAndroid Build Coastguard Worker while ((iter = pending_waiters_.begin()) != pending_waiters_.end() &&
143*14675a02SAndroid Build Coastguard Worker iter->first <= now) {
144*14675a02SAndroid Build Coastguard Worker std::move(iter->second.begin(), iter->second.end(),
145*14675a02SAndroid Build Coastguard Worker std::back_inserter(wakeup_calls));
146*14675a02SAndroid Build Coastguard Worker pending_waiters_.erase(iter);
147*14675a02SAndroid Build Coastguard Worker }
148*14675a02SAndroid Build Coastguard Worker return wakeup_calls;
149*14675a02SAndroid Build Coastguard Worker }
150*14675a02SAndroid Build Coastguard Worker
151*14675a02SAndroid Build Coastguard Worker // Called at the end of dispatch loop to check post-dispatch conditions,
152*14675a02SAndroid Build Coastguard Worker // reset re-entracy level, and schedule a next dispatch if needed.
153*14675a02SAndroid Build Coastguard Worker // Returns true if the dispatch loop has ended.
154*14675a02SAndroid Build Coastguard Worker // Returns false if more the dispatch loop needs to be repeated.
FinishDispatchAndScheduleNextWakeup()155*14675a02SAndroid Build Coastguard Worker bool Clock::FinishDispatchAndScheduleNextWakeup() {
156*14675a02SAndroid Build Coastguard Worker absl::MutexLock lock(mutex());
157*14675a02SAndroid Build Coastguard Worker int current_dispatch_level = dispatch_level_;
158*14675a02SAndroid Build Coastguard Worker dispatch_level_ = 0;
159*14675a02SAndroid Build Coastguard Worker
160*14675a02SAndroid Build Coastguard Worker if (!pending_waiters_.empty()) {
161*14675a02SAndroid Build Coastguard Worker if (current_dispatch_level > 1) {
162*14675a02SAndroid Build Coastguard Worker // There was another dispatch call while this one was in progress.
163*14675a02SAndroid Build Coastguard Worker // One more dispatch loop is needed.
164*14675a02SAndroid Build Coastguard Worker return false;
165*14675a02SAndroid Build Coastguard Worker }
166*14675a02SAndroid Build Coastguard Worker
167*14675a02SAndroid Build Coastguard Worker absl::Time next_wakeup = pending_waiters_.begin()->first;
168*14675a02SAndroid Build Coastguard Worker if (next_wakeup <= NowLocked()) {
169*14675a02SAndroid Build Coastguard Worker // One more dispatch loop is needed because a new waiter has become due
170*14675a02SAndroid Build Coastguard Worker // while the wake-ups were dispatched.
171*14675a02SAndroid Build Coastguard Worker return false;
172*14675a02SAndroid Build Coastguard Worker }
173*14675a02SAndroid Build Coastguard Worker
174*14675a02SAndroid Build Coastguard Worker // Schedule DispatchWakeups() to be called at a future next_wakeup time.
175*14675a02SAndroid Build Coastguard Worker ScheduleWakeup(next_wakeup);
176*14675a02SAndroid Build Coastguard Worker }
177*14675a02SAndroid Build Coastguard Worker
178*14675a02SAndroid Build Coastguard Worker return true;
179*14675a02SAndroid Build Coastguard Worker }
180*14675a02SAndroid Build Coastguard Worker
RealTimeClock()181*14675a02SAndroid Build Coastguard Worker RealTimeClock::RealTimeClock() {
182*14675a02SAndroid Build Coastguard Worker worker_thread_ =
183*14675a02SAndroid Build Coastguard Worker std::make_unique<std::thread>([this] { this->WorkerLoop(); });
184*14675a02SAndroid Build Coastguard Worker }
185*14675a02SAndroid Build Coastguard Worker
WorkerLoop()186*14675a02SAndroid Build Coastguard Worker void RealTimeClock::WorkerLoop() {
187*14675a02SAndroid Build Coastguard Worker for (;;) {
188*14675a02SAndroid Build Coastguard Worker bool dispatch = false;
189*14675a02SAndroid Build Coastguard Worker
190*14675a02SAndroid Build Coastguard Worker {
191*14675a02SAndroid Build Coastguard Worker absl::MutexLock lock(&wakeup_mu_);
192*14675a02SAndroid Build Coastguard Worker wakeup_condvar_.WaitWithDeadline(&wakeup_mu_, next_wakeup_);
193*14675a02SAndroid Build Coastguard Worker if (Now() >= next_wakeup_) {
194*14675a02SAndroid Build Coastguard Worker dispatch = true;
195*14675a02SAndroid Build Coastguard Worker next_wakeup_ = absl::InfiniteFuture();
196*14675a02SAndroid Build Coastguard Worker }
197*14675a02SAndroid Build Coastguard Worker }
198*14675a02SAndroid Build Coastguard Worker
199*14675a02SAndroid Build Coastguard Worker if (dispatch) {
200*14675a02SAndroid Build Coastguard Worker DispatchWakeups();
201*14675a02SAndroid Build Coastguard Worker }
202*14675a02SAndroid Build Coastguard Worker }
203*14675a02SAndroid Build Coastguard Worker }
204*14675a02SAndroid Build Coastguard Worker
205*14675a02SAndroid Build Coastguard Worker // RealTimeClock implementation of ScheduleWakeup.
ScheduleWakeup(absl::Time wakeup_time)206*14675a02SAndroid Build Coastguard Worker void RealTimeClock::ScheduleWakeup(absl::Time wakeup_time) {
207*14675a02SAndroid Build Coastguard Worker absl::MutexLock lock(&wakeup_mu_);
208*14675a02SAndroid Build Coastguard Worker
209*14675a02SAndroid Build Coastguard Worker // Optimization: round wakeup_time up to whole milliseconds.
210*14675a02SAndroid Build Coastguard Worker wakeup_time = absl::FromUDate(ceil(absl::ToUDate(wakeup_time)));
211*14675a02SAndroid Build Coastguard Worker
212*14675a02SAndroid Build Coastguard Worker // ScheduleWakeup may be called repeatedly with the same time if a new timer
213*14675a02SAndroid Build Coastguard Worker // is created in the future after already existing timer. In that case
214*14675a02SAndroid Build Coastguard Worker // this function continues relying on already scheduled wake-up time.
215*14675a02SAndroid Build Coastguard Worker // A new ScheduleWakeup call will be made from within DispatchWakeups() once
216*14675a02SAndroid Build Coastguard Worker // the current timer expires.
217*14675a02SAndroid Build Coastguard Worker if (wakeup_time == next_wakeup_) {
218*14675a02SAndroid Build Coastguard Worker return;
219*14675a02SAndroid Build Coastguard Worker }
220*14675a02SAndroid Build Coastguard Worker
221*14675a02SAndroid Build Coastguard Worker next_wakeup_ = wakeup_time;
222*14675a02SAndroid Build Coastguard Worker wakeup_condvar_.Signal();
223*14675a02SAndroid Build Coastguard Worker }
224*14675a02SAndroid Build Coastguard Worker
225*14675a02SAndroid Build Coastguard Worker } // namespace fcp
226