1 use std::time::Duration;
2 
3 use log::{trace, warn};
4 use tokio::{
5     sync::mpsc::{self, error::TrySendError},
6     time::timeout,
7 };
8 
9 use crate::{gatt::ids::AttHandle, packets::att};
10 
11 use super::{
12     att_database::{AttDatabase, StableAttDatabase},
13     att_server_bearer::SendError,
14 };
15 
16 #[derive(Debug)]
17 /// Errors that can occur while sending an indication
18 pub enum IndicationError {
19     /// The provided data exceeds the MTU limitations
20     DataExceedsMtu {
21         /// The actual max payload size permitted
22         /// (ATT_MTU - 3, since 3 bytes are needed for the header)
23         mtu: usize,
24     },
25     /// The indicated attribute handle does not exist
26     AttributeNotFound,
27     /// The indicated attribute does not support indications
28     IndicationsNotSupported,
29     /// Failed to send the outgoing indication packet
30     SendError(SendError),
31     /// Did not receive a confirmation in the given time (30s)
32     ConfirmationTimeout,
33     /// The connection was dropped while waiting for a confirmation
34     ConnectionDroppedWhileWaitingForConfirmation,
35 }
36 
37 pub struct IndicationHandler<T> {
38     db: T,
39     pending_confirmation: mpsc::Receiver<()>,
40 }
41 
42 impl<T: AttDatabase> IndicationHandler<T> {
new(db: T) -> (Self, ConfirmationWatcher)43     pub fn new(db: T) -> (Self, ConfirmationWatcher) {
44         let (tx, rx) = mpsc::channel(1);
45         (Self { db, pending_confirmation: rx }, ConfirmationWatcher(tx))
46     }
47 
send( &mut self, handle: AttHandle, data: &[u8], mtu: usize, send_packet: impl FnOnce(att::Att) -> Result<(), SendError>, ) -> Result<(), IndicationError>48     pub async fn send(
49         &mut self,
50         handle: AttHandle,
51         data: &[u8],
52         mtu: usize,
53         send_packet: impl FnOnce(att::Att) -> Result<(), SendError>,
54     ) -> Result<(), IndicationError> {
55         let data_size = data.len();
56         // As per Core Spec 5.3 Vol 3F 3.4.7.2, the indicated value must be at most
57         // ATT_MTU-3
58         if data_size > (mtu - 3) {
59             return Err(IndicationError::DataExceedsMtu { mtu: mtu - 3 });
60         }
61 
62         if !self
63             .db
64             .snapshot()
65             .find_attribute(handle)
66             .ok_or(IndicationError::AttributeNotFound)?
67             .permissions
68             .indicate()
69         {
70             warn!("cannot send indication for {handle:?} since it does not support indications");
71             return Err(IndicationError::IndicationsNotSupported);
72         }
73 
74         // flushing any confirmations that arrived before we sent the next indication
75         let _ = self.pending_confirmation.try_recv();
76 
77         send_packet(
78             att::AttHandleValueIndication { handle: handle.into(), value: data.to_vec() }
79                 .try_into()
80                 .unwrap(),
81         )
82         .map_err(IndicationError::SendError)?;
83 
84         match timeout(Duration::from_secs(30), self.pending_confirmation.recv()).await {
85             Ok(Some(())) => Ok(()),
86             Ok(None) => {
87                 warn!("connection dropped while waiting for indication confirmation");
88                 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation)
89             }
90             Err(_) => {
91                 warn!("Sent indication but received no response for 30s");
92                 Err(IndicationError::ConfirmationTimeout)
93             }
94         }
95     }
96 }
97 
98 pub struct ConfirmationWatcher(mpsc::Sender<()>);
99 
100 impl ConfirmationWatcher {
on_confirmation(&self)101     pub fn on_confirmation(&self) {
102         match self.0.try_send(()) {
103             Ok(_) => {
104                 trace!("Got AttHandleValueConfirmation")
105             }
106             Err(TrySendError::Full(_)) => {
107                 warn!("Got a second AttHandleValueConfirmation before the first was processed, dropping it")
108             }
109             Err(TrySendError::Closed(_)) => {
110                 warn!("Got an AttHandleValueConfirmation while no indications are outstanding, dropping it")
111             }
112         }
113     }
114 }
115 
116 #[cfg(test)]
117 mod test {
118     use crate::packets::att;
119     use tokio::{sync::oneshot, task::spawn_local, time::Instant};
120 
121     use crate::{
122         core::uuid::Uuid,
123         gatt::server::{
124             att_database::AttAttribute, gatt_database::AttPermissions,
125             test::test_att_db::TestAttDatabase,
126         },
127         utils::task::block_on_locally,
128     };
129 
130     use super::*;
131 
132     const HANDLE: AttHandle = AttHandle(1);
133     const NONEXISTENT_HANDLE: AttHandle = AttHandle(2);
134     const NON_INDICATE_HANDLE: AttHandle = AttHandle(3);
135     const MTU: usize = 32;
136     const DATA: [u8; 3] = [1, 2, 3];
137 
get_att_database() -> TestAttDatabase138     fn get_att_database() -> TestAttDatabase {
139         TestAttDatabase::new(vec![
140             (
141                 AttAttribute {
142                     handle: HANDLE,
143                     type_: Uuid::new(123),
144                     permissions: AttPermissions::INDICATE,
145                 },
146                 vec![],
147             ),
148             (
149                 AttAttribute {
150                     handle: NON_INDICATE_HANDLE,
151                     type_: Uuid::new(123),
152                     permissions: AttPermissions::READABLE,
153                 },
154                 vec![],
155             ),
156         ])
157     }
158 
159     #[test]
test_indication_sent()160     fn test_indication_sent() {
161         block_on_locally(async move {
162             // arrange
163             let (mut indication_handler, _confirmation_watcher) =
164                 IndicationHandler::new(get_att_database());
165             let (tx, rx) = oneshot::channel();
166 
167             // act: send an indication
168             spawn_local(async move {
169                 indication_handler
170                     .send(HANDLE, &DATA, MTU, move |packet| {
171                         tx.send(packet).unwrap();
172                         Ok(())
173                     })
174                     .await
175             });
176 
177             // assert: that an AttHandleValueIndication was sent on the channel
178             let indication = rx.await.unwrap();
179             assert_eq!(
180                 Ok(indication),
181                 att::AttHandleValueIndication { handle: HANDLE.into(), value: DATA.to_vec() }
182                     .try_into()
183             );
184         });
185     }
186 
187     #[test]
test_invalid_handle()188     fn test_invalid_handle() {
189         block_on_locally(async move {
190             // arrange
191             let (mut indication_handler, _confirmation_watcher) =
192                 IndicationHandler::new(get_att_database());
193 
194             // act: send an indication on a nonexistent handle
195             let ret = indication_handler
196                 .send(NONEXISTENT_HANDLE, &DATA, MTU, move |_| unreachable!())
197                 .await;
198 
199             // assert: that we failed with IndicationError::AttributeNotFound
200             assert!(matches!(ret, Err(IndicationError::AttributeNotFound)));
201         });
202     }
203 
204     #[test]
test_unsupported_permission()205     fn test_unsupported_permission() {
206         block_on_locally(async move {
207             // arrange
208             let (mut indication_handler, _confirmation_watcher) =
209                 IndicationHandler::new(get_att_database());
210 
211             // act: send an indication on an attribute that does not support indications
212             let ret = indication_handler
213                 .send(NON_INDICATE_HANDLE, &DATA, MTU, move |_| unreachable!())
214                 .await;
215 
216             // assert: that we failed with IndicationError::IndicationsNotSupported
217             assert!(matches!(ret, Err(IndicationError::IndicationsNotSupported)));
218         });
219     }
220 
221     #[test]
test_confirmation_handled()222     fn test_confirmation_handled() {
223         block_on_locally(async move {
224             // arrange
225             let (mut indication_handler, confirmation_watcher) =
226                 IndicationHandler::new(get_att_database());
227             let (tx, rx) = oneshot::channel();
228 
229             // act: send an indication
230             let pending_result = spawn_local(async move {
231                 indication_handler
232                     .send(HANDLE, &DATA, MTU, move |packet| {
233                         tx.send(packet).unwrap();
234                         Ok(())
235                     })
236                     .await
237             });
238             // when the indication is sent, send a confirmation in response
239             rx.await.unwrap();
240             confirmation_watcher.on_confirmation();
241 
242             // assert: the indication was successfully sent
243             assert!(matches!(pending_result.await.unwrap(), Ok(())));
244         });
245     }
246 
247     #[test]
test_unblock_on_disconnect()248     fn test_unblock_on_disconnect() {
249         block_on_locally(async move {
250             // arrange
251             let (mut indication_handler, confirmation_watcher) =
252                 IndicationHandler::new(get_att_database());
253             let (tx, rx) = oneshot::channel();
254 
255             // act: send an indication
256             let pending_result = spawn_local(async move {
257                 indication_handler
258                     .send(HANDLE, &DATA, MTU, move |packet| {
259                         tx.send(packet).unwrap();
260                         Ok(())
261                     })
262                     .await
263             });
264             // when the indication is sent, drop the confirmation watcher (as would happen
265             // upon a disconnection)
266             rx.await.unwrap();
267             drop(confirmation_watcher);
268 
269             // assert: we get the appropriate error
270             assert!(matches!(
271                 pending_result.await.unwrap(),
272                 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation)
273             ));
274         });
275     }
276 
277     #[test]
test_spurious_confirmations()278     fn test_spurious_confirmations() {
279         block_on_locally(async move {
280             // arrange: send a few confirmations in advance
281             let (mut indication_handler, confirmation_watcher) =
282                 IndicationHandler::new(get_att_database());
283             let (tx, rx) = oneshot::channel();
284             confirmation_watcher.on_confirmation();
285             confirmation_watcher.on_confirmation();
286 
287             // act: send an indication
288             let pending_result = spawn_local(async move {
289                 indication_handler
290                     .send(HANDLE, &DATA, MTU, move |packet| {
291                         tx.send(packet).unwrap();
292                         Ok(())
293                     })
294                     .await
295             });
296             // when the indication is sent, drop the confirmation watcher (so we won't block
297             // forever)
298             rx.await.unwrap();
299             drop(confirmation_watcher);
300 
301             // assert: we get the appropriate error, rather than an Ok(())
302             // (which would have been the case if we had processed the spurious
303             // confirmations)
304             assert!(matches!(
305                 pending_result.await.unwrap(),
306                 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation)
307             ));
308         });
309     }
310 
311     #[test]
test_indication_timeout()312     fn test_indication_timeout() {
313         block_on_locally(async move {
314             // arrange: send a few confirmations in advance
315             let (mut indication_handler, confirmation_watcher) =
316                 IndicationHandler::new(get_att_database());
317             let (tx, rx) = oneshot::channel();
318             confirmation_watcher.on_confirmation();
319             confirmation_watcher.on_confirmation();
320 
321             // act: send an indication
322             let time_sent = Instant::now();
323             let pending_result = spawn_local(async move {
324                 indication_handler
325                     .send(HANDLE, &DATA, MTU, move |packet| {
326                         tx.send(packet).unwrap();
327                         Ok(())
328                     })
329                     .await
330             });
331             // after it is sent, wait for the timer to fire
332             rx.await.unwrap();
333 
334             // assert: we get the appropriate error
335             assert!(matches!(
336                 pending_result.await.unwrap(),
337                 Err(IndicationError::ConfirmationTimeout)
338             ));
339             // after the appropriate interval
340             // note: this is not really timing-dependent, since we are using a simulated
341             // clock TODO(aryarahul) - why is this not exactly 30s?
342             let time_slept = Instant::now().duration_since(time_sent);
343             assert!(time_slept > Duration::from_secs(29));
344             assert!(time_slept < Duration::from_secs(31));
345         });
346     }
347 
348     #[test]
test_mtu_exceeds()349     fn test_mtu_exceeds() {
350         block_on_locally(async move {
351             // arrange
352             let (mut indication_handler, _confirmation_watcher) =
353                 IndicationHandler::new(get_att_database());
354 
355             // act: send an indication with an ATT_MTU of 4 and data length of 3
356             let res = indication_handler.send(HANDLE, &DATA, 4, move |_| unreachable!()).await;
357 
358             // assert: that we got the expected error, indicating the max data size (not the
359             // ATT_MTU, but ATT_MTU-3)
360             assert!(matches!(res, Err(IndicationError::DataExceedsMtu { mtu: 1 })));
361         });
362     }
363 }
364