xref: /aosp_15_r20/external/swiftshader/third_party/marl/src/scheduler.cpp (revision 03ce13f70fcc45d86ee91b7ee4cab1936a95046e)
1*03ce13f7SAndroid Build Coastguard Worker // Copyright 2019 The Marl Authors.
2*03ce13f7SAndroid Build Coastguard Worker //
3*03ce13f7SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
4*03ce13f7SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
5*03ce13f7SAndroid Build Coastguard Worker // You may obtain a copy of the License at
6*03ce13f7SAndroid Build Coastguard Worker //
7*03ce13f7SAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*03ce13f7SAndroid Build Coastguard Worker //
9*03ce13f7SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*03ce13f7SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
11*03ce13f7SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*03ce13f7SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
13*03ce13f7SAndroid Build Coastguard Worker // limitations under the License.
14*03ce13f7SAndroid Build Coastguard Worker 
15*03ce13f7SAndroid Build Coastguard Worker #include "osfiber.h"  // Must come first. See osfiber_ucontext.h.
16*03ce13f7SAndroid Build Coastguard Worker 
17*03ce13f7SAndroid Build Coastguard Worker #include "marl/scheduler.h"
18*03ce13f7SAndroid Build Coastguard Worker 
19*03ce13f7SAndroid Build Coastguard Worker #include "marl/debug.h"
20*03ce13f7SAndroid Build Coastguard Worker #include "marl/thread.h"
21*03ce13f7SAndroid Build Coastguard Worker #include "marl/trace.h"
22*03ce13f7SAndroid Build Coastguard Worker 
23*03ce13f7SAndroid Build Coastguard Worker #if defined(_WIN32)
24*03ce13f7SAndroid Build Coastguard Worker #include <intrin.h>  // __nop()
25*03ce13f7SAndroid Build Coastguard Worker #endif
26*03ce13f7SAndroid Build Coastguard Worker 
27*03ce13f7SAndroid Build Coastguard Worker // Enable to trace scheduler events.
28*03ce13f7SAndroid Build Coastguard Worker #define ENABLE_TRACE_EVENTS 0
29*03ce13f7SAndroid Build Coastguard Worker 
30*03ce13f7SAndroid Build Coastguard Worker // Enable to print verbose debug logging.
31*03ce13f7SAndroid Build Coastguard Worker #define ENABLE_DEBUG_LOGGING 0
32*03ce13f7SAndroid Build Coastguard Worker 
33*03ce13f7SAndroid Build Coastguard Worker #if ENABLE_TRACE_EVENTS
34*03ce13f7SAndroid Build Coastguard Worker #define TRACE(...) MARL_SCOPED_EVENT(__VA_ARGS__)
35*03ce13f7SAndroid Build Coastguard Worker #else
36*03ce13f7SAndroid Build Coastguard Worker #define TRACE(...)
37*03ce13f7SAndroid Build Coastguard Worker #endif
38*03ce13f7SAndroid Build Coastguard Worker 
39*03ce13f7SAndroid Build Coastguard Worker #if ENABLE_DEBUG_LOGGING
40*03ce13f7SAndroid Build Coastguard Worker #define DBG_LOG(msg, ...) \
41*03ce13f7SAndroid Build Coastguard Worker   printf("%.3x " msg "\n", (int)threadID() & 0xfff, __VA_ARGS__)
42*03ce13f7SAndroid Build Coastguard Worker #else
43*03ce13f7SAndroid Build Coastguard Worker #define DBG_LOG(msg, ...)
44*03ce13f7SAndroid Build Coastguard Worker #endif
45*03ce13f7SAndroid Build Coastguard Worker 
46*03ce13f7SAndroid Build Coastguard Worker #define ASSERT_FIBER_STATE(FIBER, STATE)                                   \
47*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(FIBER->state == STATE,                                       \
48*03ce13f7SAndroid Build Coastguard Worker               "fiber %d was in state %s, but expected %s", (int)FIBER->id, \
49*03ce13f7SAndroid Build Coastguard Worker               Fiber::toString(FIBER->state), Fiber::toString(STATE))
50*03ce13f7SAndroid Build Coastguard Worker 
51*03ce13f7SAndroid Build Coastguard Worker namespace {
52*03ce13f7SAndroid Build Coastguard Worker 
53*03ce13f7SAndroid Build Coastguard Worker #if ENABLE_DEBUG_LOGGING
54*03ce13f7SAndroid Build Coastguard Worker // threadID() returns a uint64_t representing the currently executing thread.
55*03ce13f7SAndroid Build Coastguard Worker // threadID() is only intended to be used for debugging purposes.
threadID()56*03ce13f7SAndroid Build Coastguard Worker inline uint64_t threadID() {
57*03ce13f7SAndroid Build Coastguard Worker   auto id = std::this_thread::get_id();
58*03ce13f7SAndroid Build Coastguard Worker   return std::hash<std::thread::id>()(id);
59*03ce13f7SAndroid Build Coastguard Worker }
60*03ce13f7SAndroid Build Coastguard Worker #endif
61*03ce13f7SAndroid Build Coastguard Worker 
nop()62*03ce13f7SAndroid Build Coastguard Worker inline void nop() {
63*03ce13f7SAndroid Build Coastguard Worker #if defined(_WIN32)
64*03ce13f7SAndroid Build Coastguard Worker   __nop();
65*03ce13f7SAndroid Build Coastguard Worker #else
66*03ce13f7SAndroid Build Coastguard Worker   __asm__ __volatile__("nop");
67*03ce13f7SAndroid Build Coastguard Worker #endif
68*03ce13f7SAndroid Build Coastguard Worker }
69*03ce13f7SAndroid Build Coastguard Worker 
setConfigDefaults(const marl::Scheduler::Config & cfgIn)70*03ce13f7SAndroid Build Coastguard Worker inline marl::Scheduler::Config setConfigDefaults(
71*03ce13f7SAndroid Build Coastguard Worker     const marl::Scheduler::Config& cfgIn) {
72*03ce13f7SAndroid Build Coastguard Worker   marl::Scheduler::Config cfg{cfgIn};
73*03ce13f7SAndroid Build Coastguard Worker   if (cfg.workerThread.count > 0 && !cfg.workerThread.affinityPolicy) {
74*03ce13f7SAndroid Build Coastguard Worker     cfg.workerThread.affinityPolicy = marl::Thread::Affinity::Policy::anyOf(
75*03ce13f7SAndroid Build Coastguard Worker         marl::Thread::Affinity::all(cfg.allocator), cfg.allocator);
76*03ce13f7SAndroid Build Coastguard Worker   }
77*03ce13f7SAndroid Build Coastguard Worker   return cfg;
78*03ce13f7SAndroid Build Coastguard Worker }
79*03ce13f7SAndroid Build Coastguard Worker 
80*03ce13f7SAndroid Build Coastguard Worker }  // anonymous namespace
81*03ce13f7SAndroid Build Coastguard Worker 
82*03ce13f7SAndroid Build Coastguard Worker namespace marl {
83*03ce13f7SAndroid Build Coastguard Worker 
84*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
85*03ce13f7SAndroid Build Coastguard Worker // Scheduler
86*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
87*03ce13f7SAndroid Build Coastguard Worker MARL_INSTANTIATE_THREAD_LOCAL(Scheduler*, Scheduler::bound, nullptr);
88*03ce13f7SAndroid Build Coastguard Worker 
get()89*03ce13f7SAndroid Build Coastguard Worker Scheduler* Scheduler::get() {
90*03ce13f7SAndroid Build Coastguard Worker   return bound;
91*03ce13f7SAndroid Build Coastguard Worker }
92*03ce13f7SAndroid Build Coastguard Worker 
setBound(Scheduler * scheduler)93*03ce13f7SAndroid Build Coastguard Worker void Scheduler::setBound(Scheduler* scheduler) {
94*03ce13f7SAndroid Build Coastguard Worker   bound = scheduler;
95*03ce13f7SAndroid Build Coastguard Worker }
96*03ce13f7SAndroid Build Coastguard Worker 
bind()97*03ce13f7SAndroid Build Coastguard Worker void Scheduler::bind() {
98*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(get() == nullptr, "Scheduler already bound");
99*03ce13f7SAndroid Build Coastguard Worker   setBound(this);
100*03ce13f7SAndroid Build Coastguard Worker   {
101*03ce13f7SAndroid Build Coastguard Worker     marl::lock lock(singleThreadedWorkers.mutex);
102*03ce13f7SAndroid Build Coastguard Worker     auto worker = cfg.allocator->make_unique<Worker>(
103*03ce13f7SAndroid Build Coastguard Worker         this, Worker::Mode::SingleThreaded, -1);
104*03ce13f7SAndroid Build Coastguard Worker     worker->start();
105*03ce13f7SAndroid Build Coastguard Worker     auto tid = std::this_thread::get_id();
106*03ce13f7SAndroid Build Coastguard Worker     singleThreadedWorkers.byTid.emplace(tid, std::move(worker));
107*03ce13f7SAndroid Build Coastguard Worker   }
108*03ce13f7SAndroid Build Coastguard Worker }
109*03ce13f7SAndroid Build Coastguard Worker 
unbind()110*03ce13f7SAndroid Build Coastguard Worker void Scheduler::unbind() {
111*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(get() != nullptr, "No scheduler bound");
112*03ce13f7SAndroid Build Coastguard Worker   auto worker = Worker::getCurrent();
113*03ce13f7SAndroid Build Coastguard Worker   worker->stop();
114*03ce13f7SAndroid Build Coastguard Worker   {
115*03ce13f7SAndroid Build Coastguard Worker     marl::lock lock(get()->singleThreadedWorkers.mutex);
116*03ce13f7SAndroid Build Coastguard Worker     auto tid = std::this_thread::get_id();
117*03ce13f7SAndroid Build Coastguard Worker     auto& workers = get()->singleThreadedWorkers.byTid;
118*03ce13f7SAndroid Build Coastguard Worker     auto it = workers.find(tid);
119*03ce13f7SAndroid Build Coastguard Worker     MARL_ASSERT(it != workers.end(), "singleThreadedWorker not found");
120*03ce13f7SAndroid Build Coastguard Worker     MARL_ASSERT(it->second.get() == worker, "worker is not bound?");
121*03ce13f7SAndroid Build Coastguard Worker     workers.erase(it);
122*03ce13f7SAndroid Build Coastguard Worker     if (workers.empty()) {
123*03ce13f7SAndroid Build Coastguard Worker       get()->singleThreadedWorkers.unbind.notify_one();
124*03ce13f7SAndroid Build Coastguard Worker     }
125*03ce13f7SAndroid Build Coastguard Worker   }
126*03ce13f7SAndroid Build Coastguard Worker   setBound(nullptr);
127*03ce13f7SAndroid Build Coastguard Worker }
128*03ce13f7SAndroid Build Coastguard Worker 
Scheduler(const Config & config)129*03ce13f7SAndroid Build Coastguard Worker Scheduler::Scheduler(const Config& config)
130*03ce13f7SAndroid Build Coastguard Worker     : cfg(setConfigDefaults(config)),
131*03ce13f7SAndroid Build Coastguard Worker       workerThreads{},
132*03ce13f7SAndroid Build Coastguard Worker       singleThreadedWorkers(config.allocator) {
133*03ce13f7SAndroid Build Coastguard Worker   for (int i = 0; i < cfg.workerThread.count; i++) {
134*03ce13f7SAndroid Build Coastguard Worker     spinningWorkers[i] = -1;
135*03ce13f7SAndroid Build Coastguard Worker     workerThreads[i] =
136*03ce13f7SAndroid Build Coastguard Worker         cfg.allocator->create<Worker>(this, Worker::Mode::MultiThreaded, i);
137*03ce13f7SAndroid Build Coastguard Worker   }
138*03ce13f7SAndroid Build Coastguard Worker   for (int i = 0; i < cfg.workerThread.count; i++) {
139*03ce13f7SAndroid Build Coastguard Worker     workerThreads[i]->start();
140*03ce13f7SAndroid Build Coastguard Worker   }
141*03ce13f7SAndroid Build Coastguard Worker }
142*03ce13f7SAndroid Build Coastguard Worker 
~Scheduler()143*03ce13f7SAndroid Build Coastguard Worker Scheduler::~Scheduler() {
144*03ce13f7SAndroid Build Coastguard Worker   {
145*03ce13f7SAndroid Build Coastguard Worker     // Wait until all the single threaded workers have been unbound.
146*03ce13f7SAndroid Build Coastguard Worker     marl::lock lock(singleThreadedWorkers.mutex);
147*03ce13f7SAndroid Build Coastguard Worker     lock.wait(singleThreadedWorkers.unbind,
148*03ce13f7SAndroid Build Coastguard Worker               [this]() REQUIRES(singleThreadedWorkers.mutex) {
149*03ce13f7SAndroid Build Coastguard Worker                 return singleThreadedWorkers.byTid.empty();
150*03ce13f7SAndroid Build Coastguard Worker               });
151*03ce13f7SAndroid Build Coastguard Worker   }
152*03ce13f7SAndroid Build Coastguard Worker 
153*03ce13f7SAndroid Build Coastguard Worker   // Release all worker threads.
154*03ce13f7SAndroid Build Coastguard Worker   // This will wait for all in-flight tasks to complete before returning.
155*03ce13f7SAndroid Build Coastguard Worker   for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
156*03ce13f7SAndroid Build Coastguard Worker     workerThreads[i]->stop();
157*03ce13f7SAndroid Build Coastguard Worker   }
158*03ce13f7SAndroid Build Coastguard Worker   for (int i = cfg.workerThread.count - 1; i >= 0; i--) {
159*03ce13f7SAndroid Build Coastguard Worker     cfg.allocator->destroy(workerThreads[i]);
160*03ce13f7SAndroid Build Coastguard Worker   }
161*03ce13f7SAndroid Build Coastguard Worker }
162*03ce13f7SAndroid Build Coastguard Worker 
enqueue(Task && task)163*03ce13f7SAndroid Build Coastguard Worker void Scheduler::enqueue(Task&& task) {
164*03ce13f7SAndroid Build Coastguard Worker   if (task.is(Task::Flags::SameThread)) {
165*03ce13f7SAndroid Build Coastguard Worker     Worker::getCurrent()->enqueue(std::move(task));
166*03ce13f7SAndroid Build Coastguard Worker     return;
167*03ce13f7SAndroid Build Coastguard Worker   }
168*03ce13f7SAndroid Build Coastguard Worker   if (cfg.workerThread.count > 0) {
169*03ce13f7SAndroid Build Coastguard Worker     while (true) {
170*03ce13f7SAndroid Build Coastguard Worker       // Prioritize workers that have recently started spinning.
171*03ce13f7SAndroid Build Coastguard Worker       auto i = --nextSpinningWorkerIdx % cfg.workerThread.count;
172*03ce13f7SAndroid Build Coastguard Worker       auto idx = spinningWorkers[i].exchange(-1);
173*03ce13f7SAndroid Build Coastguard Worker       if (idx < 0) {
174*03ce13f7SAndroid Build Coastguard Worker         // If a spinning worker couldn't be found, round-robin the
175*03ce13f7SAndroid Build Coastguard Worker         // workers.
176*03ce13f7SAndroid Build Coastguard Worker         idx = nextEnqueueIndex++ % cfg.workerThread.count;
177*03ce13f7SAndroid Build Coastguard Worker       }
178*03ce13f7SAndroid Build Coastguard Worker 
179*03ce13f7SAndroid Build Coastguard Worker       auto worker = workerThreads[idx];
180*03ce13f7SAndroid Build Coastguard Worker       if (worker->tryLock()) {
181*03ce13f7SAndroid Build Coastguard Worker         worker->enqueueAndUnlock(std::move(task));
182*03ce13f7SAndroid Build Coastguard Worker         return;
183*03ce13f7SAndroid Build Coastguard Worker       }
184*03ce13f7SAndroid Build Coastguard Worker     }
185*03ce13f7SAndroid Build Coastguard Worker   } else {
186*03ce13f7SAndroid Build Coastguard Worker     if (auto worker = Worker::getCurrent()) {
187*03ce13f7SAndroid Build Coastguard Worker       worker->enqueue(std::move(task));
188*03ce13f7SAndroid Build Coastguard Worker     } else {
189*03ce13f7SAndroid Build Coastguard Worker       MARL_FATAL(
190*03ce13f7SAndroid Build Coastguard Worker           "singleThreadedWorker not found. Did you forget to call "
191*03ce13f7SAndroid Build Coastguard Worker           "marl::Scheduler::bind()?");
192*03ce13f7SAndroid Build Coastguard Worker     }
193*03ce13f7SAndroid Build Coastguard Worker   }
194*03ce13f7SAndroid Build Coastguard Worker }
195*03ce13f7SAndroid Build Coastguard Worker 
config() const196*03ce13f7SAndroid Build Coastguard Worker const Scheduler::Config& Scheduler::config() const {
197*03ce13f7SAndroid Build Coastguard Worker   return cfg;
198*03ce13f7SAndroid Build Coastguard Worker }
199*03ce13f7SAndroid Build Coastguard Worker 
stealWork(Worker * thief,uint64_t from,Task & out)200*03ce13f7SAndroid Build Coastguard Worker bool Scheduler::stealWork(Worker* thief, uint64_t from, Task& out) {
201*03ce13f7SAndroid Build Coastguard Worker   if (cfg.workerThread.count > 0) {
202*03ce13f7SAndroid Build Coastguard Worker     auto thread = workerThreads[from % cfg.workerThread.count];
203*03ce13f7SAndroid Build Coastguard Worker     if (thread != thief) {
204*03ce13f7SAndroid Build Coastguard Worker       if (thread->steal(out)) {
205*03ce13f7SAndroid Build Coastguard Worker         return true;
206*03ce13f7SAndroid Build Coastguard Worker       }
207*03ce13f7SAndroid Build Coastguard Worker     }
208*03ce13f7SAndroid Build Coastguard Worker   }
209*03ce13f7SAndroid Build Coastguard Worker   return false;
210*03ce13f7SAndroid Build Coastguard Worker }
211*03ce13f7SAndroid Build Coastguard Worker 
onBeginSpinning(int workerId)212*03ce13f7SAndroid Build Coastguard Worker void Scheduler::onBeginSpinning(int workerId) {
213*03ce13f7SAndroid Build Coastguard Worker   auto idx = nextSpinningWorkerIdx++ % cfg.workerThread.count;
214*03ce13f7SAndroid Build Coastguard Worker   spinningWorkers[idx] = workerId;
215*03ce13f7SAndroid Build Coastguard Worker }
216*03ce13f7SAndroid Build Coastguard Worker 
217*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
218*03ce13f7SAndroid Build Coastguard Worker // Scheduler::Config
219*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
allCores()220*03ce13f7SAndroid Build Coastguard Worker Scheduler::Config Scheduler::Config::allCores() {
221*03ce13f7SAndroid Build Coastguard Worker   return Config().setWorkerThreadCount(Thread::numLogicalCPUs());
222*03ce13f7SAndroid Build Coastguard Worker }
223*03ce13f7SAndroid Build Coastguard Worker 
224*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
225*03ce13f7SAndroid Build Coastguard Worker // Scheduler::Fiber
226*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
Fiber(Allocator::unique_ptr<OSFiber> && impl,uint32_t id)227*03ce13f7SAndroid Build Coastguard Worker Scheduler::Fiber::Fiber(Allocator::unique_ptr<OSFiber>&& impl, uint32_t id)
228*03ce13f7SAndroid Build Coastguard Worker     : id(id), impl(std::move(impl)), worker(Worker::getCurrent()) {
229*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(worker != nullptr, "No Scheduler::Worker bound");
230*03ce13f7SAndroid Build Coastguard Worker }
231*03ce13f7SAndroid Build Coastguard Worker 
current()232*03ce13f7SAndroid Build Coastguard Worker Scheduler::Fiber* Scheduler::Fiber::current() {
233*03ce13f7SAndroid Build Coastguard Worker   auto worker = Worker::getCurrent();
234*03ce13f7SAndroid Build Coastguard Worker   return worker != nullptr ? worker->getCurrentFiber() : nullptr;
235*03ce13f7SAndroid Build Coastguard Worker }
236*03ce13f7SAndroid Build Coastguard Worker 
notify()237*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Fiber::notify() {
238*03ce13f7SAndroid Build Coastguard Worker   worker->enqueue(this);
239*03ce13f7SAndroid Build Coastguard Worker }
240*03ce13f7SAndroid Build Coastguard Worker 
wait(marl::lock & lock,const Predicate & pred)241*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Fiber::wait(marl::lock& lock, const Predicate& pred) {
242*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(worker == Worker::getCurrent(),
243*03ce13f7SAndroid Build Coastguard Worker               "Scheduler::Fiber::wait() must only be called on the currently "
244*03ce13f7SAndroid Build Coastguard Worker               "executing fiber");
245*03ce13f7SAndroid Build Coastguard Worker   worker->wait(lock, nullptr, pred);
246*03ce13f7SAndroid Build Coastguard Worker }
247*03ce13f7SAndroid Build Coastguard Worker 
switchTo(Fiber * to)248*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Fiber::switchTo(Fiber* to) {
249*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(worker == Worker::getCurrent(),
250*03ce13f7SAndroid Build Coastguard Worker               "Scheduler::Fiber::switchTo() must only be called on the "
251*03ce13f7SAndroid Build Coastguard Worker               "currently executing fiber");
252*03ce13f7SAndroid Build Coastguard Worker   if (to != this) {
253*03ce13f7SAndroid Build Coastguard Worker     impl->switchTo(to->impl.get());
254*03ce13f7SAndroid Build Coastguard Worker   }
255*03ce13f7SAndroid Build Coastguard Worker }
256*03ce13f7SAndroid Build Coastguard Worker 
create(Allocator * allocator,uint32_t id,size_t stackSize,const std::function<void ()> & func)257*03ce13f7SAndroid Build Coastguard Worker Allocator::unique_ptr<Scheduler::Fiber> Scheduler::Fiber::create(
258*03ce13f7SAndroid Build Coastguard Worker     Allocator* allocator,
259*03ce13f7SAndroid Build Coastguard Worker     uint32_t id,
260*03ce13f7SAndroid Build Coastguard Worker     size_t stackSize,
261*03ce13f7SAndroid Build Coastguard Worker     const std::function<void()>& func) {
262*03ce13f7SAndroid Build Coastguard Worker   return allocator->make_unique<Fiber>(
263*03ce13f7SAndroid Build Coastguard Worker       OSFiber::createFiber(allocator, stackSize, func), id);
264*03ce13f7SAndroid Build Coastguard Worker }
265*03ce13f7SAndroid Build Coastguard Worker 
266*03ce13f7SAndroid Build Coastguard Worker Allocator::unique_ptr<Scheduler::Fiber>
createFromCurrentThread(Allocator * allocator,uint32_t id)267*03ce13f7SAndroid Build Coastguard Worker Scheduler::Fiber::createFromCurrentThread(Allocator* allocator, uint32_t id) {
268*03ce13f7SAndroid Build Coastguard Worker   return allocator->make_unique<Fiber>(
269*03ce13f7SAndroid Build Coastguard Worker       OSFiber::createFiberFromCurrentThread(allocator), id);
270*03ce13f7SAndroid Build Coastguard Worker }
271*03ce13f7SAndroid Build Coastguard Worker 
toString(State state)272*03ce13f7SAndroid Build Coastguard Worker const char* Scheduler::Fiber::toString(State state) {
273*03ce13f7SAndroid Build Coastguard Worker   switch (state) {
274*03ce13f7SAndroid Build Coastguard Worker     case State::Idle:
275*03ce13f7SAndroid Build Coastguard Worker       return "Idle";
276*03ce13f7SAndroid Build Coastguard Worker     case State::Yielded:
277*03ce13f7SAndroid Build Coastguard Worker       return "Yielded";
278*03ce13f7SAndroid Build Coastguard Worker     case State::Queued:
279*03ce13f7SAndroid Build Coastguard Worker       return "Queued";
280*03ce13f7SAndroid Build Coastguard Worker     case State::Running:
281*03ce13f7SAndroid Build Coastguard Worker       return "Running";
282*03ce13f7SAndroid Build Coastguard Worker     case State::Waiting:
283*03ce13f7SAndroid Build Coastguard Worker       return "Waiting";
284*03ce13f7SAndroid Build Coastguard Worker   }
285*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(false, "bad fiber state");
286*03ce13f7SAndroid Build Coastguard Worker   return "<unknown>";
287*03ce13f7SAndroid Build Coastguard Worker }
288*03ce13f7SAndroid Build Coastguard Worker 
289*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
290*03ce13f7SAndroid Build Coastguard Worker // Scheduler::WaitingFibers
291*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
WaitingFibers(Allocator * allocator)292*03ce13f7SAndroid Build Coastguard Worker Scheduler::WaitingFibers::WaitingFibers(Allocator* allocator)
293*03ce13f7SAndroid Build Coastguard Worker     : timeouts(allocator), fibers(allocator) {}
294*03ce13f7SAndroid Build Coastguard Worker 
operator bool() const295*03ce13f7SAndroid Build Coastguard Worker Scheduler::WaitingFibers::operator bool() const {
296*03ce13f7SAndroid Build Coastguard Worker   return !fibers.empty();
297*03ce13f7SAndroid Build Coastguard Worker }
298*03ce13f7SAndroid Build Coastguard Worker 
take(const TimePoint & timeout)299*03ce13f7SAndroid Build Coastguard Worker Scheduler::Fiber* Scheduler::WaitingFibers::take(const TimePoint& timeout) {
300*03ce13f7SAndroid Build Coastguard Worker   if (!*this) {
301*03ce13f7SAndroid Build Coastguard Worker     return nullptr;
302*03ce13f7SAndroid Build Coastguard Worker   }
303*03ce13f7SAndroid Build Coastguard Worker   auto it = timeouts.begin();
304*03ce13f7SAndroid Build Coastguard Worker   if (timeout < it->timepoint) {
305*03ce13f7SAndroid Build Coastguard Worker     return nullptr;
306*03ce13f7SAndroid Build Coastguard Worker   }
307*03ce13f7SAndroid Build Coastguard Worker   auto fiber = it->fiber;
308*03ce13f7SAndroid Build Coastguard Worker   timeouts.erase(it);
309*03ce13f7SAndroid Build Coastguard Worker   auto deleted = fibers.erase(fiber) != 0;
310*03ce13f7SAndroid Build Coastguard Worker   (void)deleted;
311*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(deleted, "WaitingFibers::take() maps out of sync");
312*03ce13f7SAndroid Build Coastguard Worker   return fiber;
313*03ce13f7SAndroid Build Coastguard Worker }
314*03ce13f7SAndroid Build Coastguard Worker 
next() const315*03ce13f7SAndroid Build Coastguard Worker Scheduler::TimePoint Scheduler::WaitingFibers::next() const {
316*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(*this,
317*03ce13f7SAndroid Build Coastguard Worker               "WaitingFibers::next() called when there' no waiting fibers");
318*03ce13f7SAndroid Build Coastguard Worker   return timeouts.begin()->timepoint;
319*03ce13f7SAndroid Build Coastguard Worker }
320*03ce13f7SAndroid Build Coastguard Worker 
add(const TimePoint & timeout,Fiber * fiber)321*03ce13f7SAndroid Build Coastguard Worker void Scheduler::WaitingFibers::add(const TimePoint& timeout, Fiber* fiber) {
322*03ce13f7SAndroid Build Coastguard Worker   timeouts.emplace(Timeout{timeout, fiber});
323*03ce13f7SAndroid Build Coastguard Worker   bool added = fibers.emplace(fiber, timeout).second;
324*03ce13f7SAndroid Build Coastguard Worker   (void)added;
325*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(added, "WaitingFibers::add() fiber already waiting");
326*03ce13f7SAndroid Build Coastguard Worker }
327*03ce13f7SAndroid Build Coastguard Worker 
erase(Fiber * fiber)328*03ce13f7SAndroid Build Coastguard Worker void Scheduler::WaitingFibers::erase(Fiber* fiber) {
329*03ce13f7SAndroid Build Coastguard Worker   auto it = fibers.find(fiber);
330*03ce13f7SAndroid Build Coastguard Worker   if (it != fibers.end()) {
331*03ce13f7SAndroid Build Coastguard Worker     auto timeout = it->second;
332*03ce13f7SAndroid Build Coastguard Worker     auto erased = timeouts.erase(Timeout{timeout, fiber}) != 0;
333*03ce13f7SAndroid Build Coastguard Worker     (void)erased;
334*03ce13f7SAndroid Build Coastguard Worker     MARL_ASSERT(erased, "WaitingFibers::erase() maps out of sync");
335*03ce13f7SAndroid Build Coastguard Worker     fibers.erase(it);
336*03ce13f7SAndroid Build Coastguard Worker   }
337*03ce13f7SAndroid Build Coastguard Worker }
338*03ce13f7SAndroid Build Coastguard Worker 
contains(Fiber * fiber) const339*03ce13f7SAndroid Build Coastguard Worker bool Scheduler::WaitingFibers::contains(Fiber* fiber) const {
340*03ce13f7SAndroid Build Coastguard Worker   return fibers.count(fiber) != 0;
341*03ce13f7SAndroid Build Coastguard Worker }
342*03ce13f7SAndroid Build Coastguard Worker 
operator <(const Timeout & o) const343*03ce13f7SAndroid Build Coastguard Worker bool Scheduler::WaitingFibers::Timeout::operator<(const Timeout& o) const {
344*03ce13f7SAndroid Build Coastguard Worker   if (timepoint != o.timepoint) {
345*03ce13f7SAndroid Build Coastguard Worker     return timepoint < o.timepoint;
346*03ce13f7SAndroid Build Coastguard Worker   }
347*03ce13f7SAndroid Build Coastguard Worker   return fiber < o.fiber;
348*03ce13f7SAndroid Build Coastguard Worker }
349*03ce13f7SAndroid Build Coastguard Worker 
350*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
351*03ce13f7SAndroid Build Coastguard Worker // Scheduler::Worker
352*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
353*03ce13f7SAndroid Build Coastguard Worker MARL_INSTANTIATE_THREAD_LOCAL(Scheduler::Worker*,
354*03ce13f7SAndroid Build Coastguard Worker                               Scheduler::Worker::current,
355*03ce13f7SAndroid Build Coastguard Worker                               nullptr);
356*03ce13f7SAndroid Build Coastguard Worker 
Worker(Scheduler * scheduler,Mode mode,uint32_t id)357*03ce13f7SAndroid Build Coastguard Worker Scheduler::Worker::Worker(Scheduler* scheduler, Mode mode, uint32_t id)
358*03ce13f7SAndroid Build Coastguard Worker     : id(id),
359*03ce13f7SAndroid Build Coastguard Worker       mode(mode),
360*03ce13f7SAndroid Build Coastguard Worker       scheduler(scheduler),
361*03ce13f7SAndroid Build Coastguard Worker       work(scheduler->cfg.allocator),
362*03ce13f7SAndroid Build Coastguard Worker       idleFibers(scheduler->cfg.allocator) {}
363*03ce13f7SAndroid Build Coastguard Worker 
start()364*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::start() {
365*03ce13f7SAndroid Build Coastguard Worker   switch (mode) {
366*03ce13f7SAndroid Build Coastguard Worker     case Mode::MultiThreaded: {
367*03ce13f7SAndroid Build Coastguard Worker       auto allocator = scheduler->cfg.allocator;
368*03ce13f7SAndroid Build Coastguard Worker       auto& affinityPolicy = scheduler->cfg.workerThread.affinityPolicy;
369*03ce13f7SAndroid Build Coastguard Worker       auto affinity = affinityPolicy->get(id, allocator);
370*03ce13f7SAndroid Build Coastguard Worker       thread = Thread(std::move(affinity), [=, this] {
371*03ce13f7SAndroid Build Coastguard Worker         Thread::setName("Thread<%.2d>", int(id));
372*03ce13f7SAndroid Build Coastguard Worker 
373*03ce13f7SAndroid Build Coastguard Worker         if (auto const& initFunc = scheduler->cfg.workerThread.initializer) {
374*03ce13f7SAndroid Build Coastguard Worker           initFunc(id);
375*03ce13f7SAndroid Build Coastguard Worker         }
376*03ce13f7SAndroid Build Coastguard Worker 
377*03ce13f7SAndroid Build Coastguard Worker         Scheduler::setBound(scheduler);
378*03ce13f7SAndroid Build Coastguard Worker         Worker::current = this;
379*03ce13f7SAndroid Build Coastguard Worker         mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
380*03ce13f7SAndroid Build Coastguard Worker         currentFiber = mainFiber.get();
381*03ce13f7SAndroid Build Coastguard Worker         {
382*03ce13f7SAndroid Build Coastguard Worker           marl::lock lock(work.mutex);
383*03ce13f7SAndroid Build Coastguard Worker           run();
384*03ce13f7SAndroid Build Coastguard Worker         }
385*03ce13f7SAndroid Build Coastguard Worker         mainFiber.reset();
386*03ce13f7SAndroid Build Coastguard Worker         Worker::current = nullptr;
387*03ce13f7SAndroid Build Coastguard Worker       });
388*03ce13f7SAndroid Build Coastguard Worker       break;
389*03ce13f7SAndroid Build Coastguard Worker     }
390*03ce13f7SAndroid Build Coastguard Worker     case Mode::SingleThreaded: {
391*03ce13f7SAndroid Build Coastguard Worker       Worker::current = this;
392*03ce13f7SAndroid Build Coastguard Worker       mainFiber = Fiber::createFromCurrentThread(scheduler->cfg.allocator, 0);
393*03ce13f7SAndroid Build Coastguard Worker       currentFiber = mainFiber.get();
394*03ce13f7SAndroid Build Coastguard Worker       break;
395*03ce13f7SAndroid Build Coastguard Worker     }
396*03ce13f7SAndroid Build Coastguard Worker     default:
397*03ce13f7SAndroid Build Coastguard Worker       MARL_ASSERT(false, "Unknown mode: %d", int(mode));
398*03ce13f7SAndroid Build Coastguard Worker   }
399*03ce13f7SAndroid Build Coastguard Worker }
400*03ce13f7SAndroid Build Coastguard Worker 
stop()401*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::stop() {
402*03ce13f7SAndroid Build Coastguard Worker   switch (mode) {
403*03ce13f7SAndroid Build Coastguard Worker     case Mode::MultiThreaded: {
404*03ce13f7SAndroid Build Coastguard Worker       enqueue(Task([this] { shutdown = true; }, Task::Flags::SameThread));
405*03ce13f7SAndroid Build Coastguard Worker       thread.join();
406*03ce13f7SAndroid Build Coastguard Worker       break;
407*03ce13f7SAndroid Build Coastguard Worker     }
408*03ce13f7SAndroid Build Coastguard Worker     case Mode::SingleThreaded: {
409*03ce13f7SAndroid Build Coastguard Worker       marl::lock lock(work.mutex);
410*03ce13f7SAndroid Build Coastguard Worker       shutdown = true;
411*03ce13f7SAndroid Build Coastguard Worker       runUntilShutdown();
412*03ce13f7SAndroid Build Coastguard Worker       Worker::current = nullptr;
413*03ce13f7SAndroid Build Coastguard Worker       break;
414*03ce13f7SAndroid Build Coastguard Worker     }
415*03ce13f7SAndroid Build Coastguard Worker     default:
416*03ce13f7SAndroid Build Coastguard Worker       MARL_ASSERT(false, "Unknown mode: %d", int(mode));
417*03ce13f7SAndroid Build Coastguard Worker   }
418*03ce13f7SAndroid Build Coastguard Worker }
419*03ce13f7SAndroid Build Coastguard Worker 
wait(const TimePoint * timeout)420*03ce13f7SAndroid Build Coastguard Worker bool Scheduler::Worker::wait(const TimePoint* timeout) {
421*03ce13f7SAndroid Build Coastguard Worker   DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
422*03ce13f7SAndroid Build Coastguard Worker   {
423*03ce13f7SAndroid Build Coastguard Worker     marl::lock lock(work.mutex);
424*03ce13f7SAndroid Build Coastguard Worker     suspend(timeout);
425*03ce13f7SAndroid Build Coastguard Worker   }
426*03ce13f7SAndroid Build Coastguard Worker   return timeout == nullptr || std::chrono::system_clock::now() < *timeout;
427*03ce13f7SAndroid Build Coastguard Worker }
428*03ce13f7SAndroid Build Coastguard Worker 
wait(lock & waitLock,const TimePoint * timeout,const Predicate & pred)429*03ce13f7SAndroid Build Coastguard Worker bool Scheduler::Worker::wait(lock& waitLock,
430*03ce13f7SAndroid Build Coastguard Worker                              const TimePoint* timeout,
431*03ce13f7SAndroid Build Coastguard Worker                              const Predicate& pred) {
432*03ce13f7SAndroid Build Coastguard Worker   DBG_LOG("%d: WAIT(%d)", (int)id, (int)currentFiber->id);
433*03ce13f7SAndroid Build Coastguard Worker   while (!pred()) {
434*03ce13f7SAndroid Build Coastguard Worker     // Lock the work mutex to call suspend().
435*03ce13f7SAndroid Build Coastguard Worker     work.mutex.lock();
436*03ce13f7SAndroid Build Coastguard Worker 
437*03ce13f7SAndroid Build Coastguard Worker     // Unlock the wait mutex with the work mutex lock held.
438*03ce13f7SAndroid Build Coastguard Worker     // Order is important here as we need to ensure that the fiber is not
439*03ce13f7SAndroid Build Coastguard Worker     // enqueued (via Fiber::notify()) between the waitLock.unlock() and fiber
440*03ce13f7SAndroid Build Coastguard Worker     // switch, otherwise the Fiber::notify() call may be ignored and the fiber
441*03ce13f7SAndroid Build Coastguard Worker     // is never woken.
442*03ce13f7SAndroid Build Coastguard Worker     waitLock.unlock_no_tsa();
443*03ce13f7SAndroid Build Coastguard Worker 
444*03ce13f7SAndroid Build Coastguard Worker     // suspend the fiber.
445*03ce13f7SAndroid Build Coastguard Worker     suspend(timeout);
446*03ce13f7SAndroid Build Coastguard Worker 
447*03ce13f7SAndroid Build Coastguard Worker     // Fiber resumed. We don't need the work mutex locked any more.
448*03ce13f7SAndroid Build Coastguard Worker     work.mutex.unlock();
449*03ce13f7SAndroid Build Coastguard Worker 
450*03ce13f7SAndroid Build Coastguard Worker     // Re-lock to either return due to timeout, or call pred().
451*03ce13f7SAndroid Build Coastguard Worker     waitLock.lock_no_tsa();
452*03ce13f7SAndroid Build Coastguard Worker 
453*03ce13f7SAndroid Build Coastguard Worker     // Check timeout.
454*03ce13f7SAndroid Build Coastguard Worker     if (timeout != nullptr && std::chrono::system_clock::now() >= *timeout) {
455*03ce13f7SAndroid Build Coastguard Worker       return false;
456*03ce13f7SAndroid Build Coastguard Worker     }
457*03ce13f7SAndroid Build Coastguard Worker 
458*03ce13f7SAndroid Build Coastguard Worker     // Spurious wake up. Spin again.
459*03ce13f7SAndroid Build Coastguard Worker   }
460*03ce13f7SAndroid Build Coastguard Worker   return true;
461*03ce13f7SAndroid Build Coastguard Worker }
462*03ce13f7SAndroid Build Coastguard Worker 
suspend(const std::chrono::system_clock::time_point * timeout)463*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::suspend(
464*03ce13f7SAndroid Build Coastguard Worker     const std::chrono::system_clock::time_point* timeout) {
465*03ce13f7SAndroid Build Coastguard Worker   // Current fiber is yielding as it is blocked.
466*03ce13f7SAndroid Build Coastguard Worker   if (timeout != nullptr) {
467*03ce13f7SAndroid Build Coastguard Worker     changeFiberState(currentFiber, Fiber::State::Running,
468*03ce13f7SAndroid Build Coastguard Worker                      Fiber::State::Waiting);
469*03ce13f7SAndroid Build Coastguard Worker     work.waiting.add(*timeout, currentFiber);
470*03ce13f7SAndroid Build Coastguard Worker   } else {
471*03ce13f7SAndroid Build Coastguard Worker     changeFiberState(currentFiber, Fiber::State::Running,
472*03ce13f7SAndroid Build Coastguard Worker                      Fiber::State::Yielded);
473*03ce13f7SAndroid Build Coastguard Worker   }
474*03ce13f7SAndroid Build Coastguard Worker 
475*03ce13f7SAndroid Build Coastguard Worker   // First wait until there's something else this worker can do.
476*03ce13f7SAndroid Build Coastguard Worker   waitForWork();
477*03ce13f7SAndroid Build Coastguard Worker 
478*03ce13f7SAndroid Build Coastguard Worker   work.numBlockedFibers++;
479*03ce13f7SAndroid Build Coastguard Worker 
480*03ce13f7SAndroid Build Coastguard Worker   if (!work.fibers.empty()) {
481*03ce13f7SAndroid Build Coastguard Worker     // There's another fiber that has become unblocked, resume that.
482*03ce13f7SAndroid Build Coastguard Worker     work.num--;
483*03ce13f7SAndroid Build Coastguard Worker     auto to = containers::take(work.fibers);
484*03ce13f7SAndroid Build Coastguard Worker     ASSERT_FIBER_STATE(to, Fiber::State::Queued);
485*03ce13f7SAndroid Build Coastguard Worker     switchToFiber(to);
486*03ce13f7SAndroid Build Coastguard Worker   } else if (!idleFibers.empty()) {
487*03ce13f7SAndroid Build Coastguard Worker     // There's an old fiber we can reuse, resume that.
488*03ce13f7SAndroid Build Coastguard Worker     auto to = containers::take(idleFibers);
489*03ce13f7SAndroid Build Coastguard Worker     ASSERT_FIBER_STATE(to, Fiber::State::Idle);
490*03ce13f7SAndroid Build Coastguard Worker     switchToFiber(to);
491*03ce13f7SAndroid Build Coastguard Worker   } else {
492*03ce13f7SAndroid Build Coastguard Worker     // Tasks to process and no existing fibers to resume.
493*03ce13f7SAndroid Build Coastguard Worker     // Spawn a new fiber.
494*03ce13f7SAndroid Build Coastguard Worker     switchToFiber(createWorkerFiber());
495*03ce13f7SAndroid Build Coastguard Worker   }
496*03ce13f7SAndroid Build Coastguard Worker 
497*03ce13f7SAndroid Build Coastguard Worker   work.numBlockedFibers--;
498*03ce13f7SAndroid Build Coastguard Worker 
499*03ce13f7SAndroid Build Coastguard Worker   setFiberState(currentFiber, Fiber::State::Running);
500*03ce13f7SAndroid Build Coastguard Worker }
501*03ce13f7SAndroid Build Coastguard Worker 
tryLock()502*03ce13f7SAndroid Build Coastguard Worker bool Scheduler::Worker::tryLock() {
503*03ce13f7SAndroid Build Coastguard Worker   return work.mutex.try_lock();
504*03ce13f7SAndroid Build Coastguard Worker }
505*03ce13f7SAndroid Build Coastguard Worker 
enqueue(Fiber * fiber)506*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::enqueue(Fiber* fiber) {
507*03ce13f7SAndroid Build Coastguard Worker   bool notify = false;
508*03ce13f7SAndroid Build Coastguard Worker   {
509*03ce13f7SAndroid Build Coastguard Worker     marl::lock lock(work.mutex);
510*03ce13f7SAndroid Build Coastguard Worker     DBG_LOG("%d: ENQUEUE(%d %s)", (int)id, (int)fiber->id,
511*03ce13f7SAndroid Build Coastguard Worker             Fiber::toString(fiber->state));
512*03ce13f7SAndroid Build Coastguard Worker     switch (fiber->state) {
513*03ce13f7SAndroid Build Coastguard Worker       case Fiber::State::Running:
514*03ce13f7SAndroid Build Coastguard Worker       case Fiber::State::Queued:
515*03ce13f7SAndroid Build Coastguard Worker         return;  // Nothing to do here - task is already queued or running.
516*03ce13f7SAndroid Build Coastguard Worker       case Fiber::State::Waiting:
517*03ce13f7SAndroid Build Coastguard Worker         work.waiting.erase(fiber);
518*03ce13f7SAndroid Build Coastguard Worker         break;
519*03ce13f7SAndroid Build Coastguard Worker       case Fiber::State::Idle:
520*03ce13f7SAndroid Build Coastguard Worker       case Fiber::State::Yielded:
521*03ce13f7SAndroid Build Coastguard Worker         break;
522*03ce13f7SAndroid Build Coastguard Worker     }
523*03ce13f7SAndroid Build Coastguard Worker     notify = work.notifyAdded;
524*03ce13f7SAndroid Build Coastguard Worker     work.fibers.push_back(fiber);
525*03ce13f7SAndroid Build Coastguard Worker     MARL_ASSERT(!work.waiting.contains(fiber),
526*03ce13f7SAndroid Build Coastguard Worker                 "fiber is unexpectedly in the waiting list");
527*03ce13f7SAndroid Build Coastguard Worker     setFiberState(fiber, Fiber::State::Queued);
528*03ce13f7SAndroid Build Coastguard Worker     work.num++;
529*03ce13f7SAndroid Build Coastguard Worker   }
530*03ce13f7SAndroid Build Coastguard Worker 
531*03ce13f7SAndroid Build Coastguard Worker   if (notify) {
532*03ce13f7SAndroid Build Coastguard Worker     work.added.notify_one();
533*03ce13f7SAndroid Build Coastguard Worker   }
534*03ce13f7SAndroid Build Coastguard Worker }
535*03ce13f7SAndroid Build Coastguard Worker 
enqueue(Task && task)536*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::enqueue(Task&& task) {
537*03ce13f7SAndroid Build Coastguard Worker   work.mutex.lock();
538*03ce13f7SAndroid Build Coastguard Worker   enqueueAndUnlock(std::move(task));
539*03ce13f7SAndroid Build Coastguard Worker }
540*03ce13f7SAndroid Build Coastguard Worker 
enqueueAndUnlock(Task && task)541*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::enqueueAndUnlock(Task&& task) {
542*03ce13f7SAndroid Build Coastguard Worker   auto notify = work.notifyAdded;
543*03ce13f7SAndroid Build Coastguard Worker   work.tasks.push_back(std::move(task));
544*03ce13f7SAndroid Build Coastguard Worker   work.num++;
545*03ce13f7SAndroid Build Coastguard Worker   work.mutex.unlock();
546*03ce13f7SAndroid Build Coastguard Worker   if (notify) {
547*03ce13f7SAndroid Build Coastguard Worker     work.added.notify_one();
548*03ce13f7SAndroid Build Coastguard Worker   }
549*03ce13f7SAndroid Build Coastguard Worker }
550*03ce13f7SAndroid Build Coastguard Worker 
steal(Task & out)551*03ce13f7SAndroid Build Coastguard Worker bool Scheduler::Worker::steal(Task& out) {
552*03ce13f7SAndroid Build Coastguard Worker   if (work.num.load() == 0) {
553*03ce13f7SAndroid Build Coastguard Worker     return false;
554*03ce13f7SAndroid Build Coastguard Worker   }
555*03ce13f7SAndroid Build Coastguard Worker   if (!work.mutex.try_lock()) {
556*03ce13f7SAndroid Build Coastguard Worker     return false;
557*03ce13f7SAndroid Build Coastguard Worker   }
558*03ce13f7SAndroid Build Coastguard Worker   if (work.tasks.empty() || work.tasks.front().is(Task::Flags::SameThread)) {
559*03ce13f7SAndroid Build Coastguard Worker     work.mutex.unlock();
560*03ce13f7SAndroid Build Coastguard Worker     return false;
561*03ce13f7SAndroid Build Coastguard Worker   }
562*03ce13f7SAndroid Build Coastguard Worker   work.num--;
563*03ce13f7SAndroid Build Coastguard Worker   out = containers::take(work.tasks);
564*03ce13f7SAndroid Build Coastguard Worker   work.mutex.unlock();
565*03ce13f7SAndroid Build Coastguard Worker   return true;
566*03ce13f7SAndroid Build Coastguard Worker }
567*03ce13f7SAndroid Build Coastguard Worker 
run()568*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::run() {
569*03ce13f7SAndroid Build Coastguard Worker   if (mode == Mode::MultiThreaded) {
570*03ce13f7SAndroid Build Coastguard Worker     MARL_NAME_THREAD("Thread<%.2d> Fiber<%.2d>", int(id), Fiber::current()->id);
571*03ce13f7SAndroid Build Coastguard Worker     // This is the entry point for a multi-threaded worker.
572*03ce13f7SAndroid Build Coastguard Worker     // Start with a regular condition-variable wait for work. This avoids
573*03ce13f7SAndroid Build Coastguard Worker     // starting the thread with a spinForWorkAndLock().
574*03ce13f7SAndroid Build Coastguard Worker     work.wait([this]() REQUIRES(work.mutex) {
575*03ce13f7SAndroid Build Coastguard Worker       return work.num > 0 || work.waiting || shutdown;
576*03ce13f7SAndroid Build Coastguard Worker     });
577*03ce13f7SAndroid Build Coastguard Worker   }
578*03ce13f7SAndroid Build Coastguard Worker   ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
579*03ce13f7SAndroid Build Coastguard Worker   runUntilShutdown();
580*03ce13f7SAndroid Build Coastguard Worker   switchToFiber(mainFiber.get());
581*03ce13f7SAndroid Build Coastguard Worker }
582*03ce13f7SAndroid Build Coastguard Worker 
runUntilShutdown()583*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::runUntilShutdown() {
584*03ce13f7SAndroid Build Coastguard Worker   while (!shutdown || work.num > 0 || work.numBlockedFibers > 0U) {
585*03ce13f7SAndroid Build Coastguard Worker     waitForWork();
586*03ce13f7SAndroid Build Coastguard Worker     runUntilIdle();
587*03ce13f7SAndroid Build Coastguard Worker   }
588*03ce13f7SAndroid Build Coastguard Worker }
589*03ce13f7SAndroid Build Coastguard Worker 
waitForWork()590*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::waitForWork() {
591*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
592*03ce13f7SAndroid Build Coastguard Worker               "work.num out of sync");
593*03ce13f7SAndroid Build Coastguard Worker   if (work.num > 0) {
594*03ce13f7SAndroid Build Coastguard Worker     return;
595*03ce13f7SAndroid Build Coastguard Worker   }
596*03ce13f7SAndroid Build Coastguard Worker 
597*03ce13f7SAndroid Build Coastguard Worker   if (mode == Mode::MultiThreaded) {
598*03ce13f7SAndroid Build Coastguard Worker     scheduler->onBeginSpinning(id);
599*03ce13f7SAndroid Build Coastguard Worker     work.mutex.unlock();
600*03ce13f7SAndroid Build Coastguard Worker     spinForWorkAndLock();
601*03ce13f7SAndroid Build Coastguard Worker   }
602*03ce13f7SAndroid Build Coastguard Worker 
603*03ce13f7SAndroid Build Coastguard Worker   work.wait([this]() REQUIRES(work.mutex) {
604*03ce13f7SAndroid Build Coastguard Worker     return work.num > 0 || (shutdown && work.numBlockedFibers == 0U);
605*03ce13f7SAndroid Build Coastguard Worker   });
606*03ce13f7SAndroid Build Coastguard Worker   if (work.waiting) {
607*03ce13f7SAndroid Build Coastguard Worker     enqueueFiberTimeouts();
608*03ce13f7SAndroid Build Coastguard Worker   }
609*03ce13f7SAndroid Build Coastguard Worker }
610*03ce13f7SAndroid Build Coastguard Worker 
enqueueFiberTimeouts()611*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::enqueueFiberTimeouts() {
612*03ce13f7SAndroid Build Coastguard Worker   auto now = std::chrono::system_clock::now();
613*03ce13f7SAndroid Build Coastguard Worker   while (auto fiber = work.waiting.take(now)) {
614*03ce13f7SAndroid Build Coastguard Worker     changeFiberState(fiber, Fiber::State::Waiting, Fiber::State::Queued);
615*03ce13f7SAndroid Build Coastguard Worker     DBG_LOG("%d: TIMEOUT(%d)", (int)id, (int)fiber->id);
616*03ce13f7SAndroid Build Coastguard Worker     work.fibers.push_back(fiber);
617*03ce13f7SAndroid Build Coastguard Worker     work.num++;
618*03ce13f7SAndroid Build Coastguard Worker   }
619*03ce13f7SAndroid Build Coastguard Worker }
620*03ce13f7SAndroid Build Coastguard Worker 
changeFiberState(Fiber * fiber,Fiber::State from,Fiber::State to) const621*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::changeFiberState(Fiber* fiber,
622*03ce13f7SAndroid Build Coastguard Worker                                          Fiber::State from,
623*03ce13f7SAndroid Build Coastguard Worker                                          Fiber::State to) const {
624*03ce13f7SAndroid Build Coastguard Worker   (void)from;  // Unusued parameter when ENABLE_DEBUG_LOGGING is disabled.
625*03ce13f7SAndroid Build Coastguard Worker   DBG_LOG("%d: CHANGE_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id,
626*03ce13f7SAndroid Build Coastguard Worker           Fiber::toString(from), Fiber::toString(to));
627*03ce13f7SAndroid Build Coastguard Worker   ASSERT_FIBER_STATE(fiber, from);
628*03ce13f7SAndroid Build Coastguard Worker   fiber->state = to;
629*03ce13f7SAndroid Build Coastguard Worker }
630*03ce13f7SAndroid Build Coastguard Worker 
setFiberState(Fiber * fiber,Fiber::State to) const631*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::setFiberState(Fiber* fiber, Fiber::State to) const {
632*03ce13f7SAndroid Build Coastguard Worker   DBG_LOG("%d: SET_FIBER_STATE(%d %s -> %s)", (int)id, (int)fiber->id,
633*03ce13f7SAndroid Build Coastguard Worker           Fiber::toString(fiber->state), Fiber::toString(to));
634*03ce13f7SAndroid Build Coastguard Worker   fiber->state = to;
635*03ce13f7SAndroid Build Coastguard Worker }
636*03ce13f7SAndroid Build Coastguard Worker 
spinForWorkAndLock()637*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::spinForWorkAndLock() {
638*03ce13f7SAndroid Build Coastguard Worker   TRACE("SPIN");
639*03ce13f7SAndroid Build Coastguard Worker   Task stolen;
640*03ce13f7SAndroid Build Coastguard Worker 
641*03ce13f7SAndroid Build Coastguard Worker   constexpr auto duration = std::chrono::milliseconds(1);
642*03ce13f7SAndroid Build Coastguard Worker   auto start = std::chrono::high_resolution_clock::now();
643*03ce13f7SAndroid Build Coastguard Worker   while (std::chrono::high_resolution_clock::now() - start < duration) {
644*03ce13f7SAndroid Build Coastguard Worker     for (int i = 0; i < 256; i++)  // Empirically picked magic number!
645*03ce13f7SAndroid Build Coastguard Worker     {
646*03ce13f7SAndroid Build Coastguard Worker       // clang-format off
647*03ce13f7SAndroid Build Coastguard Worker       nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
648*03ce13f7SAndroid Build Coastguard Worker       nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
649*03ce13f7SAndroid Build Coastguard Worker       nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
650*03ce13f7SAndroid Build Coastguard Worker       nop(); nop(); nop(); nop(); nop(); nop(); nop(); nop();
651*03ce13f7SAndroid Build Coastguard Worker       // clang-format on
652*03ce13f7SAndroid Build Coastguard Worker 
653*03ce13f7SAndroid Build Coastguard Worker       if (work.num > 0) {
654*03ce13f7SAndroid Build Coastguard Worker         work.mutex.lock();
655*03ce13f7SAndroid Build Coastguard Worker         if (work.num > 0) {
656*03ce13f7SAndroid Build Coastguard Worker           return;
657*03ce13f7SAndroid Build Coastguard Worker         }
658*03ce13f7SAndroid Build Coastguard Worker         else {
659*03ce13f7SAndroid Build Coastguard Worker           // Our new task was stolen by another worker. Keep spinning.
660*03ce13f7SAndroid Build Coastguard Worker           work.mutex.unlock();
661*03ce13f7SAndroid Build Coastguard Worker         }
662*03ce13f7SAndroid Build Coastguard Worker       }
663*03ce13f7SAndroid Build Coastguard Worker     }
664*03ce13f7SAndroid Build Coastguard Worker 
665*03ce13f7SAndroid Build Coastguard Worker     if (scheduler->stealWork(this, rng(), stolen)) {
666*03ce13f7SAndroid Build Coastguard Worker       work.mutex.lock();
667*03ce13f7SAndroid Build Coastguard Worker       work.tasks.emplace_back(std::move(stolen));
668*03ce13f7SAndroid Build Coastguard Worker       work.num++;
669*03ce13f7SAndroid Build Coastguard Worker       return;
670*03ce13f7SAndroid Build Coastguard Worker     }
671*03ce13f7SAndroid Build Coastguard Worker 
672*03ce13f7SAndroid Build Coastguard Worker     std::this_thread::yield();
673*03ce13f7SAndroid Build Coastguard Worker   }
674*03ce13f7SAndroid Build Coastguard Worker   work.mutex.lock();
675*03ce13f7SAndroid Build Coastguard Worker }
676*03ce13f7SAndroid Build Coastguard Worker 
runUntilIdle()677*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::runUntilIdle() {
678*03ce13f7SAndroid Build Coastguard Worker   ASSERT_FIBER_STATE(currentFiber, Fiber::State::Running);
679*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(work.num == work.fibers.size() + work.tasks.size(),
680*03ce13f7SAndroid Build Coastguard Worker               "work.num out of sync");
681*03ce13f7SAndroid Build Coastguard Worker   while (!work.fibers.empty() || !work.tasks.empty()) {
682*03ce13f7SAndroid Build Coastguard Worker     // Note: we cannot take and store on the stack more than a single fiber
683*03ce13f7SAndroid Build Coastguard Worker     // or task at a time, as the Fiber may yield and these items may get
684*03ce13f7SAndroid Build Coastguard Worker     // held on suspended fiber stack.
685*03ce13f7SAndroid Build Coastguard Worker 
686*03ce13f7SAndroid Build Coastguard Worker     while (!work.fibers.empty()) {
687*03ce13f7SAndroid Build Coastguard Worker       work.num--;
688*03ce13f7SAndroid Build Coastguard Worker       auto fiber = containers::take(work.fibers);
689*03ce13f7SAndroid Build Coastguard Worker       // Sanity checks,
690*03ce13f7SAndroid Build Coastguard Worker       MARL_ASSERT(idleFibers.count(fiber) == 0, "dequeued fiber is idle");
691*03ce13f7SAndroid Build Coastguard Worker       MARL_ASSERT(fiber != currentFiber, "dequeued fiber is currently running");
692*03ce13f7SAndroid Build Coastguard Worker       ASSERT_FIBER_STATE(fiber, Fiber::State::Queued);
693*03ce13f7SAndroid Build Coastguard Worker 
694*03ce13f7SAndroid Build Coastguard Worker       changeFiberState(currentFiber, Fiber::State::Running, Fiber::State::Idle);
695*03ce13f7SAndroid Build Coastguard Worker       auto added = idleFibers.emplace(currentFiber).second;
696*03ce13f7SAndroid Build Coastguard Worker       (void)added;
697*03ce13f7SAndroid Build Coastguard Worker       MARL_ASSERT(added, "fiber already idle");
698*03ce13f7SAndroid Build Coastguard Worker 
699*03ce13f7SAndroid Build Coastguard Worker       switchToFiber(fiber);
700*03ce13f7SAndroid Build Coastguard Worker       changeFiberState(currentFiber, Fiber::State::Idle, Fiber::State::Running);
701*03ce13f7SAndroid Build Coastguard Worker     }
702*03ce13f7SAndroid Build Coastguard Worker 
703*03ce13f7SAndroid Build Coastguard Worker     if (!work.tasks.empty()) {
704*03ce13f7SAndroid Build Coastguard Worker       work.num--;
705*03ce13f7SAndroid Build Coastguard Worker       auto task = containers::take(work.tasks);
706*03ce13f7SAndroid Build Coastguard Worker       work.mutex.unlock();
707*03ce13f7SAndroid Build Coastguard Worker 
708*03ce13f7SAndroid Build Coastguard Worker       // Run the task.
709*03ce13f7SAndroid Build Coastguard Worker       task();
710*03ce13f7SAndroid Build Coastguard Worker 
711*03ce13f7SAndroid Build Coastguard Worker       // std::function<> can carry arguments with complex destructors.
712*03ce13f7SAndroid Build Coastguard Worker       // Ensure these are destructed outside of the lock.
713*03ce13f7SAndroid Build Coastguard Worker       task = Task();
714*03ce13f7SAndroid Build Coastguard Worker 
715*03ce13f7SAndroid Build Coastguard Worker       work.mutex.lock();
716*03ce13f7SAndroid Build Coastguard Worker     }
717*03ce13f7SAndroid Build Coastguard Worker   }
718*03ce13f7SAndroid Build Coastguard Worker }
719*03ce13f7SAndroid Build Coastguard Worker 
createWorkerFiber()720*03ce13f7SAndroid Build Coastguard Worker Scheduler::Fiber* Scheduler::Worker::createWorkerFiber() {
721*03ce13f7SAndroid Build Coastguard Worker   auto fiberId = static_cast<uint32_t>(workerFibers.size() + 1);
722*03ce13f7SAndroid Build Coastguard Worker   DBG_LOG("%d: CREATE(%d)", (int)id, (int)fiberId);
723*03ce13f7SAndroid Build Coastguard Worker   auto fiber = Fiber::create(scheduler->cfg.allocator, fiberId,
724*03ce13f7SAndroid Build Coastguard Worker                              scheduler->cfg.fiberStackSize,
725*03ce13f7SAndroid Build Coastguard Worker                              [&]() REQUIRES(work.mutex) { run(); });
726*03ce13f7SAndroid Build Coastguard Worker   auto ptr = fiber.get();
727*03ce13f7SAndroid Build Coastguard Worker   workerFibers.emplace_back(std::move(fiber));
728*03ce13f7SAndroid Build Coastguard Worker   return ptr;
729*03ce13f7SAndroid Build Coastguard Worker }
730*03ce13f7SAndroid Build Coastguard Worker 
switchToFiber(Fiber * to)731*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::switchToFiber(Fiber* to) {
732*03ce13f7SAndroid Build Coastguard Worker   DBG_LOG("%d: SWITCH(%d -> %d)", (int)id, (int)currentFiber->id, (int)to->id);
733*03ce13f7SAndroid Build Coastguard Worker   MARL_ASSERT(to == mainFiber.get() || idleFibers.count(to) == 0,
734*03ce13f7SAndroid Build Coastguard Worker               "switching to idle fiber");
735*03ce13f7SAndroid Build Coastguard Worker   auto from = currentFiber;
736*03ce13f7SAndroid Build Coastguard Worker   currentFiber = to;
737*03ce13f7SAndroid Build Coastguard Worker   from->switchTo(to);
738*03ce13f7SAndroid Build Coastguard Worker }
739*03ce13f7SAndroid Build Coastguard Worker 
740*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
741*03ce13f7SAndroid Build Coastguard Worker // Scheduler::Worker::Work
742*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
Work(Allocator * allocator)743*03ce13f7SAndroid Build Coastguard Worker Scheduler::Worker::Work::Work(Allocator* allocator)
744*03ce13f7SAndroid Build Coastguard Worker     : tasks(allocator), fibers(allocator), waiting(allocator) {}
745*03ce13f7SAndroid Build Coastguard Worker 
746*03ce13f7SAndroid Build Coastguard Worker template <typename F>
wait(F && f)747*03ce13f7SAndroid Build Coastguard Worker void Scheduler::Worker::Work::wait(F&& f) {
748*03ce13f7SAndroid Build Coastguard Worker   notifyAdded = true;
749*03ce13f7SAndroid Build Coastguard Worker   if (waiting) {
750*03ce13f7SAndroid Build Coastguard Worker     mutex.wait_until_locked(added, waiting.next(), f);
751*03ce13f7SAndroid Build Coastguard Worker   } else {
752*03ce13f7SAndroid Build Coastguard Worker     mutex.wait_locked(added, f);
753*03ce13f7SAndroid Build Coastguard Worker   }
754*03ce13f7SAndroid Build Coastguard Worker   notifyAdded = false;
755*03ce13f7SAndroid Build Coastguard Worker }
756*03ce13f7SAndroid Build Coastguard Worker 
757*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
758*03ce13f7SAndroid Build Coastguard Worker // Scheduler::Worker::Work
759*03ce13f7SAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
SingleThreadedWorkers(Allocator * allocator)760*03ce13f7SAndroid Build Coastguard Worker Scheduler::SingleThreadedWorkers::SingleThreadedWorkers(Allocator* allocator)
761*03ce13f7SAndroid Build Coastguard Worker     : byTid(allocator) {}
762*03ce13f7SAndroid Build Coastguard Worker 
763*03ce13f7SAndroid Build Coastguard Worker }  // namespace marl
764