1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <algorithm>
16 #include <atomic>
17 #include <chrono>
18 #include <cstdint>
19 #include <memory>
20 #include <random>
21 #include <thread>
22 #include <utility>
23 #include <vector>
24
25 #include "absl/base/thread_annotations.h"
26 #include "absl/functional/any_invocable.h"
27 #include "absl/functional/bind_front.h"
28 #include "absl/functional/function_ref.h"
29 #include "absl/time/clock.h"
30 #include "absl/time/time.h"
31 #include "gmock/gmock.h"
32 #include "gtest/gtest.h"
33
34 #include <grpc/event_engine/event_engine.h>
35 #include <grpc/support/log.h>
36
37 #include "src/core/lib/event_engine/time_util.h"
38 #include "src/core/lib/gprpp/sync.h"
39 #include "test/core/event_engine/test_suite/event_engine_test_framework.h"
40
41 using ::testing::ElementsAre;
42 using namespace std::chrono_literals;
43
44 namespace grpc_event_engine {
45 namespace experimental {
46
InitTimerTests()47 void InitTimerTests() {}
48
49 } // namespace experimental
50 } // namespace grpc_event_engine
51
52 class EventEngineTimerTest : public EventEngineTest {
53 public:
54 void ScheduleCheckCB(std::chrono::steady_clock::time_point when,
55 std::atomic<int>* call_count,
56 std::atomic<int>* fail_count, int total_expected);
57
58 protected:
WaitForSignalled(absl::Duration timeout)59 void WaitForSignalled(absl::Duration timeout)
60 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
61 absl::Time deadline = absl::Now() + timeout;
62 while (!signaled_) {
63 timeout = deadline - absl::Now();
64 ASSERT_GT(timeout, absl::ZeroDuration());
65 cv_.WaitWithTimeout(&mu_, timeout);
66 }
67 }
68
69 grpc_core::Mutex mu_;
70 grpc_core::CondVar cv_;
71 bool signaled_ ABSL_GUARDED_BY(mu_) = false;
72 };
73
TEST_F(EventEngineTimerTest,ImmediateCallbackIsExecutedQuickly)74 TEST_F(EventEngineTimerTest, ImmediateCallbackIsExecutedQuickly) {
75 auto engine = this->NewEventEngine();
76 grpc_core::MutexLock lock(&mu_);
77 engine->RunAfter(0ms, [this]() {
78 grpc_core::MutexLock lock(&mu_);
79 signaled_ = true;
80 cv_.Signal();
81 });
82 WaitForSignalled(absl::Seconds(5));
83 }
84
TEST_F(EventEngineTimerTest,SupportsCancellation)85 TEST_F(EventEngineTimerTest, SupportsCancellation) {
86 auto engine = this->NewEventEngine();
87 auto handle = engine->RunAfter(24h, []() {});
88 ASSERT_TRUE(engine->Cancel(handle));
89 }
90
TEST_F(EventEngineTimerTest,CancelledCallbackIsNotExecuted)91 TEST_F(EventEngineTimerTest, CancelledCallbackIsNotExecuted) {
92 {
93 auto engine = this->NewEventEngine();
94 auto handle = engine->RunAfter(24h, [this]() {
95 grpc_core::MutexLock lock(&mu_);
96 signaled_ = true;
97 });
98 ASSERT_TRUE(engine->Cancel(handle));
99 }
100 // The engine is deleted, and all closures should have been flushed
101 grpc_core::MutexLock lock(&mu_);
102 ASSERT_FALSE(signaled_);
103 }
104
TEST_F(EventEngineTimerTest,TimersRespectScheduleOrdering)105 TEST_F(EventEngineTimerTest, TimersRespectScheduleOrdering) {
106 // Note: this is a brittle test if the first call to `RunAfter` takes longer
107 // than the second callback's wait time.
108 std::vector<uint8_t> ordered;
109 uint8_t count = 0;
110 grpc_core::MutexLock lock(&mu_);
111 {
112 auto engine = this->NewEventEngine();
113 engine->RunAfter(3000ms, [&]() {
114 grpc_core::MutexLock lock(&mu_);
115 ordered.push_back(2);
116 ++count;
117 cv_.Signal();
118 });
119 engine->RunAfter(0ms, [&]() {
120 grpc_core::MutexLock lock(&mu_);
121 ordered.push_back(1);
122 ++count;
123 cv_.Signal();
124 });
125 // Ensure both callbacks have run.
126 while (count != 2) {
127 cv_.WaitWithTimeout(&mu_, absl::Milliseconds(8));
128 }
129 }
130 // The engine is deleted, and all closures should have been flushed beforehand
131 ASSERT_THAT(ordered, ElementsAre(1, 2));
132 }
133
TEST_F(EventEngineTimerTest,CancellingExecutedCallbackIsNoopAndReturnsFalse)134 TEST_F(EventEngineTimerTest, CancellingExecutedCallbackIsNoopAndReturnsFalse) {
135 auto engine = this->NewEventEngine();
136 grpc_core::MutexLock lock(&mu_);
137 auto handle = engine->RunAfter(0ms, [this]() {
138 grpc_core::MutexLock lock(&mu_);
139 signaled_ = true;
140 cv_.Signal();
141 });
142 WaitForSignalled(absl::Seconds(10));
143 // The callback has run, and now we'll try to cancel it.
144 ASSERT_FALSE(engine->Cancel(handle));
145 }
146
ScheduleCheckCB(std::chrono::steady_clock::time_point when,std::atomic<int> * call_count,std::atomic<int> * fail_count,int total_expected)147 void EventEngineTimerTest::ScheduleCheckCB(
148 std::chrono::steady_clock::time_point when, std::atomic<int>* call_count,
149 std::atomic<int>* fail_count, int total_expected) {
150 auto now = std::chrono::steady_clock::now();
151 EXPECT_LE(when, now) << "Callback was run "
152 << grpc_event_engine::experimental::Milliseconds(when -
153 now)
154 << " ms too early: ";
155 if (when > now) ++(*fail_count);
156 if (++(*call_count) == total_expected) {
157 grpc_core::MutexLock lock(&mu_);
158 signaled_ = true;
159 cv_.Signal();
160 }
161 }
162
TEST_F(EventEngineTimerTest,StressTestTimersNotCalledBeforeScheduled)163 TEST_F(EventEngineTimerTest, StressTestTimersNotCalledBeforeScheduled) {
164 auto engine = this->NewEventEngine();
165 constexpr int thread_count = 10;
166 constexpr int call_count_per_thread = 100;
167 constexpr float timeout_min_seconds = 1;
168 constexpr float timeout_max_seconds = 10;
169 std::atomic<int> call_count{0};
170 std::atomic<int> failed_call_count{0};
171 std::vector<std::thread> threads;
172 threads.reserve(thread_count);
173 for (int thread_n = 0; thread_n < thread_count; ++thread_n) {
174 threads.emplace_back([&]() {
175 std::random_device rd;
176 std::mt19937 gen(rd());
177 std::uniform_real_distribution<> dis(timeout_min_seconds,
178 timeout_max_seconds);
179 for (int call_n = 0; call_n < call_count_per_thread; ++call_n) {
180 const auto dur = static_cast<int64_t>(1e9 * dis(gen));
181 auto deadline =
182 std::chrono::steady_clock::now() + std::chrono::nanoseconds(dur);
183 engine->RunAfter(
184 std::chrono::nanoseconds(dur),
185 absl::bind_front(&EventEngineTimerTest::ScheduleCheckCB, this,
186 deadline, &call_count, &failed_call_count,
187 thread_count * call_count_per_thread));
188 }
189 });
190 }
191 for (auto& t : threads) {
192 t.join();
193 }
194 grpc_core::MutexLock lock(&mu_);
195 // to protect against spurious wakeups.
196 while (!signaled_) {
197 cv_.Wait(&mu_);
198 }
199 if (failed_call_count.load() != 0) {
200 gpr_log(GPR_DEBUG, "failed timer count: %d of %d", failed_call_count.load(),
201 thread_count * call_count);
202 }
203 ASSERT_EQ(0, failed_call_count.load());
204 }
205
206 // Common implementation for the Run and RunAfter test variants below
207 // Calls run_fn multiple times, and will get stuck if the implementation does a
208 // blocking inline execution of the closure. This test will timeout on failure.
ImmediateRunTestInternal(absl::FunctionRef<void (absl::AnyInvocable<void ()>)> run_fn,grpc_core::Mutex & mu,grpc_core::CondVar & cv)209 void ImmediateRunTestInternal(
210 absl::FunctionRef<void(absl::AnyInvocable<void()>)> run_fn,
211 grpc_core::Mutex& mu, grpc_core::CondVar& cv) {
212 constexpr int num_concurrent_runs = 32;
213 constexpr int num_iterations = 100;
214 constexpr absl::Duration run_timeout = absl::Seconds(60);
215 std::atomic<int> waiters{0};
216 std::atomic<int> execution_count{0};
217 auto cb = [&mu, &cv, &run_timeout, &waiters, &execution_count]() {
218 waiters.fetch_add(1);
219 grpc_core::MutexLock lock(&mu);
220 EXPECT_FALSE(cv.WaitWithTimeout(&mu, run_timeout))
221 << "callback timed out waiting.";
222 execution_count.fetch_add(1);
223 };
224 for (int i = 0; i < num_iterations; i++) {
225 waiters.store(0);
226 execution_count.store(0);
227 for (int run = 0; run < num_concurrent_runs; run++) {
228 run_fn(cb);
229 }
230 while (waiters.load() != num_concurrent_runs) {
231 absl::SleepFor(absl::Milliseconds(33));
232 }
233 cv.SignalAll();
234 while (execution_count.load() != num_concurrent_runs) {
235 absl::SleepFor(absl::Milliseconds(33));
236 }
237 }
238 }
239
240 // TODO(hork): re-enabled after either I've implemented XFAIL, or fixed the
241 // ThreadPool's behavior under backlog.
TEST_F(EventEngineTimerTest,DISABLED_RunDoesNotImmediatelyExecuteInTheSameThread)242 TEST_F(EventEngineTimerTest,
243 DISABLED_RunDoesNotImmediatelyExecuteInTheSameThread) {
244 auto engine = this->NewEventEngine();
245 ImmediateRunTestInternal(
246 [&engine](absl::AnyInvocable<void()> cb) { engine->Run(std::move(cb)); },
247 mu_, cv_);
248 }
249
250 // TODO(hork): re-enabled after either I've implemented XFAIL, or fixed the
251 // ThreadPool's behavior under backlog.
TEST_F(EventEngineTimerTest,DISABLED_RunAfterDoesNotImmediatelyExecuteInTheSameThread)252 TEST_F(EventEngineTimerTest,
253 DISABLED_RunAfterDoesNotImmediatelyExecuteInTheSameThread) {
254 auto engine = this->NewEventEngine();
255 ImmediateRunTestInternal(
256 [&engine](absl::AnyInvocable<void()> cb) {
257 engine->RunAfter(std::chrono::seconds(0), std::move(cb));
258 },
259 mu_, cv_);
260 }
261