1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #include "pw_async_basic/dispatcher.h"
15
16 #include <mutex>
17 #include <utility>
18
19 #include "pw_chrono/system_clock.h"
20
21 using namespace std::chrono_literals;
22
23 namespace pw::async {
24
~BasicDispatcher()25 BasicDispatcher::~BasicDispatcher() {
26 RequestStop();
27 lock_.lock();
28 DrainTaskQueue();
29 lock_.unlock();
30 }
31
Run()32 void BasicDispatcher::Run() {
33 lock_.lock();
34 while (!stop_requested_) {
35 MaybeSleep();
36 ExecuteDueTasks();
37 }
38 DrainTaskQueue();
39 lock_.unlock();
40 }
41
RunUntilIdle()42 void BasicDispatcher::RunUntilIdle() {
43 lock_.lock();
44 ExecuteDueTasks();
45 if (stop_requested_) {
46 DrainTaskQueue();
47 }
48 lock_.unlock();
49 }
50
RunUntil(chrono::SystemClock::time_point end_time)51 void BasicDispatcher::RunUntil(chrono::SystemClock::time_point end_time) {
52 lock_.lock();
53 while (end_time < now() && !stop_requested_) {
54 MaybeSleep();
55 ExecuteDueTasks();
56 }
57 if (stop_requested_) {
58 DrainTaskQueue();
59 }
60 lock_.unlock();
61 }
62
RunFor(chrono::SystemClock::duration duration)63 void BasicDispatcher::RunFor(chrono::SystemClock::duration duration) {
64 RunUntil(now() + duration);
65 }
66
MaybeSleep()67 void BasicDispatcher::MaybeSleep() {
68 if (task_queue_.empty() || task_queue_.front().due_time_ > now()) {
69 // Sleep until a notification is received or until the due time of the
70 // next task. Notifications are sent when tasks are posted or 'stop' is
71 // requested.
72 std::optional<chrono::SystemClock::time_point> wake_time = std::nullopt;
73 if (!task_queue_.empty()) {
74 wake_time = task_queue_.front().due_time_;
75 }
76 lock_.unlock();
77 if (wake_time.has_value()) {
78 std::ignore = timed_notification_.try_acquire_until(*wake_time);
79 } else {
80 timed_notification_.acquire();
81 }
82 lock_.lock();
83 }
84 }
85
ExecuteDueTasks()86 void BasicDispatcher::ExecuteDueTasks() {
87 while (!task_queue_.empty() && task_queue_.front().due_time_ <= now() &&
88 !stop_requested_) {
89 backend::NativeTask& task = task_queue_.front();
90 task_queue_.pop_front();
91
92 lock_.unlock();
93 Context ctx{this, &task.task_};
94 task(ctx, OkStatus());
95 lock_.lock();
96 }
97 }
98
RequestStop()99 void BasicDispatcher::RequestStop() {
100 {
101 std::lock_guard lock(lock_);
102 stop_requested_ = true;
103 }
104 timed_notification_.release();
105 }
106
DrainTaskQueue()107 void BasicDispatcher::DrainTaskQueue() {
108 while (!task_queue_.empty()) {
109 backend::NativeTask& task = task_queue_.front();
110 task_queue_.pop_front();
111
112 lock_.unlock();
113 Context ctx{this, &task.task_};
114 task(ctx, Status::Cancelled());
115 lock_.lock();
116 }
117 }
118
PostAt(Task & task,chrono::SystemClock::time_point time)119 void BasicDispatcher::PostAt(Task& task, chrono::SystemClock::time_point time) {
120 PostTaskInternal(task.native_type(), time);
121 }
122
Cancel(Task & task)123 bool BasicDispatcher::Cancel(Task& task) {
124 std::lock_guard lock(lock_);
125 return task_queue_.remove(task.native_type());
126 }
127
PostTaskInternal(backend::NativeTask & task,chrono::SystemClock::time_point time_due)128 void BasicDispatcher::PostTaskInternal(
129 backend::NativeTask& task, chrono::SystemClock::time_point time_due) {
130 lock_.lock();
131 task.due_time_ = time_due;
132 // Insert the new task in the queue after all tasks with the same or earlier
133 // deadline to ensure FIFO execution order.
134 auto it_front = task_queue_.begin();
135 auto it_behind = task_queue_.before_begin();
136 while (it_front != task_queue_.end() && time_due >= it_front->due_time_) {
137 ++it_front;
138 ++it_behind;
139 }
140 task_queue_.insert_after(it_behind, task);
141 lock_.unlock();
142 timed_notification_.release();
143 }
144
145 } // namespace pw::async
146