1 /*
2  * Copyright 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "powerhal-libperfmgr"
18 #define ATRACE_TAG (ATRACE_TAG_POWER | ATRACE_TAG_HAL)
19 
20 #include "BackgroundWorker.h"
21 
22 namespace aidl {
23 namespace google {
24 namespace hardware {
25 namespace power {
26 namespace impl {
27 namespace pixel {
28 
PriorityQueueWorkerPool(size_t threadCount,const std::string & threadNamePrefix)29 PriorityQueueWorkerPool::PriorityQueueWorkerPool(size_t threadCount,
30                                                  const std::string &threadNamePrefix) {
31     mRunning = true;
32     mThreadPool.reserve(threadCount);
33     for (size_t threadId = 0; threadId < threadCount; ++threadId) {
34         mThreadPool.push_back(std::thread([&, tid = threadId]() { loop(); }));
35 
36         if (!threadNamePrefix.empty()) {
37             const std::string fullThreadName = threadNamePrefix + std::to_string(threadId);
38             pthread_setname_np(mThreadPool.back().native_handle(), fullThreadName.c_str());
39         }
40     }
41 }
42 
~PriorityQueueWorkerPool()43 PriorityQueueWorkerPool::~PriorityQueueWorkerPool() {
44     {
45         std::lock_guard<std::mutex> lock(mMutex);
46         mRunning = false;
47         mCv.notify_all();
48     }
49     for (auto &t : mThreadPool) {
50         if (t.joinable()) {
51             t.join();
52         }
53     }
54 }
55 
addCallback(int64_t templateQueueWorkerId,std::function<void (int64_t)> callback)56 void PriorityQueueWorkerPool::addCallback(int64_t templateQueueWorkerId,
57                                           std::function<void(int64_t)> callback) {
58     if (!callback) {
59         // Don't add callback if it isn't callable to prevent having to check later
60         return;
61     }
62     std::unique_lock<std::shared_mutex> lock(mSharedMutex);
63     auto itr = mCallbackMap.find(templateQueueWorkerId);
64     if (itr != mCallbackMap.end()) {
65         return;
66     }
67     mCallbackMap[templateQueueWorkerId] = callback;
68 }
69 
removeCallback(int64_t templateQueueWorkerId)70 void PriorityQueueWorkerPool::removeCallback(int64_t templateQueueWorkerId) {
71     std::unique_lock<std::shared_mutex> lock(mSharedMutex);
72     auto itr = mCallbackMap.find(templateQueueWorkerId);
73     if (itr == mCallbackMap.end()) {
74         return;
75     }
76     mCallbackMap.erase(itr);
77 }
78 
schedule(int64_t templateQueueWorkerId,int64_t packageId,std::chrono::steady_clock::time_point deadline)79 void PriorityQueueWorkerPool::schedule(int64_t templateQueueWorkerId, int64_t packageId,
80                                        std::chrono::steady_clock::time_point deadline) {
81     std::unique_lock<std::mutex> lock(mMutex);
82     mPackageQueue.emplace(deadline, templateQueueWorkerId, packageId);
83     mCv.notify_all();
84 }
85 
loop()86 void PriorityQueueWorkerPool::loop() {
87     Package package;
88     while (mRunning) {
89         std::unique_lock<std::mutex> lock(mMutex);
90         // Default to longest wait possible without overflowing if there is
91         // nothing to work on in the queue
92         std::chrono::steady_clock::time_point deadline =
93                 std::chrono::steady_clock::time_point::max();
94 
95         // Use next item to work on deadline if available
96         if (!mPackageQueue.empty()) {
97             deadline = mPackageQueue.top().deadline;
98         }
99 
100         // Wait until signal or deadline
101         mCv.wait_until(lock, deadline, [&]() {
102             // Check if stop running requested, if so return now
103             if (!mRunning)
104                 return true;
105 
106             // Check if nothing in queue (e.g. spurious wakeup), wait as long as possible again
107             if (mPackageQueue.empty()) {
108                 deadline = std::chrono::steady_clock::time_point::max();
109                 return false;
110             }
111 
112             // Something in queue, use that as next deadline
113             deadline = mPackageQueue.top().deadline;
114             // Check if deadline is in the future still, continue waiting with updated deadline
115             if (deadline > std::chrono::steady_clock::now())
116                 return false;
117             // Next work entry's deadline is in the past or exactly now, time to work on it
118             return true;
119         });
120 
121         if (!mRunning)
122             break;
123         if (mPackageQueue.empty())
124             continue;
125 
126         // Copy work entry from queue and unlock
127         package = mPackageQueue.top();
128         mPackageQueue.pop();
129         lock.unlock();
130 
131         // Find callback based on package's callback id
132         {
133             std::shared_lock<std::shared_mutex> lockCb(mSharedMutex);
134             auto callbackItr = mCallbackMap.find(package.templateQueueWorkerId);
135             if (callbackItr == mCallbackMap.end()) {
136                 // Callback was removed before package could be worked on, that's ok just ignore
137                 continue;
138             }
139             // Exceptions disabled so no need to wrap this
140             callbackItr->second(package.packageId);
141         }
142     }
143 }
144 
145 }  // namespace pixel
146 }  // namespace impl
147 }  // namespace power
148 }  // namespace hardware
149 }  // namespace google
150 }  // namespace aidl
151