xref: /aosp_15_r20/external/federated-compute/fcp/base/clock.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1*14675a02SAndroid Build Coastguard Worker /*
2*14675a02SAndroid Build Coastguard Worker  * Copyright 2020 Google LLC
3*14675a02SAndroid Build Coastguard Worker  *
4*14675a02SAndroid Build Coastguard Worker  * Licensed under the Apache License, Version 2.0 (the "License");
5*14675a02SAndroid Build Coastguard Worker  * you may not use this file except in compliance with the License.
6*14675a02SAndroid Build Coastguard Worker  * You may obtain a copy of the License at
7*14675a02SAndroid Build Coastguard Worker  *
8*14675a02SAndroid Build Coastguard Worker  *      http://www.apache.org/licenses/LICENSE-2.0
9*14675a02SAndroid Build Coastguard Worker  *
10*14675a02SAndroid Build Coastguard Worker  * Unless required by applicable law or agreed to in writing, software
11*14675a02SAndroid Build Coastguard Worker  * distributed under the License is distributed on an "AS IS" BASIS,
12*14675a02SAndroid Build Coastguard Worker  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*14675a02SAndroid Build Coastguard Worker  * See the License for the specific language governing permissions and
14*14675a02SAndroid Build Coastguard Worker  * limitations under the License.
15*14675a02SAndroid Build Coastguard Worker  */
16*14675a02SAndroid Build Coastguard Worker 
17*14675a02SAndroid Build Coastguard Worker #include "fcp/base/clock.h"
18*14675a02SAndroid Build Coastguard Worker 
19*14675a02SAndroid Build Coastguard Worker #include <algorithm>
20*14675a02SAndroid Build Coastguard Worker #include <iterator>
21*14675a02SAndroid Build Coastguard Worker #include <memory>
22*14675a02SAndroid Build Coastguard Worker #include <thread>  // NOLINT(build/c++11)
23*14675a02SAndroid Build Coastguard Worker 
24*14675a02SAndroid Build Coastguard Worker #include "absl/synchronization/mutex.h"
25*14675a02SAndroid Build Coastguard Worker #include "absl/time/time.h"
26*14675a02SAndroid Build Coastguard Worker #include "fcp/base/monitoring.h"
27*14675a02SAndroid Build Coastguard Worker 
28*14675a02SAndroid Build Coastguard Worker namespace fcp {
29*14675a02SAndroid Build Coastguard Worker 
30*14675a02SAndroid Build Coastguard Worker // Implements global realtime clock that uses timers to schedule wake-up of
31*14675a02SAndroid Build Coastguard Worker // waiters.
32*14675a02SAndroid Build Coastguard Worker class RealTimeClock : public Clock {
33*14675a02SAndroid Build Coastguard Worker  public:
34*14675a02SAndroid Build Coastguard Worker   RealTimeClock();
~RealTimeClock()35*14675a02SAndroid Build Coastguard Worker   ~RealTimeClock() override {
36*14675a02SAndroid Build Coastguard Worker     FCP_LOG(FATAL) << "RealTimeClock should never be destroyed";
37*14675a02SAndroid Build Coastguard Worker   }
38*14675a02SAndroid Build Coastguard Worker 
39*14675a02SAndroid Build Coastguard Worker   // Returns the current time.
Now()40*14675a02SAndroid Build Coastguard Worker   absl::Time Now() override { return absl::Now(); }
NowLocked()41*14675a02SAndroid Build Coastguard Worker   absl::Time NowLocked() override { return absl::Now(); }
42*14675a02SAndroid Build Coastguard Worker 
43*14675a02SAndroid Build Coastguard Worker   // Sleeps for the specified duration.
Sleep(absl::Duration d)44*14675a02SAndroid Build Coastguard Worker   void Sleep(absl::Duration d) override { absl::SleepFor(d); }
45*14675a02SAndroid Build Coastguard Worker 
46*14675a02SAndroid Build Coastguard Worker   // Schedules wakeup at the specified wakeup_time.
47*14675a02SAndroid Build Coastguard Worker   void ScheduleWakeup(absl::Time wakeup_time) override;
48*14675a02SAndroid Build Coastguard Worker 
49*14675a02SAndroid Build Coastguard Worker  private:
50*14675a02SAndroid Build Coastguard Worker   // The worker loop that performs the sleep and dispatches wake-up calls.
51*14675a02SAndroid Build Coastguard Worker   void WorkerLoop();
52*14675a02SAndroid Build Coastguard Worker 
53*14675a02SAndroid Build Coastguard Worker   // The currently scheduled wake-up time. There is at most one wake-up
54*14675a02SAndroid Build Coastguard Worker   // time per process.
55*14675a02SAndroid Build Coastguard Worker   absl::Time next_wakeup_ ABSL_GUARDED_BY(&wakeup_mu_) = absl::InfiniteFuture();
56*14675a02SAndroid Build Coastguard Worker   // Mutex that protects next_wakeup and used with the wake-up CondVar.
57*14675a02SAndroid Build Coastguard Worker   absl::Mutex wakeup_mu_;
58*14675a02SAndroid Build Coastguard Worker   // CondVar used to sleep until the next wake-up deadline.
59*14675a02SAndroid Build Coastguard Worker   absl::CondVar wakeup_condvar_;
60*14675a02SAndroid Build Coastguard Worker   // Worker thread that runs the worker loop function. Since this class
61*14675a02SAndroid Build Coastguard Worker   // is singleton, there is only one thread per process, and that thread is
62*14675a02SAndroid Build Coastguard Worker   // never terminated.
63*14675a02SAndroid Build Coastguard Worker   std::unique_ptr<std::thread> worker_thread_;
64*14675a02SAndroid Build Coastguard Worker };
65*14675a02SAndroid Build Coastguard Worker 
RealClock()66*14675a02SAndroid Build Coastguard Worker Clock* Clock::RealClock() {
67*14675a02SAndroid Build Coastguard Worker   static Clock* realtime_clock = new RealTimeClock;
68*14675a02SAndroid Build Coastguard Worker   return realtime_clock;
69*14675a02SAndroid Build Coastguard Worker }
70*14675a02SAndroid Build Coastguard Worker 
WakeupWithDeadline(absl::Time deadline,const std::shared_ptr<Clock::Waiter> & waiter)71*14675a02SAndroid Build Coastguard Worker void Clock::WakeupWithDeadline(absl::Time deadline,
72*14675a02SAndroid Build Coastguard Worker                                const std::shared_ptr<Clock::Waiter>& waiter) {
73*14675a02SAndroid Build Coastguard Worker   // Insert the new waiter into the map ordered by its deadline.
74*14675a02SAndroid Build Coastguard Worker   // Waiters with matching deadlines are inserted into the same bucket and
75*14675a02SAndroid Build Coastguard Worker   // their order within the bucket is preserved.
76*14675a02SAndroid Build Coastguard Worker   {
77*14675a02SAndroid Build Coastguard Worker     absl::MutexLock lock(mutex());
78*14675a02SAndroid Build Coastguard Worker     WaiterMap::iterator it;
79*14675a02SAndroid Build Coastguard Worker     if ((it = pending_waiters_.find(deadline)) != pending_waiters_.end()) {
80*14675a02SAndroid Build Coastguard Worker       it->second.push_back(waiter);
81*14675a02SAndroid Build Coastguard Worker     } else {
82*14675a02SAndroid Build Coastguard Worker       pending_waiters_.insert(std::make_pair(deadline, WaiterList{waiter}));
83*14675a02SAndroid Build Coastguard Worker     }
84*14675a02SAndroid Build Coastguard Worker   }
85*14675a02SAndroid Build Coastguard Worker 
86*14675a02SAndroid Build Coastguard Worker   // Inserting a new waiter may trigger an immediate wake-up if the deadline
87*14675a02SAndroid Build Coastguard Worker   // is due. Otherwise a new wake-up is scheduled at the end on the dispatch.
88*14675a02SAndroid Build Coastguard Worker   DispatchWakeups();
89*14675a02SAndroid Build Coastguard Worker }
90*14675a02SAndroid Build Coastguard Worker 
91*14675a02SAndroid Build Coastguard Worker // DispatchWakeup performs the following actions in the loop:
92*14675a02SAndroid Build Coastguard Worker // - Check for reentrancy to avoid more than one concurrent dispatch loop
93*14675a02SAndroid Build Coastguard Worker // - Take out all waiters that are due
94*14675a02SAndroid Build Coastguard Worker // - Make WakeUp calls on all of those waiters. This step is done outside
95*14675a02SAndroid Build Coastguard Worker //   of lock because it may potentially take longer time and new waiters may
96*14675a02SAndroid Build Coastguard Worker //   potentially be inserted during that step.
97*14675a02SAndroid Build Coastguard Worker // - If there are any waiters that are still due at that point (because the
98*14675a02SAndroid Build Coastguard Worker //   the previous step took too long and new waiters have expired or because
99*14675a02SAndroid Build Coastguard Worker //   there were any new waiters inserted during the previous steps), loop
100*14675a02SAndroid Build Coastguard Worker //   back and repeat the previous steps.
101*14675a02SAndroid Build Coastguard Worker // - Otherwise finish the dispatch by scheduling a new wakeup for the bucket
102*14675a02SAndroid Build Coastguard Worker //   that expires the soonest.
DispatchWakeups()103*14675a02SAndroid Build Coastguard Worker void Clock::DispatchWakeups() {
104*14675a02SAndroid Build Coastguard Worker   do {
105*14675a02SAndroid Build Coastguard Worker     if (CheckReentrancy()) {
106*14675a02SAndroid Build Coastguard Worker       // Avoid reentrancy. An ongoing DispatchWakeups() call will take care
107*14675a02SAndroid Build Coastguard Worker       // of dispatching any new due wakeups if necessary.
108*14675a02SAndroid Build Coastguard Worker       // If there is a race condition, only one of dispatch calls will go
109*14675a02SAndroid Build Coastguard Worker       // through and all other will just increment the dispatch_level and
110*14675a02SAndroid Build Coastguard Worker       // return.
111*14675a02SAndroid Build Coastguard Worker       return;
112*14675a02SAndroid Build Coastguard Worker     }
113*14675a02SAndroid Build Coastguard Worker 
114*14675a02SAndroid Build Coastguard Worker     // Collect waiters that are due.
115*14675a02SAndroid Build Coastguard Worker     WaiterList wakeup_calls = GetExpiredWaiters();
116*14675a02SAndroid Build Coastguard Worker 
117*14675a02SAndroid Build Coastguard Worker     // Dispatch WakeUp calls to those waiters.
118*14675a02SAndroid Build Coastguard Worker     for (const auto& waiter : wakeup_calls) {
119*14675a02SAndroid Build Coastguard Worker       waiter->WakeUp();
120*14675a02SAndroid Build Coastguard Worker     }
121*14675a02SAndroid Build Coastguard Worker     // One more dispatch loop may be needed if there were any reentrant calls
122*14675a02SAndroid Build Coastguard Worker     // or if WakeUp() calls took so long that more waiters have become due.
123*14675a02SAndroid Build Coastguard Worker   } while (!FinishDispatchAndScheduleNextWakeup());
124*14675a02SAndroid Build Coastguard Worker }
125*14675a02SAndroid Build Coastguard Worker 
126*14675a02SAndroid Build Coastguard Worker // Called at the beginning of dispatch loop.
127*14675a02SAndroid Build Coastguard Worker // Increments dispatch_level_ and returns true if there is already
128*14675a02SAndroid Build Coastguard Worker // another dispatch call in progress.
CheckReentrancy()129*14675a02SAndroid Build Coastguard Worker bool Clock::CheckReentrancy() {
130*14675a02SAndroid Build Coastguard Worker   absl::MutexLock lock(mutex());
131*14675a02SAndroid Build Coastguard Worker   return ++dispatch_level_ > 1;
132*14675a02SAndroid Build Coastguard Worker }
133*14675a02SAndroid Build Coastguard Worker 
134*14675a02SAndroid Build Coastguard Worker // Iterate through waiter buckets ordered by deadline time and take out all
135*14675a02SAndroid Build Coastguard Worker // waiters that are due.
GetExpiredWaiters()136*14675a02SAndroid Build Coastguard Worker Clock::WaiterList Clock::GetExpiredWaiters() {
137*14675a02SAndroid Build Coastguard Worker   absl::MutexLock lock(mutex());
138*14675a02SAndroid Build Coastguard Worker   absl::Time now = NowLocked();
139*14675a02SAndroid Build Coastguard Worker   std::vector<std::shared_ptr<Waiter>> wakeup_calls;
140*14675a02SAndroid Build Coastguard Worker   WaiterMap::iterator iter;
141*14675a02SAndroid Build Coastguard Worker 
142*14675a02SAndroid Build Coastguard Worker   while ((iter = pending_waiters_.begin()) != pending_waiters_.end() &&
143*14675a02SAndroid Build Coastguard Worker          iter->first <= now) {
144*14675a02SAndroid Build Coastguard Worker     std::move(iter->second.begin(), iter->second.end(),
145*14675a02SAndroid Build Coastguard Worker               std::back_inserter(wakeup_calls));
146*14675a02SAndroid Build Coastguard Worker     pending_waiters_.erase(iter);
147*14675a02SAndroid Build Coastguard Worker   }
148*14675a02SAndroid Build Coastguard Worker   return wakeup_calls;
149*14675a02SAndroid Build Coastguard Worker }
150*14675a02SAndroid Build Coastguard Worker 
151*14675a02SAndroid Build Coastguard Worker // Called at the end of dispatch loop to check post-dispatch conditions,
152*14675a02SAndroid Build Coastguard Worker // reset re-entracy level, and schedule a next dispatch if needed.
153*14675a02SAndroid Build Coastguard Worker // Returns true if the dispatch loop has ended.
154*14675a02SAndroid Build Coastguard Worker // Returns false if more the dispatch loop needs to be repeated.
FinishDispatchAndScheduleNextWakeup()155*14675a02SAndroid Build Coastguard Worker bool Clock::FinishDispatchAndScheduleNextWakeup() {
156*14675a02SAndroid Build Coastguard Worker   absl::MutexLock lock(mutex());
157*14675a02SAndroid Build Coastguard Worker   int current_dispatch_level = dispatch_level_;
158*14675a02SAndroid Build Coastguard Worker   dispatch_level_ = 0;
159*14675a02SAndroid Build Coastguard Worker 
160*14675a02SAndroid Build Coastguard Worker   if (!pending_waiters_.empty()) {
161*14675a02SAndroid Build Coastguard Worker     if (current_dispatch_level > 1) {
162*14675a02SAndroid Build Coastguard Worker       // There was another dispatch call while this one was in progress.
163*14675a02SAndroid Build Coastguard Worker       // One more dispatch loop is needed.
164*14675a02SAndroid Build Coastguard Worker       return false;
165*14675a02SAndroid Build Coastguard Worker     }
166*14675a02SAndroid Build Coastguard Worker 
167*14675a02SAndroid Build Coastguard Worker     absl::Time next_wakeup = pending_waiters_.begin()->first;
168*14675a02SAndroid Build Coastguard Worker     if (next_wakeup <= NowLocked()) {
169*14675a02SAndroid Build Coastguard Worker       // One more dispatch loop is needed because a new waiter has become due
170*14675a02SAndroid Build Coastguard Worker       // while the wake-ups were dispatched.
171*14675a02SAndroid Build Coastguard Worker       return false;
172*14675a02SAndroid Build Coastguard Worker     }
173*14675a02SAndroid Build Coastguard Worker 
174*14675a02SAndroid Build Coastguard Worker     // Schedule DispatchWakeups() to be called at a future next_wakeup time.
175*14675a02SAndroid Build Coastguard Worker     ScheduleWakeup(next_wakeup);
176*14675a02SAndroid Build Coastguard Worker   }
177*14675a02SAndroid Build Coastguard Worker 
178*14675a02SAndroid Build Coastguard Worker   return true;
179*14675a02SAndroid Build Coastguard Worker }
180*14675a02SAndroid Build Coastguard Worker 
RealTimeClock()181*14675a02SAndroid Build Coastguard Worker RealTimeClock::RealTimeClock() {
182*14675a02SAndroid Build Coastguard Worker   worker_thread_ =
183*14675a02SAndroid Build Coastguard Worker       std::make_unique<std::thread>([this] { this->WorkerLoop(); });
184*14675a02SAndroid Build Coastguard Worker }
185*14675a02SAndroid Build Coastguard Worker 
WorkerLoop()186*14675a02SAndroid Build Coastguard Worker void RealTimeClock::WorkerLoop() {
187*14675a02SAndroid Build Coastguard Worker   for (;;) {
188*14675a02SAndroid Build Coastguard Worker     bool dispatch = false;
189*14675a02SAndroid Build Coastguard Worker 
190*14675a02SAndroid Build Coastguard Worker     {
191*14675a02SAndroid Build Coastguard Worker       absl::MutexLock lock(&wakeup_mu_);
192*14675a02SAndroid Build Coastguard Worker       wakeup_condvar_.WaitWithDeadline(&wakeup_mu_, next_wakeup_);
193*14675a02SAndroid Build Coastguard Worker       if (Now() >= next_wakeup_) {
194*14675a02SAndroid Build Coastguard Worker         dispatch = true;
195*14675a02SAndroid Build Coastguard Worker         next_wakeup_ = absl::InfiniteFuture();
196*14675a02SAndroid Build Coastguard Worker       }
197*14675a02SAndroid Build Coastguard Worker     }
198*14675a02SAndroid Build Coastguard Worker 
199*14675a02SAndroid Build Coastguard Worker     if (dispatch) {
200*14675a02SAndroid Build Coastguard Worker       DispatchWakeups();
201*14675a02SAndroid Build Coastguard Worker     }
202*14675a02SAndroid Build Coastguard Worker   }
203*14675a02SAndroid Build Coastguard Worker }
204*14675a02SAndroid Build Coastguard Worker 
205*14675a02SAndroid Build Coastguard Worker // RealTimeClock implementation of ScheduleWakeup.
ScheduleWakeup(absl::Time wakeup_time)206*14675a02SAndroid Build Coastguard Worker void RealTimeClock::ScheduleWakeup(absl::Time wakeup_time) {
207*14675a02SAndroid Build Coastguard Worker   absl::MutexLock lock(&wakeup_mu_);
208*14675a02SAndroid Build Coastguard Worker 
209*14675a02SAndroid Build Coastguard Worker   // Optimization: round wakeup_time up to whole milliseconds.
210*14675a02SAndroid Build Coastguard Worker   wakeup_time = absl::FromUDate(ceil(absl::ToUDate(wakeup_time)));
211*14675a02SAndroid Build Coastguard Worker 
212*14675a02SAndroid Build Coastguard Worker   // ScheduleWakeup may be called repeatedly with the same time if a new timer
213*14675a02SAndroid Build Coastguard Worker   // is created in the future after already existing timer. In that case
214*14675a02SAndroid Build Coastguard Worker   // this function continues relying on already scheduled wake-up time.
215*14675a02SAndroid Build Coastguard Worker   // A new ScheduleWakeup call will be made from within DispatchWakeups() once
216*14675a02SAndroid Build Coastguard Worker   // the current timer expires.
217*14675a02SAndroid Build Coastguard Worker   if (wakeup_time == next_wakeup_) {
218*14675a02SAndroid Build Coastguard Worker     return;
219*14675a02SAndroid Build Coastguard Worker   }
220*14675a02SAndroid Build Coastguard Worker 
221*14675a02SAndroid Build Coastguard Worker   next_wakeup_ = wakeup_time;
222*14675a02SAndroid Build Coastguard Worker   wakeup_condvar_.Signal();
223*14675a02SAndroid Build Coastguard Worker }
224*14675a02SAndroid Build Coastguard Worker 
225*14675a02SAndroid Build Coastguard Worker }  // namespace fcp
226