xref: /aosp_15_r20/external/cronet/base/task/job_perftest.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2020 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <stddef.h>
6 
7 #include <atomic>
8 #include <optional>
9 #include <utility>
10 #include <vector>
11 
12 #include "base/containers/queue.h"
13 #include "base/containers/stack.h"
14 #include "base/functional/callback_helpers.h"
15 #include "base/synchronization/lock.h"
16 #include "base/task/post_job.h"
17 #include "base/task/thread_pool.h"
18 #include "base/test/bind.h"
19 #include "base/test/task_environment.h"
20 #include "testing/gtest/include/gtest/gtest.h"
21 #include "testing/perf/perf_result_reporter.h"
22 
23 namespace base {
24 
25 namespace {
26 
27 // The perftest implements the following assignment strategy:
28 // - Naive: See RunJobWithNaiveAssignment().
29 // - Dynamic: See RunJobWithDynamicAssignment().
30 // - Loop around: See RunJobWithLoopAround().
31 // The following test setups exists for different strategies, although
32 // not every combination is performed:
33 // - No-op: Work items are no-op tasks.
34 // - No-op + disrupted: 10 disruptive tasks are posted every 1ms.
35 // - Busy wait: Work items are busy wait for 5us.
36 // - Busy wait + disrupted
37 
38 constexpr char kMetricPrefixJob[] = "Job.";
39 constexpr char kMetricWorkThroughput[] = "work_throughput";
40 constexpr char kStoryNoOpNaive[] = "noop_naive";
41 constexpr char kStoryBusyWaitNaive[] = "busy_wait_naive";
42 constexpr char kStoryNoOpAtomic[] = "noop_atomic";
43 constexpr char kStoryNoOpAtomicDisrupted[] = "noop_atomic_disrupted";
44 constexpr char kStoryBusyWaitAtomic[] = "busy_wait_atomic";
45 constexpr char kStoryBusyWaitAtomicDisrupted[] = "busy_wait_atomic_disrupted";
46 constexpr char kStoryNoOpDynamic[] = "noop_dynamic";
47 constexpr char kStoryNoOpDynamicDisrupted[] = "noop_dynamic_disrupted";
48 constexpr char kStoryBusyWaitDynamic[] = "busy_wait_dynamic";
49 constexpr char kStoryBusyWaitDynamicDisrupted[] = "busy_wait_dynamic_disrupted";
50 constexpr char kStoryNoOpLoopAround[] = "noop_loop_around";
51 constexpr char kStoryNoOpLoopAroundDisrupted[] = "noop_loop_around_disrupted";
52 constexpr char kStoryBusyWaitLoopAround[] = "busy_wait_loop_around";
53 constexpr char kStoryBusyWaitLoopAroundDisrupted[] =
54     "busy_wait_loop_around_disrupted";
55 
SetUpReporter(const std::string & story_name)56 perf_test::PerfResultReporter SetUpReporter(const std::string& story_name) {
57   perf_test::PerfResultReporter reporter(kMetricPrefixJob, story_name);
58   reporter.RegisterImportantMetric(kMetricWorkThroughput, "tasks/ms");
59   return reporter;
60 }
61 
62 // A thread-safe data structure that generates heuristic starting points in a
63 // range to process items in parallel.
64 // Note: we could expose this atomic-binary-search-index-generator in
65 // //base/util if it's useful for real-world use cases.
66 class IndexGenerator {
67  public:
IndexGenerator(size_t size)68   explicit IndexGenerator(size_t size) : size_(size) {
69     AutoLock auto_lock(lock_);
70     pending_indices_.push(0);
71     ranges_to_split_.push({0, size_});
72   }
73 
74   IndexGenerator(const IndexGenerator&) = delete;
75   IndexGenerator& operator=(const IndexGenerator&) = delete;
76 
GetNext()77   std::optional<size_t> GetNext() {
78     AutoLock auto_lock(lock_);
79     if (!pending_indices_.empty()) {
80       // Return any pending index first.
81       auto index = pending_indices_.top();
82       pending_indices_.pop();
83       return index;
84     }
85     if (ranges_to_split_.empty())
86       return std::nullopt;
87 
88     // Split the oldest running range in 2 and return the middle index as
89     // starting point.
90     auto range = ranges_to_split_.front();
91     ranges_to_split_.pop();
92     size_t size = range.second - range.first;
93     size_t mid = range.first + size / 2;
94     // Both sides of the range are added to |ranges_to_split_| so they may be
95     // further split if possible.
96     if (mid - range.first > 1)
97       ranges_to_split_.push({range.first, mid});
98     if (range.second - mid > 1)
99       ranges_to_split_.push({mid, range.second});
100     return mid;
101   }
102 
GiveBack(size_t index)103   void GiveBack(size_t index) {
104     AutoLock auto_lock(lock_);
105     // Add |index| to pending indices so GetNext() may return it before anything
106     // else.
107     pending_indices_.push(index);
108   }
109 
110  private:
111   base::Lock lock_;
112   // Pending indices that are ready to be handed out, prioritized over
113   // |pending_ranges_| when non-empty.
114   base::stack<size_t> pending_indices_ GUARDED_BY(lock_);
115   // Pending [start, end] (exclusive) ranges to split and hand out indices from.
116   base::queue<std::pair<size_t, size_t>> ranges_to_split_ GUARDED_BY(lock_);
117   const size_t size_;
118 };
119 
120 struct WorkItem {
121   std::atomic_bool acquire{false};
122 
TryAcquirebase::__anon8b4b63c80111::WorkItem123   bool TryAcquire() {
124     // memory_order_relaxed is sufficient as the WorkItem's state itself hasn't
125     // been modified since the beginning of its associated job. This is only
126     // atomically acquiring the right to work on it.
127     return acquire.exchange(true, std::memory_order_relaxed) == false;
128   }
129 };
130 
131 class WorkList {
132  public:
WorkList(size_t num_work_items,RepeatingCallback<void (size_t)> process_item)133   WorkList(size_t num_work_items, RepeatingCallback<void(size_t)> process_item)
134       : num_incomplete_items_(num_work_items),
135         items_(num_work_items),
136         process_item_(std::move(process_item)) {}
137 
138   WorkList(const WorkList&) = delete;
139   WorkList& operator=(const WorkList&) = delete;
140 
141   // Acquires work item at |index|. Returns true if successful, or false if the
142   // item was already acquired.
TryAcquire(size_t index)143   bool TryAcquire(size_t index) { return items_[index].TryAcquire(); }
144 
145   // Processes work item at |index|. Returns true if there are more work items
146   // to process, or false if all items were processed.
ProcessWorkItem(size_t index)147   bool ProcessWorkItem(size_t index) {
148     process_item_.Run(index);
149     return num_incomplete_items_.fetch_sub(1, std::memory_order_relaxed) > 1;
150   }
151 
NumIncompleteWorkItems(size_t) const152   size_t NumIncompleteWorkItems(size_t /*worker_count*/) const {
153     // memory_order_relaxed is sufficient since this is not synchronized with
154     // other state.
155     return num_incomplete_items_.load(std::memory_order_relaxed);
156   }
157 
NumWorkItems() const158   size_t NumWorkItems() const { return items_.size(); }
159 
160  private:
161   std::atomic_size_t num_incomplete_items_;
162   std::vector<WorkItem> items_;
163   RepeatingCallback<void(size_t)> process_item_;
164 };
165 
BusyWaitCallback(TimeDelta delta)166 RepeatingCallback<void(size_t)> BusyWaitCallback(TimeDelta delta) {
167   return base::BindRepeating(
168       [](base::TimeDelta duration, size_t index) {
169         const base::TimeTicks end_time = base::TimeTicks::Now() + duration;
170         while (base::TimeTicks::Now() < end_time)
171           ;
172       },
173       delta);
174 }
175 
176 // Posts |task_count| no-op tasks every |delay|.
DisruptivePostTasks(size_t task_count,TimeDelta delay)177 void DisruptivePostTasks(size_t task_count, TimeDelta delay) {
178   for (size_t i = 0; i < task_count; ++i) {
179     ThreadPool::PostTask(FROM_HERE, {TaskPriority::USER_BLOCKING}, DoNothing());
180   }
181   ThreadPool::PostDelayedTask(FROM_HERE, {TaskPriority::USER_BLOCKING},
182                               BindOnce(&DisruptivePostTasks, task_count, delay),
183                               delay);
184 }
185 
186 class JobPerfTest : public testing::Test {
187  public:
188   JobPerfTest() = default;
189 
190   JobPerfTest(const JobPerfTest&) = delete;
191   JobPerfTest& operator=(const JobPerfTest&) = delete;
192 
193   // Process |num_work_items| items with |process_item| in parallel. Work is
194   // assigned by having each worker sequentially traversing all items and
195   // acquiring unvisited ones.
RunJobWithNaiveAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item)196   void RunJobWithNaiveAssignment(const std::string& story_name,
197                                  size_t num_work_items,
198                                  RepeatingCallback<void(size_t)> process_item) {
199     WorkList work_list(num_work_items, std::move(process_item));
200 
201     const TimeTicks job_run_start = TimeTicks::Now();
202 
203     WaitableEvent complete;
204     auto handle = PostJob(
205         FROM_HERE, {TaskPriority::USER_VISIBLE},
206         BindRepeating(
207             [](WorkList* work_list, WaitableEvent* complete,
208                JobDelegate* delegate) {
209               for (size_t i = 0; i < work_list->NumWorkItems() &&
210                                  work_list->NumIncompleteWorkItems(0) != 0 &&
211                                  !delegate->ShouldYield();
212                    ++i) {
213                 if (!work_list->TryAcquire(i))
214                   continue;
215                 if (!work_list->ProcessWorkItem(i)) {
216                   complete->Signal();
217                   return;
218                 }
219               }
220             },
221             Unretained(&work_list), Unretained(&complete)),
222         BindRepeating(&WorkList::NumIncompleteWorkItems,
223                       Unretained(&work_list)));
224 
225     complete.Wait();
226     handle.Join();
227     const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
228     EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
229 
230     auto reporter = SetUpReporter(story_name);
231     reporter.AddResult(kMetricWorkThroughput,
232                        size_t(num_work_items / job_duration.InMilliseconds()));
233   }
234 
235   // Process |num_work_items| items with |process_item| in parallel. Work is
236   // assigned by having each worker sequentially traversing all items
237   // synchronized with an atomic variable.
RunJobWithAtomicAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)238   void RunJobWithAtomicAssignment(const std::string& story_name,
239                                   size_t num_work_items,
240                                   RepeatingCallback<void(size_t)> process_item,
241                                   bool disruptive_post_tasks = false) {
242     WorkList work_list(num_work_items, std::move(process_item));
243     std::atomic_size_t index{0};
244 
245     // Post extra tasks to disrupt Job execution and cause workers to yield.
246     if (disruptive_post_tasks)
247       DisruptivePostTasks(10, Milliseconds(1));
248 
249     const TimeTicks job_run_start = TimeTicks::Now();
250 
251     WaitableEvent complete;
252     auto handle = PostJob(
253         FROM_HERE, {TaskPriority::USER_VISIBLE},
254         BindRepeating(
255             [](WorkList* work_list, WaitableEvent* complete,
256                std::atomic_size_t* index, JobDelegate* delegate) {
257               while (!delegate->ShouldYield()) {
258                 const size_t i = index->fetch_add(1, std::memory_order_relaxed);
259                 if (i >= work_list->NumWorkItems() ||
260                     !work_list->ProcessWorkItem(i)) {
261                   complete->Signal();
262                   return;
263                 }
264               }
265             },
266             Unretained(&work_list), Unretained(&complete), Unretained(&index)),
267         BindRepeating(&WorkList::NumIncompleteWorkItems,
268                       Unretained(&work_list)));
269 
270     complete.Wait();
271     handle.Join();
272     const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
273     EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
274 
275     auto reporter = SetUpReporter(story_name);
276     reporter.AddResult(kMetricWorkThroughput,
277                        size_t(num_work_items / job_duration.InMilliseconds()));
278   }
279 
280   // Process |num_work_items| items with |process_item| in parallel. Work is
281   // assigned dynamically having each new worker given a different point far
282   // from other workers until all work is done. This is achieved by recursively
283   // splitting each range that was previously given in half.
RunJobWithDynamicAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)284   void RunJobWithDynamicAssignment(const std::string& story_name,
285                                    size_t num_work_items,
286                                    RepeatingCallback<void(size_t)> process_item,
287                                    bool disruptive_post_tasks = false) {
288     WorkList work_list(num_work_items, std::move(process_item));
289     IndexGenerator generator(num_work_items);
290 
291     // Post extra tasks to disrupt Job execution and cause workers to yield.
292     if (disruptive_post_tasks)
293       DisruptivePostTasks(10, Milliseconds(1));
294 
295     const TimeTicks job_run_start = TimeTicks::Now();
296 
297     WaitableEvent complete;
298     auto handle = PostJob(
299         FROM_HERE, {TaskPriority::USER_VISIBLE},
300         BindRepeating(
301             [](IndexGenerator* generator, WorkList* work_list,
302                WaitableEvent* complete, JobDelegate* delegate) {
303               while (work_list->NumIncompleteWorkItems(0) != 0 &&
304                      !delegate->ShouldYield()) {
305                 std::optional<size_t> index = generator->GetNext();
306                 if (!index)
307                   return;
308                 for (size_t i = *index; i < work_list->NumWorkItems(); ++i) {
309                   if (delegate->ShouldYield()) {
310                     generator->GiveBack(i);
311                     return;
312                   }
313                   if (!work_list->TryAcquire(i)) {
314                     // If this was touched already, get a new starting point.
315                     break;
316                   }
317                   if (!work_list->ProcessWorkItem(i)) {
318                     complete->Signal();
319                     return;
320                   }
321                 }
322               }
323             },
324             Unretained(&generator), Unretained(&work_list),
325             Unretained(&complete)),
326         BindRepeating(&WorkList::NumIncompleteWorkItems,
327                       Unretained(&work_list)));
328 
329     complete.Wait();
330     handle.Join();
331     const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
332     EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
333 
334     auto reporter = SetUpReporter(story_name);
335     reporter.AddResult(kMetricWorkThroughput,
336                        size_t(num_work_items / job_duration.InMilliseconds()));
337   }
338 
339   // Process |num_work_items| items with |process_item| in parallel. Work is
340   // assigned having each new worker given a different starting point far from
341   // other workers and loop over all work items from there. This is achieved by
342   // recursively splitting each range that was previously given in half.
RunJobWithLoopAround(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)343   void RunJobWithLoopAround(const std::string& story_name,
344                             size_t num_work_items,
345                             RepeatingCallback<void(size_t)> process_item,
346                             bool disruptive_post_tasks = false) {
347     WorkList work_list(num_work_items, std::move(process_item));
348     IndexGenerator generator(num_work_items);
349 
350     // Post extra tasks to disrupt Job execution and cause workers to yield.
351     if (disruptive_post_tasks)
352       DisruptivePostTasks(10, Milliseconds(1));
353 
354     const TimeTicks job_run_start = TimeTicks::Now();
355 
356     WaitableEvent complete;
357     auto handle =
358         PostJob(FROM_HERE, {TaskPriority::USER_VISIBLE},
359                 BindRepeating(
360                     [](IndexGenerator* generator, WorkList* work_list,
361                        WaitableEvent* complete, JobDelegate* delegate) {
362                       std::optional<size_t> index = generator->GetNext();
363                       if (!index)
364                         return;
365                       size_t i = *index;
366                       while (true) {
367                         if (delegate->ShouldYield()) {
368                           generator->GiveBack(i);
369                           return;
370                         }
371                         if (!work_list->TryAcquire(i)) {
372                           // If this was touched already, skip.
373                           continue;
374                         }
375                         if (!work_list->ProcessWorkItem(i)) {
376                           // This will cause the loop to exit if there's no work
377                           // left.
378                           complete->Signal();
379                           return;
380                         }
381                         ++i;
382                         if (i == work_list->NumWorkItems())
383                           i = 0;
384                       }
385                     },
386                     Unretained(&generator), Unretained(&work_list),
387                     Unretained(&complete)),
388                 BindRepeating(&WorkList::NumIncompleteWorkItems,
389                               Unretained(&work_list)));
390 
391     complete.Wait();
392     handle.Join();
393     const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
394     EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
395 
396     auto reporter = SetUpReporter(story_name);
397     reporter.AddResult(kMetricWorkThroughput,
398                        size_t(num_work_items / job_duration.InMilliseconds()));
399   }
400 
401  private:
402   test::TaskEnvironment task_environment;
403 };
404 
405 }  // namespace
406 
TEST_F(JobPerfTest,NoOpWorkNaiveAssignment)407 TEST_F(JobPerfTest, NoOpWorkNaiveAssignment) {
408   RunJobWithNaiveAssignment(kStoryNoOpNaive, 10000000, DoNothing());
409 }
410 
TEST_F(JobPerfTest,BusyWaitNaiveAssignment)411 TEST_F(JobPerfTest, BusyWaitNaiveAssignment) {
412   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
413   RunJobWithNaiveAssignment(kStoryBusyWaitNaive, 500000, std::move(callback));
414 }
415 
TEST_F(JobPerfTest,NoOpWorkAtomicAssignment)416 TEST_F(JobPerfTest, NoOpWorkAtomicAssignment) {
417   RunJobWithAtomicAssignment(kStoryNoOpAtomic, 10000000, DoNothing());
418 }
419 
TEST_F(JobPerfTest,NoOpDisruptedWorkAtomicAssignment)420 TEST_F(JobPerfTest, NoOpDisruptedWorkAtomicAssignment) {
421   RunJobWithAtomicAssignment(kStoryNoOpAtomicDisrupted, 10000000, DoNothing(),
422                              true);
423 }
424 
TEST_F(JobPerfTest,BusyWaitAtomicAssignment)425 TEST_F(JobPerfTest, BusyWaitAtomicAssignment) {
426   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
427   RunJobWithAtomicAssignment(kStoryBusyWaitAtomic, 500000, std::move(callback));
428 }
429 
TEST_F(JobPerfTest,BusyWaitDisruptedWorkAtomicAssignment)430 TEST_F(JobPerfTest, BusyWaitDisruptedWorkAtomicAssignment) {
431   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
432   RunJobWithAtomicAssignment(kStoryBusyWaitAtomicDisrupted, 500000,
433                              std::move(callback), true);
434 }
435 
TEST_F(JobPerfTest,NoOpWorkDynamicAssignment)436 TEST_F(JobPerfTest, NoOpWorkDynamicAssignment) {
437   RunJobWithDynamicAssignment(kStoryNoOpDynamic, 10000000, DoNothing());
438 }
439 
TEST_F(JobPerfTest,NoOpDisruptedWorkDynamicAssignment)440 TEST_F(JobPerfTest, NoOpDisruptedWorkDynamicAssignment) {
441   RunJobWithDynamicAssignment(kStoryNoOpDynamicDisrupted, 10000000, DoNothing(),
442                               true);
443 }
444 
TEST_F(JobPerfTest,BusyWaitWorkDynamicAssignment)445 TEST_F(JobPerfTest, BusyWaitWorkDynamicAssignment) {
446   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
447   RunJobWithDynamicAssignment(kStoryBusyWaitDynamic, 500000,
448                               std::move(callback));
449 }
450 
TEST_F(JobPerfTest,BusyWaitDisruptedWorkDynamicAssignment)451 TEST_F(JobPerfTest, BusyWaitDisruptedWorkDynamicAssignment) {
452   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
453   RunJobWithDynamicAssignment(kStoryBusyWaitDynamicDisrupted, 500000,
454                               std::move(callback), true);
455 }
456 
TEST_F(JobPerfTest,NoOpWorkLoopAround)457 TEST_F(JobPerfTest, NoOpWorkLoopAround) {
458   RunJobWithLoopAround(kStoryNoOpLoopAround, 10000000, DoNothing());
459 }
460 
TEST_F(JobPerfTest,NoOpDisruptedWorkLoopAround)461 TEST_F(JobPerfTest, NoOpDisruptedWorkLoopAround) {
462   RunJobWithLoopAround(kStoryNoOpLoopAroundDisrupted, 10000000, DoNothing(),
463                        true);
464 }
465 
TEST_F(JobPerfTest,BusyWaitWorkLoopAround)466 TEST_F(JobPerfTest, BusyWaitWorkLoopAround) {
467   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
468   RunJobWithLoopAround(kStoryBusyWaitLoopAround, 500000, std::move(callback));
469 }
470 
TEST_F(JobPerfTest,BusyWaitDisruptedWorkLoopAround)471 TEST_F(JobPerfTest, BusyWaitDisruptedWorkLoopAround) {
472   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
473   RunJobWithLoopAround(kStoryBusyWaitLoopAroundDisrupted, 500000,
474                        std::move(callback), true);
475 }
476 
477 }  // namespace base
478