1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #include "pw_work_queue/work_queue.h" 16 17 #include <mutex> 18 19 #include "pw_assert/check.h" 20 21 namespace pw::work_queue { 22 RequestStop()23void WorkQueue::RequestStop() { 24 { 25 std::lock_guard lock(lock_); 26 stop_requested_ = true; 27 } // Release lock before calling .release() on the semaphore. 28 work_notification_.release(); 29 } 30 Run()31void WorkQueue::Run() { 32 while (true) { 33 work_notification_.acquire(); 34 35 // Drain the work queue. 36 bool stop_requested; 37 bool work_remaining; 38 do { 39 std::optional<WorkItem> possible_work_item; 40 { 41 std::lock_guard lock(lock_); 42 if (!queue_.empty()) { 43 possible_work_item.emplace(std::move(queue_.front())); 44 queue_.pop(); 45 } 46 work_remaining = !queue_.empty(); 47 stop_requested = stop_requested_; 48 } 49 if (!possible_work_item.has_value()) { 50 continue; // No work item to process. 51 } 52 WorkItem& work_item = possible_work_item.value(); 53 PW_CHECK(work_item != nullptr); 54 work_item(); 55 } while (work_remaining); 56 57 // Queue was drained, return if we've been requested to stop. 58 if (stop_requested) { 59 return; 60 } 61 } 62 } 63 CheckPushWork(WorkItem && work_item)64void WorkQueue::CheckPushWork(WorkItem&& work_item) { 65 PW_CHECK_OK(InternalPushWork(std::move(work_item)), 66 "Failed to push work item into the work queue"); 67 } 68 InternalPushWork(WorkItem && work_item)69Status WorkQueue::InternalPushWork(WorkItem&& work_item) { 70 { 71 std::lock_guard lock(lock_); 72 73 if (stop_requested_) { 74 // Entries are not permitted to be enqueued once stop has been requested. 75 return Status::FailedPrecondition(); 76 } 77 78 if (queue_.full()) { 79 return Status::ResourceExhausted(); 80 } 81 82 queue_.emplace(std::move(work_item)); 83 84 // Update the watermarks for the queue. 85 const uint32_t queue_entries = queue_.size(); 86 if (queue_entries > max_queue_used_.value()) { 87 max_queue_used_.Set(queue_entries); 88 } 89 const uint32_t queue_remaining = queue_.capacity() - queue_entries; 90 if (queue_remaining < min_queue_remaining_.value()) { 91 min_queue_remaining_.Set(queue_entries); 92 } 93 } // Release lock before calling .release() on the semaphore. 94 work_notification_.release(); 95 return OkStatus(); 96 } 97 98 } // namespace pw::work_queue 99