xref: /aosp_15_r20/external/angle/src/common/WorkerThread.cpp (revision 8975f5c5ed3d1c378011245431ada316dfb6f244)
1 //
2 // Copyright 2016 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // WorkerThread:
7 //   Task running thread for ANGLE, similar to a TaskRunner in Chromium.
8 //   Might be implemented differently depending on platform.
9 //
10 
11 #include "common/WorkerThread.h"
12 
13 #include "common/angleutils.h"
14 #include "common/system_utils.h"
15 
16 // Controls if our threading code uses std::async or falls back to single-threaded operations.
17 // Note that we can't easily use std::async in UWPs due to UWP threading restrictions.
18 #if !defined(ANGLE_STD_ASYNC_WORKERS) && !defined(ANGLE_ENABLE_WINDOWS_UWP)
19 #    define ANGLE_STD_ASYNC_WORKERS 1
20 #endif  // !defined(ANGLE_STD_ASYNC_WORKERS) && & !defined(ANGLE_ENABLE_WINDOWS_UWP)
21 
22 #if ANGLE_DELEGATE_WORKERS || ANGLE_STD_ASYNC_WORKERS
23 #    include <future>
24 #    include <queue>
25 #    include <thread>
26 #endif  // ANGLE_DELEGATE_WORKERS || ANGLE_STD_ASYNC_WORKERS
27 
28 namespace angle
29 {
30 
31 WaitableEvent::WaitableEvent()  = default;
32 WaitableEvent::~WaitableEvent() = default;
33 
wait()34 void WaitableEventDone::wait() {}
35 
isReady()36 bool WaitableEventDone::isReady()
37 {
38     return true;
39 }
40 
markAsReady()41 void AsyncWaitableEvent::markAsReady()
42 {
43     std::lock_guard<std::mutex> lock(mMutex);
44     mIsReady = true;
45     mCondition.notify_all();
46 }
47 
wait()48 void AsyncWaitableEvent::wait()
49 {
50     std::unique_lock<std::mutex> lock(mMutex);
51     mCondition.wait(lock, [this] { return mIsReady; });
52 }
53 
isReady()54 bool AsyncWaitableEvent::isReady()
55 {
56     std::lock_guard<std::mutex> lock(mMutex);
57     return mIsReady;
58 }
59 
60 WorkerThreadPool::WorkerThreadPool()  = default;
61 WorkerThreadPool::~WorkerThreadPool() = default;
62 
63 class SingleThreadedWorkerPool final : public WorkerThreadPool
64 {
65   public:
66     std::shared_ptr<WaitableEvent> postWorkerTask(const std::shared_ptr<Closure> &task) override;
67     bool isAsync() override;
68 };
69 
70 // SingleThreadedWorkerPool implementation.
postWorkerTask(const std::shared_ptr<Closure> & task)71 std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
72     const std::shared_ptr<Closure> &task)
73 {
74     // Thread safety: This function is thread-safe because the task is run on the calling thread
75     // itself.
76     (*task)();
77     return std::make_shared<WaitableEventDone>();
78 }
79 
isAsync()80 bool SingleThreadedWorkerPool::isAsync()
81 {
82     return false;
83 }
84 
85 #if ANGLE_STD_ASYNC_WORKERS
86 
87 class AsyncWorkerPool final : public WorkerThreadPool
88 {
89   public:
90     AsyncWorkerPool(size_t numThreads);
91 
92     ~AsyncWorkerPool() override;
93 
94     std::shared_ptr<WaitableEvent> postWorkerTask(const std::shared_ptr<Closure> &task) override;
95 
96     bool isAsync() override;
97 
98   private:
99     void createThreads();
100 
101     using Task = std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>;
102 
103     // Thread's main loop
104     void threadLoop();
105 
106     bool mTerminated = false;
107     std::mutex mMutex;                 // Protects access to the fields in this class
108     std::condition_variable mCondVar;  // Signals when work is available in the queue
109     std::queue<Task> mTaskQueue;
110     std::deque<std::thread> mThreads;
111     size_t mDesiredThreadCount;
112 };
113 
114 // AsyncWorkerPool implementation.
115 
AsyncWorkerPool(size_t numThreads)116 AsyncWorkerPool::AsyncWorkerPool(size_t numThreads) : mDesiredThreadCount(numThreads)
117 {
118     ASSERT(numThreads != 0);
119 }
120 
~AsyncWorkerPool()121 AsyncWorkerPool::~AsyncWorkerPool()
122 {
123     {
124         std::unique_lock<std::mutex> lock(mMutex);
125         mTerminated = true;
126     }
127     mCondVar.notify_all();
128     for (auto &thread : mThreads)
129     {
130         ASSERT(thread.get_id() != std::this_thread::get_id());
131         thread.join();
132     }
133 }
134 
createThreads()135 void AsyncWorkerPool::createThreads()
136 {
137     if (mDesiredThreadCount == mThreads.size())
138     {
139         return;
140     }
141     ASSERT(mThreads.empty());
142 
143     for (size_t i = 0; i < mDesiredThreadCount; ++i)
144     {
145         mThreads.emplace_back(&AsyncWorkerPool::threadLoop, this);
146     }
147 }
148 
postWorkerTask(const std::shared_ptr<Closure> & task)149 std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(const std::shared_ptr<Closure> &task)
150 {
151     // Thread safety: This function is thread-safe because access to |mTaskQueue| is protected by
152     // |mMutex|.
153     auto waitable = std::make_shared<AsyncWaitableEvent>();
154     {
155         std::lock_guard<std::mutex> lock(mMutex);
156 
157         // Lazily create the threads on first task
158         createThreads();
159 
160         mTaskQueue.push(std::make_pair(waitable, task));
161     }
162     mCondVar.notify_one();
163     return waitable;
164 }
165 
threadLoop()166 void AsyncWorkerPool::threadLoop()
167 {
168     angle::SetCurrentThreadName("ANGLE-Worker");
169 
170     while (true)
171     {
172         Task task;
173         {
174             std::unique_lock<std::mutex> lock(mMutex);
175             mCondVar.wait(lock, [this] { return !mTaskQueue.empty() || mTerminated; });
176             if (mTerminated)
177             {
178                 return;
179             }
180             task = mTaskQueue.front();
181             mTaskQueue.pop();
182         }
183 
184         auto &waitable = task.first;
185         auto &closure  = task.second;
186 
187         // Note: always add an ANGLE_TRACE_EVENT* macro in the closure.  Then the job will show up
188         // in traces.
189         (*closure)();
190         // Release shared_ptr<Closure> before notifying the event to allow for destructor based
191         // dependencies (example: anglebug.com/42267099)
192         task.second.reset();
193         waitable->markAsReady();
194     }
195 }
196 
isAsync()197 bool AsyncWorkerPool::isAsync()
198 {
199     return true;
200 }
201 
202 #endif  // ANGLE_STD_ASYNC_WORKERS
203 
204 #if ANGLE_DELEGATE_WORKERS
205 
206 class DelegateWorkerPool final : public WorkerThreadPool
207 {
208   public:
DelegateWorkerPool(PlatformMethods * platform)209     DelegateWorkerPool(PlatformMethods *platform) : mPlatform(platform) {}
210     ~DelegateWorkerPool() override = default;
211 
212     std::shared_ptr<WaitableEvent> postWorkerTask(const std::shared_ptr<Closure> &task) override;
213 
214     bool isAsync() override;
215 
216   private:
217     PlatformMethods *mPlatform;
218 };
219 
220 // A function wrapper to execute the closure and to notify the waitable
221 // event after the execution.
222 class DelegateWorkerTask
223 {
224   public:
DelegateWorkerTask(const std::shared_ptr<Closure> & task,std::shared_ptr<AsyncWaitableEvent> waitable)225     DelegateWorkerTask(const std::shared_ptr<Closure> &task,
226                        std::shared_ptr<AsyncWaitableEvent> waitable)
227         : mTask(task), mWaitable(waitable)
228     {}
229     DelegateWorkerTask()                     = delete;
230     DelegateWorkerTask(DelegateWorkerTask &) = delete;
231 
RunTask(void * userData)232     static void RunTask(void *userData)
233     {
234         DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData);
235         (*workerTask->mTask)();
236         workerTask->mWaitable->markAsReady();
237 
238         // Delete the task after its execution.
239         delete workerTask;
240     }
241 
242   private:
243     ~DelegateWorkerTask() = default;
244 
245     std::shared_ptr<Closure> mTask;
246     std::shared_ptr<AsyncWaitableEvent> mWaitable;
247 };
248 
249 ANGLE_NO_SANITIZE_CFI_ICALL
postWorkerTask(const std::shared_ptr<Closure> & task)250 std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(
251     const std::shared_ptr<Closure> &task)
252 {
253     if (mPlatform->postWorkerTask == nullptr)
254     {
255         // In the unexpected case where the platform methods have been changed during execution and
256         // postWorkerTask is no longer usable, simply run the task on the calling thread.
257         (*task)();
258         return std::make_shared<WaitableEventDone>();
259     }
260 
261     // Thread safety: This function is thread-safe because the |postWorkerTask| platform method is
262     // expected to be thread safe.  For Chromium, that forwards the call to the |TaskTracker| class
263     // in base/task/thread_pool/task_tracker.h which is thread-safe.
264     auto waitable = std::make_shared<AsyncWaitableEvent>();
265 
266     // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
267     DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
268     mPlatform->postWorkerTask(mPlatform, DelegateWorkerTask::RunTask, workerTask);
269 
270     return waitable;
271 }
272 
isAsync()273 bool DelegateWorkerPool::isAsync()
274 {
275     return mPlatform->postWorkerTask != nullptr;
276 }
277 #endif
278 
279 // static
Create(size_t numThreads,PlatformMethods * platform)280 std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(size_t numThreads,
281                                                            PlatformMethods *platform)
282 {
283     const bool multithreaded = numThreads != 1;
284     std::shared_ptr<WorkerThreadPool> pool(nullptr);
285 
286 #if ANGLE_DELEGATE_WORKERS
287     const bool hasPostWorkerTaskImpl = platform->postWorkerTask != nullptr;
288     if (hasPostWorkerTaskImpl && multithreaded)
289     {
290         pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool(platform));
291     }
292 #endif
293 #if ANGLE_STD_ASYNC_WORKERS
294     if (!pool && multithreaded)
295     {
296         pool = std::shared_ptr<WorkerThreadPool>(new AsyncWorkerPool(
297             numThreads == 0 ? std::thread::hardware_concurrency() : numThreads));
298     }
299 #endif
300     if (!pool)
301     {
302         return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool());
303     }
304     return pool;
305 }
306 }  // namespace angle
307