1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/lib/resource_quota/memory_quota.h"
18
19 #include <inttypes.h>
20
21 #include <algorithm>
22 #include <atomic>
23 #include <tuple>
24
25 #include "absl/status/status.h"
26 #include "absl/strings/str_cat.h"
27
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/core/lib/gprpp/mpscq.h"
31 #include "src/core/lib/promise/detail/basic_seq.h"
32 #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
33 #include "src/core/lib/promise/loop.h"
34 #include "src/core/lib/promise/map.h"
35 #include "src/core/lib/promise/race.h"
36 #include "src/core/lib/promise/seq.h"
37 #include "src/core/lib/resource_quota/trace.h"
38
39 namespace grpc_core {
40
41 // Maximum number of bytes an allocator will request from a quota in one step.
42 // Larger allocations than this will require multiple allocation requests.
43 static constexpr size_t kMaxReplenishBytes = 1024 * 1024;
44
45 // Minimum number of bytes an allocator will request from a quota in one step.
46 static constexpr size_t kMinReplenishBytes = 4096;
47
48 //
49 // Reclaimer
50 //
51
~ReclamationSweep()52 ReclamationSweep::~ReclamationSweep() {
53 if (memory_quota_ != nullptr) {
54 memory_quota_->FinishReclamation(sweep_token_, std::move(waker_));
55 }
56 }
57
58 //
59 // ReclaimerQueue
60 //
61
62 struct ReclaimerQueue::QueuedNode
63 : public MultiProducerSingleConsumerQueue::Node {
QueuedNodegrpc_core::ReclaimerQueue::QueuedNode64 explicit QueuedNode(RefCountedPtr<Handle> reclaimer_handle)
65 : reclaimer_handle(std::move(reclaimer_handle)) {}
66 RefCountedPtr<Handle> reclaimer_handle;
67 };
68
69 struct ReclaimerQueue::State {
70 Mutex reader_mu;
71 MultiProducerSingleConsumerQueue queue; // reader_mu must be held to pop
72 Waker waker ABSL_GUARDED_BY(reader_mu);
73
~Stategrpc_core::ReclaimerQueue::State74 ~State() {
75 bool empty = false;
76 do {
77 delete static_cast<QueuedNode*>(queue.PopAndCheckEnd(&empty));
78 } while (!empty);
79 }
80 };
81
Orphan()82 void ReclaimerQueue::Handle::Orphan() {
83 if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
84 sweep->RunAndDelete(absl::nullopt);
85 }
86 Unref();
87 }
88
Run(ReclamationSweep reclamation_sweep)89 void ReclaimerQueue::Handle::Run(ReclamationSweep reclamation_sweep) {
90 if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
91 sweep->RunAndDelete(std::move(reclamation_sweep));
92 }
93 }
94
Requeue(ReclaimerQueue * new_queue)95 bool ReclaimerQueue::Handle::Requeue(ReclaimerQueue* new_queue) {
96 if (sweep_.load(std::memory_order_relaxed)) {
97 new_queue->Enqueue(Ref());
98 return true;
99 } else {
100 return false;
101 }
102 }
103
MarkCancelled()104 void ReclaimerQueue::Handle::Sweep::MarkCancelled() {
105 // When we cancel a reclaimer we rotate the elements of the queue once -
106 // taking one non-cancelled node from the start, and placing it on the end.
107 // This ensures that we don't suffer from head of line blocking whereby a
108 // non-cancelled reclaimer at the head of the queue, in the absence of memory
109 // pressure, prevents the remainder of the queue from being cleaned up.
110 MutexLock lock(&state_->reader_mu);
111 while (true) {
112 bool empty = false;
113 std::unique_ptr<QueuedNode> node(
114 static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
115 if (node == nullptr) break;
116 if (node->reclaimer_handle->sweep_.load(std::memory_order_relaxed) !=
117 nullptr) {
118 state_->queue.Push(node.release());
119 break;
120 }
121 }
122 }
123
ReclaimerQueue()124 ReclaimerQueue::ReclaimerQueue() : state_(std::make_shared<State>()) {}
125
126 ReclaimerQueue::~ReclaimerQueue() = default;
127
Enqueue(RefCountedPtr<Handle> handle)128 void ReclaimerQueue::Enqueue(RefCountedPtr<Handle> handle) {
129 if (state_->queue.Push(new QueuedNode(std::move(handle)))) {
130 MutexLock lock(&state_->reader_mu);
131 state_->waker.Wakeup();
132 }
133 }
134
PollNext()135 Poll<RefCountedPtr<ReclaimerQueue::Handle>> ReclaimerQueue::PollNext() {
136 MutexLock lock(&state_->reader_mu);
137 bool empty = false;
138 // Try to pull from the queue.
139 std::unique_ptr<QueuedNode> node(
140 static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
141 // If we get something, great.
142 if (node != nullptr) return std::move(node->reclaimer_handle);
143 if (!empty) {
144 // If we don't, but the queue is probably not empty, schedule an immediate
145 // repoll.
146 Activity::current()->ForceImmediateRepoll();
147 } else {
148 // Otherwise, schedule a wakeup for whenever something is pushed.
149 state_->waker = Activity::current()->MakeNonOwningWaker();
150 }
151 return Pending{};
152 }
153
154 //
155 // GrpcMemoryAllocatorImpl
156 //
157
GrpcMemoryAllocatorImpl(std::shared_ptr<BasicMemoryQuota> memory_quota,std::string name)158 GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl(
159 std::shared_ptr<BasicMemoryQuota> memory_quota, std::string name)
160 : memory_quota_(memory_quota), name_(std::move(name)) {
161 memory_quota_->Take(
162 /*allocator=*/this, taken_bytes_);
163 memory_quota_->AddNewAllocator(this);
164 }
165
~GrpcMemoryAllocatorImpl()166 GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() {
167 GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) +
168 sizeof(GrpcMemoryAllocatorImpl) ==
169 taken_bytes_.load(std::memory_order_relaxed));
170 memory_quota_->Return(taken_bytes_);
171 }
172
Shutdown()173 void GrpcMemoryAllocatorImpl::Shutdown() {
174 memory_quota_->RemoveAllocator(this);
175 std::shared_ptr<BasicMemoryQuota> memory_quota;
176 OrphanablePtr<ReclaimerQueue::Handle>
177 reclamation_handles[kNumReclamationPasses];
178 {
179 MutexLock lock(&reclaimer_mu_);
180 GPR_ASSERT(!shutdown_);
181 shutdown_ = true;
182 memory_quota = memory_quota_;
183 for (size_t i = 0; i < kNumReclamationPasses; i++) {
184 reclamation_handles[i] = std::exchange(reclamation_handles_[i], nullptr);
185 }
186 }
187 }
188
Reserve(MemoryRequest request)189 size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) {
190 // Validate request - performed here so we don't bloat the generated code with
191 // inlined asserts.
192 GPR_ASSERT(request.min() <= request.max());
193 GPR_ASSERT(request.max() <= MemoryRequest::max_allowed_size());
194 size_t old_free = free_bytes_.load(std::memory_order_relaxed);
195
196 while (true) {
197 // Attempt to reserve memory from our pool.
198 auto reservation = TryReserve(request);
199 if (reservation.has_value()) {
200 size_t new_free = free_bytes_.load(std::memory_order_relaxed);
201 memory_quota_->MaybeMoveAllocator(this, old_free, new_free);
202 return *reservation;
203 }
204
205 // If that failed, grab more from the quota and retry.
206 Replenish();
207 }
208 }
209
TryReserve(MemoryRequest request)210 absl::optional<size_t> GrpcMemoryAllocatorImpl::TryReserve(
211 MemoryRequest request) {
212 // How much memory should we request? (see the scaling below)
213 size_t scaled_size_over_min = request.max() - request.min();
214 // Scale the request down according to memory pressure if we have that
215 // flexibility.
216 if (scaled_size_over_min != 0) {
217 const auto pressure_info = memory_quota_->GetPressureInfo();
218 double pressure = pressure_info.pressure_control_value;
219 size_t max_recommended_allocation_size =
220 pressure_info.max_recommended_allocation_size;
221 // Reduce allocation size proportional to the pressure > 80% usage.
222 if (pressure > 0.8) {
223 scaled_size_over_min =
224 std::min(scaled_size_over_min,
225 static_cast<size_t>((request.max() - request.min()) *
226 (1.0 - pressure) / 0.2));
227 }
228 if (max_recommended_allocation_size < request.min()) {
229 scaled_size_over_min = 0;
230 } else if (request.min() + scaled_size_over_min >
231 max_recommended_allocation_size) {
232 scaled_size_over_min = max_recommended_allocation_size - request.min();
233 }
234 }
235
236 // How much do we want to reserve?
237 const size_t reserve = request.min() + scaled_size_over_min;
238 // See how many bytes are available.
239 size_t available = free_bytes_.load(std::memory_order_acquire);
240 while (true) {
241 // Does the current free pool satisfy the request?
242 if (available < reserve) {
243 return {};
244 }
245 // Try to reserve the requested amount.
246 // If the amount of free memory changed through this loop, then available
247 // will be set to the new value and we'll repeat.
248 if (free_bytes_.compare_exchange_weak(available, available - reserve,
249 std::memory_order_acq_rel,
250 std::memory_order_acquire)) {
251 return reserve;
252 }
253 }
254 }
255
MaybeDonateBack()256 void GrpcMemoryAllocatorImpl::MaybeDonateBack() {
257 size_t free = free_bytes_.load(std::memory_order_relaxed);
258 while (free > 0) {
259 size_t ret = 0;
260 if (!IsUnconstrainedMaxQuotaBufferSizeEnabled() &&
261 free > kMaxQuotaBufferSize / 2) {
262 ret = std::max(ret, free - kMaxQuotaBufferSize / 2);
263 }
264 ret = std::max(ret, free > 8192 ? free / 2 : free);
265 const size_t new_free = free - ret;
266 if (free_bytes_.compare_exchange_weak(free, new_free,
267 std::memory_order_acq_rel,
268 std::memory_order_acquire)) {
269 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
270 gpr_log(GPR_INFO, "[%p|%s] Early return %" PRIdPTR " bytes", this,
271 name_.c_str(), ret);
272 }
273 GPR_ASSERT(taken_bytes_.fetch_sub(ret, std::memory_order_relaxed) >= ret);
274 memory_quota_->Return(ret);
275 return;
276 }
277 }
278 }
279
Replenish()280 void GrpcMemoryAllocatorImpl::Replenish() {
281 // Attempt a fairly low rate exponential growth request size, bounded between
282 // some reasonable limits declared at top of file.
283 auto amount = Clamp(taken_bytes_.load(std::memory_order_relaxed) / 3,
284 kMinReplenishBytes, kMaxReplenishBytes);
285 // Take the requested amount from the quota.
286 memory_quota_->Take(
287 /*allocator=*/this, amount);
288 // Record that we've taken it.
289 taken_bytes_.fetch_add(amount, std::memory_order_relaxed);
290 // Add the taken amount to the free pool.
291 free_bytes_.fetch_add(amount, std::memory_order_acq_rel);
292 }
293
294 //
295 // BasicMemoryQuota
296 //
297
298 class BasicMemoryQuota::WaitForSweepPromise {
299 public:
WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,uint64_t token)300 WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,
301 uint64_t token)
302 : memory_quota_(std::move(memory_quota)), token_(token) {}
303
operator ()()304 Poll<Empty> operator()() {
305 if (memory_quota_->reclamation_counter_.load(std::memory_order_relaxed) !=
306 token_) {
307 return Empty{};
308 } else {
309 return Pending{};
310 }
311 }
312
313 private:
314 std::shared_ptr<BasicMemoryQuota> memory_quota_;
315 uint64_t token_;
316 };
317
Start()318 void BasicMemoryQuota::Start() {
319 auto self = shared_from_this();
320
321 // Reclamation loop:
322 // basically, wait until we are in overcommit (free_bytes_ < 0), and then:
323 // while (free_bytes_ < 0) reclaim_memory()
324 // ... and repeat
325 auto reclamation_loop = Loop(Seq(
326 [self]() -> Poll<int> {
327 // If there's free memory we no longer need to reclaim memory!
328 if (self->free_bytes_.load(std::memory_order_acquire) > 0) {
329 return Pending{};
330 }
331 return 0;
332 },
333 [self]() {
334 // Race biases to the first thing that completes... so this will
335 // choose the highest priority/least destructive thing to do that's
336 // available.
337 auto annotate = [](const char* name) {
338 return [name](RefCountedPtr<ReclaimerQueue::Handle> f) {
339 return std::make_tuple(name, std::move(f));
340 };
341 };
342 return Race(Map(self->reclaimers_[0].Next(), annotate("benign")),
343 Map(self->reclaimers_[1].Next(), annotate("idle")),
344 Map(self->reclaimers_[2].Next(), annotate("destructive")));
345 },
346 [self](
347 std::tuple<const char*, RefCountedPtr<ReclaimerQueue::Handle>> arg) {
348 auto reclaimer = std::move(std::get<1>(arg));
349 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
350 double free = std::max(intptr_t{0}, self->free_bytes_.load());
351 size_t quota_size = self->quota_size_.load();
352 gpr_log(GPR_INFO,
353 "RQ: %s perform %s reclamation. Available free bytes: %f, "
354 "total quota_size: %zu",
355 self->name_.c_str(), std::get<0>(arg), free, quota_size);
356 }
357 // One of the reclaimer queues gave us a way to get back memory.
358 // Call the reclaimer with a token that contains enough to wake us
359 // up again.
360 const uint64_t token =
361 self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) +
362 1;
363 reclaimer->Run(ReclamationSweep(
364 self, token, Activity::current()->MakeNonOwningWaker()));
365 // Return a promise that will wait for our barrier. This will be
366 // awoken by the token above being destroyed. So, once that token is
367 // destroyed, we'll be able to proceed.
368 return WaitForSweepPromise(self, token);
369 },
370 []() -> LoopCtl<absl::Status> {
371 // Continue the loop!
372 return Continue{};
373 }));
374
375 reclaimer_activity_ =
376 MakeActivity(std::move(reclamation_loop), ExecCtxWakeupScheduler(),
377 [](absl::Status status) {
378 GPR_ASSERT(status.code() == absl::StatusCode::kCancelled);
379 });
380 }
381
Stop()382 void BasicMemoryQuota::Stop() { reclaimer_activity_.reset(); }
383
SetSize(size_t new_size)384 void BasicMemoryQuota::SetSize(size_t new_size) {
385 size_t old_size = quota_size_.exchange(new_size, std::memory_order_relaxed);
386 if (old_size < new_size) {
387 // We're growing the quota.
388 Return(new_size - old_size);
389 } else {
390 // We're shrinking the quota.
391 Take(/*allocator=*/nullptr, old_size - new_size);
392 }
393 }
394
Take(GrpcMemoryAllocatorImpl * allocator,size_t amount)395 void BasicMemoryQuota::Take(GrpcMemoryAllocatorImpl* allocator, size_t amount) {
396 // If there's a request for nothing, then do nothing!
397 if (amount == 0) return;
398 GPR_DEBUG_ASSERT(amount <= std::numeric_limits<intptr_t>::max());
399 // Grab memory from the quota.
400 auto prior = free_bytes_.fetch_sub(amount, std::memory_order_acq_rel);
401 // If we push into overcommit, awake the reclaimer.
402 if (prior >= 0 && prior < static_cast<intptr_t>(amount)) {
403 if (reclaimer_activity_ != nullptr) reclaimer_activity_->ForceWakeup();
404 }
405
406 if (IsFreeLargeAllocatorEnabled()) {
407 if (allocator == nullptr) return;
408 GrpcMemoryAllocatorImpl* chosen_allocator = nullptr;
409 // Use calling allocator's shard index to choose shard.
410 auto& shard = big_allocators_.shards[allocator->IncrementShardIndex() %
411 big_allocators_.shards.size()];
412
413 if (shard.shard_mu.TryLock()) {
414 if (!shard.allocators.empty()) {
415 chosen_allocator = *shard.allocators.begin();
416 }
417 shard.shard_mu.Unlock();
418 }
419
420 if (chosen_allocator != nullptr) {
421 chosen_allocator->ReturnFree();
422 }
423 }
424 }
425
FinishReclamation(uint64_t token,Waker waker)426 void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) {
427 uint64_t current = reclamation_counter_.load(std::memory_order_relaxed);
428 if (current != token) return;
429 if (reclamation_counter_.compare_exchange_strong(current, current + 1,
430 std::memory_order_relaxed,
431 std::memory_order_relaxed)) {
432 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
433 double free = std::max(intptr_t{0}, free_bytes_.load());
434 size_t quota_size = quota_size_.load();
435 gpr_log(GPR_INFO,
436 "RQ: %s reclamation complete. Available free bytes: %f, "
437 "total quota_size: %zu",
438 name_.c_str(), free, quota_size);
439 }
440 waker.Wakeup();
441 }
442 }
443
Return(size_t amount)444 void BasicMemoryQuota::Return(size_t amount) {
445 free_bytes_.fetch_add(amount, std::memory_order_relaxed);
446 }
447
AddNewAllocator(GrpcMemoryAllocatorImpl * allocator)448 void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) {
449 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
450 gpr_log(GPR_INFO, "Adding allocator %p", allocator);
451 }
452
453 AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator);
454
455 {
456 MutexLock l(&shard.shard_mu);
457 shard.allocators.emplace(allocator);
458 }
459 }
460
RemoveAllocator(GrpcMemoryAllocatorImpl * allocator)461 void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) {
462 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
463 gpr_log(GPR_INFO, "Removing allocator %p", allocator);
464 }
465
466 AllocatorBucket::Shard& small_shard =
467 small_allocators_.SelectShard(allocator);
468
469 {
470 MutexLock l(&small_shard.shard_mu);
471 if (small_shard.allocators.erase(allocator) == 1) {
472 return;
473 }
474 }
475
476 AllocatorBucket::Shard& big_shard = big_allocators_.SelectShard(allocator);
477
478 {
479 MutexLock l(&big_shard.shard_mu);
480 big_shard.allocators.erase(allocator);
481 }
482 }
483
MaybeMoveAllocator(GrpcMemoryAllocatorImpl * allocator,size_t old_free_bytes,size_t new_free_bytes)484 void BasicMemoryQuota::MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator,
485 size_t old_free_bytes,
486 size_t new_free_bytes) {
487 while (true) {
488 if (new_free_bytes < kSmallAllocatorThreshold) {
489 // Still in small bucket. No move.
490 if (old_free_bytes < kSmallAllocatorThreshold) return;
491 MaybeMoveAllocatorBigToSmall(allocator);
492 } else if (new_free_bytes > kBigAllocatorThreshold) {
493 // Still in big bucket. No move.
494 if (old_free_bytes > kBigAllocatorThreshold) return;
495 MaybeMoveAllocatorSmallToBig(allocator);
496 } else {
497 // Somewhere between thresholds. No move.
498 return;
499 }
500
501 // Loop to make sure move is eventually stable.
502 old_free_bytes = new_free_bytes;
503 new_free_bytes = allocator->GetFreeBytes();
504 }
505 }
506
MaybeMoveAllocatorBigToSmall(GrpcMemoryAllocatorImpl * allocator)507 void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall(
508 GrpcMemoryAllocatorImpl* allocator) {
509 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
510 gpr_log(GPR_INFO, "Moving allocator %p to small", allocator);
511 }
512
513 AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator);
514
515 {
516 MutexLock l(&old_shard.shard_mu);
517 if (old_shard.allocators.erase(allocator) == 0) return;
518 }
519
520 AllocatorBucket::Shard& new_shard = small_allocators_.SelectShard(allocator);
521
522 {
523 MutexLock l(&new_shard.shard_mu);
524 new_shard.allocators.emplace(allocator);
525 }
526 }
527
MaybeMoveAllocatorSmallToBig(GrpcMemoryAllocatorImpl * allocator)528 void BasicMemoryQuota::MaybeMoveAllocatorSmallToBig(
529 GrpcMemoryAllocatorImpl* allocator) {
530 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
531 gpr_log(GPR_INFO, "Moving allocator %p to big", allocator);
532 }
533
534 AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator);
535
536 {
537 MutexLock l(&old_shard.shard_mu);
538 if (old_shard.allocators.erase(allocator) == 0) return;
539 }
540
541 AllocatorBucket::Shard& new_shard = big_allocators_.SelectShard(allocator);
542
543 {
544 MutexLock l(&new_shard.shard_mu);
545 new_shard.allocators.emplace(allocator);
546 }
547 }
548
GetPressureInfo()549 BasicMemoryQuota::PressureInfo BasicMemoryQuota::GetPressureInfo() {
550 double free = free_bytes_.load();
551 if (free < 0) free = 0;
552 size_t quota_size = quota_size_.load();
553 double size = quota_size;
554 if (size < 1) return PressureInfo{1, 1, 1};
555 PressureInfo pressure_info;
556 pressure_info.instantaneous_pressure = std::max(0.0, (size - free) / size);
557 if (IsMemoryPressureControllerEnabled()) {
558 pressure_info.pressure_control_value =
559 pressure_tracker_.AddSampleAndGetControlValue(
560 pressure_info.instantaneous_pressure);
561 } else {
562 pressure_info.pressure_control_value =
563 std::min(pressure_info.instantaneous_pressure, 1.0);
564 }
565 pressure_info.max_recommended_allocation_size = quota_size / 16;
566 return pressure_info;
567 }
568
569 //
570 // PressureTracker
571 //
572
573 namespace memory_quota_detail {
574
Update(double error)575 double PressureController::Update(double error) {
576 bool is_low = error < 0;
577 bool was_low = std::exchange(last_was_low_, is_low);
578 double new_control; // leave unset to compiler can note bad branches
579 if (is_low && was_low) {
580 // Memory pressure is too low this round, and was last round too.
581 // If we have reached the min reporting value last time, then we will report
582 // the same value again this time and can start to increase the ticks_same_
583 // counter.
584 if (last_control_ == min_) {
585 ticks_same_++;
586 if (ticks_same_ >= max_ticks_same_) {
587 // If it's been the same for too long, reduce the min reported value
588 // down towards zero.
589 min_ /= 2.0;
590 ticks_same_ = 0;
591 }
592 }
593 // Target the min reporting value.
594 new_control = min_;
595 } else if (!is_low && !was_low) {
596 // Memory pressure is high, and was high previously.
597 ticks_same_++;
598 if (ticks_same_ >= max_ticks_same_) {
599 // It's been high for too long, increase the max reporting value up
600 // towards 1.0.
601 max_ = (1.0 + max_) / 2.0;
602 ticks_same_ = 0;
603 }
604 // Target the max reporting value.
605 new_control = max_;
606 } else if (is_low) {
607 // Memory pressure is low, but was high last round.
608 // Target the min reporting value, but first update it to be closer to the
609 // current max (that we've been reporting lately).
610 // In this way the min will gradually climb towards the max as we find a
611 // stable point.
612 // If this is too high, then we'll eventually move it back towards zero.
613 ticks_same_ = 0;
614 min_ = (min_ + max_) / 2.0;
615 new_control = min_;
616 } else {
617 // Memory pressure is high, but was low last round.
618 // Target the max reporting value, but first update it to be closer to the
619 // last reported value.
620 // The first switchover will have last_control_ being 0, and max_ being 2,
621 // so we'll immediately choose 1.0 which will tend to really slow down
622 // progress.
623 // If we end up targetting too low, we'll eventually move it back towards
624 // 1.0 after max_ticks_same_ ticks.
625 ticks_same_ = 0;
626 max_ = (last_control_ + max_) / 2.0;
627 new_control = max_;
628 }
629 // If the control value is decreasing we do it slowly. This avoids rapid
630 // oscillations.
631 // (If we want a control value that's higher than the last one we snap
632 // immediately because it's likely that memory pressure is growing unchecked).
633 if (new_control < last_control_) {
634 new_control =
635 std::max(new_control, last_control_ - max_reduction_per_tick_ / 1000.0);
636 }
637 last_control_ = new_control;
638 return new_control;
639 }
640
DebugString() const641 std::string PressureController::DebugString() const {
642 return absl::StrCat(last_was_low_ ? "low" : "high", " min=", min_,
643 " max=", max_, " ticks=", ticks_same_,
644 " last_control=", last_control_);
645 }
646
AddSampleAndGetControlValue(double sample)647 double PressureTracker::AddSampleAndGetControlValue(double sample) {
648 static const double kSetPoint = 0.95;
649
650 double max_so_far = max_this_round_.load(std::memory_order_relaxed);
651 if (sample > max_so_far) {
652 max_this_round_.compare_exchange_weak(max_so_far, sample,
653 std::memory_order_relaxed,
654 std::memory_order_relaxed);
655 }
656 // If memory pressure is almost done, immediately hit the brakes and report
657 // full memory usage.
658 if (sample >= 0.99) {
659 report_.store(1.0, std::memory_order_relaxed);
660 }
661 update_.Tick([&](Duration) {
662 // Reset the round tracker with the new sample.
663 const double current_estimate =
664 max_this_round_.exchange(sample, std::memory_order_relaxed);
665 double report;
666 if (current_estimate > 0.99) {
667 // Under very high memory pressure we... just max things out.
668 report = controller_.Update(1e99);
669 } else {
670 report = controller_.Update(current_estimate - kSetPoint);
671 }
672 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
673 gpr_log(GPR_INFO, "RQ: pressure:%lf report:%lf controller:%s",
674 current_estimate, report, controller_.DebugString().c_str());
675 }
676 report_.store(report, std::memory_order_relaxed);
677 });
678 return report_.load(std::memory_order_relaxed);
679 }
680
681 } // namespace memory_quota_detail
682
683 //
684 // MemoryQuota
685 //
686
CreateMemoryAllocator(absl::string_view name)687 MemoryAllocator MemoryQuota::CreateMemoryAllocator(absl::string_view name) {
688 auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(
689 memory_quota_, absl::StrCat(memory_quota_->name(), "/allocator/", name));
690 return MemoryAllocator(std::move(impl));
691 }
692
CreateMemoryOwner(absl::string_view name)693 MemoryOwner MemoryQuota::CreateMemoryOwner(absl::string_view name) {
694 auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(
695 memory_quota_, absl::StrCat(memory_quota_->name(), "/owner/", name));
696 return MemoryOwner(std::move(impl));
697 }
698
699 } // namespace grpc_core
700