xref: /aosp_15_r20/external/pigweed/pw_async_basic/dispatcher.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #include "pw_async_basic/dispatcher.h"
15 
16 #include <mutex>
17 #include <utility>
18 
19 #include "pw_chrono/system_clock.h"
20 
21 using namespace std::chrono_literals;
22 
23 namespace pw::async {
24 
~BasicDispatcher()25 BasicDispatcher::~BasicDispatcher() {
26   RequestStop();
27   lock_.lock();
28   DrainTaskQueue();
29   lock_.unlock();
30 }
31 
Run()32 void BasicDispatcher::Run() {
33   lock_.lock();
34   while (!stop_requested_) {
35     MaybeSleep();
36     ExecuteDueTasks();
37   }
38   DrainTaskQueue();
39   lock_.unlock();
40 }
41 
RunUntilIdle()42 void BasicDispatcher::RunUntilIdle() {
43   lock_.lock();
44   ExecuteDueTasks();
45   if (stop_requested_) {
46     DrainTaskQueue();
47   }
48   lock_.unlock();
49 }
50 
RunUntil(chrono::SystemClock::time_point end_time)51 void BasicDispatcher::RunUntil(chrono::SystemClock::time_point end_time) {
52   lock_.lock();
53   while (end_time < now() && !stop_requested_) {
54     MaybeSleep();
55     ExecuteDueTasks();
56   }
57   if (stop_requested_) {
58     DrainTaskQueue();
59   }
60   lock_.unlock();
61 }
62 
RunFor(chrono::SystemClock::duration duration)63 void BasicDispatcher::RunFor(chrono::SystemClock::duration duration) {
64   RunUntil(now() + duration);
65 }
66 
MaybeSleep()67 void BasicDispatcher::MaybeSleep() {
68   if (task_queue_.empty() || task_queue_.front().due_time_ > now()) {
69     // Sleep until a notification is received or until the due time of the
70     // next task. Notifications are sent when tasks are posted or 'stop' is
71     // requested.
72     std::optional<chrono::SystemClock::time_point> wake_time = std::nullopt;
73     if (!task_queue_.empty()) {
74       wake_time = task_queue_.front().due_time_;
75     }
76     lock_.unlock();
77     if (wake_time.has_value()) {
78       std::ignore = timed_notification_.try_acquire_until(*wake_time);
79     } else {
80       timed_notification_.acquire();
81     }
82     lock_.lock();
83   }
84 }
85 
ExecuteDueTasks()86 void BasicDispatcher::ExecuteDueTasks() {
87   while (!task_queue_.empty() && task_queue_.front().due_time_ <= now() &&
88          !stop_requested_) {
89     backend::NativeTask& task = task_queue_.front();
90     task_queue_.pop_front();
91 
92     lock_.unlock();
93     Context ctx{this, &task.task_};
94     task(ctx, OkStatus());
95     lock_.lock();
96   }
97 }
98 
RequestStop()99 void BasicDispatcher::RequestStop() {
100   {
101     std::lock_guard lock(lock_);
102     stop_requested_ = true;
103   }
104   timed_notification_.release();
105 }
106 
DrainTaskQueue()107 void BasicDispatcher::DrainTaskQueue() {
108   while (!task_queue_.empty()) {
109     backend::NativeTask& task = task_queue_.front();
110     task_queue_.pop_front();
111 
112     lock_.unlock();
113     Context ctx{this, &task.task_};
114     task(ctx, Status::Cancelled());
115     lock_.lock();
116   }
117 }
118 
PostAt(Task & task,chrono::SystemClock::time_point time)119 void BasicDispatcher::PostAt(Task& task, chrono::SystemClock::time_point time) {
120   PostTaskInternal(task.native_type(), time);
121 }
122 
Cancel(Task & task)123 bool BasicDispatcher::Cancel(Task& task) {
124   std::lock_guard lock(lock_);
125   return task_queue_.remove(task.native_type());
126 }
127 
PostTaskInternal(backend::NativeTask & task,chrono::SystemClock::time_point time_due)128 void BasicDispatcher::PostTaskInternal(
129     backend::NativeTask& task, chrono::SystemClock::time_point time_due) {
130   lock_.lock();
131   task.due_time_ = time_due;
132   // Insert the new task in the queue after all tasks with the same or earlier
133   // deadline to ensure FIFO execution order.
134   auto it_front = task_queue_.begin();
135   auto it_behind = task_queue_.before_begin();
136   while (it_front != task_queue_.end() && time_due >= it_front->due_time_) {
137     ++it_front;
138     ++it_behind;
139   }
140   task_queue_.insert_after(it_behind, task);
141   lock_.unlock();
142   timed_notification_.release();
143 }
144 
145 }  // namespace pw::async
146