xref: /aosp_15_r20/external/pytorch/caffe2/utils/threadpool/ThreadPool.cc (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include "caffe2/utils/threadpool/ThreadPool.h"
2 #include "WorkersPool.h"
3 
4 #if !defined(__s390x__) && !defined(__powerpc__)
5 #include <cpuinfo.h>
6 #else
7 #include <thread>
8 #endif
9 
10 C10_DEFINE_bool(
11     caffe2_threadpool_force_inline,
12     false,
13     "Force to always run jobs on the calling thread");
14 
15 // Whether or not threadpool caps apply to Android
16 C10_DEFINE_int(caffe2_threadpool_android_cap, true, "");
17 
18 // Whether or not threadpool caps apply to iOS and MacOS
19 C10_DEFINE_int(caffe2_threadpool_ios_cap, true, "");
20 C10_DEFINE_int(caffe2_threadpool_macos_cap, true, "");
21 
22 C10_DEFINE_int(pthreadpool_size, 0, "Override the default thread pool size.");
23 
24 namespace caffe2 {
25 
26 namespace {
27   class ThreadPoolImpl : public ThreadPool {
28   public:
29     explicit ThreadPoolImpl(int numThreads);
30     ~ThreadPoolImpl() override;
31 
32     // Returns the number of threads currently in use
33     int getNumThreads() const override;
34     void setNumThreads(size_t numThreads) override;
35 
36     void run(const std::function<void(int, size_t)>& fn, size_t range) override;
37     void withPool(const std::function<void(WorkersPool*)>& f) override;
38 
39   private:
40     std::atomic_size_t numThreads_;
41     std::shared_ptr<WorkersPool> workersPool_;
42     std::vector<std::shared_ptr<Task>> tasks_;
43   };
44 }
45 
getDefaultNumThreads()46 size_t getDefaultNumThreads() {
47 #if !defined(__s390x__) && !defined(__powerpc__)
48   auto numThreads = 1U;
49   if (cpuinfo_initialize()) {
50     numThreads = std::max(cpuinfo_get_processors_count(), 1U);
51   } else {
52     LOG(WARNING) << "cpuinfo initialization failed";
53     numThreads = std::max(std::thread::hardware_concurrency(), 1U);
54   }
55 
56   bool applyCap = false;
57 #if defined(C10_ANDROID)
58   applyCap = FLAGS_caffe2_threadpool_android_cap;
59 #elif defined(C10_IOS)
60   applyCap = FLAGS_caffe2_threadpool_ios_cap;
61 #elif defined(TARGET_OS_MAC)
62   applyCap = FLAGS_caffe2_threadpool_macos_cap;
63 #endif
64 
65   if (applyCap) {
66     switch (numThreads) {
67 #if defined(C10_ANDROID) && (CPUINFO_ARCH_ARM || CPUINFO_ARCH_ARM64)
68       case 4:
69         switch (cpuinfo_get_core(0)->midr & UINT32_C(0xFF00FFF0)) {
70           case UINT32_C(0x51002110): /* Snapdragon 820 Kryo Silver */
71           case UINT32_C(0x51002010): /* Snapdragon 821 Kryo Silver */
72           case UINT32_C(0x51002050): /* Snapdragon 820/821 Kryo Gold */
73             /* Kryo: 2+2 big.LITTLE */
74             numThreads = 2;
75             break;
76           default:
77             /* Anything else: assume homogeneous architecture */
78             numThreads = 4;
79             break;
80         }
81         break;
82 #endif
83       case 5:
84         /* 4+1 big.LITTLE */
85         numThreads = 4;
86         break;
87       case 6:
88         /* 2+4 big.LITTLE */
89         numThreads = 2;
90         break;
91       // NOLINTNEXTLINE(cppcoreguidelines-avoid-magic-numbers,bugprone-branch-clone)
92       case 8:
93         /* 4+4 big.LITTLE */
94         numThreads = 4;
95         break;
96       case 10:
97         /* 4+4+2 Min.Med.Max, running on Med cores */
98         numThreads = 4;
99         break;
100       default:
101         if (numThreads > 4) {
102           numThreads = numThreads / 2;
103         }
104         break;
105     }
106   }
107 #else
108   auto numThreads = std::max(std::thread::hardware_concurrency(), 1U);
109 #endif
110 
111   if (FLAGS_pthreadpool_size) {
112     // Always give precedence to explicit setting.
113     numThreads = FLAGS_pthreadpool_size;
114   }
115 
116   /*
117    * For llvm-tsan, holding limit for the number of locks for a single thread
118    * is 63 (because of comparison < 64 instead of <=). pthreadpool's worst
119    * case is the number of threads in a pool. So we want to limit the threadpool
120    * size to 64 when running with tsan. However, sometimes it is tricky to
121    * detect if we are running under tsan, for now capping the default
122    * threadcount to the tsan limit unconditionally.
123    */
124   auto tsanThreadLimit = 63U;
125   numThreads = std::min(numThreads, tsanThreadLimit);
126 
127   return numThreads;
128 }
129 
130 // Default smallest amount of work that will be partitioned between
131 // multiple threads; the runtime value is configurable
132 constexpr size_t kDefaultMinWorkSize = 1;
133 
134 size_t ThreadPool::defaultNumThreads_ = 0;
135 
createThreadPool(int numThreads)136 ThreadPool* ThreadPool::createThreadPool(int numThreads) {
137   return new ThreadPoolImpl(numThreads);
138 }
139 
defaultThreadPool()140 std::unique_ptr<ThreadPool> ThreadPool::defaultThreadPool() {
141   defaultNumThreads_ = getDefaultNumThreads();
142   LOG(INFO) << "Constructing thread pool with " << defaultNumThreads_
143             << " threads";
144   return std::make_unique<ThreadPoolImpl>(defaultNumThreads_);
145 }
146 
ThreadPoolImpl(int numThreads)147 ThreadPoolImpl::ThreadPoolImpl(int numThreads)
148     : numThreads_(numThreads),
149       workersPool_(std::make_shared<WorkersPool>()) {
150   minWorkSize_ = kDefaultMinWorkSize;
151 }
152 
153 // NOLINTNEXTLINE(modernize-use-equals-default)
~ThreadPoolImpl()154 ThreadPoolImpl::~ThreadPoolImpl() {}
155 
getNumThreads() const156 int ThreadPoolImpl::getNumThreads() const {
157   return numThreads_;
158 }
159 
160 // Sets the number of threads
161 // # of threads should not be bigger than the number of big cores
setNumThreads(size_t numThreads)162 void ThreadPoolImpl::setNumThreads(size_t numThreads) {
163   if (defaultNumThreads_ == 0) {
164     defaultNumThreads_ = getDefaultNumThreads();
165   }
166   numThreads_ = std::min(numThreads, defaultNumThreads_);
167 }
168 
run(const std::function<void (int,size_t)> & fn,size_t range)169 void ThreadPoolImpl::run(const std::function<void(int, size_t)>& fn, size_t range) {
170   const auto numThreads = numThreads_.load(std::memory_order_relaxed);
171 
172   std::lock_guard<std::mutex> guard(executionMutex_);
173   // If there are no worker threads, or if the range is too small (too
174   // little work), just run locally
175   const bool runLocally = range < minWorkSize_ ||
176       FLAGS_caffe2_threadpool_force_inline || (numThreads == 0);
177   if (runLocally) {
178     // Work is small enough to just run locally; multithread overhead
179     // is too high
180     for (size_t i = 0; i < range; ++i) {
181       fn(0, i);
182     }
183     return;
184   }
185 
186   struct FnTask : public Task {
187     // NOLINTNEXTLINE(modernize-use-equals-default,cppcoreguidelines-pro-type-member-init)
188     FnTask(){};
189     // NOLINTNEXTLINE(modernize-use-equals-default)
190     ~FnTask() override{};
191     const std::function<void(int, size_t)>* fn_;
192     int idx_;
193     size_t start_;
194     size_t end_;
195     void Run() override {
196       for (auto i = start_; i < end_; ++i) {
197         (*fn_)(idx_, i);
198       }
199     }
200   };
201 
202   CAFFE_ENFORCE_GE(numThreads_, 1);
203   const size_t unitsPerTask = (range + numThreads - 1) / numThreads;
204   tasks_.resize(numThreads);
205   for (size_t i = 0; i < numThreads; ++i) {
206     if (!tasks_[i]) {
207       // NOLINTNEXTLINE(modernize-make-shared)
208       tasks_[i].reset(new FnTask());
209     }
210     auto* task = (FnTask*)tasks_[i].get();
211     task->fn_ = &fn;
212     task->idx_ = i;
213     task->start_ = std::min<size_t>(range, i * unitsPerTask);
214     task->end_ = std::min<size_t>(range, (i + 1) * unitsPerTask);
215     if (task->start_ >= task->end_) {
216       tasks_.resize(i);
217       break;
218     }
219     CAFFE_ENFORCE_LE(task->start_, range);
220     CAFFE_ENFORCE_LE(task->end_, range);
221   }
222   CAFFE_ENFORCE_LE(tasks_.size(), numThreads);
223   CAFFE_ENFORCE_GE(tasks_.size(), 1);
224   workersPool_->Execute(tasks_);
225 }
226 
withPool(const std::function<void (WorkersPool *)> & f)227 void ThreadPoolImpl::withPool(const std::function<void(WorkersPool*)>& f) {
228   std::lock_guard<std::mutex> guard(executionMutex_);
229   f(workersPool_.get());
230 }
231 
232 } // namespace caffe2
233