1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_bluetooth_sapphire/internal/host/l2cap/enhanced_retransmission_mode_tx_engine.h"
16
17 #include <limits>
18
19 #include "pw_bluetooth_sapphire/internal/host/common/assert.h"
20 #include "pw_bluetooth_sapphire/internal/host/common/log.h"
21 #include "pw_bluetooth_sapphire/internal/host/l2cap/frame_headers.h"
22
23 namespace bt::l2cap::internal {
24
25 using Engine = EnhancedRetransmissionModeTxEngine;
26
27 namespace {
28
29 // Returns the number of frames within the range from |low| to |high|, inclusive
30 // of |low|, but exclusive of |high|. Returns zero if |low == high|.
NumFramesBetween(uint8_t low,uint8_t high)31 uint8_t NumFramesBetween(uint8_t low, uint8_t high) {
32 if (high < low) {
33 high += (EnhancedControlField::kMaxSeqNum + 1);
34 }
35 return high - low;
36 }
37
38 } // namespace
39
EnhancedRetransmissionModeTxEngine(ChannelId channel_id,uint16_t max_tx_sdu_size,uint8_t max_transmissions,uint8_t n_frames_in_tx_window,TxChannel & channel,ConnectionFailureCallback connection_failure_callback,pw::async::Dispatcher & dispatcher)40 Engine::EnhancedRetransmissionModeTxEngine(
41 ChannelId channel_id,
42 uint16_t max_tx_sdu_size,
43 uint8_t max_transmissions,
44 uint8_t n_frames_in_tx_window,
45 TxChannel& channel,
46 ConnectionFailureCallback connection_failure_callback,
47 pw::async::Dispatcher& dispatcher)
48 : TxEngine(channel_id, max_tx_sdu_size, channel),
49 pw_dispatcher_(dispatcher),
50 max_transmissions_(max_transmissions),
51 n_frames_in_tx_window_(n_frames_in_tx_window),
52 connection_failure_callback_(std::move(connection_failure_callback)),
53 expected_ack_seq_(0),
54 next_tx_seq_(0),
55 last_tx_seq_(0),
56 req_seqnum_(0),
57 retransmitted_range_during_poll_(false),
58 n_receiver_ready_polls_sent_(0),
59 remote_is_busy_(false) {
60 PW_DCHECK(n_frames_in_tx_window_);
61 receiver_ready_poll_task_.set_function(
62 [this](pw::async::Context /*ctx*/, pw::Status status) {
63 if (!status.ok()) {
64 return;
65 }
66 SendReceiverReadyPoll();
67 StartMonitorTimer();
68 });
69 monitor_task_.set_function(
70 [this](pw::async::Context /*ctx*/, pw::Status status) {
71 if (!status.ok()) {
72 return;
73 }
74 if (max_transmissions_ == 0 ||
75 n_receiver_ready_polls_sent_ < max_transmissions_) {
76 SendReceiverReadyPoll();
77 StartMonitorTimer();
78 } else {
79 connection_failure_callback_(); // May invalidate |self|.
80 }
81 });
82 }
83
NotifySduQueued()84 void Engine::NotifySduQueued() {
85 std::optional<ByteBufferPtr> sdu = channel().GetNextQueuedSdu();
86 PW_CHECK(sdu);
87 ProcessSdu(std::move(*sdu));
88 }
89
ProcessSdu(ByteBufferPtr sdu)90 void Engine::ProcessSdu(ByteBufferPtr sdu) {
91 PW_CHECK(sdu);
92 // TODO(fxbug.dev/42054330): Add support for segmentation
93 if (sdu->size() > max_tx_sdu_size()) {
94 bt_log(INFO,
95 "l2cap",
96 "SDU size exceeds channel TxMTU (channel-id: %#.4x)",
97 channel_id());
98 return;
99 }
100
101 const auto seq_num = GetNextTxSeq();
102 SimpleInformationFrameHeader header(seq_num);
103 DynamicByteBuffer frame(sizeof(header) + sdu->size());
104 auto body = frame.mutable_view(sizeof(header));
105 frame.WriteObj(header);
106 sdu->Copy(&body);
107
108 // TODO(fxbug.dev/42086227): Limit the size of the queue.
109 pending_pdus_.push_back(std::move(frame));
110 MaybeSendQueuedData();
111 }
112
UpdateAckSeq(uint8_t new_seq,bool is_poll_response)113 void Engine::UpdateAckSeq(uint8_t new_seq, bool is_poll_response) {
114 // TODO(quiche): Reconsider this assertion if we allow reconfiguration of the
115 // TX window.
116 PW_DCHECK(NumUnackedFrames() <= n_frames_in_tx_window_,
117 "(NumUnackedFrames() = %u, n_frames_in_tx_window_ = %u, "
118 "expected_ack_seq_ = %u, last_tx_seq_ = %u)",
119 NumUnackedFrames(),
120 n_frames_in_tx_window_,
121 expected_ack_seq_,
122 last_tx_seq_);
123
124 const auto n_frames_acked = NumFramesBetween(expected_ack_seq_, new_seq);
125 if (n_frames_acked > NumUnackedFrames()) {
126 // Peer acknowledgment of our outbound data (ReqSeq) exceeds the sequence
127 // numbers of yet-acknowledged data that we've sent to that peer. See
128 // conditions "With-Invalid-ReqSeq" and "With-Invalid-ReqSeq-Retrans" in
129 // Core Spec v5.0 Vol 3 Part A Sec 8.6.5.5.
130 bt_log(WARN,
131 "l2cap",
132 "Received acknowledgment for %hhu frames but only %hhu frames are "
133 "pending",
134 n_frames_acked,
135 NumUnackedFrames());
136 connection_failure_callback_(); // May invalidate |self|.
137 return;
138 }
139
140 // Perform Stop-MonitorTimer when "F = 1," per Core Spec v5.0, Vol 3, Part A,
141 // Sec 8.6.5.7–8.
142 if (is_poll_response) {
143 monitor_task_.Cancel();
144 }
145
146 PW_CHECK(!(range_request_.has_value() && single_request_.has_value()));
147 if (ProcessSingleRetransmitRequest(new_seq, is_poll_response) ==
148 UpdateAckSeqAction::kConsumeAckSeq) {
149 return;
150 }
151
152 auto n_frames_to_discard = n_frames_acked;
153 while (n_frames_to_discard) {
154 PW_DCHECK(!pending_pdus_.empty());
155 pending_pdus_.pop_front();
156 --n_frames_to_discard;
157 }
158
159 expected_ack_seq_ = new_seq;
160 if (expected_ack_seq_ == next_tx_seq_) {
161 receiver_ready_poll_task_.Cancel();
162 }
163
164 const auto range_request = std::exchange(range_request_, std::nullopt);
165
166 // RemoteBusy is cleared as the first action to take when receiving a REJ per
167 // Core Spec v5.0 Vol 3, Part A, Sec 8.6.5.9–11, so their corresponding member
168 // variables shouldn't be both set.
169 PW_CHECK(!(range_request.has_value() && remote_is_busy_));
170 bool should_retransmit = range_request.has_value();
171
172 // This implements the logic for RejActioned in the Recv {I,RR,REJ} (F=1)
173 // event for all of the receiver states (Core Spec v5.0 Vol 3, Part A,
174 // Sec 8.6.5.9–11).
175 if (is_poll_response) {
176 if (retransmitted_range_during_poll_) {
177 should_retransmit = false;
178 retransmitted_range_during_poll_ = false;
179 } else {
180 should_retransmit = true;
181 }
182 }
183
184 if (remote_is_busy_) {
185 return;
186 }
187
188 // This implements the logic for PbitOutstanding in the Recv REJ (F=0) event
189 // for all of the receiver states (Core Spec v5.0 Vol 3, Part A,
190 // Sec 8.6.5.9–11).
191 if (range_request.has_value() && !is_poll_response &&
192 monitor_task_.is_pending()) {
193 retransmitted_range_during_poll_ = true;
194 }
195
196 if (should_retransmit) {
197 const bool set_is_poll_response =
198 range_request.value_or(RangeRetransmitRequest{}).is_poll_request;
199 if (!RetransmitUnackedData(std::nullopt, set_is_poll_response)) {
200 return;
201 }
202 }
203
204 MaybeSendQueuedData();
205
206 // TODO(quiche): Restart the receiver_ready_poll_task_, if there's any
207 // remaining unacknowledged data.
208 }
209
UpdateReqSeq(uint8_t new_seq)210 void Engine::UpdateReqSeq(uint8_t new_seq) { req_seqnum_ = new_seq; }
211
ClearRemoteBusy()212 void Engine::ClearRemoteBusy() {
213 // TODO(quiche): Maybe clear backpressure on the Channel (subject to TxWindow
214 // contraints).
215 remote_is_busy_ = false;
216 }
217
SetRemoteBusy()218 void Engine::SetRemoteBusy() {
219 // TODO(fxbug.dev/42086338): Signal backpressure to the Channel.
220 remote_is_busy_ = true;
221 receiver_ready_poll_task_.Cancel();
222 }
223
SetSingleRetransmit(bool is_poll_request)224 void Engine::SetSingleRetransmit(bool is_poll_request) {
225 PW_CHECK(!single_request_.has_value());
226 PW_CHECK(!range_request_.has_value());
227 // Store SREJ state for UpdateAckSeq to handle.
228 single_request_ = SingleRetransmitRequest{.is_poll_request = is_poll_request};
229 }
230
SetRangeRetransmit(bool is_poll_request)231 void Engine::SetRangeRetransmit(bool is_poll_request) {
232 PW_CHECK(!single_request_.has_value());
233 PW_CHECK(!range_request_.has_value());
234 // Store REJ state for UpdateAckSeq to handle.
235 range_request_ = RangeRetransmitRequest{.is_poll_request = is_poll_request};
236 }
237
MaybeSendQueuedData()238 void Engine::MaybeSendQueuedData() {
239 if (remote_is_busy_ || monitor_task_.is_pending()) {
240 return;
241 }
242
243 // Find the first PDU that has not already been transmitted (if any).
244 // * This is not necessarily the first PDU, because that may have been
245 // transmited already, and is just pending acknowledgement.
246 // * This is not necessarily the last PDU, because earlier PDUs may have been
247 // queued without having been sent over-the-air (due, e.g., to tx_window
248 // constraints).
249 //
250 // TODO(quiche): Consider if there's a way to do this that isn't O(n).
251 auto it = std::find_if(
252 pending_pdus_.begin(), pending_pdus_.end(), [](const auto& pending_pdu) {
253 return pending_pdu.tx_count == 0;
254 });
255
256 while (it != pending_pdus_.end() &&
257 NumUnackedFrames() < n_frames_in_tx_window_) {
258 PW_DCHECK(it->tx_count == 0);
259 SendPdu(&*it);
260 last_tx_seq_ = it->buf.To<SimpleInformationFrameHeader>().tx_seq();
261 ++it;
262 }
263 }
264
ProcessSingleRetransmitRequest(uint8_t new_seq,bool is_poll_response)265 Engine::UpdateAckSeqAction Engine::ProcessSingleRetransmitRequest(
266 uint8_t new_seq, bool is_poll_response) {
267 const auto single_request = std::exchange(single_request_, std::nullopt);
268 PW_CHECK(!(single_request.has_value() && remote_is_busy_));
269 if (!single_request.has_value()) {
270 return UpdateAckSeqAction::kDiscardAcknowledged;
271 }
272
273 // This implements the logic for SrejActioned=TRUE in the Recv SREJ (P=0)
274 // (F=1) event for all of the receiver states (Core Spec v5.0 Vol 3, Part A,
275 // Sec 8.6.5.9–11).
276 if (is_poll_response && retransmitted_single_during_poll_.has_value() &&
277 new_seq == *retransmitted_single_during_poll_) {
278 // Only a SREJ (F=1) with a matching AckSeq can clear this state, so if this
279 // duplicate suppression isn't performed it sticks around even when we sent
280 // the next poll request. AckSeq is modulo kMaxSeqNum + 1 so it'll roll over
281 // and potentially suppress retransmission of a different (non-duplicate)
282 // packet in a later poll request-response cycle. Unfortunately this isn't
283 // fixed as of Core Spec v5.2 so it's implemented as written.
284 retransmitted_single_during_poll_.reset();
285
286 // Return early to suppress duplicate retransmission.
287 return UpdateAckSeqAction::kConsumeAckSeq;
288 }
289
290 // This implements the logic for PbitOutstanding in the Recv SREJ (P=0) (F=0)
291 // and Recv SREJ(P=1) events for all of the receiver states (Core Spec v5.0
292 // Vol 3, Part A, Sec 8.6.5.9–11).
293 if (!is_poll_response && monitor_task_.is_pending()) {
294 retransmitted_single_during_poll_ = new_seq;
295 }
296
297 if (!RetransmitUnackedData(new_seq, single_request->is_poll_request)) {
298 return UpdateAckSeqAction::kConsumeAckSeq;
299 }
300
301 // Only "single requests" that are poll requests acknowledge previous I-Frames
302 // and cause initial transmission of queued SDUs, per Core Spec v5.0, Vol 3,
303 // Part A, Sec 8.6.1.4.
304 if (single_request->is_poll_request) {
305 return UpdateAckSeqAction::kDiscardAcknowledged;
306 }
307 return UpdateAckSeqAction::kConsumeAckSeq;
308 }
309
StartReceiverReadyPollTimer()310 void Engine::StartReceiverReadyPollTimer() {
311 PW_DCHECK(!monitor_task_.is_pending());
312 n_receiver_ready_polls_sent_ = 0;
313 receiver_ready_poll_task_.Cancel();
314 receiver_ready_poll_task_.PostAfter(kErtmReceiverReadyPollTimerDuration);
315 }
316
StartMonitorTimer()317 void Engine::StartMonitorTimer() {
318 PW_DCHECK(!receiver_ready_poll_task_.is_pending());
319 monitor_task_.Cancel();
320 monitor_task_.PostAfter(kErtmMonitorTimerDuration);
321 }
322
SendReceiverReadyPoll()323 void Engine::SendReceiverReadyPoll() {
324 SimpleReceiverReadyFrame frame;
325 frame.set_receive_seq_num(req_seqnum_);
326 frame.set_is_poll_request();
327 ++n_receiver_ready_polls_sent_;
328 PW_CHECK(max_transmissions_ == 0 ||
329 n_receiver_ready_polls_sent_ <= max_transmissions_,
330 "(n_receiver_ready_polls_sent_ = %u, "
331 "max_transmissions = %u)",
332 n_receiver_ready_polls_sent_,
333 max_transmissions_);
334 channel().SendFrame(
335 std::make_unique<DynamicByteBuffer>(BufferView(&frame, sizeof(frame))));
336 }
337
GetNextTxSeq()338 uint8_t Engine::GetNextTxSeq() {
339 auto ret = next_tx_seq_;
340 ++next_tx_seq_;
341 if (next_tx_seq_ > EnhancedControlField::kMaxSeqNum) {
342 next_tx_seq_ = 0;
343 }
344 return ret;
345 }
346
NumUnackedFrames()347 uint8_t Engine::NumUnackedFrames() {
348 if (pending_pdus_.empty()) {
349 // Initially, |ack_seqnum_ == last_tx_seq_ == 0|, but the number of
350 // unacknowledged frames is 0, not 1.
351 return 0;
352 } else if (pending_pdus_.front().tx_count == 0) {
353 // While we have some data queued, none of that data has been sent
354 // over-the-air. This might happen, e.g., transiently in QueueSdu().
355 return 0;
356 } else {
357 // Having ascertained that some data _is_ in flight, the number of frames in
358 // flight is given by the expression below.
359 return NumFramesBetween(
360 expected_ack_seq_,
361 last_tx_seq_ + 1 // Include frame with |last_tx_seq_| in count
362 );
363 }
364 }
365
SendPdu(PendingPdu * pdu)366 void Engine::SendPdu(PendingPdu* pdu) {
367 PW_DCHECK(pdu);
368 pdu->buf.AsMutable<SimpleInformationFrameHeader>()->set_receive_seq_num(
369 req_seqnum_);
370
371 // Prevent tx_count from overflowing to zero, as that would be
372 // indistinguishable from "never transmitted." This is only possible when
373 // configured for infinite retransmissions, so there is no benefit to having
374 // an accurate tx_count after each frame's initial transmission.
375 if (pdu->tx_count != std::numeric_limits<decltype(pdu->tx_count)>::max()) {
376 pdu->tx_count++;
377 }
378 StartReceiverReadyPollTimer();
379 channel().SendFrame(std::make_unique<DynamicByteBuffer>(pdu->buf));
380 }
381
RetransmitUnackedData(std::optional<uint8_t> only_with_seq,bool set_is_poll_response)382 bool Engine::RetransmitUnackedData(std::optional<uint8_t> only_with_seq,
383 bool set_is_poll_response) {
384 // The receive engine should have cleared the remote busy condition before
385 // calling any method that would cause us (the transmit engine) to retransmit
386 // unacked data. See, e.g., Core Spec v5.0, Volume 3, Part A, Table 8.6, row
387 // "Recv REJ (F=0)".
388 PW_DCHECK(!remote_is_busy_);
389
390 // Any peer actions that cause retransmission indicate the peer is alive. This
391 // is in conflict with Core Spec v5.0, Vol 3, Part A, Sec 8.6.5.8, which only
392 // stops the MonitorTimer when a poll response is received and omits what to
393 // do when REJ or SREJ cause retransmission. However, this behavior of
394 // canceling the MonitorTimer "early" is in line with Sequence Diagram
395 // Fig 4.94, L2CAP Test Spec v5.0.2 Section 4.9.7.24, among others that show
396 // REJ or SREJ causing the receiver to cancel its MonitorTimer. We follow the
397 // latter behavior because (1) it's less ambiguous (2) it makes sense and (3)
398 // we need to pass those tests.
399 monitor_task_.Cancel();
400
401 const auto n_to_send = NumUnackedFrames();
402 PW_CHECK(n_to_send <= n_frames_in_tx_window_);
403 PW_DCHECK(n_to_send <= pending_pdus_.size());
404
405 auto cur_frame = pending_pdus_.begin();
406 auto last_frame = std::next(cur_frame, n_to_send);
407 for (; cur_frame != last_frame; cur_frame++) {
408 PW_DCHECK(cur_frame != pending_pdus_.end());
409
410 const auto control_field =
411 cur_frame->buf.To<SimpleInformationFrameHeader>();
412 if (only_with_seq.has_value() && control_field.tx_seq() != *only_with_seq) {
413 continue;
414 }
415
416 // Core Spec v5.0, Vol 3, Part A, Sec 5.4: "In Enhanced Retransmission mode
417 // a value of zero for MaxTransmit means infinite retransmissions."
418 if (max_transmissions_ != 0 && cur_frame->tx_count >= max_transmissions_) {
419 PW_CHECK(cur_frame->tx_count == max_transmissions_,
420 "%hhu != %hhu",
421 cur_frame->tx_count,
422 max_transmissions_);
423 connection_failure_callback_();
424 return false;
425 }
426
427 if (set_is_poll_response) {
428 cur_frame->buf.AsMutable<EnhancedControlField>()->set_is_poll_response();
429
430 // Per "Retransmit-I-frames" of Core Spec v5.0 Vol 3, Part A, Sec 8.6.5.6,
431 // "the F-bit of all other [than the first] unacknowledged I-frames sent
432 // shall be 0," so clear this for subsequent iterations.
433 set_is_poll_response = false;
434 }
435
436 // TODO(fxbug.dev/42087625): If the task is already running, we should not
437 // restart it.
438 SendPdu(&*cur_frame);
439 *cur_frame->buf.AsMutable<EnhancedControlField>() = control_field;
440 }
441
442 return true;
443 }
444
445 } // namespace bt::l2cap::internal
446