xref: /aosp_15_r20/external/pigweed/pw_work_queue/work_queue.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_work_queue/work_queue.h"
16 
17 #include <mutex>
18 
19 #include "pw_assert/check.h"
20 
21 namespace pw::work_queue {
22 
RequestStop()23 void WorkQueue::RequestStop() {
24   {
25     std::lock_guard lock(lock_);
26     stop_requested_ = true;
27   }  // Release lock before calling .release() on the semaphore.
28   work_notification_.release();
29 }
30 
Run()31 void WorkQueue::Run() {
32   while (true) {
33     work_notification_.acquire();
34 
35     // Drain the work queue.
36     bool stop_requested;
37     bool work_remaining;
38     do {
39       std::optional<WorkItem> possible_work_item;
40       {
41         std::lock_guard lock(lock_);
42         if (!queue_.empty()) {
43           possible_work_item.emplace(std::move(queue_.front()));
44           queue_.pop();
45         }
46         work_remaining = !queue_.empty();
47         stop_requested = stop_requested_;
48       }
49       if (!possible_work_item.has_value()) {
50         continue;  // No work item to process.
51       }
52       WorkItem& work_item = possible_work_item.value();
53       PW_CHECK(work_item != nullptr);
54       work_item();
55     } while (work_remaining);
56 
57     // Queue was drained, return if we've been requested to stop.
58     if (stop_requested) {
59       return;
60     }
61   }
62 }
63 
CheckPushWork(WorkItem && work_item)64 void WorkQueue::CheckPushWork(WorkItem&& work_item) {
65   PW_CHECK_OK(InternalPushWork(std::move(work_item)),
66               "Failed to push work item into the work queue");
67 }
68 
InternalPushWork(WorkItem && work_item)69 Status WorkQueue::InternalPushWork(WorkItem&& work_item) {
70   {
71     std::lock_guard lock(lock_);
72 
73     if (stop_requested_) {
74       // Entries are not permitted to be enqueued once stop has been requested.
75       return Status::FailedPrecondition();
76     }
77 
78     if (queue_.full()) {
79       return Status::ResourceExhausted();
80     }
81 
82     queue_.emplace(std::move(work_item));
83 
84     // Update the watermarks for the queue.
85     const uint32_t queue_entries = queue_.size();
86     if (queue_entries > max_queue_used_.value()) {
87       max_queue_used_.Set(queue_entries);
88     }
89     const uint32_t queue_remaining = queue_.capacity() - queue_entries;
90     if (queue_remaining < min_queue_remaining_.value()) {
91       min_queue_remaining_.Set(queue_entries);
92     }
93   }  // Release lock before calling .release() on the semaphore.
94   work_notification_.release();
95   return OkStatus();
96 }
97 
98 }  // namespace pw::work_queue
99