1 // Copyright 2023 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #pragma once 16 #include <lib/fit/function.h> 17 #include <lib/stdcompat/type_traits.h> 18 #include <pw_async/dispatcher.h> 19 20 #include <functional> 21 #include <limits> 22 #include <queue> 23 #include <tuple> 24 #include <unordered_map> 25 #include <utility> 26 #include <variant> 27 28 #include "pw_bluetooth_sapphire/internal/host/common/assert.h" 29 #include "pw_bluetooth_sapphire/internal/host/common/retire_log.h" 30 #include "pw_bluetooth_sapphire/internal/host/common/weak_self.h" 31 32 namespace bt { 33 34 // In-process data pipeline monitoring service. This issues tokens that 35 // accompany data packets through various buffers. When tokens are destroyed, 36 // their lifetimes are recorded. 37 // 38 // Clients may subscribe to alerts for when various values exceed specified 39 // thresholds. 40 // 41 // TODO(fxbug.dev/42150683): Produce pollable statistics about retired tokens 42 // TODO(fxbug.dev/42150684): Timestamp stages of each token 43 // TODO(fxbug.dev/42150684): Provide mechanism to split/merge tokens through 44 // chunking/de-chunking 45 class PipelineMonitor final { 46 private: 47 using TokenId = uint64_t; 48 49 public: 50 // Each Token is created when the monitor "issues" it, at which point it is 51 // "in-flight" until the token is "retired" (either explicitly with |Retire()| 52 // or by destruction). Tokens model a unique moveable resource. Moved-from 53 // Token objects are valid and can take on newly issued tokens from the same 54 // PipelineMonitor instance. 55 // 56 // Tokens can also be created by splitting an existing token in order to model 57 // data chunking through e.g. segmentation or fragmentation. Splitting tokens 58 // will result in more retirements (and hence logged retirements) than issues. 59 // 60 // Tokens that outlive their issuing PipelineMonitor have no effect when 61 // retired or destroyed. 62 class Token { 63 public: Token(Token && other)64 Token(Token&& other) noexcept : parent_(other.parent_) { 65 *this = std::move(other); 66 } 67 68 Token& operator=(Token&& other) noexcept { 69 PW_CHECK(&parent_.get() == &other.parent_.get()); 70 id_ = std::exchange(other.id_, kInvalidTokenId); 71 return *this; 72 } 73 74 Token() = delete; 75 Token(const Token&) = delete; 76 Token& operator=(const Token&) = delete; 77 ~Token()78 ~Token() { Retire(); } 79 80 // Explicitly retire to its monitor. This has no effect if the token has 81 // already been retired. Retire()82 void Retire() { 83 if (!parent_.is_alive()) { 84 return; 85 } 86 if (id_ != kInvalidTokenId) { 87 parent_->Retire(this); 88 } 89 id_ = kInvalidTokenId; 90 } 91 92 // If this token is valid, subtract |bytes_to_take| from its bookkeeping and 93 // track it under a new token. This does count towards token issue threshold 94 // alerts and will result in one more retirement logged. |bytes_to_take| 95 // must be no greater than the bytes issued for this token. If 96 // |bytes_to_take| is exactly equal, this is effectively a token move. Split(size_t bytes_to_take)97 Token Split(size_t bytes_to_take) { 98 if (!parent_.is_alive()) { 99 return Token(parent_, kInvalidTokenId); 100 } 101 PW_CHECK(id_ != kInvalidTokenId); 102 return parent_->Split(this, bytes_to_take); 103 } 104 105 private: 106 friend class PipelineMonitor; Token(WeakSelf<PipelineMonitor>::WeakPtr parent,TokenId id)107 Token(WeakSelf<PipelineMonitor>::WeakPtr parent, TokenId id) 108 : parent_(std::move(parent)), id_(id) {} 109 110 const WeakSelf<PipelineMonitor>::WeakPtr parent_; 111 TokenId id_ = kInvalidTokenId; 112 }; 113 114 // Alert types used for |SetAlert|. These are used as dimensioned value 115 // wrappers whose types identify what kind of value they hold. 116 117 // Alert for max_bytes_in_flight. Fires upon issuing the first token that 118 // exceeds the threshold. 119 struct MaxBytesInFlightAlert { 120 size_t value; 121 }; 122 123 // Alert for max_bytes_in_flight. Fires upon issuing the first token that 124 // exceeds the threshold. 125 struct MaxTokensInFlightAlert { 126 int64_t value; 127 }; 128 129 // Alert for token age (duration from issue to retirement). Fires upon 130 // retiring the first token that exceeds the threshold. 131 struct MaxAgeRetiredAlert { 132 pw::chrono::SystemClock::duration value; 133 }; 134 135 // Create a data chunk-tracking service that uses |pw_dispatcher| for timing. 136 // |retire_log| is copied into the class in order to store statistics about 137 // retired tokens. Note that the "live" internal log is readable with the 138 // |retire_log()| method, not the original instance passed to the ctor (which 139 // will not be logged into). PipelineMonitor(pw::async::Dispatcher & pw_dispatcher,const internal::RetireLog & retire_log)140 explicit PipelineMonitor(pw::async::Dispatcher& pw_dispatcher, 141 const internal::RetireLog& retire_log) 142 : dispatcher_(pw_dispatcher), retire_log_(retire_log) {} 143 bytes_issued()144 [[nodiscard]] size_t bytes_issued() const { return bytes_issued_; } tokens_issued()145 [[nodiscard]] int64_t tokens_issued() const { return tokens_issued_; } bytes_in_flight()146 [[nodiscard]] size_t bytes_in_flight() const { return bytes_in_flight_; } tokens_in_flight()147 [[nodiscard]] int64_t tokens_in_flight() const { 148 PW_MODIFY_DIAGNOSTICS_PUSH(); 149 PW_MODIFY_DIAGNOSTIC(ignored, 150 "-Wtautological-constant-out-of-range-compare"); 151 PW_CHECK(issued_tokens_.size() <= std::numeric_limits<int64_t>::max()); 152 PW_MODIFY_DIAGNOSTICS_POP(); 153 return issued_tokens_.size(); 154 } bytes_retired()155 [[nodiscard]] size_t bytes_retired() const { 156 PW_CHECK(bytes_issued() >= bytes_in_flight()); 157 return bytes_issued() - bytes_in_flight(); 158 } tokens_retired()159 [[nodiscard]] int64_t tokens_retired() const { 160 return tokens_issued() - tokens_in_flight(); 161 } 162 retire_log()163 const internal::RetireLog& retire_log() const { return retire_log_; } 164 165 // Start tracking |byte_count| bytes of data. This issues a token that is now 166 // considered "in-flight" until it is retired. Issue(size_t byte_count)167 [[nodiscard]] Token Issue(size_t byte_count) { 168 // For consistency, complete all token map and counter modifications before 169 // processing alerts. 170 const auto id = MakeTokenId(); 171 issued_tokens_.insert_or_assign(id, 172 TokenInfo{dispatcher_.now(), byte_count}); 173 bytes_issued_ += byte_count; 174 tokens_issued_++; 175 bytes_in_flight_ += byte_count; 176 177 // Process alerts. 178 SignalAlertValue<MaxBytesInFlightAlert>(bytes_in_flight()); 179 SignalAlertValue<MaxTokensInFlightAlert>(tokens_in_flight()); 180 return Token(weak_self_.GetWeakPtr(), id); 181 } 182 183 // Moves bytes tracked from one issued token to a new token, up to all of the 184 // bytes in |token|. Split(Token * token,size_t bytes_to_take)185 [[nodiscard]] Token Split(Token* token, size_t bytes_to_take) { 186 // For consistency, complete all token map and counter modifications before 187 // processing alerts. 188 PW_CHECK(this == &token->parent_.get()); 189 auto iter = issued_tokens_.find(token->id_); 190 PW_CHECK(iter != issued_tokens_.end()); 191 TokenInfo& token_info = iter->second; 192 PW_CHECK(bytes_to_take <= token_info.byte_count); 193 if (token_info.byte_count == bytes_to_take) { 194 return std::move(*token); 195 } 196 197 token_info.byte_count -= bytes_to_take; 198 199 const TokenId id = MakeTokenId(); 200 issued_tokens_.insert_or_assign( 201 id, TokenInfo{token_info.issue_time, bytes_to_take}); 202 tokens_issued_++; 203 204 // Process alerts. 205 SignalAlertValue<MaxTokensInFlightAlert>(tokens_in_flight()); 206 return Token(weak_self_.GetWeakPtr(), id); 207 } 208 209 // Subscribes to an alert that fires when the watched value strictly exceeds 210 // |threshold|. When that happens, |listener| is called with the alert type 211 // containing the actual value and the alert trigger is removed. 212 // 213 // New alerts will not be signaled until the next event that can change the 214 // value (token issued, retired, etc), so |listener| can re-subscribe (but 215 // likely at a different threshold to avoid a tight loop of re-subscriptions). 216 // 217 // For example, 218 // monitor.SetAlert(MaxBytesInFlightAlert{kMaxBytes}, 219 // [](MaxBytesInFlightAlert value) { /* value.value = 220 // bytes_in_flight() */ }); 221 template <typename AlertType> SetAlert(AlertType threshold,fit::callback<void (decltype (threshold))> listener)222 void SetAlert(AlertType threshold, 223 fit::callback<void(decltype(threshold))> listener) { 224 GetAlertList<AlertType>().push( 225 AlertInfo<AlertType>{threshold, std::move(listener)}); 226 } 227 228 // Convenience function to set a single listener for all of |alerts|, called 229 // if any of the alerts defined by |thresholds| are triggered. 230 template <typename... AlertTypes> SetAlerts(fit::function<void (std::variant<cpp20::type_identity_t<AlertTypes>...>)> listener,AlertTypes...thresholds)231 void SetAlerts( 232 fit::function<void(std::variant<cpp20::type_identity_t<AlertTypes>...>)> 233 listener, 234 AlertTypes... thresholds) { 235 // This is a fold expression over the comma (,) operator with SetAlert. 236 (SetAlert<AlertTypes>(thresholds, listener.share()), ...); 237 } 238 239 private: 240 // Tracks information for each Token issued out-of-line so that Tokens can be 241 // kept small. 242 struct TokenInfo { 243 pw::chrono::SystemClock::time_point issue_time = 244 pw::chrono::SystemClock::time_point::max(); 245 size_t byte_count = 0; 246 }; 247 248 // Records alerts and their subscribers. Removed when fired. 249 template <typename AlertType> 250 struct AlertInfo { 251 AlertType threshold; 252 fit::callback<void(AlertType)> listener; 253 254 // Used by std::priority_queue through std::less. The logic is intentionally 255 // inverted so that the AlertInfo with the smallest threshold appears first. 256 bool operator<(const AlertInfo<AlertType>& other) const { 257 return this->threshold.value > other.threshold.value; 258 } 259 }; 260 261 // Store subscribers so that the earliest and most likely to fire threshold is 262 // highest priority to make testing values against thresholds constant time 263 // and fast. 264 template <typename T> 265 using AlertList = std::priority_queue<AlertInfo<T>>; 266 267 template <typename... AlertTypes> 268 using AlertRegistry = std::tuple<AlertList<AlertTypes>...>; 269 270 // Used in Token to represent an inactive Token object (one that does not 271 // represent an in-flight token in the monitor). 272 static constexpr TokenId kInvalidTokenId = 273 std::numeric_limits<TokenId>::max(); 274 275 template <typename AlertType> GetAlertList()276 AlertList<AlertType>& GetAlertList() { 277 return std::get<AlertList<AlertType>>(alert_registry_); 278 } 279 280 // Signal a change in the value watched by |AlertType| and test it against the 281 // subscribed alert thresholds. Any thresholds strict exceeded (with 282 // std::greater) cause its subscribed listener to be removed and invoked. 283 template <typename AlertType> SignalAlertValue(decltype (AlertType::value)value)284 void SignalAlertValue(decltype(AlertType::value) value) { 285 auto& alert_list = GetAlertList<AlertType>(); 286 std::vector<decltype(AlertInfo<AlertType>::listener)> listeners; 287 while (!alert_list.empty()) { 288 auto& top = alert_list.top(); 289 if (!std::greater()(value, top.threshold.value)) { 290 break; 291 } 292 293 // std::priority_queue intentionally has const access to top() in order to 294 // avoid breaking heap constraints. This cast to remove const and modify 295 // top respects that design intent because (1) it doesn't modify element 296 // order (2) the intent is to pop the top anyways. It is important to call 297 // |listener| after pop in case that call re-subscribes to this call 298 // (which could modify the heap top). 299 listeners.push_back( 300 std::move(const_cast<AlertInfo<AlertType>&>(top).listener)); 301 alert_list.pop(); 302 } 303 304 // Deferring the call to after filtering helps prevent infinite alert loops. 305 for (auto& listener : listeners) { 306 listener(AlertType{value}); 307 } 308 } 309 MakeTokenId()310 TokenId MakeTokenId() { 311 return std::exchange(next_token_id_, 312 (next_token_id_ + 1) % kInvalidTokenId); 313 } 314 Retire(Token * token)315 void Retire(Token* token) { 316 // For consistency, complete all token map and counter modifications before 317 // processing alerts. 318 PW_CHECK(this == &token->parent_.get()); 319 auto node = issued_tokens_.extract(token->id_); 320 PW_CHECK(bool{node}); 321 const TokenInfo& token_info = node.mapped(); 322 bytes_in_flight_ -= token_info.byte_count; 323 const pw::chrono::SystemClock::duration age = 324 dispatcher_.now() - token_info.issue_time; 325 retire_log_.Retire(token_info.byte_count, age); 326 327 // Process alerts. 328 SignalAlertValue<MaxAgeRetiredAlert>(age); 329 } 330 331 pw::async::Dispatcher& dispatcher_; 332 333 internal::RetireLog retire_log_; 334 335 // This is likely not the best choice for memory efficiency and insertion 336 // latency (allocation and rehashing are both concerning). A slotmap is likely 337 // a good choice here with some tweaks to Token invalidation and an eye for 338 // implementation (SG14 slot_map may have O(N) insertion). 339 std::unordered_map<TokenId, TokenInfo> issued_tokens_; 340 341 TokenId next_token_id_ = 0; 342 size_t bytes_issued_ = 0; 343 int64_t tokens_issued_ = 0; 344 size_t bytes_in_flight_ = 0; 345 346 // Use a single variable to store all of the alert subscribers. This can be 347 // split by type using std::get (see GetAlertList). 348 AlertRegistry<MaxBytesInFlightAlert, 349 MaxTokensInFlightAlert, 350 MaxAgeRetiredAlert> 351 alert_registry_; 352 353 WeakSelf<PipelineMonitor> weak_self_{this}; 354 }; 355 356 } // namespace bt 357