1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_bluetooth_sapphire/internal/host/common/pipeline_monitor.h"
16
17 #include <gmock/gmock.h>
18 #include <pw_async/fake_dispatcher_fixture.h>
19
20 #include <memory>
21
22 #include "pw_bluetooth_sapphire/internal/host/common/retire_log.h"
23
24 namespace bt {
25 namespace {
26
27 using Token = PipelineMonitor::Token;
28
29 using PipelineMonitorTest = pw::async::test::FakeDispatcherFixture;
30
31 const internal::RetireLog kRetireLogDefaultParams(/*min_depth=*/1,
32 /*max_depth=*/100);
33
TEST_F(PipelineMonitorTest,TokensCanOutliveMonitor)34 TEST_F(PipelineMonitorTest, TokensCanOutliveMonitor) {
35 auto monitor =
36 std::make_unique<PipelineMonitor>(dispatcher(), kRetireLogDefaultParams);
37 auto token = monitor->Issue(0);
38 monitor.reset();
39 }
40
TEST_F(PipelineMonitorTest,SequentialTokensModifyCounts)41 TEST_F(PipelineMonitorTest, SequentialTokensModifyCounts) {
42 PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams);
43 EXPECT_EQ(0U, monitor.bytes_issued());
44 EXPECT_EQ(0, monitor.tokens_issued());
45 EXPECT_EQ(0U, monitor.bytes_in_flight());
46 EXPECT_EQ(0, monitor.tokens_in_flight());
47 EXPECT_EQ(0U, monitor.bytes_retired());
48 EXPECT_EQ(0, monitor.tokens_retired());
49
50 constexpr size_t kByteCount = 2;
51 {
52 auto token = monitor.Issue(kByteCount);
53 EXPECT_EQ(kByteCount, monitor.bytes_issued());
54 EXPECT_EQ(1, monitor.tokens_issued());
55 EXPECT_EQ(kByteCount, monitor.bytes_in_flight());
56 EXPECT_EQ(1, monitor.tokens_in_flight());
57 EXPECT_EQ(0U, monitor.bytes_retired());
58 EXPECT_EQ(0, monitor.tokens_retired());
59
60 token.Retire();
61 EXPECT_EQ(kByteCount, monitor.bytes_issued());
62 EXPECT_EQ(1, monitor.tokens_issued());
63 EXPECT_EQ(0U, monitor.bytes_in_flight());
64 EXPECT_EQ(0, monitor.tokens_in_flight());
65 EXPECT_EQ(kByteCount, monitor.bytes_retired());
66 EXPECT_EQ(1, monitor.tokens_retired());
67
68 // Test that a moved-from value is reusable and that it retires by going out
69 // of scope
70 token = monitor.Issue(kByteCount);
71 EXPECT_EQ(2 * kByteCount, monitor.bytes_issued());
72 EXPECT_EQ(2, monitor.tokens_issued());
73 EXPECT_EQ(kByteCount, monitor.bytes_in_flight());
74 EXPECT_EQ(1, monitor.tokens_in_flight());
75 EXPECT_EQ(kByteCount, monitor.bytes_retired());
76 EXPECT_EQ(1, monitor.tokens_retired());
77 }
78
79 EXPECT_EQ(2 * kByteCount, monitor.bytes_issued());
80 EXPECT_EQ(2, monitor.tokens_issued());
81 EXPECT_EQ(0U, monitor.bytes_in_flight());
82 EXPECT_EQ(0, monitor.tokens_in_flight());
83 EXPECT_EQ(2 * kByteCount, monitor.bytes_retired());
84 EXPECT_EQ(2, monitor.tokens_retired());
85 }
86
TEST_F(PipelineMonitorTest,TokensCanBeMoved)87 TEST_F(PipelineMonitorTest, TokensCanBeMoved) {
88 PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams);
89 EXPECT_EQ(0U, monitor.bytes_issued());
90 EXPECT_EQ(0, monitor.tokens_issued());
91 EXPECT_EQ(0U, monitor.bytes_in_flight());
92 EXPECT_EQ(0, monitor.tokens_in_flight());
93 EXPECT_EQ(0U, monitor.bytes_retired());
94 EXPECT_EQ(0, monitor.tokens_retired());
95
96 constexpr size_t kByteCount = 2;
97 auto token0 = monitor.Issue(kByteCount);
98 auto token1 = std::move(token0);
99 EXPECT_EQ(kByteCount, monitor.bytes_issued());
100 EXPECT_EQ(1, monitor.tokens_issued());
101 EXPECT_EQ(kByteCount, monitor.bytes_in_flight());
102 EXPECT_EQ(1, monitor.tokens_in_flight());
103 EXPECT_EQ(0U, monitor.bytes_retired());
104 EXPECT_EQ(0, monitor.tokens_retired());
105
106 // both active token and moved-from token can be retired safely
107 token0.Retire();
108 token1.Retire();
109 EXPECT_EQ(kByteCount, monitor.bytes_issued());
110 EXPECT_EQ(1, monitor.tokens_issued());
111 EXPECT_EQ(0U, monitor.bytes_in_flight());
112 EXPECT_EQ(0, monitor.tokens_in_flight());
113 EXPECT_EQ(kByteCount, monitor.bytes_retired());
114 EXPECT_EQ(1, monitor.tokens_retired());
115 }
116
TEST_F(PipelineMonitorTest,SubscribeToMaxTokensAlert)117 TEST_F(PipelineMonitorTest, SubscribeToMaxTokensAlert) {
118 PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams);
119
120 std::optional<PipelineMonitor::MaxTokensInFlightAlert> received_alert;
121 constexpr int kMaxTokensInFlight = 1;
122 monitor.SetAlert(PipelineMonitor::MaxTokensInFlightAlert{kMaxTokensInFlight},
123 [&received_alert](auto alert) { received_alert = alert; });
124
125 // First token does not exceed in-flight threshold.
126 auto token0 = monitor.Issue(0);
127 EXPECT_FALSE(received_alert);
128
129 // Total issued (but not in-flight) exceeds threshold.
130 token0.Retire();
131 token0 = monitor.Issue(0);
132 ASSERT_LT(kMaxTokensInFlight, monitor.tokens_issued());
133 EXPECT_FALSE(received_alert);
134
135 // Total in-flight exceeds threshold.
136 auto token1 = monitor.Issue(0);
137 ASSERT_TRUE(received_alert.has_value());
138 EXPECT_EQ(kMaxTokensInFlight + 1, received_alert.value().value);
139
140 // Alert has expired after firing once.
141 received_alert.reset();
142 auto token2 = monitor.Issue(0);
143 EXPECT_FALSE(received_alert);
144 }
145
TEST_F(PipelineMonitorTest,SubscribeToMaxBytesAlert)146 TEST_F(PipelineMonitorTest, SubscribeToMaxBytesAlert) {
147 PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams);
148
149 std::optional<PipelineMonitor::MaxBytesInFlightAlert> received_alert;
150 constexpr size_t kMaxBytesInFlight = 1;
151 monitor.SetAlert(PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight},
152 [&received_alert](auto alert) { received_alert = alert; });
153
154 // First token does not exceed total bytes in flight threshold.
155 auto token0 = monitor.Issue(kMaxBytesInFlight);
156 EXPECT_FALSE(received_alert);
157
158 // Total in-flight exceeds threshold.
159 auto token1 = monitor.Issue(1);
160 ASSERT_TRUE(received_alert.has_value());
161 EXPECT_EQ(kMaxBytesInFlight + 1, received_alert.value().value);
162 }
163
TEST_F(PipelineMonitorTest,SubscribeToMaxAgeAlert)164 TEST_F(PipelineMonitorTest, SubscribeToMaxAgeAlert) {
165 PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams);
166
167 std::optional<PipelineMonitor::MaxAgeRetiredAlert> received_alert;
168 constexpr pw::chrono::SystemClock::duration kMaxAge =
169 std::chrono::milliseconds(500);
170 monitor.SetAlert(PipelineMonitor::MaxAgeRetiredAlert{kMaxAge},
171 [&received_alert](auto alert) { received_alert = alert; });
172
173 // Token outlives threshold age, but doesn't signal alert until it's retired.
174 auto token0 = monitor.Issue(0);
175 RunFor(kMaxAge * 2);
176 EXPECT_FALSE(received_alert);
177
178 // Total in-flight exceeds threshold.
179 token0.Retire();
180 ASSERT_TRUE(received_alert.has_value());
181 EXPECT_EQ(kMaxAge * 2, received_alert.value().value);
182 }
183
TEST_F(PipelineMonitorTest,SubscribeToAlertInsideHandler)184 TEST_F(PipelineMonitorTest, SubscribeToAlertInsideHandler) {
185 PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams);
186
187 std::optional<PipelineMonitor::MaxBytesInFlightAlert> received_alert;
188 constexpr size_t kMaxBytesInFlight = 2;
189
190 auto renew_subscription = [&monitor, &received_alert](auto) {
191 // Same threshold, so it should be triggered eventually, but not
192 // immediately.
193 monitor.SetAlert(
194 PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight - 1},
195 [&received_alert](auto alert) { received_alert = alert; });
196 };
197 monitor.SetAlert(PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight},
198 renew_subscription);
199
200 // Total in-flight exceeds threshold.
201 auto token0 = monitor.Issue(kMaxBytesInFlight + 1);
202 EXPECT_FALSE(received_alert);
203
204 // Re-subscribed alert doesn't get called until the monitored value
205 // potentially changes again.
206 auto token1 = monitor.Issue(0);
207 ASSERT_TRUE(received_alert.has_value());
208 EXPECT_EQ(kMaxBytesInFlight + 1, received_alert.value().value);
209 }
210
TEST_F(PipelineMonitorTest,MultipleMaxBytesInFlightAlertsWithDifferentThresholds)211 TEST_F(PipelineMonitorTest,
212 MultipleMaxBytesInFlightAlertsWithDifferentThresholds) {
213 PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams);
214
215 std::optional<PipelineMonitor::MaxBytesInFlightAlert> received_alert_0;
216 constexpr size_t kMaxBytesInFlight0 = 1;
217 monitor.SetAlert(
218 PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight0},
219 [&received_alert_0](auto alert) { received_alert_0 = alert; });
220 std::optional<PipelineMonitor::MaxBytesInFlightAlert> received_alert_1;
221 constexpr size_t kMaxBytesInFlight1 = 2;
222 monitor.SetAlert(
223 PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight1},
224 [&received_alert_1](auto alert) { received_alert_1 = alert; });
225
226 // Total in-flight exceeds threshold 0.
227 auto token0 = monitor.Issue(kMaxBytesInFlight0 + 1);
228 ASSERT_TRUE(received_alert_0.has_value());
229 EXPECT_LT(kMaxBytesInFlight0, received_alert_0.value().value);
230 EXPECT_GE(kMaxBytesInFlight1, received_alert_0.value().value);
231 EXPECT_FALSE(received_alert_1);
232
233 // Total in-flight exceeds threshold 1.
234 auto token1 = monitor.Issue(kMaxBytesInFlight1);
235 ASSERT_TRUE(received_alert_1.has_value());
236 EXPECT_LT(kMaxBytesInFlight1, received_alert_1.value().value);
237 }
238
TEST_F(PipelineMonitorTest,SubscribeToMultipleDissimilarAlerts)239 TEST_F(PipelineMonitorTest, SubscribeToMultipleDissimilarAlerts) {
240 PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams);
241
242 constexpr size_t kMaxBytesInFlight = 2;
243 constexpr int kMaxTokensInFlight = 1;
244
245 int listener_call_count = 0;
246 int max_bytes_alerts = 0;
247 int max_tokens_alerts = 0;
248 auto alerts_listener = [&](auto alert_value) {
249 listener_call_count++;
250 if (std::holds_alternative<PipelineMonitor::MaxBytesInFlightAlert>(
251 alert_value)) {
252 max_bytes_alerts++;
253 } else if (std::holds_alternative<PipelineMonitor::MaxTokensInFlightAlert>(
254 alert_value)) {
255 max_tokens_alerts++;
256 }
257 };
258 monitor.SetAlerts(
259 alerts_listener,
260 PipelineMonitor::MaxBytesInFlightAlert{kMaxBytesInFlight},
261 PipelineMonitor::MaxTokensInFlightAlert{kMaxTokensInFlight});
262
263 auto token0 = monitor.Issue(0);
264 EXPECT_EQ(0, listener_call_count);
265
266 auto token1 = monitor.Issue(0);
267 EXPECT_EQ(1, listener_call_count);
268 EXPECT_EQ(1, max_tokens_alerts);
269
270 auto token2 = monitor.Issue(kMaxBytesInFlight + 1);
271 EXPECT_EQ(2, listener_call_count);
272 EXPECT_EQ(1, max_bytes_alerts);
273 }
274
TEST_F(PipelineMonitorTest,TokensRetireIntoRetireLog)275 TEST_F(PipelineMonitorTest, TokensRetireIntoRetireLog) {
276 PipelineMonitor monitor(
277 dispatcher(), internal::RetireLog(/*min_depth=*/1, /*max_depth=*/64));
278
279 auto token = monitor.Issue(1);
280 EXPECT_EQ(0U, monitor.retire_log().depth());
281
282 const pw::chrono::SystemClock::duration kAge = std::chrono::milliseconds(10);
283 RunFor(kAge);
284 token.Retire();
285 EXPECT_EQ(1U, monitor.retire_log().depth());
286 const auto bytes_quantiles =
287 monitor.retire_log().ComputeByteCountQuantiles(std::array{0., .5, 1.});
288 ASSERT_TRUE(bytes_quantiles.has_value());
289 EXPECT_THAT(*bytes_quantiles, testing::ElementsAre(1, 1, 1));
290
291 const auto age_quantiles =
292 monitor.retire_log().ComputeAgeQuantiles(std::array{0., .5, 1.});
293 ASSERT_TRUE(age_quantiles.has_value());
294 EXPECT_THAT(*age_quantiles, testing::ElementsAre(kAge, kAge, kAge));
295 }
296
TEST_F(PipelineMonitorTest,TokensCanBeSplit)297 TEST_F(PipelineMonitorTest, TokensCanBeSplit) {
298 PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams);
299
300 const size_t kSplits = 10;
301 Token token_main = monitor.Issue(kSplits);
302
303 const pw::chrono::SystemClock::duration kAge = std::chrono::milliseconds(10);
304 RunFor(kAge);
305
306 for (size_t i = 0; i < kSplits; i++) {
307 Token split_token = token_main.Split(1);
308 EXPECT_EQ(monitor.tokens_retired(), static_cast<int64_t>(i));
309 if (i == kSplits - 1) {
310 // token_main is moved to split_token when the final byte is taken.
311 EXPECT_EQ(monitor.tokens_issued(),
312 static_cast<int64_t>(i) +
313 1); // split_token + ("i" previous split tokens)
314 } else {
315 EXPECT_EQ(
316 monitor.tokens_issued(),
317 static_cast<int64_t>(i) +
318 2); // token_main + split_token + ("i" previous split tokens)
319 }
320 EXPECT_EQ(monitor.bytes_retired(), i);
321 EXPECT_EQ(kSplits - i, monitor.bytes_in_flight());
322 }
323
324 // Even though kSplits+1 Token objects were created, we should only see
325 // kSplits retirements, which is how an PDU split into fragments for outbound
326 // send would be modeled.
327 EXPECT_EQ(static_cast<int64_t>(kSplits), monitor.tokens_retired());
328 EXPECT_EQ(kSplits, monitor.bytes_retired());
329
330 ASSERT_EQ(monitor.retire_log().depth(), kSplits);
331
332 std::optional<std::array<size_t, 2>> byte_quantiles =
333 monitor.retire_log().ComputeByteCountQuantiles(std::array{0., 1.});
334 ASSERT_TRUE(byte_quantiles);
335 EXPECT_EQ(byte_quantiles.value()[0], 1u);
336 EXPECT_EQ(byte_quantiles.value()[1], 1u);
337
338 std::optional<std::array<pw::chrono::SystemClock::duration, 2>>
339 age_quantiles =
340 monitor.retire_log().ComputeAgeQuantiles(std::array{0., 1.});
341 ASSERT_TRUE(age_quantiles);
342 EXPECT_EQ(age_quantiles.value()[0], kAge);
343 EXPECT_EQ(age_quantiles.value()[1], kAge);
344 }
345
346 using PipelineMonitorDeathTest = PipelineMonitorTest;
347
TEST_F(PipelineMonitorDeathTest,SplittingTokenIntoMoreThanConstituentBytes)348 TEST_F(PipelineMonitorDeathTest, SplittingTokenIntoMoreThanConstituentBytes) {
349 PipelineMonitor monitor(dispatcher(), kRetireLogDefaultParams);
350
351 auto token_main = monitor.Issue(1);
352
353 EXPECT_DEATH(token_main.Split(2), "byte");
354 }
355
356 } // namespace
357 } // namespace bt
358