1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/lib/resource_quota/memory_quota.h"
18 
19 #include <inttypes.h>
20 
21 #include <algorithm>
22 #include <atomic>
23 #include <tuple>
24 
25 #include "absl/status/status.h"
26 #include "absl/strings/str_cat.h"
27 
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/core/lib/gprpp/mpscq.h"
31 #include "src/core/lib/promise/detail/basic_seq.h"
32 #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
33 #include "src/core/lib/promise/loop.h"
34 #include "src/core/lib/promise/map.h"
35 #include "src/core/lib/promise/race.h"
36 #include "src/core/lib/promise/seq.h"
37 #include "src/core/lib/resource_quota/trace.h"
38 
39 namespace grpc_core {
40 
41 // Maximum number of bytes an allocator will request from a quota in one step.
42 // Larger allocations than this will require multiple allocation requests.
43 static constexpr size_t kMaxReplenishBytes = 1024 * 1024;
44 
45 // Minimum number of bytes an allocator will request from a quota in one step.
46 static constexpr size_t kMinReplenishBytes = 4096;
47 
48 //
49 // Reclaimer
50 //
51 
~ReclamationSweep()52 ReclamationSweep::~ReclamationSweep() {
53   if (memory_quota_ != nullptr) {
54     memory_quota_->FinishReclamation(sweep_token_, std::move(waker_));
55   }
56 }
57 
58 //
59 // ReclaimerQueue
60 //
61 
62 struct ReclaimerQueue::QueuedNode
63     : public MultiProducerSingleConsumerQueue::Node {
QueuedNodegrpc_core::ReclaimerQueue::QueuedNode64   explicit QueuedNode(RefCountedPtr<Handle> reclaimer_handle)
65       : reclaimer_handle(std::move(reclaimer_handle)) {}
66   RefCountedPtr<Handle> reclaimer_handle;
67 };
68 
69 struct ReclaimerQueue::State {
70   Mutex reader_mu;
71   MultiProducerSingleConsumerQueue queue;  // reader_mu must be held to pop
72   Waker waker ABSL_GUARDED_BY(reader_mu);
73 
~Stategrpc_core::ReclaimerQueue::State74   ~State() {
75     bool empty = false;
76     do {
77       delete static_cast<QueuedNode*>(queue.PopAndCheckEnd(&empty));
78     } while (!empty);
79   }
80 };
81 
Orphan()82 void ReclaimerQueue::Handle::Orphan() {
83   if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
84     sweep->RunAndDelete(absl::nullopt);
85   }
86   Unref();
87 }
88 
Run(ReclamationSweep reclamation_sweep)89 void ReclaimerQueue::Handle::Run(ReclamationSweep reclamation_sweep) {
90   if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
91     sweep->RunAndDelete(std::move(reclamation_sweep));
92   }
93 }
94 
Requeue(ReclaimerQueue * new_queue)95 bool ReclaimerQueue::Handle::Requeue(ReclaimerQueue* new_queue) {
96   if (sweep_.load(std::memory_order_relaxed)) {
97     new_queue->Enqueue(Ref());
98     return true;
99   } else {
100     return false;
101   }
102 }
103 
MarkCancelled()104 void ReclaimerQueue::Handle::Sweep::MarkCancelled() {
105   // When we cancel a reclaimer we rotate the elements of the queue once -
106   // taking one non-cancelled node from the start, and placing it on the end.
107   // This ensures that we don't suffer from head of line blocking whereby a
108   // non-cancelled reclaimer at the head of the queue, in the absence of memory
109   // pressure, prevents the remainder of the queue from being cleaned up.
110   MutexLock lock(&state_->reader_mu);
111   while (true) {
112     bool empty = false;
113     std::unique_ptr<QueuedNode> node(
114         static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
115     if (node == nullptr) break;
116     if (node->reclaimer_handle->sweep_.load(std::memory_order_relaxed) !=
117         nullptr) {
118       state_->queue.Push(node.release());
119       break;
120     }
121   }
122 }
123 
ReclaimerQueue()124 ReclaimerQueue::ReclaimerQueue() : state_(std::make_shared<State>()) {}
125 
126 ReclaimerQueue::~ReclaimerQueue() = default;
127 
Enqueue(RefCountedPtr<Handle> handle)128 void ReclaimerQueue::Enqueue(RefCountedPtr<Handle> handle) {
129   if (state_->queue.Push(new QueuedNode(std::move(handle)))) {
130     MutexLock lock(&state_->reader_mu);
131     state_->waker.Wakeup();
132   }
133 }
134 
PollNext()135 Poll<RefCountedPtr<ReclaimerQueue::Handle>> ReclaimerQueue::PollNext() {
136   MutexLock lock(&state_->reader_mu);
137   bool empty = false;
138   // Try to pull from the queue.
139   std::unique_ptr<QueuedNode> node(
140       static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
141   // If we get something, great.
142   if (node != nullptr) return std::move(node->reclaimer_handle);
143   if (!empty) {
144     // If we don't, but the queue is probably not empty, schedule an immediate
145     // repoll.
146     Activity::current()->ForceImmediateRepoll();
147   } else {
148     // Otherwise, schedule a wakeup for whenever something is pushed.
149     state_->waker = Activity::current()->MakeNonOwningWaker();
150   }
151   return Pending{};
152 }
153 
154 //
155 // GrpcMemoryAllocatorImpl
156 //
157 
GrpcMemoryAllocatorImpl(std::shared_ptr<BasicMemoryQuota> memory_quota,std::string name)158 GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl(
159     std::shared_ptr<BasicMemoryQuota> memory_quota, std::string name)
160     : memory_quota_(memory_quota), name_(std::move(name)) {
161   memory_quota_->Take(
162       /*allocator=*/this, taken_bytes_);
163   memory_quota_->AddNewAllocator(this);
164 }
165 
~GrpcMemoryAllocatorImpl()166 GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() {
167   GPR_ASSERT(free_bytes_.load(std::memory_order_acquire) +
168                  sizeof(GrpcMemoryAllocatorImpl) ==
169              taken_bytes_.load(std::memory_order_relaxed));
170   memory_quota_->Return(taken_bytes_);
171 }
172 
Shutdown()173 void GrpcMemoryAllocatorImpl::Shutdown() {
174   memory_quota_->RemoveAllocator(this);
175   std::shared_ptr<BasicMemoryQuota> memory_quota;
176   OrphanablePtr<ReclaimerQueue::Handle>
177       reclamation_handles[kNumReclamationPasses];
178   {
179     MutexLock lock(&reclaimer_mu_);
180     GPR_ASSERT(!shutdown_);
181     shutdown_ = true;
182     memory_quota = memory_quota_;
183     for (size_t i = 0; i < kNumReclamationPasses; i++) {
184       reclamation_handles[i] = std::exchange(reclamation_handles_[i], nullptr);
185     }
186   }
187 }
188 
Reserve(MemoryRequest request)189 size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) {
190   // Validate request - performed here so we don't bloat the generated code with
191   // inlined asserts.
192   GPR_ASSERT(request.min() <= request.max());
193   GPR_ASSERT(request.max() <= MemoryRequest::max_allowed_size());
194   size_t old_free = free_bytes_.load(std::memory_order_relaxed);
195 
196   while (true) {
197     // Attempt to reserve memory from our pool.
198     auto reservation = TryReserve(request);
199     if (reservation.has_value()) {
200       size_t new_free = free_bytes_.load(std::memory_order_relaxed);
201       memory_quota_->MaybeMoveAllocator(this, old_free, new_free);
202       return *reservation;
203     }
204 
205     // If that failed, grab more from the quota and retry.
206     Replenish();
207   }
208 }
209 
TryReserve(MemoryRequest request)210 absl::optional<size_t> GrpcMemoryAllocatorImpl::TryReserve(
211     MemoryRequest request) {
212   // How much memory should we request? (see the scaling below)
213   size_t scaled_size_over_min = request.max() - request.min();
214   // Scale the request down according to memory pressure if we have that
215   // flexibility.
216   if (scaled_size_over_min != 0) {
217     const auto pressure_info = memory_quota_->GetPressureInfo();
218     double pressure = pressure_info.pressure_control_value;
219     size_t max_recommended_allocation_size =
220         pressure_info.max_recommended_allocation_size;
221     // Reduce allocation size proportional to the pressure > 80% usage.
222     if (pressure > 0.8) {
223       scaled_size_over_min =
224           std::min(scaled_size_over_min,
225                    static_cast<size_t>((request.max() - request.min()) *
226                                        (1.0 - pressure) / 0.2));
227     }
228     if (max_recommended_allocation_size < request.min()) {
229       scaled_size_over_min = 0;
230     } else if (request.min() + scaled_size_over_min >
231                max_recommended_allocation_size) {
232       scaled_size_over_min = max_recommended_allocation_size - request.min();
233     }
234   }
235 
236   // How much do we want to reserve?
237   const size_t reserve = request.min() + scaled_size_over_min;
238   // See how many bytes are available.
239   size_t available = free_bytes_.load(std::memory_order_acquire);
240   while (true) {
241     // Does the current free pool satisfy the request?
242     if (available < reserve) {
243       return {};
244     }
245     // Try to reserve the requested amount.
246     // If the amount of free memory changed through this loop, then available
247     // will be set to the new value and we'll repeat.
248     if (free_bytes_.compare_exchange_weak(available, available - reserve,
249                                           std::memory_order_acq_rel,
250                                           std::memory_order_acquire)) {
251       return reserve;
252     }
253   }
254 }
255 
MaybeDonateBack()256 void GrpcMemoryAllocatorImpl::MaybeDonateBack() {
257   size_t free = free_bytes_.load(std::memory_order_relaxed);
258   while (free > 0) {
259     size_t ret = 0;
260     if (!IsUnconstrainedMaxQuotaBufferSizeEnabled() &&
261         free > kMaxQuotaBufferSize / 2) {
262       ret = std::max(ret, free - kMaxQuotaBufferSize / 2);
263     }
264     ret = std::max(ret, free > 8192 ? free / 2 : free);
265     const size_t new_free = free - ret;
266     if (free_bytes_.compare_exchange_weak(free, new_free,
267                                           std::memory_order_acq_rel,
268                                           std::memory_order_acquire)) {
269       if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
270         gpr_log(GPR_INFO, "[%p|%s] Early return %" PRIdPTR " bytes", this,
271                 name_.c_str(), ret);
272       }
273       GPR_ASSERT(taken_bytes_.fetch_sub(ret, std::memory_order_relaxed) >= ret);
274       memory_quota_->Return(ret);
275       return;
276     }
277   }
278 }
279 
Replenish()280 void GrpcMemoryAllocatorImpl::Replenish() {
281   // Attempt a fairly low rate exponential growth request size, bounded between
282   // some reasonable limits declared at top of file.
283   auto amount = Clamp(taken_bytes_.load(std::memory_order_relaxed) / 3,
284                       kMinReplenishBytes, kMaxReplenishBytes);
285   // Take the requested amount from the quota.
286   memory_quota_->Take(
287       /*allocator=*/this, amount);
288   // Record that we've taken it.
289   taken_bytes_.fetch_add(amount, std::memory_order_relaxed);
290   // Add the taken amount to the free pool.
291   free_bytes_.fetch_add(amount, std::memory_order_acq_rel);
292 }
293 
294 //
295 // BasicMemoryQuota
296 //
297 
298 class BasicMemoryQuota::WaitForSweepPromise {
299  public:
WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,uint64_t token)300   WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,
301                       uint64_t token)
302       : memory_quota_(std::move(memory_quota)), token_(token) {}
303 
operator ()()304   Poll<Empty> operator()() {
305     if (memory_quota_->reclamation_counter_.load(std::memory_order_relaxed) !=
306         token_) {
307       return Empty{};
308     } else {
309       return Pending{};
310     }
311   }
312 
313  private:
314   std::shared_ptr<BasicMemoryQuota> memory_quota_;
315   uint64_t token_;
316 };
317 
Start()318 void BasicMemoryQuota::Start() {
319   auto self = shared_from_this();
320 
321   // Reclamation loop:
322   // basically, wait until we are in overcommit (free_bytes_ < 0), and then:
323   // while (free_bytes_ < 0) reclaim_memory()
324   // ... and repeat
325   auto reclamation_loop = Loop(Seq(
326       [self]() -> Poll<int> {
327         // If there's free memory we no longer need to reclaim memory!
328         if (self->free_bytes_.load(std::memory_order_acquire) > 0) {
329           return Pending{};
330         }
331         return 0;
332       },
333       [self]() {
334         // Race biases to the first thing that completes... so this will
335         // choose the highest priority/least destructive thing to do that's
336         // available.
337         auto annotate = [](const char* name) {
338           return [name](RefCountedPtr<ReclaimerQueue::Handle> f) {
339             return std::make_tuple(name, std::move(f));
340           };
341         };
342         return Race(Map(self->reclaimers_[0].Next(), annotate("benign")),
343                     Map(self->reclaimers_[1].Next(), annotate("idle")),
344                     Map(self->reclaimers_[2].Next(), annotate("destructive")));
345       },
346       [self](
347           std::tuple<const char*, RefCountedPtr<ReclaimerQueue::Handle>> arg) {
348         auto reclaimer = std::move(std::get<1>(arg));
349         if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
350           double free = std::max(intptr_t{0}, self->free_bytes_.load());
351           size_t quota_size = self->quota_size_.load();
352           gpr_log(GPR_INFO,
353                   "RQ: %s perform %s reclamation. Available free bytes: %f, "
354                   "total quota_size: %zu",
355                   self->name_.c_str(), std::get<0>(arg), free, quota_size);
356         }
357         // One of the reclaimer queues gave us a way to get back memory.
358         // Call the reclaimer with a token that contains enough to wake us
359         // up again.
360         const uint64_t token =
361             self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) +
362             1;
363         reclaimer->Run(ReclamationSweep(
364             self, token, Activity::current()->MakeNonOwningWaker()));
365         // Return a promise that will wait for our barrier. This will be
366         // awoken by the token above being destroyed. So, once that token is
367         // destroyed, we'll be able to proceed.
368         return WaitForSweepPromise(self, token);
369       },
370       []() -> LoopCtl<absl::Status> {
371         // Continue the loop!
372         return Continue{};
373       }));
374 
375   reclaimer_activity_ =
376       MakeActivity(std::move(reclamation_loop), ExecCtxWakeupScheduler(),
377                    [](absl::Status status) {
378                      GPR_ASSERT(status.code() == absl::StatusCode::kCancelled);
379                    });
380 }
381 
Stop()382 void BasicMemoryQuota::Stop() { reclaimer_activity_.reset(); }
383 
SetSize(size_t new_size)384 void BasicMemoryQuota::SetSize(size_t new_size) {
385   size_t old_size = quota_size_.exchange(new_size, std::memory_order_relaxed);
386   if (old_size < new_size) {
387     // We're growing the quota.
388     Return(new_size - old_size);
389   } else {
390     // We're shrinking the quota.
391     Take(/*allocator=*/nullptr, old_size - new_size);
392   }
393 }
394 
Take(GrpcMemoryAllocatorImpl * allocator,size_t amount)395 void BasicMemoryQuota::Take(GrpcMemoryAllocatorImpl* allocator, size_t amount) {
396   // If there's a request for nothing, then do nothing!
397   if (amount == 0) return;
398   GPR_DEBUG_ASSERT(amount <= std::numeric_limits<intptr_t>::max());
399   // Grab memory from the quota.
400   auto prior = free_bytes_.fetch_sub(amount, std::memory_order_acq_rel);
401   // If we push into overcommit, awake the reclaimer.
402   if (prior >= 0 && prior < static_cast<intptr_t>(amount)) {
403     if (reclaimer_activity_ != nullptr) reclaimer_activity_->ForceWakeup();
404   }
405 
406   if (IsFreeLargeAllocatorEnabled()) {
407     if (allocator == nullptr) return;
408     GrpcMemoryAllocatorImpl* chosen_allocator = nullptr;
409     // Use calling allocator's shard index to choose shard.
410     auto& shard = big_allocators_.shards[allocator->IncrementShardIndex() %
411                                          big_allocators_.shards.size()];
412 
413     if (shard.shard_mu.TryLock()) {
414       if (!shard.allocators.empty()) {
415         chosen_allocator = *shard.allocators.begin();
416       }
417       shard.shard_mu.Unlock();
418     }
419 
420     if (chosen_allocator != nullptr) {
421       chosen_allocator->ReturnFree();
422     }
423   }
424 }
425 
FinishReclamation(uint64_t token,Waker waker)426 void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) {
427   uint64_t current = reclamation_counter_.load(std::memory_order_relaxed);
428   if (current != token) return;
429   if (reclamation_counter_.compare_exchange_strong(current, current + 1,
430                                                    std::memory_order_relaxed,
431                                                    std::memory_order_relaxed)) {
432     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
433       double free = std::max(intptr_t{0}, free_bytes_.load());
434       size_t quota_size = quota_size_.load();
435       gpr_log(GPR_INFO,
436               "RQ: %s reclamation complete. Available free bytes: %f, "
437               "total quota_size: %zu",
438               name_.c_str(), free, quota_size);
439     }
440     waker.Wakeup();
441   }
442 }
443 
Return(size_t amount)444 void BasicMemoryQuota::Return(size_t amount) {
445   free_bytes_.fetch_add(amount, std::memory_order_relaxed);
446 }
447 
AddNewAllocator(GrpcMemoryAllocatorImpl * allocator)448 void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) {
449   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
450     gpr_log(GPR_INFO, "Adding allocator %p", allocator);
451   }
452 
453   AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator);
454 
455   {
456     MutexLock l(&shard.shard_mu);
457     shard.allocators.emplace(allocator);
458   }
459 }
460 
RemoveAllocator(GrpcMemoryAllocatorImpl * allocator)461 void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) {
462   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
463     gpr_log(GPR_INFO, "Removing allocator %p", allocator);
464   }
465 
466   AllocatorBucket::Shard& small_shard =
467       small_allocators_.SelectShard(allocator);
468 
469   {
470     MutexLock l(&small_shard.shard_mu);
471     if (small_shard.allocators.erase(allocator) == 1) {
472       return;
473     }
474   }
475 
476   AllocatorBucket::Shard& big_shard = big_allocators_.SelectShard(allocator);
477 
478   {
479     MutexLock l(&big_shard.shard_mu);
480     big_shard.allocators.erase(allocator);
481   }
482 }
483 
MaybeMoveAllocator(GrpcMemoryAllocatorImpl * allocator,size_t old_free_bytes,size_t new_free_bytes)484 void BasicMemoryQuota::MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator,
485                                           size_t old_free_bytes,
486                                           size_t new_free_bytes) {
487   while (true) {
488     if (new_free_bytes < kSmallAllocatorThreshold) {
489       // Still in small bucket. No move.
490       if (old_free_bytes < kSmallAllocatorThreshold) return;
491       MaybeMoveAllocatorBigToSmall(allocator);
492     } else if (new_free_bytes > kBigAllocatorThreshold) {
493       // Still in big bucket. No move.
494       if (old_free_bytes > kBigAllocatorThreshold) return;
495       MaybeMoveAllocatorSmallToBig(allocator);
496     } else {
497       // Somewhere between thresholds. No move.
498       return;
499     }
500 
501     // Loop to make sure move is eventually stable.
502     old_free_bytes = new_free_bytes;
503     new_free_bytes = allocator->GetFreeBytes();
504   }
505 }
506 
MaybeMoveAllocatorBigToSmall(GrpcMemoryAllocatorImpl * allocator)507 void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall(
508     GrpcMemoryAllocatorImpl* allocator) {
509   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
510     gpr_log(GPR_INFO, "Moving allocator %p to small", allocator);
511   }
512 
513   AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator);
514 
515   {
516     MutexLock l(&old_shard.shard_mu);
517     if (old_shard.allocators.erase(allocator) == 0) return;
518   }
519 
520   AllocatorBucket::Shard& new_shard = small_allocators_.SelectShard(allocator);
521 
522   {
523     MutexLock l(&new_shard.shard_mu);
524     new_shard.allocators.emplace(allocator);
525   }
526 }
527 
MaybeMoveAllocatorSmallToBig(GrpcMemoryAllocatorImpl * allocator)528 void BasicMemoryQuota::MaybeMoveAllocatorSmallToBig(
529     GrpcMemoryAllocatorImpl* allocator) {
530   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
531     gpr_log(GPR_INFO, "Moving allocator %p to big", allocator);
532   }
533 
534   AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator);
535 
536   {
537     MutexLock l(&old_shard.shard_mu);
538     if (old_shard.allocators.erase(allocator) == 0) return;
539   }
540 
541   AllocatorBucket::Shard& new_shard = big_allocators_.SelectShard(allocator);
542 
543   {
544     MutexLock l(&new_shard.shard_mu);
545     new_shard.allocators.emplace(allocator);
546   }
547 }
548 
GetPressureInfo()549 BasicMemoryQuota::PressureInfo BasicMemoryQuota::GetPressureInfo() {
550   double free = free_bytes_.load();
551   if (free < 0) free = 0;
552   size_t quota_size = quota_size_.load();
553   double size = quota_size;
554   if (size < 1) return PressureInfo{1, 1, 1};
555   PressureInfo pressure_info;
556   pressure_info.instantaneous_pressure = std::max(0.0, (size - free) / size);
557   if (IsMemoryPressureControllerEnabled()) {
558     pressure_info.pressure_control_value =
559         pressure_tracker_.AddSampleAndGetControlValue(
560             pressure_info.instantaneous_pressure);
561   } else {
562     pressure_info.pressure_control_value =
563         std::min(pressure_info.instantaneous_pressure, 1.0);
564   }
565   pressure_info.max_recommended_allocation_size = quota_size / 16;
566   return pressure_info;
567 }
568 
569 //
570 // PressureTracker
571 //
572 
573 namespace memory_quota_detail {
574 
Update(double error)575 double PressureController::Update(double error) {
576   bool is_low = error < 0;
577   bool was_low = std::exchange(last_was_low_, is_low);
578   double new_control;  // leave unset to compiler can note bad branches
579   if (is_low && was_low) {
580     // Memory pressure is too low this round, and was last round too.
581     // If we have reached the min reporting value last time, then we will report
582     // the same value again this time and can start to increase the ticks_same_
583     // counter.
584     if (last_control_ == min_) {
585       ticks_same_++;
586       if (ticks_same_ >= max_ticks_same_) {
587         // If it's been the same for too long, reduce the min reported value
588         // down towards zero.
589         min_ /= 2.0;
590         ticks_same_ = 0;
591       }
592     }
593     // Target the min reporting value.
594     new_control = min_;
595   } else if (!is_low && !was_low) {
596     // Memory pressure is high, and was high previously.
597     ticks_same_++;
598     if (ticks_same_ >= max_ticks_same_) {
599       // It's been high for too long, increase the max reporting value up
600       // towards 1.0.
601       max_ = (1.0 + max_) / 2.0;
602       ticks_same_ = 0;
603     }
604     // Target the max reporting value.
605     new_control = max_;
606   } else if (is_low) {
607     // Memory pressure is low, but was high last round.
608     // Target the min reporting value, but first update it to be closer to the
609     // current max (that we've been reporting lately).
610     // In this way the min will gradually climb towards the max as we find a
611     // stable point.
612     // If this is too high, then we'll eventually move it back towards zero.
613     ticks_same_ = 0;
614     min_ = (min_ + max_) / 2.0;
615     new_control = min_;
616   } else {
617     // Memory pressure is high, but was low last round.
618     // Target the max reporting value, but first update it to be closer to the
619     // last reported value.
620     // The first switchover will have last_control_ being 0, and max_ being 2,
621     // so we'll immediately choose 1.0 which will tend to really slow down
622     // progress.
623     // If we end up targetting too low, we'll eventually move it back towards
624     // 1.0 after max_ticks_same_ ticks.
625     ticks_same_ = 0;
626     max_ = (last_control_ + max_) / 2.0;
627     new_control = max_;
628   }
629   // If the control value is decreasing we do it slowly. This avoids rapid
630   // oscillations.
631   // (If we want a control value that's higher than the last one we snap
632   // immediately because it's likely that memory pressure is growing unchecked).
633   if (new_control < last_control_) {
634     new_control =
635         std::max(new_control, last_control_ - max_reduction_per_tick_ / 1000.0);
636   }
637   last_control_ = new_control;
638   return new_control;
639 }
640 
DebugString() const641 std::string PressureController::DebugString() const {
642   return absl::StrCat(last_was_low_ ? "low" : "high", " min=", min_,
643                       " max=", max_, " ticks=", ticks_same_,
644                       " last_control=", last_control_);
645 }
646 
AddSampleAndGetControlValue(double sample)647 double PressureTracker::AddSampleAndGetControlValue(double sample) {
648   static const double kSetPoint = 0.95;
649 
650   double max_so_far = max_this_round_.load(std::memory_order_relaxed);
651   if (sample > max_so_far) {
652     max_this_round_.compare_exchange_weak(max_so_far, sample,
653                                           std::memory_order_relaxed,
654                                           std::memory_order_relaxed);
655   }
656   // If memory pressure is almost done, immediately hit the brakes and report
657   // full memory usage.
658   if (sample >= 0.99) {
659     report_.store(1.0, std::memory_order_relaxed);
660   }
661   update_.Tick([&](Duration) {
662     // Reset the round tracker with the new sample.
663     const double current_estimate =
664         max_this_round_.exchange(sample, std::memory_order_relaxed);
665     double report;
666     if (current_estimate > 0.99) {
667       // Under very high memory pressure we... just max things out.
668       report = controller_.Update(1e99);
669     } else {
670       report = controller_.Update(current_estimate - kSetPoint);
671     }
672     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
673       gpr_log(GPR_INFO, "RQ: pressure:%lf report:%lf controller:%s",
674               current_estimate, report, controller_.DebugString().c_str());
675     }
676     report_.store(report, std::memory_order_relaxed);
677   });
678   return report_.load(std::memory_order_relaxed);
679 }
680 
681 }  // namespace memory_quota_detail
682 
683 //
684 // MemoryQuota
685 //
686 
CreateMemoryAllocator(absl::string_view name)687 MemoryAllocator MemoryQuota::CreateMemoryAllocator(absl::string_view name) {
688   auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(
689       memory_quota_, absl::StrCat(memory_quota_->name(), "/allocator/", name));
690   return MemoryAllocator(std::move(impl));
691 }
692 
CreateMemoryOwner(absl::string_view name)693 MemoryOwner MemoryQuota::CreateMemoryOwner(absl::string_view name) {
694   auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(
695       memory_quota_, absl::StrCat(memory_quota_->name(), "/owner/", name));
696   return MemoryOwner(std::move(impl));
697 }
698 
699 }  // namespace grpc_core
700