1 #include "caffe2/utils/threadpool/ThreadPool.h"
2 #include "WorkersPool.h"
3
4 #if !defined(__s390x__) && !defined(__powerpc__)
5 #include <cpuinfo.h>
6 #else
7 #include <thread>
8 #endif
9
10 C10_DEFINE_bool(
11 caffe2_threadpool_force_inline,
12 false,
13 "Force to always run jobs on the calling thread");
14
15 // Whether or not threadpool caps apply to Android
16 C10_DEFINE_int(caffe2_threadpool_android_cap, true, "");
17
18 // Whether or not threadpool caps apply to iOS and MacOS
19 C10_DEFINE_int(caffe2_threadpool_ios_cap, true, "");
20 C10_DEFINE_int(caffe2_threadpool_macos_cap, true, "");
21
22 C10_DEFINE_int(pthreadpool_size, 0, "Override the default thread pool size.");
23
24 namespace caffe2 {
25
26 namespace {
27 class ThreadPoolImpl : public ThreadPool {
28 public:
29 explicit ThreadPoolImpl(int numThreads);
30 ~ThreadPoolImpl() override;
31
32 // Returns the number of threads currently in use
33 int getNumThreads() const override;
34 void setNumThreads(size_t numThreads) override;
35
36 void run(const std::function<void(int, size_t)>& fn, size_t range) override;
37 void withPool(const std::function<void(WorkersPool*)>& f) override;
38
39 private:
40 std::atomic_size_t numThreads_;
41 std::shared_ptr<WorkersPool> workersPool_;
42 std::vector<std::shared_ptr<Task>> tasks_;
43 };
44 }
45
getDefaultNumThreads()46 size_t getDefaultNumThreads() {
47 #if !defined(__s390x__) && !defined(__powerpc__)
48 auto numThreads = 1U;
49 if (cpuinfo_initialize()) {
50 numThreads = std::max(cpuinfo_get_processors_count(), 1U);
51 } else {
52 LOG(WARNING) << "cpuinfo initialization failed";
53 numThreads = std::max(std::thread::hardware_concurrency(), 1U);
54 }
55
56 bool applyCap = false;
57 #if defined(C10_ANDROID)
58 applyCap = FLAGS_caffe2_threadpool_android_cap;
59 #elif defined(C10_IOS)
60 applyCap = FLAGS_caffe2_threadpool_ios_cap;
61 #elif defined(TARGET_OS_MAC)
62 applyCap = FLAGS_caffe2_threadpool_macos_cap;
63 #endif
64
65 if (applyCap) {
66 switch (numThreads) {
67 #if defined(C10_ANDROID) && (CPUINFO_ARCH_ARM || CPUINFO_ARCH_ARM64)
68 case 4:
69 switch (cpuinfo_get_core(0)->midr & UINT32_C(0xFF00FFF0)) {
70 case UINT32_C(0x51002110): /* Snapdragon 820 Kryo Silver */
71 case UINT32_C(0x51002010): /* Snapdragon 821 Kryo Silver */
72 case UINT32_C(0x51002050): /* Snapdragon 820/821 Kryo Gold */
73 /* Kryo: 2+2 big.LITTLE */
74 numThreads = 2;
75 break;
76 default:
77 /* Anything else: assume homogeneous architecture */
78 numThreads = 4;
79 break;
80 }
81 break;
82 #endif
83 case 5:
84 /* 4+1 big.LITTLE */
85 numThreads = 4;
86 break;
87 case 6:
88 /* 2+4 big.LITTLE */
89 numThreads = 2;
90 break;
91 // NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers,bugprone-branch-clone)
92 case 8:
93 /* 4+4 big.LITTLE */
94 numThreads = 4;
95 break;
96 case 10:
97 /* 4+4+2 Min.Med.Max, running on Med cores */
98 numThreads = 4;
99 break;
100 default:
101 if (numThreads > 4) {
102 numThreads = numThreads / 2;
103 }
104 break;
105 }
106 }
107 #else
108 auto numThreads = std::max(std::thread::hardware_concurrency(), 1U);
109 #endif
110
111 if (FLAGS_pthreadpool_size) {
112 // Always give precedence to explicit setting.
113 numThreads = FLAGS_pthreadpool_size;
114 }
115
116 /*
117 * For llvm-tsan, holding limit for the number of locks for a single thread
118 * is 63 (because of comparison < 64 instead of <=). pthreadpool's worst
119 * case is the number of threads in a pool. So we want to limit the threadpool
120 * size to 64 when running with tsan. However, sometimes it is tricky to
121 * detect if we are running under tsan, for now capping the default
122 * threadcount to the tsan limit unconditionally.
123 */
124 auto tsanThreadLimit = 63U;
125 numThreads = std::min(numThreads, tsanThreadLimit);
126
127 return numThreads;
128 }
129
130 // Default smallest amount of work that will be partitioned between
131 // multiple threads; the runtime value is configurable
132 constexpr size_t kDefaultMinWorkSize = 1;
133
134 size_t ThreadPool::defaultNumThreads_ = 0;
135
createThreadPool(int numThreads)136 ThreadPool* ThreadPool::createThreadPool(int numThreads) {
137 return new ThreadPoolImpl(numThreads);
138 }
139
defaultThreadPool()140 std::unique_ptr<ThreadPool> ThreadPool::defaultThreadPool() {
141 defaultNumThreads_ = getDefaultNumThreads();
142 LOG(INFO) << "Constructing thread pool with " << defaultNumThreads_
143 << " threads";
144 return std::make_unique<ThreadPoolImpl>(defaultNumThreads_);
145 }
146
ThreadPoolImpl(int numThreads)147 ThreadPoolImpl::ThreadPoolImpl(int numThreads)
148 : numThreads_(numThreads),
149 workersPool_(std::make_shared<WorkersPool>()) {
150 minWorkSize_ = kDefaultMinWorkSize;
151 }
152
153 // NOLINTNEXTLINE(modernize-use-equals-default)
~ThreadPoolImpl()154 ThreadPoolImpl::~ThreadPoolImpl() {}
155
getNumThreads() const156 int ThreadPoolImpl::getNumThreads() const {
157 return numThreads_;
158 }
159
160 // Sets the number of threads
161 // # of threads should not be bigger than the number of big cores
setNumThreads(size_t numThreads)162 void ThreadPoolImpl::setNumThreads(size_t numThreads) {
163 if (defaultNumThreads_ == 0) {
164 defaultNumThreads_ = getDefaultNumThreads();
165 }
166 numThreads_ = std::min(numThreads, defaultNumThreads_);
167 }
168
run(const std::function<void (int,size_t)> & fn,size_t range)169 void ThreadPoolImpl::run(const std::function<void(int, size_t)>& fn, size_t range) {
170 const auto numThreads = numThreads_.load(std::memory_order_relaxed);
171
172 std::lock_guard<std::mutex> guard(executionMutex_);
173 // If there are no worker threads, or if the range is too small (too
174 // little work), just run locally
175 const bool runLocally = range < minWorkSize_ ||
176 FLAGS_caffe2_threadpool_force_inline || (numThreads == 0);
177 if (runLocally) {
178 // Work is small enough to just run locally; multithread overhead
179 // is too high
180 for (size_t i = 0; i < range; ++i) {
181 fn(0, i);
182 }
183 return;
184 }
185
186 struct FnTask : public Task {
187 // NOLINTNEXTLINE(modernize-use-equals-default,cppcoreguidelines-pro-type-member-init)
188 FnTask(){};
189 // NOLINTNEXTLINE(modernize-use-equals-default)
190 ~FnTask() override{};
191 const std::function<void(int, size_t)>* fn_;
192 int idx_;
193 size_t start_;
194 size_t end_;
195 void Run() override {
196 for (auto i = start_; i < end_; ++i) {
197 (*fn_)(idx_, i);
198 }
199 }
200 };
201
202 CAFFE_ENFORCE_GE(numThreads_, 1);
203 const size_t unitsPerTask = (range + numThreads - 1) / numThreads;
204 tasks_.resize(numThreads);
205 for (size_t i = 0; i < numThreads; ++i) {
206 if (!tasks_[i]) {
207 // NOLINTNEXTLINE(modernize-make-shared)
208 tasks_[i].reset(new FnTask());
209 }
210 auto* task = (FnTask*)tasks_[i].get();
211 task->fn_ = &fn;
212 task->idx_ = i;
213 task->start_ = std::min<size_t>(range, i * unitsPerTask);
214 task->end_ = std::min<size_t>(range, (i + 1) * unitsPerTask);
215 if (task->start_ >= task->end_) {
216 tasks_.resize(i);
217 break;
218 }
219 CAFFE_ENFORCE_LE(task->start_, range);
220 CAFFE_ENFORCE_LE(task->end_, range);
221 }
222 CAFFE_ENFORCE_LE(tasks_.size(), numThreads);
223 CAFFE_ENFORCE_GE(tasks_.size(), 1);
224 workersPool_->Execute(tasks_);
225 }
226
withPool(const std::function<void (WorkersPool *)> & f)227 void ThreadPoolImpl::withPool(const std::function<void(WorkersPool*)>& f) {
228 std::lock_guard<std::mutex> guard(executionMutex_);
229 f(workersPool_.get());
230 }
231
232 } // namespace caffe2
233