/* * Copyright 2018 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fcp/base/scheduler.h" #include #include // for std::rand #include "gmock/gmock.h" #include "gtest/gtest.h" #include "absl/synchronization/blocking_counter.h" #include "fcp/base/monitoring.h" #include "fcp/testing/testing.h" namespace fcp { namespace base { namespace { // NOTE: many of tests below use reference captures in lambdas for locals. // This is sound because the test methods do not return before the thread // pool has become idle (pool->WaitUntilIdle()). // Tests whether scheduled tasks are successfully executed. TEST(ThreadPool, TasksAreExecuted) { auto pool = CreateThreadPoolScheduler(2); bool b1 = false; bool b2 = false; pool->Schedule([&b1]() { b1 = true; }); pool->Schedule([&b2]() { b2 = true; }); pool->WaitUntilIdle(); EXPECT_TRUE(b1); EXPECT_TRUE(b2); } // Tests whether the pool uses actually multiple threads to execute tasks. // The test goal is achieved by blocking in one task until another task // unblocks, which can only work if multiple threads are used. TEST(ThreadPool, ThreadsAreUtilized) { auto pool = CreateThreadPoolScheduler(2); absl::BlockingCounter counter(1); bool b1 = false; bool b2 = false; pool->Schedule([&b1, &counter] { counter.Wait(); b1 = true; }); pool->Schedule([&b2, &counter] { counter.DecrementCount(); b2 = true; }); pool->WaitUntilIdle(); EXPECT_TRUE(b1); EXPECT_TRUE(b2); } TEST(ThreadPool, StressTest) { // A simple stress test where we spawn many threads and let them after // a random wait time increment a counter. static constexpr int kThreads = 32; static constexpr int kIterations = 16; auto pool = CreateThreadPoolScheduler(kThreads); std::atomic atomic_counter{0}; for (auto i = 0; i < kThreads; ++i) { auto task = [&atomic_counter] { for (auto j = 0; j < kIterations; ++j) { absl::SleepFor(absl::Microseconds(std::rand() % 500)); atomic_counter.fetch_add(1); } }; pool->Schedule(task); } pool->WaitUntilIdle(); ASSERT_EQ(atomic_counter, kThreads * kIterations); } TEST(Worker, TasksAreExecutedSequentially) { auto pool = CreateThreadPoolScheduler(3); auto worker = pool->CreateWorker(); absl::Mutex mutex{}; std::vector recorded{}; for (int i = 0; i < 128; i++) { worker->Schedule([&mutex, &recorded, i] { // Expect that no one is holding the mutex (tests for non-overlap). if (mutex.TryLock()) { // Add i to the recorded values (tests for execution in order). recorded.push_back(i); // Idle wait to be sure we don't execute faster than we schedule absl::SleepFor(absl::Milliseconds(50)); mutex.Unlock(); } else { FAIL() << "mutex was unexpectedly hold"; } }); } pool->WaitUntilIdle(); // Verify recorded values. for (int i = 0; i < 128; i++) { ASSERT_EQ(recorded[i], i); } } } // namespace } // namespace base } // namespace fcp