1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #pragma once
16 #include <lib/fit/function.h>
17 #include <lib/stdcompat/type_traits.h>
18 #include <pw_async/dispatcher.h>
19 
20 #include <functional>
21 #include <limits>
22 #include <queue>
23 #include <tuple>
24 #include <unordered_map>
25 #include <utility>
26 #include <variant>
27 
28 #include "pw_bluetooth_sapphire/internal/host/common/assert.h"
29 #include "pw_bluetooth_sapphire/internal/host/common/retire_log.h"
30 #include "pw_bluetooth_sapphire/internal/host/common/weak_self.h"
31 
32 namespace bt {
33 
34 // In-process data pipeline monitoring service. This issues tokens that
35 // accompany data packets through various buffers. When tokens are destroyed,
36 // their lifetimes are recorded.
37 //
38 // Clients may subscribe to alerts for when various values exceed specified
39 // thresholds.
40 //
41 // TODO(fxbug.dev/42150683): Produce pollable statistics about retired tokens
42 // TODO(fxbug.dev/42150684): Timestamp stages of each token
43 // TODO(fxbug.dev/42150684): Provide mechanism to split/merge tokens through
44 // chunking/de-chunking
45 class PipelineMonitor final {
46  private:
47   using TokenId = uint64_t;
48 
49  public:
50   // Each Token is created when the monitor "issues" it, at which point it is
51   // "in-flight" until the token is "retired" (either explicitly with |Retire()|
52   // or by destruction). Tokens model a unique moveable resource. Moved-from
53   // Token objects are valid and can take on newly issued tokens from the same
54   // PipelineMonitor instance.
55   //
56   // Tokens can also be created by splitting an existing token in order to model
57   // data chunking through e.g. segmentation or fragmentation. Splitting tokens
58   // will result in more retirements (and hence logged retirements) than issues.
59   //
60   // Tokens that outlive their issuing PipelineMonitor have no effect when
61   // retired or destroyed.
62   class Token {
63    public:
Token(Token && other)64     Token(Token&& other) noexcept : parent_(other.parent_) {
65       *this = std::move(other);
66     }
67 
68     Token& operator=(Token&& other) noexcept {
69       PW_CHECK(&parent_.get() == &other.parent_.get());
70       id_ = std::exchange(other.id_, kInvalidTokenId);
71       return *this;
72     }
73 
74     Token() = delete;
75     Token(const Token&) = delete;
76     Token& operator=(const Token&) = delete;
77 
~Token()78     ~Token() { Retire(); }
79 
80     // Explicitly retire to its monitor. This has no effect if the token has
81     // already been retired.
Retire()82     void Retire() {
83       if (!parent_.is_alive()) {
84         return;
85       }
86       if (id_ != kInvalidTokenId) {
87         parent_->Retire(this);
88       }
89       id_ = kInvalidTokenId;
90     }
91 
92     // If this token is valid, subtract |bytes_to_take| from its bookkeeping and
93     // track it under a new token. This does count towards token issue threshold
94     // alerts and will result in one more retirement logged. |bytes_to_take|
95     // must be no greater than the bytes issued for this token. If
96     // |bytes_to_take| is exactly equal, this is effectively a token move.
Split(size_t bytes_to_take)97     Token Split(size_t bytes_to_take) {
98       if (!parent_.is_alive()) {
99         return Token(parent_, kInvalidTokenId);
100       }
101       PW_CHECK(id_ != kInvalidTokenId);
102       return parent_->Split(this, bytes_to_take);
103     }
104 
105    private:
106     friend class PipelineMonitor;
Token(WeakSelf<PipelineMonitor>::WeakPtr parent,TokenId id)107     Token(WeakSelf<PipelineMonitor>::WeakPtr parent, TokenId id)
108         : parent_(std::move(parent)), id_(id) {}
109 
110     const WeakSelf<PipelineMonitor>::WeakPtr parent_;
111     TokenId id_ = kInvalidTokenId;
112   };
113 
114   // Alert types used for |SetAlert|. These are used as dimensioned value
115   // wrappers whose types identify what kind of value they hold.
116 
117   // Alert for max_bytes_in_flight. Fires upon issuing the first token that
118   // exceeds the threshold.
119   struct MaxBytesInFlightAlert {
120     size_t value;
121   };
122 
123   // Alert for max_bytes_in_flight. Fires upon issuing the first token that
124   // exceeds the threshold.
125   struct MaxTokensInFlightAlert {
126     int64_t value;
127   };
128 
129   // Alert for token age (duration from issue to retirement). Fires upon
130   // retiring the first token that exceeds the threshold.
131   struct MaxAgeRetiredAlert {
132     pw::chrono::SystemClock::duration value;
133   };
134 
135   // Create a data chunk-tracking service that uses |pw_dispatcher| for timing.
136   // |retire_log| is copied into the class in order to store statistics about
137   // retired tokens. Note that the "live" internal log is readable with the
138   // |retire_log()| method, not the original instance passed to the ctor (which
139   // will not be logged into).
PipelineMonitor(pw::async::Dispatcher & pw_dispatcher,const internal::RetireLog & retire_log)140   explicit PipelineMonitor(pw::async::Dispatcher& pw_dispatcher,
141                            const internal::RetireLog& retire_log)
142       : dispatcher_(pw_dispatcher), retire_log_(retire_log) {}
143 
bytes_issued()144   [[nodiscard]] size_t bytes_issued() const { return bytes_issued_; }
tokens_issued()145   [[nodiscard]] int64_t tokens_issued() const { return tokens_issued_; }
bytes_in_flight()146   [[nodiscard]] size_t bytes_in_flight() const { return bytes_in_flight_; }
tokens_in_flight()147   [[nodiscard]] int64_t tokens_in_flight() const {
148     PW_MODIFY_DIAGNOSTICS_PUSH();
149     PW_MODIFY_DIAGNOSTIC(ignored,
150                          "-Wtautological-constant-out-of-range-compare");
151     PW_CHECK(issued_tokens_.size() <= std::numeric_limits<int64_t>::max());
152     PW_MODIFY_DIAGNOSTICS_POP();
153     return issued_tokens_.size();
154   }
bytes_retired()155   [[nodiscard]] size_t bytes_retired() const {
156     PW_CHECK(bytes_issued() >= bytes_in_flight());
157     return bytes_issued() - bytes_in_flight();
158   }
tokens_retired()159   [[nodiscard]] int64_t tokens_retired() const {
160     return tokens_issued() - tokens_in_flight();
161   }
162 
retire_log()163   const internal::RetireLog& retire_log() const { return retire_log_; }
164 
165   // Start tracking |byte_count| bytes of data. This issues a token that is now
166   // considered "in-flight" until it is retired.
Issue(size_t byte_count)167   [[nodiscard]] Token Issue(size_t byte_count) {
168     // For consistency, complete all token map and counter modifications before
169     // processing alerts.
170     const auto id = MakeTokenId();
171     issued_tokens_.insert_or_assign(id,
172                                     TokenInfo{dispatcher_.now(), byte_count});
173     bytes_issued_ += byte_count;
174     tokens_issued_++;
175     bytes_in_flight_ += byte_count;
176 
177     // Process alerts.
178     SignalAlertValue<MaxBytesInFlightAlert>(bytes_in_flight());
179     SignalAlertValue<MaxTokensInFlightAlert>(tokens_in_flight());
180     return Token(weak_self_.GetWeakPtr(), id);
181   }
182 
183   // Moves bytes tracked from one issued token to a new token, up to all of the
184   // bytes in |token|.
Split(Token * token,size_t bytes_to_take)185   [[nodiscard]] Token Split(Token* token, size_t bytes_to_take) {
186     // For consistency, complete all token map and counter modifications before
187     // processing alerts.
188     PW_CHECK(this == &token->parent_.get());
189     auto iter = issued_tokens_.find(token->id_);
190     PW_CHECK(iter != issued_tokens_.end());
191     TokenInfo& token_info = iter->second;
192     PW_CHECK(bytes_to_take <= token_info.byte_count);
193     if (token_info.byte_count == bytes_to_take) {
194       return std::move(*token);
195     }
196 
197     token_info.byte_count -= bytes_to_take;
198 
199     const TokenId id = MakeTokenId();
200     issued_tokens_.insert_or_assign(
201         id, TokenInfo{token_info.issue_time, bytes_to_take});
202     tokens_issued_++;
203 
204     // Process alerts.
205     SignalAlertValue<MaxTokensInFlightAlert>(tokens_in_flight());
206     return Token(weak_self_.GetWeakPtr(), id);
207   }
208 
209   // Subscribes to an alert that fires when the watched value strictly exceeds
210   // |threshold|. When that happens, |listener| is called with the alert type
211   // containing the actual value and the alert trigger is removed.
212   //
213   // New alerts will not be signaled until the next event that can change the
214   // value (token issued, retired, etc), so |listener| can re-subscribe (but
215   // likely at a different threshold to avoid a tight loop of re-subscriptions).
216   //
217   // For example,
218   //   monitor.SetAlert(MaxBytesInFlightAlert{kMaxBytes},
219   //                    [](MaxBytesInFlightAlert value) { /* value.value =
220   //                    bytes_in_flight() */ });
221   template <typename AlertType>
SetAlert(AlertType threshold,fit::callback<void (decltype (threshold))> listener)222   void SetAlert(AlertType threshold,
223                 fit::callback<void(decltype(threshold))> listener) {
224     GetAlertList<AlertType>().push(
225         AlertInfo<AlertType>{threshold, std::move(listener)});
226   }
227 
228   // Convenience function to set a single listener for all of |alerts|, called
229   // if any of the alerts defined by |thresholds| are triggered.
230   template <typename... AlertTypes>
SetAlerts(fit::function<void (std::variant<cpp20::type_identity_t<AlertTypes>...>)> listener,AlertTypes...thresholds)231   void SetAlerts(
232       fit::function<void(std::variant<cpp20::type_identity_t<AlertTypes>...>)>
233           listener,
234       AlertTypes... thresholds) {
235     // This is a fold expression over the comma (,) operator with SetAlert.
236     (SetAlert<AlertTypes>(thresholds, listener.share()), ...);
237   }
238 
239  private:
240   // Tracks information for each Token issued out-of-line so that Tokens can be
241   // kept small.
242   struct TokenInfo {
243     pw::chrono::SystemClock::time_point issue_time =
244         pw::chrono::SystemClock::time_point::max();
245     size_t byte_count = 0;
246   };
247 
248   // Records alerts and their subscribers. Removed when fired.
249   template <typename AlertType>
250   struct AlertInfo {
251     AlertType threshold;
252     fit::callback<void(AlertType)> listener;
253 
254     // Used by std::priority_queue through std::less. The logic is intentionally
255     // inverted so that the AlertInfo with the smallest threshold appears first.
256     bool operator<(const AlertInfo<AlertType>& other) const {
257       return this->threshold.value > other.threshold.value;
258     }
259   };
260 
261   // Store subscribers so that the earliest and most likely to fire threshold is
262   // highest priority to make testing values against thresholds constant time
263   // and fast.
264   template <typename T>
265   using AlertList = std::priority_queue<AlertInfo<T>>;
266 
267   template <typename... AlertTypes>
268   using AlertRegistry = std::tuple<AlertList<AlertTypes>...>;
269 
270   // Used in Token to represent an inactive Token object (one that does not
271   // represent an in-flight token in the monitor).
272   static constexpr TokenId kInvalidTokenId =
273       std::numeric_limits<TokenId>::max();
274 
275   template <typename AlertType>
GetAlertList()276   AlertList<AlertType>& GetAlertList() {
277     return std::get<AlertList<AlertType>>(alert_registry_);
278   }
279 
280   // Signal a change in the value watched by |AlertType| and test it against the
281   // subscribed alert thresholds. Any thresholds strict exceeded (with
282   // std::greater) cause its subscribed listener to be removed and invoked.
283   template <typename AlertType>
SignalAlertValue(decltype (AlertType::value)value)284   void SignalAlertValue(decltype(AlertType::value) value) {
285     auto& alert_list = GetAlertList<AlertType>();
286     std::vector<decltype(AlertInfo<AlertType>::listener)> listeners;
287     while (!alert_list.empty()) {
288       auto& top = alert_list.top();
289       if (!std::greater()(value, top.threshold.value)) {
290         break;
291       }
292 
293       // std::priority_queue intentionally has const access to top() in order to
294       // avoid breaking heap constraints. This cast to remove const and modify
295       // top respects that design intent because (1) it doesn't modify element
296       // order (2) the intent is to pop the top anyways. It is important to call
297       // |listener| after pop in case that call re-subscribes to this call
298       // (which could modify the heap top).
299       listeners.push_back(
300           std::move(const_cast<AlertInfo<AlertType>&>(top).listener));
301       alert_list.pop();
302     }
303 
304     // Deferring the call to after filtering helps prevent infinite alert loops.
305     for (auto& listener : listeners) {
306       listener(AlertType{value});
307     }
308   }
309 
MakeTokenId()310   TokenId MakeTokenId() {
311     return std::exchange(next_token_id_,
312                          (next_token_id_ + 1) % kInvalidTokenId);
313   }
314 
Retire(Token * token)315   void Retire(Token* token) {
316     // For consistency, complete all token map and counter modifications before
317     // processing alerts.
318     PW_CHECK(this == &token->parent_.get());
319     auto node = issued_tokens_.extract(token->id_);
320     PW_CHECK(bool{node});
321     const TokenInfo& token_info = node.mapped();
322     bytes_in_flight_ -= token_info.byte_count;
323     const pw::chrono::SystemClock::duration age =
324         dispatcher_.now() - token_info.issue_time;
325     retire_log_.Retire(token_info.byte_count, age);
326 
327     // Process alerts.
328     SignalAlertValue<MaxAgeRetiredAlert>(age);
329   }
330 
331   pw::async::Dispatcher& dispatcher_;
332 
333   internal::RetireLog retire_log_;
334 
335   // This is likely not the best choice for memory efficiency and insertion
336   // latency (allocation and rehashing are both concerning). A slotmap is likely
337   // a good choice here with some tweaks to Token invalidation and an eye for
338   // implementation (SG14 slot_map may have O(N) insertion).
339   std::unordered_map<TokenId, TokenInfo> issued_tokens_;
340 
341   TokenId next_token_id_ = 0;
342   size_t bytes_issued_ = 0;
343   int64_t tokens_issued_ = 0;
344   size_t bytes_in_flight_ = 0;
345 
346   // Use a single variable to store all of the alert subscribers. This can be
347   // split by type using std::get (see GetAlertList).
348   AlertRegistry<MaxBytesInFlightAlert,
349                 MaxTokensInFlightAlert,
350                 MaxAgeRetiredAlert>
351       alert_registry_;
352 
353   WeakSelf<PipelineMonitor> weak_self_{this};
354 };
355 
356 }  // namespace bt
357