xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/test_suite/tests/timer_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <atomic>
17 #include <chrono>
18 #include <cstdint>
19 #include <memory>
20 #include <random>
21 #include <thread>
22 #include <utility>
23 #include <vector>
24 
25 #include "absl/base/thread_annotations.h"
26 #include "absl/functional/any_invocable.h"
27 #include "absl/functional/bind_front.h"
28 #include "absl/functional/function_ref.h"
29 #include "absl/time/clock.h"
30 #include "absl/time/time.h"
31 #include "gmock/gmock.h"
32 #include "gtest/gtest.h"
33 
34 #include <grpc/event_engine/event_engine.h>
35 #include <grpc/support/log.h>
36 
37 #include "src/core/lib/event_engine/time_util.h"
38 #include "src/core/lib/gprpp/sync.h"
39 #include "test/core/event_engine/test_suite/event_engine_test_framework.h"
40 
41 using ::testing::ElementsAre;
42 using namespace std::chrono_literals;
43 
44 namespace grpc_event_engine {
45 namespace experimental {
46 
InitTimerTests()47 void InitTimerTests() {}
48 
49 }  // namespace experimental
50 }  // namespace grpc_event_engine
51 
52 class EventEngineTimerTest : public EventEngineTest {
53  public:
54   void ScheduleCheckCB(std::chrono::steady_clock::time_point when,
55                        std::atomic<int>* call_count,
56                        std::atomic<int>* fail_count, int total_expected);
57 
58  protected:
WaitForSignalled(absl::Duration timeout)59   void WaitForSignalled(absl::Duration timeout)
60       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
61     absl::Time deadline = absl::Now() + timeout;
62     while (!signaled_) {
63       timeout = deadline - absl::Now();
64       ASSERT_GT(timeout, absl::ZeroDuration());
65       cv_.WaitWithTimeout(&mu_, timeout);
66     }
67   }
68 
69   grpc_core::Mutex mu_;
70   grpc_core::CondVar cv_;
71   bool signaled_ ABSL_GUARDED_BY(mu_) = false;
72 };
73 
TEST_F(EventEngineTimerTest,ImmediateCallbackIsExecutedQuickly)74 TEST_F(EventEngineTimerTest, ImmediateCallbackIsExecutedQuickly) {
75   auto engine = this->NewEventEngine();
76   grpc_core::MutexLock lock(&mu_);
77   engine->RunAfter(0ms, [this]() {
78     grpc_core::MutexLock lock(&mu_);
79     signaled_ = true;
80     cv_.Signal();
81   });
82   WaitForSignalled(absl::Seconds(5));
83 }
84 
TEST_F(EventEngineTimerTest,SupportsCancellation)85 TEST_F(EventEngineTimerTest, SupportsCancellation) {
86   auto engine = this->NewEventEngine();
87   auto handle = engine->RunAfter(24h, []() {});
88   ASSERT_TRUE(engine->Cancel(handle));
89 }
90 
TEST_F(EventEngineTimerTest,CancelledCallbackIsNotExecuted)91 TEST_F(EventEngineTimerTest, CancelledCallbackIsNotExecuted) {
92   {
93     auto engine = this->NewEventEngine();
94     auto handle = engine->RunAfter(24h, [this]() {
95       grpc_core::MutexLock lock(&mu_);
96       signaled_ = true;
97     });
98     ASSERT_TRUE(engine->Cancel(handle));
99   }
100   // The engine is deleted, and all closures should have been flushed
101   grpc_core::MutexLock lock(&mu_);
102   ASSERT_FALSE(signaled_);
103 }
104 
TEST_F(EventEngineTimerTest,TimersRespectScheduleOrdering)105 TEST_F(EventEngineTimerTest, TimersRespectScheduleOrdering) {
106   // Note: this is a brittle test if the first call to `RunAfter` takes longer
107   // than the second callback's wait time.
108   std::vector<uint8_t> ordered;
109   uint8_t count = 0;
110   grpc_core::MutexLock lock(&mu_);
111   {
112     auto engine = this->NewEventEngine();
113     engine->RunAfter(3000ms, [&]() {
114       grpc_core::MutexLock lock(&mu_);
115       ordered.push_back(2);
116       ++count;
117       cv_.Signal();
118     });
119     engine->RunAfter(0ms, [&]() {
120       grpc_core::MutexLock lock(&mu_);
121       ordered.push_back(1);
122       ++count;
123       cv_.Signal();
124     });
125     // Ensure both callbacks have run.
126     while (count != 2) {
127       cv_.WaitWithTimeout(&mu_, absl::Milliseconds(8));
128     }
129   }
130   // The engine is deleted, and all closures should have been flushed beforehand
131   ASSERT_THAT(ordered, ElementsAre(1, 2));
132 }
133 
TEST_F(EventEngineTimerTest,CancellingExecutedCallbackIsNoopAndReturnsFalse)134 TEST_F(EventEngineTimerTest, CancellingExecutedCallbackIsNoopAndReturnsFalse) {
135   auto engine = this->NewEventEngine();
136   grpc_core::MutexLock lock(&mu_);
137   auto handle = engine->RunAfter(0ms, [this]() {
138     grpc_core::MutexLock lock(&mu_);
139     signaled_ = true;
140     cv_.Signal();
141   });
142   WaitForSignalled(absl::Seconds(10));
143   // The callback has run, and now we'll try to cancel it.
144   ASSERT_FALSE(engine->Cancel(handle));
145 }
146 
ScheduleCheckCB(std::chrono::steady_clock::time_point when,std::atomic<int> * call_count,std::atomic<int> * fail_count,int total_expected)147 void EventEngineTimerTest::ScheduleCheckCB(
148     std::chrono::steady_clock::time_point when, std::atomic<int>* call_count,
149     std::atomic<int>* fail_count, int total_expected) {
150   auto now = std::chrono::steady_clock::now();
151   EXPECT_LE(when, now) << "Callback was run "
152                        << grpc_event_engine::experimental::Milliseconds(when -
153                                                                         now)
154                        << " ms too early: ";
155   if (when > now) ++(*fail_count);
156   if (++(*call_count) == total_expected) {
157     grpc_core::MutexLock lock(&mu_);
158     signaled_ = true;
159     cv_.Signal();
160   }
161 }
162 
TEST_F(EventEngineTimerTest,StressTestTimersNotCalledBeforeScheduled)163 TEST_F(EventEngineTimerTest, StressTestTimersNotCalledBeforeScheduled) {
164   auto engine = this->NewEventEngine();
165   constexpr int thread_count = 10;
166   constexpr int call_count_per_thread = 100;
167   constexpr float timeout_min_seconds = 1;
168   constexpr float timeout_max_seconds = 10;
169   std::atomic<int> call_count{0};
170   std::atomic<int> failed_call_count{0};
171   std::vector<std::thread> threads;
172   threads.reserve(thread_count);
173   for (int thread_n = 0; thread_n < thread_count; ++thread_n) {
174     threads.emplace_back([&]() {
175       std::random_device rd;
176       std::mt19937 gen(rd());
177       std::uniform_real_distribution<> dis(timeout_min_seconds,
178                                            timeout_max_seconds);
179       for (int call_n = 0; call_n < call_count_per_thread; ++call_n) {
180         const auto dur = static_cast<int64_t>(1e9 * dis(gen));
181         auto deadline =
182             std::chrono::steady_clock::now() + std::chrono::nanoseconds(dur);
183         engine->RunAfter(
184             std::chrono::nanoseconds(dur),
185             absl::bind_front(&EventEngineTimerTest::ScheduleCheckCB, this,
186                              deadline, &call_count, &failed_call_count,
187                              thread_count * call_count_per_thread));
188       }
189     });
190   }
191   for (auto& t : threads) {
192     t.join();
193   }
194   grpc_core::MutexLock lock(&mu_);
195   // to protect against spurious wakeups.
196   while (!signaled_) {
197     cv_.Wait(&mu_);
198   }
199   if (failed_call_count.load() != 0) {
200     gpr_log(GPR_DEBUG, "failed timer count: %d of %d", failed_call_count.load(),
201             thread_count * call_count);
202   }
203   ASSERT_EQ(0, failed_call_count.load());
204 }
205 
206 // Common implementation for the Run and RunAfter test variants below
207 // Calls run_fn multiple times, and will get stuck if the implementation does a
208 // blocking inline execution of the closure. This test will timeout on failure.
ImmediateRunTestInternal(absl::FunctionRef<void (absl::AnyInvocable<void ()>)> run_fn,grpc_core::Mutex & mu,grpc_core::CondVar & cv)209 void ImmediateRunTestInternal(
210     absl::FunctionRef<void(absl::AnyInvocable<void()>)> run_fn,
211     grpc_core::Mutex& mu, grpc_core::CondVar& cv) {
212   constexpr int num_concurrent_runs = 32;
213   constexpr int num_iterations = 100;
214   constexpr absl::Duration run_timeout = absl::Seconds(60);
215   std::atomic<int> waiters{0};
216   std::atomic<int> execution_count{0};
217   auto cb = [&mu, &cv, &run_timeout, &waiters, &execution_count]() {
218     waiters.fetch_add(1);
219     grpc_core::MutexLock lock(&mu);
220     EXPECT_FALSE(cv.WaitWithTimeout(&mu, run_timeout))
221         << "callback timed out waiting.";
222     execution_count.fetch_add(1);
223   };
224   for (int i = 0; i < num_iterations; i++) {
225     waiters.store(0);
226     execution_count.store(0);
227     for (int run = 0; run < num_concurrent_runs; run++) {
228       run_fn(cb);
229     }
230     while (waiters.load() != num_concurrent_runs) {
231       absl::SleepFor(absl::Milliseconds(33));
232     }
233     cv.SignalAll();
234     while (execution_count.load() != num_concurrent_runs) {
235       absl::SleepFor(absl::Milliseconds(33));
236     }
237   }
238 }
239 
240 // TODO(hork): re-enabled after either I've implemented XFAIL, or fixed the
241 // ThreadPool's behavior under backlog.
TEST_F(EventEngineTimerTest,DISABLED_RunDoesNotImmediatelyExecuteInTheSameThread)242 TEST_F(EventEngineTimerTest,
243        DISABLED_RunDoesNotImmediatelyExecuteInTheSameThread) {
244   auto engine = this->NewEventEngine();
245   ImmediateRunTestInternal(
246       [&engine](absl::AnyInvocable<void()> cb) { engine->Run(std::move(cb)); },
247       mu_, cv_);
248 }
249 
250 // TODO(hork): re-enabled after either I've implemented XFAIL, or fixed the
251 // ThreadPool's behavior under backlog.
TEST_F(EventEngineTimerTest,DISABLED_RunAfterDoesNotImmediatelyExecuteInTheSameThread)252 TEST_F(EventEngineTimerTest,
253        DISABLED_RunAfterDoesNotImmediatelyExecuteInTheSameThread) {
254   auto engine = this->NewEventEngine();
255   ImmediateRunTestInternal(
256       [&engine](absl::AnyInvocable<void()> cb) {
257         engine->RunAfter(std::chrono::seconds(0), std::move(cb));
258       },
259       mu_, cv_);
260 }
261