1 // Copyright 2020 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stddef.h>
6
7 #include <atomic>
8 #include <optional>
9 #include <utility>
10 #include <vector>
11
12 #include "base/containers/queue.h"
13 #include "base/containers/stack.h"
14 #include "base/functional/callback_helpers.h"
15 #include "base/synchronization/lock.h"
16 #include "base/task/post_job.h"
17 #include "base/task/thread_pool.h"
18 #include "base/test/bind.h"
19 #include "base/test/task_environment.h"
20 #include "testing/gtest/include/gtest/gtest.h"
21 #include "testing/perf/perf_result_reporter.h"
22
23 namespace base {
24
25 namespace {
26
27 // The perftest implements the following assignment strategy:
28 // - Naive: See RunJobWithNaiveAssignment().
29 // - Dynamic: See RunJobWithDynamicAssignment().
30 // - Loop around: See RunJobWithLoopAround().
31 // The following test setups exists for different strategies, although
32 // not every combination is performed:
33 // - No-op: Work items are no-op tasks.
34 // - No-op + disrupted: 10 disruptive tasks are posted every 1ms.
35 // - Busy wait: Work items are busy wait for 5us.
36 // - Busy wait + disrupted
37
38 constexpr char kMetricPrefixJob[] = "Job.";
39 constexpr char kMetricWorkThroughput[] = "work_throughput";
40 constexpr char kStoryNoOpNaive[] = "noop_naive";
41 constexpr char kStoryBusyWaitNaive[] = "busy_wait_naive";
42 constexpr char kStoryNoOpAtomic[] = "noop_atomic";
43 constexpr char kStoryNoOpAtomicDisrupted[] = "noop_atomic_disrupted";
44 constexpr char kStoryBusyWaitAtomic[] = "busy_wait_atomic";
45 constexpr char kStoryBusyWaitAtomicDisrupted[] = "busy_wait_atomic_disrupted";
46 constexpr char kStoryNoOpDynamic[] = "noop_dynamic";
47 constexpr char kStoryNoOpDynamicDisrupted[] = "noop_dynamic_disrupted";
48 constexpr char kStoryBusyWaitDynamic[] = "busy_wait_dynamic";
49 constexpr char kStoryBusyWaitDynamicDisrupted[] = "busy_wait_dynamic_disrupted";
50 constexpr char kStoryNoOpLoopAround[] = "noop_loop_around";
51 constexpr char kStoryNoOpLoopAroundDisrupted[] = "noop_loop_around_disrupted";
52 constexpr char kStoryBusyWaitLoopAround[] = "busy_wait_loop_around";
53 constexpr char kStoryBusyWaitLoopAroundDisrupted[] =
54 "busy_wait_loop_around_disrupted";
55
SetUpReporter(const std::string & story_name)56 perf_test::PerfResultReporter SetUpReporter(const std::string& story_name) {
57 perf_test::PerfResultReporter reporter(kMetricPrefixJob, story_name);
58 reporter.RegisterImportantMetric(kMetricWorkThroughput, "tasks/ms");
59 return reporter;
60 }
61
62 // A thread-safe data structure that generates heuristic starting points in a
63 // range to process items in parallel.
64 // Note: we could expose this atomic-binary-search-index-generator in
65 // //base/util if it's useful for real-world use cases.
66 class IndexGenerator {
67 public:
IndexGenerator(size_t size)68 explicit IndexGenerator(size_t size) : size_(size) {
69 AutoLock auto_lock(lock_);
70 pending_indices_.push(0);
71 ranges_to_split_.push({0, size_});
72 }
73
74 IndexGenerator(const IndexGenerator&) = delete;
75 IndexGenerator& operator=(const IndexGenerator&) = delete;
76
GetNext()77 std::optional<size_t> GetNext() {
78 AutoLock auto_lock(lock_);
79 if (!pending_indices_.empty()) {
80 // Return any pending index first.
81 auto index = pending_indices_.top();
82 pending_indices_.pop();
83 return index;
84 }
85 if (ranges_to_split_.empty())
86 return std::nullopt;
87
88 // Split the oldest running range in 2 and return the middle index as
89 // starting point.
90 auto range = ranges_to_split_.front();
91 ranges_to_split_.pop();
92 size_t size = range.second - range.first;
93 size_t mid = range.first + size / 2;
94 // Both sides of the range are added to |ranges_to_split_| so they may be
95 // further split if possible.
96 if (mid - range.first > 1)
97 ranges_to_split_.push({range.first, mid});
98 if (range.second - mid > 1)
99 ranges_to_split_.push({mid, range.second});
100 return mid;
101 }
102
GiveBack(size_t index)103 void GiveBack(size_t index) {
104 AutoLock auto_lock(lock_);
105 // Add |index| to pending indices so GetNext() may return it before anything
106 // else.
107 pending_indices_.push(index);
108 }
109
110 private:
111 base::Lock lock_;
112 // Pending indices that are ready to be handed out, prioritized over
113 // |pending_ranges_| when non-empty.
114 base::stack<size_t> pending_indices_ GUARDED_BY(lock_);
115 // Pending [start, end] (exclusive) ranges to split and hand out indices from.
116 base::queue<std::pair<size_t, size_t>> ranges_to_split_ GUARDED_BY(lock_);
117 const size_t size_;
118 };
119
120 struct WorkItem {
121 std::atomic_bool acquire{false};
122
TryAcquirebase::__anon8b4b63c80111::WorkItem123 bool TryAcquire() {
124 // memory_order_relaxed is sufficient as the WorkItem's state itself hasn't
125 // been modified since the beginning of its associated job. This is only
126 // atomically acquiring the right to work on it.
127 return acquire.exchange(true, std::memory_order_relaxed) == false;
128 }
129 };
130
131 class WorkList {
132 public:
WorkList(size_t num_work_items,RepeatingCallback<void (size_t)> process_item)133 WorkList(size_t num_work_items, RepeatingCallback<void(size_t)> process_item)
134 : num_incomplete_items_(num_work_items),
135 items_(num_work_items),
136 process_item_(std::move(process_item)) {}
137
138 WorkList(const WorkList&) = delete;
139 WorkList& operator=(const WorkList&) = delete;
140
141 // Acquires work item at |index|. Returns true if successful, or false if the
142 // item was already acquired.
TryAcquire(size_t index)143 bool TryAcquire(size_t index) { return items_[index].TryAcquire(); }
144
145 // Processes work item at |index|. Returns true if there are more work items
146 // to process, or false if all items were processed.
ProcessWorkItem(size_t index)147 bool ProcessWorkItem(size_t index) {
148 process_item_.Run(index);
149 return num_incomplete_items_.fetch_sub(1, std::memory_order_relaxed) > 1;
150 }
151
NumIncompleteWorkItems(size_t) const152 size_t NumIncompleteWorkItems(size_t /*worker_count*/) const {
153 // memory_order_relaxed is sufficient since this is not synchronized with
154 // other state.
155 return num_incomplete_items_.load(std::memory_order_relaxed);
156 }
157
NumWorkItems() const158 size_t NumWorkItems() const { return items_.size(); }
159
160 private:
161 std::atomic_size_t num_incomplete_items_;
162 std::vector<WorkItem> items_;
163 RepeatingCallback<void(size_t)> process_item_;
164 };
165
BusyWaitCallback(TimeDelta delta)166 RepeatingCallback<void(size_t)> BusyWaitCallback(TimeDelta delta) {
167 return base::BindRepeating(
168 [](base::TimeDelta duration, size_t index) {
169 const base::TimeTicks end_time = base::TimeTicks::Now() + duration;
170 while (base::TimeTicks::Now() < end_time)
171 ;
172 },
173 delta);
174 }
175
176 // Posts |task_count| no-op tasks every |delay|.
DisruptivePostTasks(size_t task_count,TimeDelta delay)177 void DisruptivePostTasks(size_t task_count, TimeDelta delay) {
178 for (size_t i = 0; i < task_count; ++i) {
179 ThreadPool::PostTask(FROM_HERE, {TaskPriority::USER_BLOCKING}, DoNothing());
180 }
181 ThreadPool::PostDelayedTask(FROM_HERE, {TaskPriority::USER_BLOCKING},
182 BindOnce(&DisruptivePostTasks, task_count, delay),
183 delay);
184 }
185
186 class JobPerfTest : public testing::Test {
187 public:
188 JobPerfTest() = default;
189
190 JobPerfTest(const JobPerfTest&) = delete;
191 JobPerfTest& operator=(const JobPerfTest&) = delete;
192
193 // Process |num_work_items| items with |process_item| in parallel. Work is
194 // assigned by having each worker sequentially traversing all items and
195 // acquiring unvisited ones.
RunJobWithNaiveAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item)196 void RunJobWithNaiveAssignment(const std::string& story_name,
197 size_t num_work_items,
198 RepeatingCallback<void(size_t)> process_item) {
199 WorkList work_list(num_work_items, std::move(process_item));
200
201 const TimeTicks job_run_start = TimeTicks::Now();
202
203 WaitableEvent complete;
204 auto handle = PostJob(
205 FROM_HERE, {TaskPriority::USER_VISIBLE},
206 BindRepeating(
207 [](WorkList* work_list, WaitableEvent* complete,
208 JobDelegate* delegate) {
209 for (size_t i = 0; i < work_list->NumWorkItems() &&
210 work_list->NumIncompleteWorkItems(0) != 0 &&
211 !delegate->ShouldYield();
212 ++i) {
213 if (!work_list->TryAcquire(i))
214 continue;
215 if (!work_list->ProcessWorkItem(i)) {
216 complete->Signal();
217 return;
218 }
219 }
220 },
221 Unretained(&work_list), Unretained(&complete)),
222 BindRepeating(&WorkList::NumIncompleteWorkItems,
223 Unretained(&work_list)));
224
225 complete.Wait();
226 handle.Join();
227 const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
228 EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
229
230 auto reporter = SetUpReporter(story_name);
231 reporter.AddResult(kMetricWorkThroughput,
232 size_t(num_work_items / job_duration.InMilliseconds()));
233 }
234
235 // Process |num_work_items| items with |process_item| in parallel. Work is
236 // assigned by having each worker sequentially traversing all items
237 // synchronized with an atomic variable.
RunJobWithAtomicAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)238 void RunJobWithAtomicAssignment(const std::string& story_name,
239 size_t num_work_items,
240 RepeatingCallback<void(size_t)> process_item,
241 bool disruptive_post_tasks = false) {
242 WorkList work_list(num_work_items, std::move(process_item));
243 std::atomic_size_t index{0};
244
245 // Post extra tasks to disrupt Job execution and cause workers to yield.
246 if (disruptive_post_tasks)
247 DisruptivePostTasks(10, Milliseconds(1));
248
249 const TimeTicks job_run_start = TimeTicks::Now();
250
251 WaitableEvent complete;
252 auto handle = PostJob(
253 FROM_HERE, {TaskPriority::USER_VISIBLE},
254 BindRepeating(
255 [](WorkList* work_list, WaitableEvent* complete,
256 std::atomic_size_t* index, JobDelegate* delegate) {
257 while (!delegate->ShouldYield()) {
258 const size_t i = index->fetch_add(1, std::memory_order_relaxed);
259 if (i >= work_list->NumWorkItems() ||
260 !work_list->ProcessWorkItem(i)) {
261 complete->Signal();
262 return;
263 }
264 }
265 },
266 Unretained(&work_list), Unretained(&complete), Unretained(&index)),
267 BindRepeating(&WorkList::NumIncompleteWorkItems,
268 Unretained(&work_list)));
269
270 complete.Wait();
271 handle.Join();
272 const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
273 EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
274
275 auto reporter = SetUpReporter(story_name);
276 reporter.AddResult(kMetricWorkThroughput,
277 size_t(num_work_items / job_duration.InMilliseconds()));
278 }
279
280 // Process |num_work_items| items with |process_item| in parallel. Work is
281 // assigned dynamically having each new worker given a different point far
282 // from other workers until all work is done. This is achieved by recursively
283 // splitting each range that was previously given in half.
RunJobWithDynamicAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)284 void RunJobWithDynamicAssignment(const std::string& story_name,
285 size_t num_work_items,
286 RepeatingCallback<void(size_t)> process_item,
287 bool disruptive_post_tasks = false) {
288 WorkList work_list(num_work_items, std::move(process_item));
289 IndexGenerator generator(num_work_items);
290
291 // Post extra tasks to disrupt Job execution and cause workers to yield.
292 if (disruptive_post_tasks)
293 DisruptivePostTasks(10, Milliseconds(1));
294
295 const TimeTicks job_run_start = TimeTicks::Now();
296
297 WaitableEvent complete;
298 auto handle = PostJob(
299 FROM_HERE, {TaskPriority::USER_VISIBLE},
300 BindRepeating(
301 [](IndexGenerator* generator, WorkList* work_list,
302 WaitableEvent* complete, JobDelegate* delegate) {
303 while (work_list->NumIncompleteWorkItems(0) != 0 &&
304 !delegate->ShouldYield()) {
305 std::optional<size_t> index = generator->GetNext();
306 if (!index)
307 return;
308 for (size_t i = *index; i < work_list->NumWorkItems(); ++i) {
309 if (delegate->ShouldYield()) {
310 generator->GiveBack(i);
311 return;
312 }
313 if (!work_list->TryAcquire(i)) {
314 // If this was touched already, get a new starting point.
315 break;
316 }
317 if (!work_list->ProcessWorkItem(i)) {
318 complete->Signal();
319 return;
320 }
321 }
322 }
323 },
324 Unretained(&generator), Unretained(&work_list),
325 Unretained(&complete)),
326 BindRepeating(&WorkList::NumIncompleteWorkItems,
327 Unretained(&work_list)));
328
329 complete.Wait();
330 handle.Join();
331 const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
332 EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
333
334 auto reporter = SetUpReporter(story_name);
335 reporter.AddResult(kMetricWorkThroughput,
336 size_t(num_work_items / job_duration.InMilliseconds()));
337 }
338
339 // Process |num_work_items| items with |process_item| in parallel. Work is
340 // assigned having each new worker given a different starting point far from
341 // other workers and loop over all work items from there. This is achieved by
342 // recursively splitting each range that was previously given in half.
RunJobWithLoopAround(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)343 void RunJobWithLoopAround(const std::string& story_name,
344 size_t num_work_items,
345 RepeatingCallback<void(size_t)> process_item,
346 bool disruptive_post_tasks = false) {
347 WorkList work_list(num_work_items, std::move(process_item));
348 IndexGenerator generator(num_work_items);
349
350 // Post extra tasks to disrupt Job execution and cause workers to yield.
351 if (disruptive_post_tasks)
352 DisruptivePostTasks(10, Milliseconds(1));
353
354 const TimeTicks job_run_start = TimeTicks::Now();
355
356 WaitableEvent complete;
357 auto handle =
358 PostJob(FROM_HERE, {TaskPriority::USER_VISIBLE},
359 BindRepeating(
360 [](IndexGenerator* generator, WorkList* work_list,
361 WaitableEvent* complete, JobDelegate* delegate) {
362 std::optional<size_t> index = generator->GetNext();
363 if (!index)
364 return;
365 size_t i = *index;
366 while (true) {
367 if (delegate->ShouldYield()) {
368 generator->GiveBack(i);
369 return;
370 }
371 if (!work_list->TryAcquire(i)) {
372 // If this was touched already, skip.
373 continue;
374 }
375 if (!work_list->ProcessWorkItem(i)) {
376 // This will cause the loop to exit if there's no work
377 // left.
378 complete->Signal();
379 return;
380 }
381 ++i;
382 if (i == work_list->NumWorkItems())
383 i = 0;
384 }
385 },
386 Unretained(&generator), Unretained(&work_list),
387 Unretained(&complete)),
388 BindRepeating(&WorkList::NumIncompleteWorkItems,
389 Unretained(&work_list)));
390
391 complete.Wait();
392 handle.Join();
393 const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
394 EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
395
396 auto reporter = SetUpReporter(story_name);
397 reporter.AddResult(kMetricWorkThroughput,
398 size_t(num_work_items / job_duration.InMilliseconds()));
399 }
400
401 private:
402 test::TaskEnvironment task_environment;
403 };
404
405 } // namespace
406
TEST_F(JobPerfTest,NoOpWorkNaiveAssignment)407 TEST_F(JobPerfTest, NoOpWorkNaiveAssignment) {
408 RunJobWithNaiveAssignment(kStoryNoOpNaive, 10000000, DoNothing());
409 }
410
TEST_F(JobPerfTest,BusyWaitNaiveAssignment)411 TEST_F(JobPerfTest, BusyWaitNaiveAssignment) {
412 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
413 RunJobWithNaiveAssignment(kStoryBusyWaitNaive, 500000, std::move(callback));
414 }
415
TEST_F(JobPerfTest,NoOpWorkAtomicAssignment)416 TEST_F(JobPerfTest, NoOpWorkAtomicAssignment) {
417 RunJobWithAtomicAssignment(kStoryNoOpAtomic, 10000000, DoNothing());
418 }
419
TEST_F(JobPerfTest,NoOpDisruptedWorkAtomicAssignment)420 TEST_F(JobPerfTest, NoOpDisruptedWorkAtomicAssignment) {
421 RunJobWithAtomicAssignment(kStoryNoOpAtomicDisrupted, 10000000, DoNothing(),
422 true);
423 }
424
TEST_F(JobPerfTest,BusyWaitAtomicAssignment)425 TEST_F(JobPerfTest, BusyWaitAtomicAssignment) {
426 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
427 RunJobWithAtomicAssignment(kStoryBusyWaitAtomic, 500000, std::move(callback));
428 }
429
TEST_F(JobPerfTest,BusyWaitDisruptedWorkAtomicAssignment)430 TEST_F(JobPerfTest, BusyWaitDisruptedWorkAtomicAssignment) {
431 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
432 RunJobWithAtomicAssignment(kStoryBusyWaitAtomicDisrupted, 500000,
433 std::move(callback), true);
434 }
435
TEST_F(JobPerfTest,NoOpWorkDynamicAssignment)436 TEST_F(JobPerfTest, NoOpWorkDynamicAssignment) {
437 RunJobWithDynamicAssignment(kStoryNoOpDynamic, 10000000, DoNothing());
438 }
439
TEST_F(JobPerfTest,NoOpDisruptedWorkDynamicAssignment)440 TEST_F(JobPerfTest, NoOpDisruptedWorkDynamicAssignment) {
441 RunJobWithDynamicAssignment(kStoryNoOpDynamicDisrupted, 10000000, DoNothing(),
442 true);
443 }
444
TEST_F(JobPerfTest,BusyWaitWorkDynamicAssignment)445 TEST_F(JobPerfTest, BusyWaitWorkDynamicAssignment) {
446 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
447 RunJobWithDynamicAssignment(kStoryBusyWaitDynamic, 500000,
448 std::move(callback));
449 }
450
TEST_F(JobPerfTest,BusyWaitDisruptedWorkDynamicAssignment)451 TEST_F(JobPerfTest, BusyWaitDisruptedWorkDynamicAssignment) {
452 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
453 RunJobWithDynamicAssignment(kStoryBusyWaitDynamicDisrupted, 500000,
454 std::move(callback), true);
455 }
456
TEST_F(JobPerfTest,NoOpWorkLoopAround)457 TEST_F(JobPerfTest, NoOpWorkLoopAround) {
458 RunJobWithLoopAround(kStoryNoOpLoopAround, 10000000, DoNothing());
459 }
460
TEST_F(JobPerfTest,NoOpDisruptedWorkLoopAround)461 TEST_F(JobPerfTest, NoOpDisruptedWorkLoopAround) {
462 RunJobWithLoopAround(kStoryNoOpLoopAroundDisrupted, 10000000, DoNothing(),
463 true);
464 }
465
TEST_F(JobPerfTest,BusyWaitWorkLoopAround)466 TEST_F(JobPerfTest, BusyWaitWorkLoopAround) {
467 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
468 RunJobWithLoopAround(kStoryBusyWaitLoopAround, 500000, std::move(callback));
469 }
470
TEST_F(JobPerfTest,BusyWaitDisruptedWorkLoopAround)471 TEST_F(JobPerfTest, BusyWaitDisruptedWorkLoopAround) {
472 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
473 RunJobWithLoopAround(kStoryBusyWaitLoopAroundDisrupted, 500000,
474 std::move(callback), true);
475 }
476
477 } // namespace base
478