1 /*
2 * Copyright 2020 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "fcp/base/clock.h"
18
19 #include <algorithm>
20 #include <iterator>
21 #include <memory>
22 #include <thread> // NOLINT(build/c++11)
23
24 #include "absl/synchronization/mutex.h"
25 #include "absl/time/time.h"
26 #include "fcp/base/monitoring.h"
27
28 namespace fcp {
29
30 // Implements global realtime clock that uses timers to schedule wake-up of
31 // waiters.
32 class RealTimeClock : public Clock {
33 public:
34 RealTimeClock();
~RealTimeClock()35 ~RealTimeClock() override {
36 FCP_LOG(FATAL) << "RealTimeClock should never be destroyed";
37 }
38
39 // Returns the current time.
Now()40 absl::Time Now() override { return absl::Now(); }
NowLocked()41 absl::Time NowLocked() override { return absl::Now(); }
42
43 // Sleeps for the specified duration.
Sleep(absl::Duration d)44 void Sleep(absl::Duration d) override { absl::SleepFor(d); }
45
46 // Schedules wakeup at the specified wakeup_time.
47 void ScheduleWakeup(absl::Time wakeup_time) override;
48
49 private:
50 // The worker loop that performs the sleep and dispatches wake-up calls.
51 void WorkerLoop();
52
53 // The currently scheduled wake-up time. There is at most one wake-up
54 // time per process.
55 absl::Time next_wakeup_ ABSL_GUARDED_BY(&wakeup_mu_) = absl::InfiniteFuture();
56 // Mutex that protects next_wakeup and used with the wake-up CondVar.
57 absl::Mutex wakeup_mu_;
58 // CondVar used to sleep until the next wake-up deadline.
59 absl::CondVar wakeup_condvar_;
60 // Worker thread that runs the worker loop function. Since this class
61 // is singleton, there is only one thread per process, and that thread is
62 // never terminated.
63 std::unique_ptr<std::thread> worker_thread_;
64 };
65
RealClock()66 Clock* Clock::RealClock() {
67 static Clock* realtime_clock = new RealTimeClock;
68 return realtime_clock;
69 }
70
WakeupWithDeadline(absl::Time deadline,const std::shared_ptr<Clock::Waiter> & waiter)71 void Clock::WakeupWithDeadline(absl::Time deadline,
72 const std::shared_ptr<Clock::Waiter>& waiter) {
73 // Insert the new waiter into the map ordered by its deadline.
74 // Waiters with matching deadlines are inserted into the same bucket and
75 // their order within the bucket is preserved.
76 {
77 absl::MutexLock lock(mutex());
78 WaiterMap::iterator it;
79 if ((it = pending_waiters_.find(deadline)) != pending_waiters_.end()) {
80 it->second.push_back(waiter);
81 } else {
82 pending_waiters_.insert(std::make_pair(deadline, WaiterList{waiter}));
83 }
84 }
85
86 // Inserting a new waiter may trigger an immediate wake-up if the deadline
87 // is due. Otherwise a new wake-up is scheduled at the end on the dispatch.
88 DispatchWakeups();
89 }
90
91 // DispatchWakeup performs the following actions in the loop:
92 // - Check for reentrancy to avoid more than one concurrent dispatch loop
93 // - Take out all waiters that are due
94 // - Make WakeUp calls on all of those waiters. This step is done outside
95 // of lock because it may potentially take longer time and new waiters may
96 // potentially be inserted during that step.
97 // - If there are any waiters that are still due at that point (because the
98 // the previous step took too long and new waiters have expired or because
99 // there were any new waiters inserted during the previous steps), loop
100 // back and repeat the previous steps.
101 // - Otherwise finish the dispatch by scheduling a new wakeup for the bucket
102 // that expires the soonest.
DispatchWakeups()103 void Clock::DispatchWakeups() {
104 do {
105 if (CheckReentrancy()) {
106 // Avoid reentrancy. An ongoing DispatchWakeups() call will take care
107 // of dispatching any new due wakeups if necessary.
108 // If there is a race condition, only one of dispatch calls will go
109 // through and all other will just increment the dispatch_level and
110 // return.
111 return;
112 }
113
114 // Collect waiters that are due.
115 WaiterList wakeup_calls = GetExpiredWaiters();
116
117 // Dispatch WakeUp calls to those waiters.
118 for (const auto& waiter : wakeup_calls) {
119 waiter->WakeUp();
120 }
121 // One more dispatch loop may be needed if there were any reentrant calls
122 // or if WakeUp() calls took so long that more waiters have become due.
123 } while (!FinishDispatchAndScheduleNextWakeup());
124 }
125
126 // Called at the beginning of dispatch loop.
127 // Increments dispatch_level_ and returns true if there is already
128 // another dispatch call in progress.
CheckReentrancy()129 bool Clock::CheckReentrancy() {
130 absl::MutexLock lock(mutex());
131 return ++dispatch_level_ > 1;
132 }
133
134 // Iterate through waiter buckets ordered by deadline time and take out all
135 // waiters that are due.
GetExpiredWaiters()136 Clock::WaiterList Clock::GetExpiredWaiters() {
137 absl::MutexLock lock(mutex());
138 absl::Time now = NowLocked();
139 std::vector<std::shared_ptr<Waiter>> wakeup_calls;
140 WaiterMap::iterator iter;
141
142 while ((iter = pending_waiters_.begin()) != pending_waiters_.end() &&
143 iter->first <= now) {
144 std::move(iter->second.begin(), iter->second.end(),
145 std::back_inserter(wakeup_calls));
146 pending_waiters_.erase(iter);
147 }
148 return wakeup_calls;
149 }
150
151 // Called at the end of dispatch loop to check post-dispatch conditions,
152 // reset re-entracy level, and schedule a next dispatch if needed.
153 // Returns true if the dispatch loop has ended.
154 // Returns false if more the dispatch loop needs to be repeated.
FinishDispatchAndScheduleNextWakeup()155 bool Clock::FinishDispatchAndScheduleNextWakeup() {
156 absl::MutexLock lock(mutex());
157 int current_dispatch_level = dispatch_level_;
158 dispatch_level_ = 0;
159
160 if (!pending_waiters_.empty()) {
161 if (current_dispatch_level > 1) {
162 // There was another dispatch call while this one was in progress.
163 // One more dispatch loop is needed.
164 return false;
165 }
166
167 absl::Time next_wakeup = pending_waiters_.begin()->first;
168 if (next_wakeup <= NowLocked()) {
169 // One more dispatch loop is needed because a new waiter has become due
170 // while the wake-ups were dispatched.
171 return false;
172 }
173
174 // Schedule DispatchWakeups() to be called at a future next_wakeup time.
175 ScheduleWakeup(next_wakeup);
176 }
177
178 return true;
179 }
180
RealTimeClock()181 RealTimeClock::RealTimeClock() {
182 worker_thread_ =
183 std::make_unique<std::thread>([this] { this->WorkerLoop(); });
184 }
185
WorkerLoop()186 void RealTimeClock::WorkerLoop() {
187 for (;;) {
188 bool dispatch = false;
189
190 {
191 absl::MutexLock lock(&wakeup_mu_);
192 wakeup_condvar_.WaitWithDeadline(&wakeup_mu_, next_wakeup_);
193 if (Now() >= next_wakeup_) {
194 dispatch = true;
195 next_wakeup_ = absl::InfiniteFuture();
196 }
197 }
198
199 if (dispatch) {
200 DispatchWakeups();
201 }
202 }
203 }
204
205 // RealTimeClock implementation of ScheduleWakeup.
ScheduleWakeup(absl::Time wakeup_time)206 void RealTimeClock::ScheduleWakeup(absl::Time wakeup_time) {
207 absl::MutexLock lock(&wakeup_mu_);
208
209 // Optimization: round wakeup_time up to whole milliseconds.
210 wakeup_time = absl::FromUDate(ceil(absl::ToUDate(wakeup_time)));
211
212 // ScheduleWakeup may be called repeatedly with the same time if a new timer
213 // is created in the future after already existing timer. In that case
214 // this function continues relying on already scheduled wake-up time.
215 // A new ScheduleWakeup call will be made from within DispatchWakeups() once
216 // the current timer expires.
217 if (wakeup_time == next_wakeup_) {
218 return;
219 }
220
221 next_wakeup_ = wakeup_time;
222 wakeup_condvar_.Signal();
223 }
224
225 } // namespace fcp
226