1 // Copyright 2019 The Marl Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "marl/thread.h"
16
17 #include "marl/debug.h"
18 #include "marl/defer.h"
19 #include "marl/trace.h"
20
21 #include <algorithm> // std::sort
22
23 #include <cstdarg>
24 #include <cstdio>
25
26 #if defined(_WIN32)
27 #define WIN32_LEAN_AND_MEAN 1
28 #include <windows.h>
29 #include <array>
30 #include <cstdlib> // mbstowcs
31 #include <limits> // std::numeric_limits
32 #include <vector>
33 #undef max
34 #elif defined(__APPLE__)
35 #include <mach/thread_act.h>
36 #include <pthread.h>
37 #include <unistd.h>
38 #include <thread>
39 #elif defined(__FreeBSD__)
40 #include <pthread.h>
41 #include <pthread_np.h>
42 #include <unistd.h>
43 #include <thread>
44 #else
45 #include <pthread.h>
46 #include <unistd.h>
47 #include <thread>
48 #endif
49
50 namespace {
51
52 struct CoreHasher {
operator ()__anon9f7b20ce0111::CoreHasher53 inline uint64_t operator()(marl::Thread::Core core) const {
54 return core.pthread.index;
55 }
56 };
57
58 } // anonymous namespace
59
60 namespace marl {
61
62 #if defined(_WIN32)
63 static constexpr size_t MaxCoreCount =
64 std::numeric_limits<decltype(Thread::Core::windows.index)>::max() + 1ULL;
65 static constexpr size_t MaxGroupCount =
66 std::numeric_limits<decltype(Thread::Core::windows.group)>::max() + 1ULL;
67 static_assert(sizeof(KAFFINITY) * 8ULL <= MaxCoreCount,
68 "Thread::Core::windows.index is too small");
69
70 namespace {
71 #define CHECK_WIN32(expr) \
72 do { \
73 auto res = expr; \
74 (void)res; \
75 MARL_ASSERT(res == TRUE, #expr " failed with error: %d", \
76 (int)GetLastError()); \
77 } while (false)
78
79 struct ProcessorGroup {
80 unsigned int count; // number of logical processors in this group.
81 KAFFINITY affinity; // affinity mask.
82 };
83
84 struct ProcessorGroups {
85 std::array<ProcessorGroup, MaxGroupCount> groups;
86 size_t count;
87 };
88
getProcessorGroups()89 const ProcessorGroups& getProcessorGroups() {
90 static ProcessorGroups groups = [] {
91 ProcessorGroups out = {};
92 SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX info[32] = {};
93 DWORD size = sizeof(info);
94 CHECK_WIN32(GetLogicalProcessorInformationEx(RelationGroup, info, &size));
95 DWORD count = size / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX);
96 for (DWORD i = 0; i < count; i++) {
97 if (info[i].Relationship == RelationGroup) {
98 auto groupCount = info[i].Group.ActiveGroupCount;
99 for (WORD groupIdx = 0; groupIdx < groupCount; groupIdx++) {
100 auto const& groupInfo = info[i].Group.GroupInfo[groupIdx];
101 out.groups[out.count++] = ProcessorGroup{
102 groupInfo.ActiveProcessorCount, groupInfo.ActiveProcessorMask};
103 MARL_ASSERT(out.count <= MaxGroupCount, "Group index overflow");
104 }
105 }
106 }
107 return out;
108 }();
109 return groups;
110 }
111 } // namespace
112 #endif // defined(_WIN32)
113
114 ////////////////////////////////////////////////////////////////////////////////
115 // Thread::Affinty
116 ////////////////////////////////////////////////////////////////////////////////
117
Affinity(Allocator * allocator)118 Thread::Affinity::Affinity(Allocator* allocator) : cores(allocator) {}
Affinity(Affinity && other)119 Thread::Affinity::Affinity(Affinity&& other) : cores(std::move(other.cores)) {}
operator =(Affinity && other)120 Thread::Affinity& Thread::Affinity::operator=(Affinity&& other) {
121 cores = std::move(other.cores);
122 return *this;
123 }
Affinity(const Affinity & other,Allocator * allocator)124 Thread::Affinity::Affinity(const Affinity& other, Allocator* allocator)
125 : cores(other.cores, allocator) {}
126
Affinity(std::initializer_list<Core> list,Allocator * allocator)127 Thread::Affinity::Affinity(std::initializer_list<Core> list,
128 Allocator* allocator)
129 : cores(allocator) {
130 cores.reserve(list.size());
131 for (auto core : list) {
132 cores.push_back(core);
133 }
134 }
135
Affinity(const containers::vector<Core,32> & coreList,Allocator * allocator)136 Thread::Affinity::Affinity(const containers::vector<Core, 32>& coreList,
137 Allocator* allocator)
138 : cores(coreList, allocator) {}
139
all(Allocator * allocator)140 Thread::Affinity Thread::Affinity::all(
141 Allocator* allocator /* = Allocator::Default */) {
142 Thread::Affinity affinity(allocator);
143
144 #if defined(_WIN32)
145 const auto& groups = getProcessorGroups();
146 for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) {
147 const auto& group = groups.groups[groupIdx];
148 Core core;
149 core.windows.group = static_cast<decltype(Core::windows.group)>(groupIdx);
150 for (unsigned int coreIdx = 0; coreIdx < group.count; coreIdx++) {
151 if ((group.affinity >> coreIdx) & 1) {
152 core.windows.index = static_cast<decltype(core.windows.index)>(coreIdx);
153 affinity.cores.emplace_back(std::move(core));
154 }
155 }
156 }
157 #elif defined(__linux__) && !defined(__ANDROID__) && !defined(__BIONIC__)
158 auto thread = pthread_self();
159 cpu_set_t cpuset;
160 CPU_ZERO(&cpuset);
161 if (pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset) == 0) {
162 int count = CPU_COUNT(&cpuset);
163 for (int i = 0; i < count; i++) {
164 Core core;
165 core.pthread.index = static_cast<uint16_t>(i);
166 affinity.cores.emplace_back(std::move(core));
167 }
168 }
169 #elif defined(__FreeBSD__)
170 auto thread = pthread_self();
171 cpuset_t cpuset;
172 CPU_ZERO(&cpuset);
173 if (pthread_getaffinity_np(thread, sizeof(cpuset_t), &cpuset) == 0) {
174 int count = CPU_COUNT(&cpuset);
175 for (int i = 0; i < count; i++) {
176 Core core;
177 core.pthread.index = static_cast<uint16_t>(i);
178 affinity.cores.emplace_back(std::move(core));
179 }
180 }
181 #else
182 static_assert(!supported,
183 "marl::Thread::Affinity::supported is true, but "
184 "Thread::Affinity::all() is not implemented for this platform");
185 #endif
186
187 return affinity;
188 }
189
anyOf(Affinity && affinity,Allocator * allocator)190 std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::anyOf(
191 Affinity&& affinity,
192 Allocator* allocator /* = Allocator::Default */) {
193 struct Policy : public Thread::Affinity::Policy {
194 Affinity affinity;
195 Policy(Affinity&& affinity) : affinity(std::move(affinity)) {}
196
197 Affinity get(uint32_t threadId, Allocator* allocator) const override {
198 #if defined(_WIN32)
199 auto count = affinity.count();
200 if (count == 0) {
201 return Affinity(affinity, allocator);
202 }
203 auto group = affinity[threadId % affinity.count()].windows.group;
204 Affinity out(allocator);
205 out.cores.reserve(count);
206 for (auto core : affinity.cores) {
207 if (core.windows.group == group) {
208 out.cores.push_back(core);
209 }
210 }
211 return out;
212 #else
213 return Affinity(affinity, allocator);
214 #endif
215 }
216 };
217
218 return allocator->make_shared<Policy>(std::move(affinity));
219 }
220
oneOf(Affinity && affinity,Allocator * allocator)221 std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::oneOf(
222 Affinity&& affinity,
223 Allocator* allocator /* = Allocator::Default */) {
224 struct Policy : public Thread::Affinity::Policy {
225 Affinity affinity;
226 Policy(Affinity&& affinity) : affinity(std::move(affinity)) {}
227
228 Affinity get(uint32_t threadId, Allocator* allocator) const override {
229 auto count = affinity.count();
230 if (count == 0) {
231 return Affinity(affinity, allocator);
232 }
233 return Affinity({affinity[threadId % affinity.count()]}, allocator);
234 }
235 };
236
237 return allocator->make_shared<Policy>(std::move(affinity));
238 }
239
count() const240 size_t Thread::Affinity::count() const {
241 return cores.size();
242 }
243
operator [](size_t index) const244 Thread::Core Thread::Affinity::operator[](size_t index) const {
245 return cores[index];
246 }
247
add(const Thread::Affinity & other)248 Thread::Affinity& Thread::Affinity::add(const Thread::Affinity& other) {
249 containers::unordered_set<Core, CoreHasher> set(cores.allocator);
250 for (auto core : cores) {
251 set.emplace(core);
252 }
253 for (auto core : other.cores) {
254 if (set.count(core) == 0) {
255 cores.push_back(core);
256 }
257 }
258 std::sort(cores.begin(), cores.end());
259 return *this;
260 }
261
remove(const Thread::Affinity & other)262 Thread::Affinity& Thread::Affinity::remove(const Thread::Affinity& other) {
263 containers::unordered_set<Core, CoreHasher> set(cores.allocator);
264 for (auto core : other.cores) {
265 set.emplace(core);
266 }
267 for (size_t i = 0; i < cores.size(); i++) {
268 if (set.count(cores[i]) != 0) {
269 cores[i] = cores.back();
270 cores.resize(cores.size() - 1);
271 }
272 }
273 std::sort(cores.begin(), cores.end());
274 return *this;
275 }
276
277 #if defined(_WIN32)
278
279 class Thread::Impl {
280 public:
Impl(Func && func,_PROC_THREAD_ATTRIBUTE_LIST * attributes)281 Impl(Func&& func, _PROC_THREAD_ATTRIBUTE_LIST* attributes)
282 : func(std::move(func)),
283 handle(CreateRemoteThreadEx(GetCurrentProcess(),
284 nullptr,
285 0,
286 &Impl::run,
287 this,
288 0,
289 attributes,
290 nullptr)) {}
~Impl()291 ~Impl() { CloseHandle(handle); }
292
293 Impl(const Impl&) = delete;
294 Impl(Impl&&) = delete;
295 Impl& operator=(const Impl&) = delete;
296 Impl& operator=(Impl&&) = delete;
297
Join() const298 void Join() const { WaitForSingleObject(handle, INFINITE); }
299
run(void * self)300 static DWORD WINAPI run(void* self) {
301 reinterpret_cast<Impl*>(self)->func();
302 return 0;
303 }
304
305 private:
306 const Func func;
307 const HANDLE handle;
308 };
309
Thread(Affinity && affinity,Func && func)310 Thread::Thread(Affinity&& affinity, Func&& func) {
311 SIZE_T size = 0;
312 InitializeProcThreadAttributeList(nullptr, 1, 0, &size);
313 MARL_ASSERT(size > 0,
314 "InitializeProcThreadAttributeList() did not give a size");
315
316 std::vector<uint8_t> buffer(size);
317 LPPROC_THREAD_ATTRIBUTE_LIST attributes =
318 reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(buffer.data());
319 CHECK_WIN32(InitializeProcThreadAttributeList(attributes, 1, 0, &size));
320 defer(DeleteProcThreadAttributeList(attributes));
321
322 GROUP_AFFINITY groupAffinity = {};
323
324 auto count = affinity.count();
325 if (count > 0) {
326 groupAffinity.Group = affinity[0].windows.group;
327 for (size_t i = 0; i < count; i++) {
328 auto core = affinity[i];
329 MARL_ASSERT(groupAffinity.Group == core.windows.group,
330 "Cannot create thread that uses multiple affinity groups");
331 groupAffinity.Mask |= (1ULL << core.windows.index);
332 }
333 CHECK_WIN32(UpdateProcThreadAttribute(
334 attributes, 0, PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY, &groupAffinity,
335 sizeof(groupAffinity), nullptr, nullptr));
336 }
337
338 impl = new Impl(std::move(func), attributes);
339 }
340
~Thread()341 Thread::~Thread() {
342 delete impl;
343 }
344
join()345 void Thread::join() {
346 MARL_ASSERT(impl != nullptr, "join() called on unjoinable thread");
347 impl->Join();
348 }
349
setName(const char * fmt,...)350 void Thread::setName(const char* fmt, ...) {
351 static auto setThreadDescription =
352 reinterpret_cast<HRESULT(WINAPI*)(HANDLE, PCWSTR)>(GetProcAddress(
353 GetModuleHandleA("kernelbase.dll"), "SetThreadDescription"));
354 if (setThreadDescription == nullptr) {
355 return;
356 }
357
358 char name[1024];
359 va_list vararg;
360 va_start(vararg, fmt);
361 vsnprintf(name, sizeof(name), fmt, vararg);
362 va_end(vararg);
363
364 wchar_t wname[1024];
365 mbstowcs(wname, name, 1024);
366 setThreadDescription(GetCurrentThread(), wname);
367 MARL_NAME_THREAD("%s", name);
368 }
369
numLogicalCPUs()370 unsigned int Thread::numLogicalCPUs() {
371 unsigned int count = 0;
372 const auto& groups = getProcessorGroups();
373 for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) {
374 const auto& group = groups.groups[groupIdx];
375 count += group.count;
376 }
377 return count;
378 }
379
380 #else
381
382 class Thread::Impl {
383 public:
Impl(Affinity && affinity,Thread::Func && f)384 Impl(Affinity&& affinity, Thread::Func&& f)
385 : affinity(std::move(affinity)), func(std::move(f)), thread([this] {
386 setAffinity();
387 func();
388 }) {}
389
390 Affinity affinity;
391 Func func;
392 std::thread thread;
393
setAffinity()394 void setAffinity() {
395 auto count = affinity.count();
396 if (count == 0) {
397 return;
398 }
399
400 #if defined(__linux__) && !defined(__ANDROID__) && !defined(__BIONIC__)
401 cpu_set_t cpuset;
402 CPU_ZERO(&cpuset);
403 for (size_t i = 0; i < count; i++) {
404 CPU_SET(affinity[i].pthread.index, &cpuset);
405 }
406 auto thread = pthread_self();
407 pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
408 #elif defined(__FreeBSD__)
409 cpuset_t cpuset;
410 CPU_ZERO(&cpuset);
411 for (size_t i = 0; i < count; i++) {
412 CPU_SET(affinity[i].pthread.index, &cpuset);
413 }
414 auto thread = pthread_self();
415 pthread_setaffinity_np(thread, sizeof(cpuset_t), &cpuset);
416 #else
417 MARL_ASSERT(!marl::Thread::Affinity::supported,
418 "Attempting to use thread affinity on a unsupported platform");
419 #endif
420 }
421 };
422
Thread(Affinity && affinity,Func && func)423 Thread::Thread(Affinity&& affinity, Func&& func)
424 : impl(new Thread::Impl(std::move(affinity), std::move(func))) {}
425
~Thread()426 Thread::~Thread() {
427 MARL_ASSERT(!impl, "Thread::join() was not called before destruction");
428 }
429
join()430 void Thread::join() {
431 impl->thread.join();
432 delete impl;
433 impl = nullptr;
434 }
435
setName(const char * fmt,...)436 void Thread::setName(const char* fmt, ...) {
437 char name[1024];
438 va_list vararg;
439 va_start(vararg, fmt);
440 vsnprintf(name, sizeof(name), fmt, vararg);
441 va_end(vararg);
442
443 #if defined(__APPLE__)
444 pthread_setname_np(name);
445 #elif defined(__FreeBSD__)
446 pthread_set_name_np(pthread_self(), name);
447 #elif !defined(__Fuchsia__) && !defined(__EMSCRIPTEN__)
448 pthread_setname_np(pthread_self(), name);
449 #endif
450
451 MARL_NAME_THREAD("%s", name);
452 }
453
numLogicalCPUs()454 unsigned int Thread::numLogicalCPUs() {
455 return static_cast<unsigned int>(sysconf(_SC_NPROCESSORS_ONLN));
456 }
457
458 #endif // OS
459
Thread(Thread && rhs)460 Thread::Thread(Thread&& rhs) : impl(rhs.impl) {
461 rhs.impl = nullptr;
462 }
463
operator =(Thread && rhs)464 Thread& Thread::operator=(Thread&& rhs) {
465 if (impl) {
466 delete impl;
467 impl = nullptr;
468 }
469 impl = rhs.impl;
470 rhs.impl = nullptr;
471 return *this;
472 }
473
474 } // namespace marl
475