xref: /aosp_15_r20/external/federated-compute/fcp/base/clock.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2020 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fcp/base/clock.h"
18 
19 #include <algorithm>
20 #include <iterator>
21 #include <memory>
22 #include <thread>  // NOLINT(build/c++11)
23 
24 #include "absl/synchronization/mutex.h"
25 #include "absl/time/time.h"
26 #include "fcp/base/monitoring.h"
27 
28 namespace fcp {
29 
30 // Implements global realtime clock that uses timers to schedule wake-up of
31 // waiters.
32 class RealTimeClock : public Clock {
33  public:
34   RealTimeClock();
~RealTimeClock()35   ~RealTimeClock() override {
36     FCP_LOG(FATAL) << "RealTimeClock should never be destroyed";
37   }
38 
39   // Returns the current time.
Now()40   absl::Time Now() override { return absl::Now(); }
NowLocked()41   absl::Time NowLocked() override { return absl::Now(); }
42 
43   // Sleeps for the specified duration.
Sleep(absl::Duration d)44   void Sleep(absl::Duration d) override { absl::SleepFor(d); }
45 
46   // Schedules wakeup at the specified wakeup_time.
47   void ScheduleWakeup(absl::Time wakeup_time) override;
48 
49  private:
50   // The worker loop that performs the sleep and dispatches wake-up calls.
51   void WorkerLoop();
52 
53   // The currently scheduled wake-up time. There is at most one wake-up
54   // time per process.
55   absl::Time next_wakeup_ ABSL_GUARDED_BY(&wakeup_mu_) = absl::InfiniteFuture();
56   // Mutex that protects next_wakeup and used with the wake-up CondVar.
57   absl::Mutex wakeup_mu_;
58   // CondVar used to sleep until the next wake-up deadline.
59   absl::CondVar wakeup_condvar_;
60   // Worker thread that runs the worker loop function. Since this class
61   // is singleton, there is only one thread per process, and that thread is
62   // never terminated.
63   std::unique_ptr<std::thread> worker_thread_;
64 };
65 
RealClock()66 Clock* Clock::RealClock() {
67   static Clock* realtime_clock = new RealTimeClock;
68   return realtime_clock;
69 }
70 
WakeupWithDeadline(absl::Time deadline,const std::shared_ptr<Clock::Waiter> & waiter)71 void Clock::WakeupWithDeadline(absl::Time deadline,
72                                const std::shared_ptr<Clock::Waiter>& waiter) {
73   // Insert the new waiter into the map ordered by its deadline.
74   // Waiters with matching deadlines are inserted into the same bucket and
75   // their order within the bucket is preserved.
76   {
77     absl::MutexLock lock(mutex());
78     WaiterMap::iterator it;
79     if ((it = pending_waiters_.find(deadline)) != pending_waiters_.end()) {
80       it->second.push_back(waiter);
81     } else {
82       pending_waiters_.insert(std::make_pair(deadline, WaiterList{waiter}));
83     }
84   }
85 
86   // Inserting a new waiter may trigger an immediate wake-up if the deadline
87   // is due. Otherwise a new wake-up is scheduled at the end on the dispatch.
88   DispatchWakeups();
89 }
90 
91 // DispatchWakeup performs the following actions in the loop:
92 // - Check for reentrancy to avoid more than one concurrent dispatch loop
93 // - Take out all waiters that are due
94 // - Make WakeUp calls on all of those waiters. This step is done outside
95 //   of lock because it may potentially take longer time and new waiters may
96 //   potentially be inserted during that step.
97 // - If there are any waiters that are still due at that point (because the
98 //   the previous step took too long and new waiters have expired or because
99 //   there were any new waiters inserted during the previous steps), loop
100 //   back and repeat the previous steps.
101 // - Otherwise finish the dispatch by scheduling a new wakeup for the bucket
102 //   that expires the soonest.
DispatchWakeups()103 void Clock::DispatchWakeups() {
104   do {
105     if (CheckReentrancy()) {
106       // Avoid reentrancy. An ongoing DispatchWakeups() call will take care
107       // of dispatching any new due wakeups if necessary.
108       // If there is a race condition, only one of dispatch calls will go
109       // through and all other will just increment the dispatch_level and
110       // return.
111       return;
112     }
113 
114     // Collect waiters that are due.
115     WaiterList wakeup_calls = GetExpiredWaiters();
116 
117     // Dispatch WakeUp calls to those waiters.
118     for (const auto& waiter : wakeup_calls) {
119       waiter->WakeUp();
120     }
121     // One more dispatch loop may be needed if there were any reentrant calls
122     // or if WakeUp() calls took so long that more waiters have become due.
123   } while (!FinishDispatchAndScheduleNextWakeup());
124 }
125 
126 // Called at the beginning of dispatch loop.
127 // Increments dispatch_level_ and returns true if there is already
128 // another dispatch call in progress.
CheckReentrancy()129 bool Clock::CheckReentrancy() {
130   absl::MutexLock lock(mutex());
131   return ++dispatch_level_ > 1;
132 }
133 
134 // Iterate through waiter buckets ordered by deadline time and take out all
135 // waiters that are due.
GetExpiredWaiters()136 Clock::WaiterList Clock::GetExpiredWaiters() {
137   absl::MutexLock lock(mutex());
138   absl::Time now = NowLocked();
139   std::vector<std::shared_ptr<Waiter>> wakeup_calls;
140   WaiterMap::iterator iter;
141 
142   while ((iter = pending_waiters_.begin()) != pending_waiters_.end() &&
143          iter->first <= now) {
144     std::move(iter->second.begin(), iter->second.end(),
145               std::back_inserter(wakeup_calls));
146     pending_waiters_.erase(iter);
147   }
148   return wakeup_calls;
149 }
150 
151 // Called at the end of dispatch loop to check post-dispatch conditions,
152 // reset re-entracy level, and schedule a next dispatch if needed.
153 // Returns true if the dispatch loop has ended.
154 // Returns false if more the dispatch loop needs to be repeated.
FinishDispatchAndScheduleNextWakeup()155 bool Clock::FinishDispatchAndScheduleNextWakeup() {
156   absl::MutexLock lock(mutex());
157   int current_dispatch_level = dispatch_level_;
158   dispatch_level_ = 0;
159 
160   if (!pending_waiters_.empty()) {
161     if (current_dispatch_level > 1) {
162       // There was another dispatch call while this one was in progress.
163       // One more dispatch loop is needed.
164       return false;
165     }
166 
167     absl::Time next_wakeup = pending_waiters_.begin()->first;
168     if (next_wakeup <= NowLocked()) {
169       // One more dispatch loop is needed because a new waiter has become due
170       // while the wake-ups were dispatched.
171       return false;
172     }
173 
174     // Schedule DispatchWakeups() to be called at a future next_wakeup time.
175     ScheduleWakeup(next_wakeup);
176   }
177 
178   return true;
179 }
180 
RealTimeClock()181 RealTimeClock::RealTimeClock() {
182   worker_thread_ =
183       std::make_unique<std::thread>([this] { this->WorkerLoop(); });
184 }
185 
WorkerLoop()186 void RealTimeClock::WorkerLoop() {
187   for (;;) {
188     bool dispatch = false;
189 
190     {
191       absl::MutexLock lock(&wakeup_mu_);
192       wakeup_condvar_.WaitWithDeadline(&wakeup_mu_, next_wakeup_);
193       if (Now() >= next_wakeup_) {
194         dispatch = true;
195         next_wakeup_ = absl::InfiniteFuture();
196       }
197     }
198 
199     if (dispatch) {
200       DispatchWakeups();
201     }
202   }
203 }
204 
205 // RealTimeClock implementation of ScheduleWakeup.
ScheduleWakeup(absl::Time wakeup_time)206 void RealTimeClock::ScheduleWakeup(absl::Time wakeup_time) {
207   absl::MutexLock lock(&wakeup_mu_);
208 
209   // Optimization: round wakeup_time up to whole milliseconds.
210   wakeup_time = absl::FromUDate(ceil(absl::ToUDate(wakeup_time)));
211 
212   // ScheduleWakeup may be called repeatedly with the same time if a new timer
213   // is created in the future after already existing timer. In that case
214   // this function continues relying on already scheduled wake-up time.
215   // A new ScheduleWakeup call will be made from within DispatchWakeups() once
216   // the current timer expires.
217   if (wakeup_time == next_wakeup_) {
218     return;
219   }
220 
221   next_wakeup_ = wakeup_time;
222   wakeup_condvar_.Signal();
223 }
224 
225 }  // namespace fcp
226