xref: /aosp_15_r20/external/grpc-grpc/test/core/resource_quota/memory_quota_stress_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <stddef.h>
16 
17 #include <algorithm>
18 #include <atomic>
19 #include <chrono>
20 #include <initializer_list>
21 #include <memory>
22 #include <random>
23 #include <thread>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/base/thread_annotations.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/types/optional.h"
30 #include "gtest/gtest.h"
31 
32 #include <grpc/event_engine/memory_allocator.h>
33 #include <grpc/event_engine/memory_request.h>
34 #include <grpc/support/log.h>
35 
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/resource_quota/memory_quota.h"
39 #include "test/core/util/test_config.h"
40 
41 namespace grpc_core {
42 
43 namespace {
44 class StressTest {
45  public:
46   // Create a stress test with some size.
StressTest(size_t num_quotas,size_t num_allocators)47   StressTest(size_t num_quotas, size_t num_allocators) {
48     for (size_t i = 0; i < num_quotas; ++i) {
49       quotas_.emplace_back(absl::StrCat("quota[", i, "]"));
50     }
51     std::random_device g;
52     std::uniform_int_distribution<size_t> dist(0, num_quotas - 1);
53     for (size_t i = 0; i < num_allocators; ++i) {
54       allocators_.emplace_back(quotas_[dist(g)].CreateMemoryOwner());
55     }
56   }
57 
~StressTest()58   ~StressTest() {
59     ExecCtx exec_ctx;
60     allocators_.clear();
61     quotas_.clear();
62   }
63 
64   // Run the thread for some period of time.
Run(int seconds)65   void Run(int seconds) {
66     std::vector<std::thread> threads;
67 
68     // And another few threads constantly resizing quotas.
69     threads.reserve(2 + allocators_.size());
70     for (int i = 0; i < 2; i++) threads.push_back(Run(Resizer));
71 
72     // For each (allocator, pass), start a thread continuously allocating from
73     // that allocator. Whenever the first allocation is made, schedule a
74     // reclaimer for that pass.
75     for (size_t i = 0; i < allocators_.size(); i++) {
76       auto* allocator = &allocators_[i];
77       for (ReclamationPass pass :
78            {ReclamationPass::kBenign, ReclamationPass::kIdle,
79             ReclamationPass::kDestructive}) {
80         threads.push_back(Run([allocator, pass](StatePtr st) mutable {
81           if (st->RememberReservation(
82                   allocator->MakeReservation(st->RandomRequest()))) {
83             allocator->PostReclaimer(
84                 pass, [st](absl::optional<ReclamationSweep> sweep) {
85                   if (!sweep.has_value()) return;
86                   st->ForgetReservations();
87                 });
88           }
89         }));
90       }
91     }
92 
93     // All threads started, wait for the alloted time.
94     std::this_thread::sleep_for(std::chrono::seconds(seconds));
95 
96     // Toggle the completion bit, and then wait for the threads.
97     done_.store(true, std::memory_order_relaxed);
98     while (!threads.empty()) {
99       threads.back().join();
100       threads.pop_back();
101     }
102   }
103 
104  private:
105   // Per-thread state.
106   // Not everything is used on every thread, but it's not terrible having the
107   // extra state around and it does simplify things somewhat.
108   class State {
109    public:
State(StressTest * test)110     explicit State(StressTest* test)
111         : test_(test),
112           quotas_distribution_(0, test_->quotas_.size() - 1),
113           allocators_distribution_(0, test_->allocators_.size() - 1),
114           size_distribution_(1, 4 * 1024 * 1024),
115           quota_size_distribution_(1024 * 1024, size_t{8} * 1024 * 1024 * 1024),
116           choose_variable_size_(1, 100) {}
117 
118     // Choose a random quota, and return an owned pointer to it.
119     // Not thread-safe, only callable from the owning thread.
RandomQuota()120     MemoryQuota* RandomQuota() {
121       return &test_->quotas_[quotas_distribution_(g_)];
122     }
123 
124     // Choose a random allocator, and return a borrowed pointer to it.
125     // Not thread-safe, only callable from the owning thread.
RandomAllocator()126     MemoryOwner* RandomAllocator() {
127       return &test_->allocators_[allocators_distribution_(g_)];
128     }
129 
130     // Random memory request size - 1% of allocations are chosen to be variable
131     // sized - the rest are fixed (since variable sized create some contention
132     // problems between allocator threads of different passes on the same
133     // allocator).
134     // Not thread-safe, only callable from the owning thread.
RandomRequest()135     MemoryRequest RandomRequest() {
136       size_t a = size_distribution_(g_);
137       if (choose_variable_size_(g_) == 1) {
138         size_t b = size_distribution_(g_);
139         return MemoryRequest(std::min(a, b), std::max(a, b));
140       }
141       return MemoryRequest(a);
142     }
143 
144     // Choose a new size for a backing quota.
145     // Not thread-safe, only callable from the owning thread.
RandomQuotaSize()146     size_t RandomQuotaSize() { return quota_size_distribution_(g_); }
147 
148     // Remember a reservation, return true if it's the first remembered since
149     // the last reclamation.
150     // Thread-safe.
RememberReservation(MemoryAllocator::Reservation reservation)151     bool RememberReservation(MemoryAllocator::Reservation reservation)
152         ABSL_LOCKS_EXCLUDED(mu_) {
153       MutexLock lock(&mu_);
154       bool was_empty = reservations_.empty();
155       reservations_.emplace_back(std::move(reservation));
156       return was_empty;
157     }
158 
159     // Return all reservations made until this moment, so that they can be
160     // dropped.
ForgetReservations()161     std::vector<MemoryAllocator::Reservation> ForgetReservations()
162         ABSL_LOCKS_EXCLUDED(mu_) {
163       MutexLock lock(&mu_);
164       return std::move(reservations_);
165     }
166 
167    private:
168     // Owning test.
169     StressTest* const test_;
170     // Random number generator.
171     std::mt19937 g_{std::random_device()()};
172     // Distribution to choose a quota.
173     std::uniform_int_distribution<size_t> quotas_distribution_;
174     // Distribution to choose an allocator.
175     std::uniform_int_distribution<size_t> allocators_distribution_;
176     // Distribution to choose an allocation size.
177     std::uniform_int_distribution<size_t> size_distribution_;
178     // Distribution to choose a quota size.
179     std::uniform_int_distribution<size_t> quota_size_distribution_;
180     // Distribution to choose whether to make a variable-sized allocation.
181     std::uniform_int_distribution<size_t> choose_variable_size_;
182 
183     // Mutex to protect the reservation list.
184     Mutex mu_;
185     // Reservations remembered by this thread.
186     std::vector<MemoryAllocator::Reservation> reservations_
187         ABSL_GUARDED_BY(mu_);
188   };
189   // Type alias since we always pass around these shared pointers.
190   using StatePtr = std::shared_ptr<State>;
191 
192   // Choose one allocator, resize it to a randomly chosen size.
Resizer(StatePtr st)193   static void Resizer(StatePtr st) {
194     auto* quota = st->RandomQuota();
195     size_t size = st->RandomQuotaSize();
196     quota->SetSize(size);
197   }
198 
199   // Create a thread that repeatedly runs a function until the test is done.
200   // We create one instance of State that we pass as a StatePtr to said
201   // function as the current overall state for this thread.
202   // Monitors done_ to see when we should stop.
203   // Ensures there's an ExecCtx for each iteration of the loop.
204   template <typename Fn>
Run(Fn fn)205   std::thread Run(Fn fn) {
206     return std::thread([this, fn]() mutable {
207       auto state = std::make_shared<State>(this);
208       while (!done_.load(std::memory_order_relaxed)) {
209         ExecCtx exec_ctx;
210         fn(state);
211       }
212     });
213   }
214 
215   // Flag for when the test is completed.
216   std::atomic<bool> done_{false};
217 
218   // Memory quotas to test against. We build this up at construction time, but
219   // then don't resize, so we can load from it continuously from all of the
220   // threads.
221   std::vector<MemoryQuota> quotas_;
222   // Memory allocators to test against. Similarly, built at construction time,
223   // and then the shape of this vector is not changed.
224   std::vector<MemoryOwner> allocators_;
225 };
226 }  // namespace
227 
228 }  // namespace grpc_core
229 
TEST(MemoryQuotaStressTest,MainTest)230 TEST(MemoryQuotaStressTest, MainTest) {
231   if (sizeof(void*) != 8) {
232     gpr_log(
233         GPR_ERROR,
234         "This test assumes 64-bit processors in the values it uses for sizes. "
235         "Since this test is mostly aimed at TSAN coverage, and that's mostly "
236         "platform independent, we simply skip this test in 32-bit builds.");
237     GTEST_SKIP();
238   }
239   grpc_core::StressTest(16, 20).Run(8);
240 }
241 
main(int argc,char ** argv)242 int main(int argc, char** argv) {
243   grpc::testing::TestEnvironment give_me_a_name(&argc, argv);
244   ::testing::InitGoogleTest(&argc, argv);
245   return RUN_ALL_TESTS();
246 }
247