// Copyright 2023 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. #include "pw_async2/dispatcher_base.h" #include #include "pw_assert/check.h" #include "pw_sync/lock_annotations.h" namespace pw::async2 { void Context::ReEnqueue() { Waker waker; waker_->InternalCloneInto(waker); std::move(waker).Wake(); } void Context::InternalStoreWaker(Waker& waker_out) { waker_->InternalCloneInto(waker_out); } void Task::RemoveAllWakersLocked() { while (wakers_ != nullptr) { Waker* current = wakers_; wakers_ = current->next_; current->task_ = nullptr; current->next_ = nullptr; } } void Task::AddWakerLocked(Waker& waker) { waker.task_ = this; waker.next_ = wakers_; wakers_ = &waker; } void Task::RemoveWakerLocked(Waker& waker) { if (&waker == wakers_) { wakers_ = wakers_->next_; } else { Waker* current = wakers_; while (current->next_ != &waker) { current = current->next_; } current->next_ = current->next_->next_; } waker.task_ = nullptr; waker.next_ = nullptr; } bool Task::IsRegistered() const { std::lock_guard lock(dispatcher_lock()); return state_ != Task::State::kUnposted; } void Task::Deregister() { pw::sync::Mutex* task_execution_lock; { // Fast path: the task is not running. std::lock_guard lock(dispatcher_lock()); if (TryDeregister()) { return; } // The task was running, so we have to wait for the task to stop being // run by acquiring the `task_lock`. task_execution_lock = &dispatcher_->task_execution_lock_; } // NOTE: there is a race here where `task_execution_lock_` may be // invalidated by concurrent destruction of the dispatcher. // // This restriction is documented above, but is still fairly footgun-y. std::lock_guard task_lock(*task_execution_lock); std::lock_guard lock(dispatcher_lock()); PW_CHECK(TryDeregister()); } bool Task::TryDeregister() { switch (state_) { case Task::State::kUnposted: return true; case Task::State::kSleeping: dispatcher_->RemoveSleepingTaskLocked(*this); break; case Task::State::kRunning: return false; case Task::State::kWoken: dispatcher_->RemoveWokenTaskLocked(*this); break; } state_ = Task::State::kUnposted; RemoveAllWakersLocked(); // Wake the dispatcher up if this was the last task so that it can see that // all tasks have completed. if (dispatcher_->first_woken_ == nullptr && dispatcher_->sleeping_ == nullptr && dispatcher_->wants_wake_) { dispatcher_->DoWake(); } dispatcher_ = nullptr; return true; } Waker::Waker(Waker&& other) noexcept { std::lock_guard lock(dispatcher_lock()); if (other.task_ == nullptr) { return; } Task& task = *other.task_; task.RemoveWakerLocked(other); task.AddWakerLocked(*this); } Waker& Waker::operator=(Waker&& other) noexcept { std::lock_guard lock(dispatcher_lock()); RemoveFromTaskWakerListLocked(); if (other.task_ == nullptr) { return *this; } Task& task = *other.task_; task.RemoveWakerLocked(other); task.AddWakerLocked(*this); return *this; } void Waker::Wake() && { std::lock_guard lock(dispatcher_lock()); if (task_ != nullptr) { task_->dispatcher_->WakeTask(*task_); RemoveFromTaskWakerListLocked(); } } void Waker::InternalCloneInto(Waker& out) & { std::lock_guard lock(dispatcher_lock()); // The `out` waker already points to this task, so no work is necessary. if (out.task_ == task_) { return; } // Remove the output waker from its existing task's list. out.RemoveFromTaskWakerListLocked(); out.task_ = task_; // Only add if the waker being cloned is actually associated with a task. if (task_ != nullptr) { task_->AddWakerLocked(out); } } bool Waker::IsEmpty() const { std::lock_guard lock(dispatcher_lock()); return task_ == nullptr; } void Waker::InsertIntoTaskWakerList() { std::lock_guard lock(dispatcher_lock()); InsertIntoTaskWakerListLocked(); } void Waker::InsertIntoTaskWakerListLocked() { if (task_ != nullptr) { task_->AddWakerLocked(*this); } } void Waker::RemoveFromTaskWakerList() { std::lock_guard lock(dispatcher_lock()); RemoveFromTaskWakerListLocked(); } void Waker::RemoveFromTaskWakerListLocked() { if (task_ != nullptr) { task_->RemoveWakerLocked(*this); } } void NativeDispatcherBase::Deregister() { std::lock_guard lock(dispatcher_lock()); UnpostTaskList(first_woken_); first_woken_ = nullptr; last_woken_ = nullptr; UnpostTaskList(sleeping_); sleeping_ = nullptr; } void NativeDispatcherBase::Post(Task& task) { bool wake_dispatcher = false; { std::lock_guard lock(dispatcher_lock()); PW_DASSERT(task.state_ == Task::State::kUnposted); PW_DASSERT(task.dispatcher_ == nullptr); task.state_ = Task::State::kWoken; task.dispatcher_ = this; AddTaskToWokenList(task); if (wants_wake_) { wake_dispatcher = true; wants_wake_ = false; } } // Note: unlike in ``WakeTask``, here we know that the ``Dispatcher`` will // not be destroyed out from under our feet because we're in a method being // called on the ``Dispatcher`` by a user. if (wake_dispatcher) { DoWake(); } } NativeDispatcherBase::SleepInfo NativeDispatcherBase::AttemptRequestWake( bool allow_empty) { std::lock_guard lock(dispatcher_lock()); // Don't allow sleeping if there are already tasks waiting to be run. if (first_woken_ != nullptr) { return SleepInfo::DontSleep(); } if (!allow_empty && sleeping_ == nullptr) { return SleepInfo::DontSleep(); } /// Indicate that the ``Dispatcher`` is sleeping and will need a ``DoWake`` /// call once more work can be done. wants_wake_ = true; // Once timers are added, this should check them. return SleepInfo::Indefinitely(); } NativeDispatcherBase::RunOneTaskResult NativeDispatcherBase::RunOneTask( Dispatcher& dispatcher, Task* task_to_look_for) { std::lock_guard task_lock(task_execution_lock_); Task* task; { std::lock_guard lock(dispatcher_lock()); task = PopWokenTask(); if (task == nullptr) { bool all_complete = first_woken_ == nullptr && sleeping_ == nullptr; return RunOneTaskResult( /*completed_all_tasks=*/all_complete, /*completed_main_task=*/false, /*ran_a_task=*/false); } task->state_ = Task::State::kRunning; } bool complete; { Waker waker(*task); Context context(dispatcher, waker); complete = task->Pend(context).IsReady(); } if (complete) { bool all_complete; { std::lock_guard lock(dispatcher_lock()); switch (task->state_) { case Task::State::kUnposted: case Task::State::kSleeping: PW_DASSERT(false); PW_UNREACHABLE; case Task::State::kRunning: break; case Task::State::kWoken: RemoveWokenTaskLocked(*task); break; } task->state_ = Task::State::kUnposted; task->dispatcher_ = nullptr; task->RemoveAllWakersLocked(); all_complete = first_woken_ == nullptr && sleeping_ == nullptr; } task->DoDestroy(); return RunOneTaskResult( /*completed_all_tasks=*/all_complete, /*completed_main_task=*/task == task_to_look_for, /*ran_a_task=*/true); } else { std::lock_guard lock(dispatcher_lock()); if (task->state_ == Task::State::kRunning) { task->state_ = Task::State::kSleeping; AddTaskToSleepingList(*task); } return RunOneTaskResult( /*completed_all_tasks=*/false, /*completed_main_task=*/false, /*ran_a_task=*/true); } } void NativeDispatcherBase::UnpostTaskList(Task* task) { while (task != nullptr) { task->state_ = Task::State::kUnposted; task->dispatcher_ = nullptr; task->prev_ = nullptr; Task* next = task->next_; task->next_ = nullptr; task->RemoveAllWakersLocked(); task = next; } } void NativeDispatcherBase::RemoveTaskFromList(Task& task) { if (task.prev_ != nullptr) { task.prev_->next_ = task.next_; } if (task.next_ != nullptr) { task.next_->prev_ = task.prev_; } task.prev_ = nullptr; task.next_ = nullptr; } void NativeDispatcherBase::RemoveWokenTaskLocked(Task& task) { if (first_woken_ == &task) { first_woken_ = task.next_; } if (last_woken_ == &task) { last_woken_ = task.prev_; } RemoveTaskFromList(task); } void NativeDispatcherBase::RemoveSleepingTaskLocked(Task& task) { if (sleeping_ == &task) { sleeping_ = task.next_; } RemoveTaskFromList(task); } void NativeDispatcherBase::AddTaskToWokenList(Task& task) { if (first_woken_ == nullptr) { first_woken_ = &task; } else { last_woken_->next_ = &task; task.prev_ = last_woken_; } last_woken_ = &task; } void NativeDispatcherBase::AddTaskToSleepingList(Task& task) { if (sleeping_ != nullptr) { sleeping_->prev_ = &task; } task.next_ = sleeping_; sleeping_ = &task; } void NativeDispatcherBase::WakeTask(Task& task) { switch (task.state_) { case Task::State::kWoken: // Do nothing-- this has already been woken. return; case Task::State::kUnposted: // This should be unreachable. PW_CHECK(false); case Task::State::kRunning: // Wake again to indicate that this task should be run once more, // as the state of the world may have changed since the task // started running. break; case Task::State::kSleeping: RemoveSleepingTaskLocked(task); // Wake away! break; } task.state_ = Task::State::kWoken; AddTaskToWokenList(task); if (wants_wake_) { // Note: it's quite annoying to make this call under the lock, as it can // result in extra thread wakeup/sleep cycles. // // However, releasing the lock first would allow for the possibility that // the ``Dispatcher`` has been destroyed, making the call invalid. DoWake(); } } Task* NativeDispatcherBase::PopWokenTask() { if (first_woken_ == nullptr) { return nullptr; } Task& task = *first_woken_; if (task.next_ != nullptr) { task.next_->prev_ = nullptr; } else { last_woken_ = nullptr; } first_woken_ = task.next_; task.prev_ = nullptr; task.next_ = nullptr; return &task; } } // namespace pw::async2