1 //
2 // Copyright 2016 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // WorkerThread:
7 // Task running thread for ANGLE, similar to a TaskRunner in Chromium.
8 // Might be implemented differently depending on platform.
9 //
10
11 #include "common/WorkerThread.h"
12
13 #include "common/angleutils.h"
14 #include "common/system_utils.h"
15
16 // Controls if our threading code uses std::async or falls back to single-threaded operations.
17 // Note that we can't easily use std::async in UWPs due to UWP threading restrictions.
18 #if !defined(ANGLE_STD_ASYNC_WORKERS) && !defined(ANGLE_ENABLE_WINDOWS_UWP)
19 # define ANGLE_STD_ASYNC_WORKERS 1
20 #endif // !defined(ANGLE_STD_ASYNC_WORKERS) && & !defined(ANGLE_ENABLE_WINDOWS_UWP)
21
22 #if ANGLE_DELEGATE_WORKERS || ANGLE_STD_ASYNC_WORKERS
23 # include <future>
24 # include <queue>
25 # include <thread>
26 #endif // ANGLE_DELEGATE_WORKERS || ANGLE_STD_ASYNC_WORKERS
27
28 namespace angle
29 {
30
31 WaitableEvent::WaitableEvent() = default;
32 WaitableEvent::~WaitableEvent() = default;
33
wait()34 void WaitableEventDone::wait() {}
35
isReady()36 bool WaitableEventDone::isReady()
37 {
38 return true;
39 }
40
markAsReady()41 void AsyncWaitableEvent::markAsReady()
42 {
43 std::lock_guard<std::mutex> lock(mMutex);
44 mIsReady = true;
45 mCondition.notify_all();
46 }
47
wait()48 void AsyncWaitableEvent::wait()
49 {
50 std::unique_lock<std::mutex> lock(mMutex);
51 mCondition.wait(lock, [this] { return mIsReady; });
52 }
53
isReady()54 bool AsyncWaitableEvent::isReady()
55 {
56 std::lock_guard<std::mutex> lock(mMutex);
57 return mIsReady;
58 }
59
60 WorkerThreadPool::WorkerThreadPool() = default;
61 WorkerThreadPool::~WorkerThreadPool() = default;
62
63 class SingleThreadedWorkerPool final : public WorkerThreadPool
64 {
65 public:
66 std::shared_ptr<WaitableEvent> postWorkerTask(const std::shared_ptr<Closure> &task) override;
67 bool isAsync() override;
68 };
69
70 // SingleThreadedWorkerPool implementation.
postWorkerTask(const std::shared_ptr<Closure> & task)71 std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
72 const std::shared_ptr<Closure> &task)
73 {
74 // Thread safety: This function is thread-safe because the task is run on the calling thread
75 // itself.
76 (*task)();
77 return std::make_shared<WaitableEventDone>();
78 }
79
isAsync()80 bool SingleThreadedWorkerPool::isAsync()
81 {
82 return false;
83 }
84
85 #if ANGLE_STD_ASYNC_WORKERS
86
87 class AsyncWorkerPool final : public WorkerThreadPool
88 {
89 public:
90 AsyncWorkerPool(size_t numThreads);
91
92 ~AsyncWorkerPool() override;
93
94 std::shared_ptr<WaitableEvent> postWorkerTask(const std::shared_ptr<Closure> &task) override;
95
96 bool isAsync() override;
97
98 private:
99 void createThreads();
100
101 using Task = std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>;
102
103 // Thread's main loop
104 void threadLoop();
105
106 bool mTerminated = false;
107 std::mutex mMutex; // Protects access to the fields in this class
108 std::condition_variable mCondVar; // Signals when work is available in the queue
109 std::queue<Task> mTaskQueue;
110 std::deque<std::thread> mThreads;
111 size_t mDesiredThreadCount;
112 };
113
114 // AsyncWorkerPool implementation.
115
AsyncWorkerPool(size_t numThreads)116 AsyncWorkerPool::AsyncWorkerPool(size_t numThreads) : mDesiredThreadCount(numThreads)
117 {
118 ASSERT(numThreads != 0);
119 }
120
~AsyncWorkerPool()121 AsyncWorkerPool::~AsyncWorkerPool()
122 {
123 {
124 std::unique_lock<std::mutex> lock(mMutex);
125 mTerminated = true;
126 }
127 mCondVar.notify_all();
128 for (auto &thread : mThreads)
129 {
130 ASSERT(thread.get_id() != std::this_thread::get_id());
131 thread.join();
132 }
133 }
134
createThreads()135 void AsyncWorkerPool::createThreads()
136 {
137 if (mDesiredThreadCount == mThreads.size())
138 {
139 return;
140 }
141 ASSERT(mThreads.empty());
142
143 for (size_t i = 0; i < mDesiredThreadCount; ++i)
144 {
145 mThreads.emplace_back(&AsyncWorkerPool::threadLoop, this);
146 }
147 }
148
postWorkerTask(const std::shared_ptr<Closure> & task)149 std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(const std::shared_ptr<Closure> &task)
150 {
151 // Thread safety: This function is thread-safe because access to |mTaskQueue| is protected by
152 // |mMutex|.
153 auto waitable = std::make_shared<AsyncWaitableEvent>();
154 {
155 std::lock_guard<std::mutex> lock(mMutex);
156
157 // Lazily create the threads on first task
158 createThreads();
159
160 mTaskQueue.push(std::make_pair(waitable, task));
161 }
162 mCondVar.notify_one();
163 return waitable;
164 }
165
threadLoop()166 void AsyncWorkerPool::threadLoop()
167 {
168 angle::SetCurrentThreadName("ANGLE-Worker");
169
170 while (true)
171 {
172 Task task;
173 {
174 std::unique_lock<std::mutex> lock(mMutex);
175 mCondVar.wait(lock, [this] { return !mTaskQueue.empty() || mTerminated; });
176 if (mTerminated)
177 {
178 return;
179 }
180 task = mTaskQueue.front();
181 mTaskQueue.pop();
182 }
183
184 auto &waitable = task.first;
185 auto &closure = task.second;
186
187 // Note: always add an ANGLE_TRACE_EVENT* macro in the closure. Then the job will show up
188 // in traces.
189 (*closure)();
190 // Release shared_ptr<Closure> before notifying the event to allow for destructor based
191 // dependencies (example: anglebug.com/42267099)
192 task.second.reset();
193 waitable->markAsReady();
194 }
195 }
196
isAsync()197 bool AsyncWorkerPool::isAsync()
198 {
199 return true;
200 }
201
202 #endif // ANGLE_STD_ASYNC_WORKERS
203
204 #if ANGLE_DELEGATE_WORKERS
205
206 class DelegateWorkerPool final : public WorkerThreadPool
207 {
208 public:
DelegateWorkerPool(PlatformMethods * platform)209 DelegateWorkerPool(PlatformMethods *platform) : mPlatform(platform) {}
210 ~DelegateWorkerPool() override = default;
211
212 std::shared_ptr<WaitableEvent> postWorkerTask(const std::shared_ptr<Closure> &task) override;
213
214 bool isAsync() override;
215
216 private:
217 PlatformMethods *mPlatform;
218 };
219
220 // A function wrapper to execute the closure and to notify the waitable
221 // event after the execution.
222 class DelegateWorkerTask
223 {
224 public:
DelegateWorkerTask(const std::shared_ptr<Closure> & task,std::shared_ptr<AsyncWaitableEvent> waitable)225 DelegateWorkerTask(const std::shared_ptr<Closure> &task,
226 std::shared_ptr<AsyncWaitableEvent> waitable)
227 : mTask(task), mWaitable(waitable)
228 {}
229 DelegateWorkerTask() = delete;
230 DelegateWorkerTask(DelegateWorkerTask &) = delete;
231
RunTask(void * userData)232 static void RunTask(void *userData)
233 {
234 DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData);
235 (*workerTask->mTask)();
236 workerTask->mWaitable->markAsReady();
237
238 // Delete the task after its execution.
239 delete workerTask;
240 }
241
242 private:
243 ~DelegateWorkerTask() = default;
244
245 std::shared_ptr<Closure> mTask;
246 std::shared_ptr<AsyncWaitableEvent> mWaitable;
247 };
248
249 ANGLE_NO_SANITIZE_CFI_ICALL
postWorkerTask(const std::shared_ptr<Closure> & task)250 std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(
251 const std::shared_ptr<Closure> &task)
252 {
253 if (mPlatform->postWorkerTask == nullptr)
254 {
255 // In the unexpected case where the platform methods have been changed during execution and
256 // postWorkerTask is no longer usable, simply run the task on the calling thread.
257 (*task)();
258 return std::make_shared<WaitableEventDone>();
259 }
260
261 // Thread safety: This function is thread-safe because the |postWorkerTask| platform method is
262 // expected to be thread safe. For Chromium, that forwards the call to the |TaskTracker| class
263 // in base/task/thread_pool/task_tracker.h which is thread-safe.
264 auto waitable = std::make_shared<AsyncWaitableEvent>();
265
266 // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
267 DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
268 mPlatform->postWorkerTask(mPlatform, DelegateWorkerTask::RunTask, workerTask);
269
270 return waitable;
271 }
272
isAsync()273 bool DelegateWorkerPool::isAsync()
274 {
275 return mPlatform->postWorkerTask != nullptr;
276 }
277 #endif
278
279 // static
Create(size_t numThreads,PlatformMethods * platform)280 std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(size_t numThreads,
281 PlatformMethods *platform)
282 {
283 const bool multithreaded = numThreads != 1;
284 std::shared_ptr<WorkerThreadPool> pool(nullptr);
285
286 #if ANGLE_DELEGATE_WORKERS
287 const bool hasPostWorkerTaskImpl = platform->postWorkerTask != nullptr;
288 if (hasPostWorkerTaskImpl && multithreaded)
289 {
290 pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool(platform));
291 }
292 #endif
293 #if ANGLE_STD_ASYNC_WORKERS
294 if (!pool && multithreaded)
295 {
296 pool = std::shared_ptr<WorkerThreadPool>(new AsyncWorkerPool(
297 numThreads == 0 ? std::thread::hardware_concurrency() : numThreads));
298 }
299 #endif
300 if (!pool)
301 {
302 return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool());
303 }
304 return pool;
305 }
306 } // namespace angle
307