1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <stddef.h>
16
17 #include <algorithm>
18 #include <atomic>
19 #include <chrono>
20 #include <initializer_list>
21 #include <memory>
22 #include <random>
23 #include <thread>
24 #include <utility>
25 #include <vector>
26
27 #include "absl/base/thread_annotations.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/types/optional.h"
30 #include "gtest/gtest.h"
31
32 #include <grpc/event_engine/memory_allocator.h>
33 #include <grpc/event_engine/memory_request.h>
34 #include <grpc/support/log.h>
35
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/resource_quota/memory_quota.h"
39 #include "test/core/util/test_config.h"
40
41 namespace grpc_core {
42
43 namespace {
44 class StressTest {
45 public:
46 // Create a stress test with some size.
StressTest(size_t num_quotas,size_t num_allocators)47 StressTest(size_t num_quotas, size_t num_allocators) {
48 for (size_t i = 0; i < num_quotas; ++i) {
49 quotas_.emplace_back(absl::StrCat("quota[", i, "]"));
50 }
51 std::random_device g;
52 std::uniform_int_distribution<size_t> dist(0, num_quotas - 1);
53 for (size_t i = 0; i < num_allocators; ++i) {
54 allocators_.emplace_back(quotas_[dist(g)].CreateMemoryOwner());
55 }
56 }
57
~StressTest()58 ~StressTest() {
59 ExecCtx exec_ctx;
60 allocators_.clear();
61 quotas_.clear();
62 }
63
64 // Run the thread for some period of time.
Run(int seconds)65 void Run(int seconds) {
66 std::vector<std::thread> threads;
67
68 // And another few threads constantly resizing quotas.
69 threads.reserve(2 + allocators_.size());
70 for (int i = 0; i < 2; i++) threads.push_back(Run(Resizer));
71
72 // For each (allocator, pass), start a thread continuously allocating from
73 // that allocator. Whenever the first allocation is made, schedule a
74 // reclaimer for that pass.
75 for (size_t i = 0; i < allocators_.size(); i++) {
76 auto* allocator = &allocators_[i];
77 for (ReclamationPass pass :
78 {ReclamationPass::kBenign, ReclamationPass::kIdle,
79 ReclamationPass::kDestructive}) {
80 threads.push_back(Run([allocator, pass](StatePtr st) mutable {
81 if (st->RememberReservation(
82 allocator->MakeReservation(st->RandomRequest()))) {
83 allocator->PostReclaimer(
84 pass, [st](absl::optional<ReclamationSweep> sweep) {
85 if (!sweep.has_value()) return;
86 st->ForgetReservations();
87 });
88 }
89 }));
90 }
91 }
92
93 // All threads started, wait for the alloted time.
94 std::this_thread::sleep_for(std::chrono::seconds(seconds));
95
96 // Toggle the completion bit, and then wait for the threads.
97 done_.store(true, std::memory_order_relaxed);
98 while (!threads.empty()) {
99 threads.back().join();
100 threads.pop_back();
101 }
102 }
103
104 private:
105 // Per-thread state.
106 // Not everything is used on every thread, but it's not terrible having the
107 // extra state around and it does simplify things somewhat.
108 class State {
109 public:
State(StressTest * test)110 explicit State(StressTest* test)
111 : test_(test),
112 quotas_distribution_(0, test_->quotas_.size() - 1),
113 allocators_distribution_(0, test_->allocators_.size() - 1),
114 size_distribution_(1, 4 * 1024 * 1024),
115 quota_size_distribution_(1024 * 1024, size_t{8} * 1024 * 1024 * 1024),
116 choose_variable_size_(1, 100) {}
117
118 // Choose a random quota, and return an owned pointer to it.
119 // Not thread-safe, only callable from the owning thread.
RandomQuota()120 MemoryQuota* RandomQuota() {
121 return &test_->quotas_[quotas_distribution_(g_)];
122 }
123
124 // Choose a random allocator, and return a borrowed pointer to it.
125 // Not thread-safe, only callable from the owning thread.
RandomAllocator()126 MemoryOwner* RandomAllocator() {
127 return &test_->allocators_[allocators_distribution_(g_)];
128 }
129
130 // Random memory request size - 1% of allocations are chosen to be variable
131 // sized - the rest are fixed (since variable sized create some contention
132 // problems between allocator threads of different passes on the same
133 // allocator).
134 // Not thread-safe, only callable from the owning thread.
RandomRequest()135 MemoryRequest RandomRequest() {
136 size_t a = size_distribution_(g_);
137 if (choose_variable_size_(g_) == 1) {
138 size_t b = size_distribution_(g_);
139 return MemoryRequest(std::min(a, b), std::max(a, b));
140 }
141 return MemoryRequest(a);
142 }
143
144 // Choose a new size for a backing quota.
145 // Not thread-safe, only callable from the owning thread.
RandomQuotaSize()146 size_t RandomQuotaSize() { return quota_size_distribution_(g_); }
147
148 // Remember a reservation, return true if it's the first remembered since
149 // the last reclamation.
150 // Thread-safe.
RememberReservation(MemoryAllocator::Reservation reservation)151 bool RememberReservation(MemoryAllocator::Reservation reservation)
152 ABSL_LOCKS_EXCLUDED(mu_) {
153 MutexLock lock(&mu_);
154 bool was_empty = reservations_.empty();
155 reservations_.emplace_back(std::move(reservation));
156 return was_empty;
157 }
158
159 // Return all reservations made until this moment, so that they can be
160 // dropped.
ForgetReservations()161 std::vector<MemoryAllocator::Reservation> ForgetReservations()
162 ABSL_LOCKS_EXCLUDED(mu_) {
163 MutexLock lock(&mu_);
164 return std::move(reservations_);
165 }
166
167 private:
168 // Owning test.
169 StressTest* const test_;
170 // Random number generator.
171 std::mt19937 g_{std::random_device()()};
172 // Distribution to choose a quota.
173 std::uniform_int_distribution<size_t> quotas_distribution_;
174 // Distribution to choose an allocator.
175 std::uniform_int_distribution<size_t> allocators_distribution_;
176 // Distribution to choose an allocation size.
177 std::uniform_int_distribution<size_t> size_distribution_;
178 // Distribution to choose a quota size.
179 std::uniform_int_distribution<size_t> quota_size_distribution_;
180 // Distribution to choose whether to make a variable-sized allocation.
181 std::uniform_int_distribution<size_t> choose_variable_size_;
182
183 // Mutex to protect the reservation list.
184 Mutex mu_;
185 // Reservations remembered by this thread.
186 std::vector<MemoryAllocator::Reservation> reservations_
187 ABSL_GUARDED_BY(mu_);
188 };
189 // Type alias since we always pass around these shared pointers.
190 using StatePtr = std::shared_ptr<State>;
191
192 // Choose one allocator, resize it to a randomly chosen size.
Resizer(StatePtr st)193 static void Resizer(StatePtr st) {
194 auto* quota = st->RandomQuota();
195 size_t size = st->RandomQuotaSize();
196 quota->SetSize(size);
197 }
198
199 // Create a thread that repeatedly runs a function until the test is done.
200 // We create one instance of State that we pass as a StatePtr to said
201 // function as the current overall state for this thread.
202 // Monitors done_ to see when we should stop.
203 // Ensures there's an ExecCtx for each iteration of the loop.
204 template <typename Fn>
Run(Fn fn)205 std::thread Run(Fn fn) {
206 return std::thread([this, fn]() mutable {
207 auto state = std::make_shared<State>(this);
208 while (!done_.load(std::memory_order_relaxed)) {
209 ExecCtx exec_ctx;
210 fn(state);
211 }
212 });
213 }
214
215 // Flag for when the test is completed.
216 std::atomic<bool> done_{false};
217
218 // Memory quotas to test against. We build this up at construction time, but
219 // then don't resize, so we can load from it continuously from all of the
220 // threads.
221 std::vector<MemoryQuota> quotas_;
222 // Memory allocators to test against. Similarly, built at construction time,
223 // and then the shape of this vector is not changed.
224 std::vector<MemoryOwner> allocators_;
225 };
226 } // namespace
227
228 } // namespace grpc_core
229
TEST(MemoryQuotaStressTest,MainTest)230 TEST(MemoryQuotaStressTest, MainTest) {
231 if (sizeof(void*) != 8) {
232 gpr_log(
233 GPR_ERROR,
234 "This test assumes 64-bit processors in the values it uses for sizes. "
235 "Since this test is mostly aimed at TSAN coverage, and that's mostly "
236 "platform independent, we simply skip this test in 32-bit builds.");
237 GTEST_SKIP();
238 }
239 grpc_core::StressTest(16, 20).Run(8);
240 }
241
main(int argc,char ** argv)242 int main(int argc, char** argv) {
243 grpc::testing::TestEnvironment give_me_a_name(&argc, argv);
244 ::testing::InitGoogleTest(&argc, argv);
245 return RUN_ALL_TESTS();
246 }
247