1 use std::time::Duration; 2 3 use log::{trace, warn}; 4 use tokio::{ 5 sync::mpsc::{self, error::TrySendError}, 6 time::timeout, 7 }; 8 9 use crate::{gatt::ids::AttHandle, packets::att}; 10 11 use super::{ 12 att_database::{AttDatabase, StableAttDatabase}, 13 att_server_bearer::SendError, 14 }; 15 16 #[derive(Debug)] 17 /// Errors that can occur while sending an indication 18 pub enum IndicationError { 19 /// The provided data exceeds the MTU limitations 20 DataExceedsMtu { 21 /// The actual max payload size permitted 22 /// (ATT_MTU - 3, since 3 bytes are needed for the header) 23 mtu: usize, 24 }, 25 /// The indicated attribute handle does not exist 26 AttributeNotFound, 27 /// The indicated attribute does not support indications 28 IndicationsNotSupported, 29 /// Failed to send the outgoing indication packet 30 SendError(SendError), 31 /// Did not receive a confirmation in the given time (30s) 32 ConfirmationTimeout, 33 /// The connection was dropped while waiting for a confirmation 34 ConnectionDroppedWhileWaitingForConfirmation, 35 } 36 37 pub struct IndicationHandler<T> { 38 db: T, 39 pending_confirmation: mpsc::Receiver<()>, 40 } 41 42 impl<T: AttDatabase> IndicationHandler<T> { new(db: T) -> (Self, ConfirmationWatcher)43 pub fn new(db: T) -> (Self, ConfirmationWatcher) { 44 let (tx, rx) = mpsc::channel(1); 45 (Self { db, pending_confirmation: rx }, ConfirmationWatcher(tx)) 46 } 47 send( &mut self, handle: AttHandle, data: &[u8], mtu: usize, send_packet: impl FnOnce(att::Att) -> Result<(), SendError>, ) -> Result<(), IndicationError>48 pub async fn send( 49 &mut self, 50 handle: AttHandle, 51 data: &[u8], 52 mtu: usize, 53 send_packet: impl FnOnce(att::Att) -> Result<(), SendError>, 54 ) -> Result<(), IndicationError> { 55 let data_size = data.len(); 56 // As per Core Spec 5.3 Vol 3F 3.4.7.2, the indicated value must be at most 57 // ATT_MTU-3 58 if data_size > (mtu - 3) { 59 return Err(IndicationError::DataExceedsMtu { mtu: mtu - 3 }); 60 } 61 62 if !self 63 .db 64 .snapshot() 65 .find_attribute(handle) 66 .ok_or(IndicationError::AttributeNotFound)? 67 .permissions 68 .indicate() 69 { 70 warn!("cannot send indication for {handle:?} since it does not support indications"); 71 return Err(IndicationError::IndicationsNotSupported); 72 } 73 74 // flushing any confirmations that arrived before we sent the next indication 75 let _ = self.pending_confirmation.try_recv(); 76 77 send_packet( 78 att::AttHandleValueIndication { handle: handle.into(), value: data.to_vec() } 79 .try_into() 80 .unwrap(), 81 ) 82 .map_err(IndicationError::SendError)?; 83 84 match timeout(Duration::from_secs(30), self.pending_confirmation.recv()).await { 85 Ok(Some(())) => Ok(()), 86 Ok(None) => { 87 warn!("connection dropped while waiting for indication confirmation"); 88 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation) 89 } 90 Err(_) => { 91 warn!("Sent indication but received no response for 30s"); 92 Err(IndicationError::ConfirmationTimeout) 93 } 94 } 95 } 96 } 97 98 pub struct ConfirmationWatcher(mpsc::Sender<()>); 99 100 impl ConfirmationWatcher { on_confirmation(&self)101 pub fn on_confirmation(&self) { 102 match self.0.try_send(()) { 103 Ok(_) => { 104 trace!("Got AttHandleValueConfirmation") 105 } 106 Err(TrySendError::Full(_)) => { 107 warn!("Got a second AttHandleValueConfirmation before the first was processed, dropping it") 108 } 109 Err(TrySendError::Closed(_)) => { 110 warn!("Got an AttHandleValueConfirmation while no indications are outstanding, dropping it") 111 } 112 } 113 } 114 } 115 116 #[cfg(test)] 117 mod test { 118 use crate::packets::att; 119 use tokio::{sync::oneshot, task::spawn_local, time::Instant}; 120 121 use crate::{ 122 core::uuid::Uuid, 123 gatt::server::{ 124 att_database::AttAttribute, gatt_database::AttPermissions, 125 test::test_att_db::TestAttDatabase, 126 }, 127 utils::task::block_on_locally, 128 }; 129 130 use super::*; 131 132 const HANDLE: AttHandle = AttHandle(1); 133 const NONEXISTENT_HANDLE: AttHandle = AttHandle(2); 134 const NON_INDICATE_HANDLE: AttHandle = AttHandle(3); 135 const MTU: usize = 32; 136 const DATA: [u8; 3] = [1, 2, 3]; 137 get_att_database() -> TestAttDatabase138 fn get_att_database() -> TestAttDatabase { 139 TestAttDatabase::new(vec![ 140 ( 141 AttAttribute { 142 handle: HANDLE, 143 type_: Uuid::new(123), 144 permissions: AttPermissions::INDICATE, 145 }, 146 vec![], 147 ), 148 ( 149 AttAttribute { 150 handle: NON_INDICATE_HANDLE, 151 type_: Uuid::new(123), 152 permissions: AttPermissions::READABLE, 153 }, 154 vec![], 155 ), 156 ]) 157 } 158 159 #[test] test_indication_sent()160 fn test_indication_sent() { 161 block_on_locally(async move { 162 // arrange 163 let (mut indication_handler, _confirmation_watcher) = 164 IndicationHandler::new(get_att_database()); 165 let (tx, rx) = oneshot::channel(); 166 167 // act: send an indication 168 spawn_local(async move { 169 indication_handler 170 .send(HANDLE, &DATA, MTU, move |packet| { 171 tx.send(packet).unwrap(); 172 Ok(()) 173 }) 174 .await 175 }); 176 177 // assert: that an AttHandleValueIndication was sent on the channel 178 let indication = rx.await.unwrap(); 179 assert_eq!( 180 Ok(indication), 181 att::AttHandleValueIndication { handle: HANDLE.into(), value: DATA.to_vec() } 182 .try_into() 183 ); 184 }); 185 } 186 187 #[test] test_invalid_handle()188 fn test_invalid_handle() { 189 block_on_locally(async move { 190 // arrange 191 let (mut indication_handler, _confirmation_watcher) = 192 IndicationHandler::new(get_att_database()); 193 194 // act: send an indication on a nonexistent handle 195 let ret = indication_handler 196 .send(NONEXISTENT_HANDLE, &DATA, MTU, move |_| unreachable!()) 197 .await; 198 199 // assert: that we failed with IndicationError::AttributeNotFound 200 assert!(matches!(ret, Err(IndicationError::AttributeNotFound))); 201 }); 202 } 203 204 #[test] test_unsupported_permission()205 fn test_unsupported_permission() { 206 block_on_locally(async move { 207 // arrange 208 let (mut indication_handler, _confirmation_watcher) = 209 IndicationHandler::new(get_att_database()); 210 211 // act: send an indication on an attribute that does not support indications 212 let ret = indication_handler 213 .send(NON_INDICATE_HANDLE, &DATA, MTU, move |_| unreachable!()) 214 .await; 215 216 // assert: that we failed with IndicationError::IndicationsNotSupported 217 assert!(matches!(ret, Err(IndicationError::IndicationsNotSupported))); 218 }); 219 } 220 221 #[test] test_confirmation_handled()222 fn test_confirmation_handled() { 223 block_on_locally(async move { 224 // arrange 225 let (mut indication_handler, confirmation_watcher) = 226 IndicationHandler::new(get_att_database()); 227 let (tx, rx) = oneshot::channel(); 228 229 // act: send an indication 230 let pending_result = spawn_local(async move { 231 indication_handler 232 .send(HANDLE, &DATA, MTU, move |packet| { 233 tx.send(packet).unwrap(); 234 Ok(()) 235 }) 236 .await 237 }); 238 // when the indication is sent, send a confirmation in response 239 rx.await.unwrap(); 240 confirmation_watcher.on_confirmation(); 241 242 // assert: the indication was successfully sent 243 assert!(matches!(pending_result.await.unwrap(), Ok(()))); 244 }); 245 } 246 247 #[test] test_unblock_on_disconnect()248 fn test_unblock_on_disconnect() { 249 block_on_locally(async move { 250 // arrange 251 let (mut indication_handler, confirmation_watcher) = 252 IndicationHandler::new(get_att_database()); 253 let (tx, rx) = oneshot::channel(); 254 255 // act: send an indication 256 let pending_result = spawn_local(async move { 257 indication_handler 258 .send(HANDLE, &DATA, MTU, move |packet| { 259 tx.send(packet).unwrap(); 260 Ok(()) 261 }) 262 .await 263 }); 264 // when the indication is sent, drop the confirmation watcher (as would happen 265 // upon a disconnection) 266 rx.await.unwrap(); 267 drop(confirmation_watcher); 268 269 // assert: we get the appropriate error 270 assert!(matches!( 271 pending_result.await.unwrap(), 272 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation) 273 )); 274 }); 275 } 276 277 #[test] test_spurious_confirmations()278 fn test_spurious_confirmations() { 279 block_on_locally(async move { 280 // arrange: send a few confirmations in advance 281 let (mut indication_handler, confirmation_watcher) = 282 IndicationHandler::new(get_att_database()); 283 let (tx, rx) = oneshot::channel(); 284 confirmation_watcher.on_confirmation(); 285 confirmation_watcher.on_confirmation(); 286 287 // act: send an indication 288 let pending_result = spawn_local(async move { 289 indication_handler 290 .send(HANDLE, &DATA, MTU, move |packet| { 291 tx.send(packet).unwrap(); 292 Ok(()) 293 }) 294 .await 295 }); 296 // when the indication is sent, drop the confirmation watcher (so we won't block 297 // forever) 298 rx.await.unwrap(); 299 drop(confirmation_watcher); 300 301 // assert: we get the appropriate error, rather than an Ok(()) 302 // (which would have been the case if we had processed the spurious 303 // confirmations) 304 assert!(matches!( 305 pending_result.await.unwrap(), 306 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation) 307 )); 308 }); 309 } 310 311 #[test] test_indication_timeout()312 fn test_indication_timeout() { 313 block_on_locally(async move { 314 // arrange: send a few confirmations in advance 315 let (mut indication_handler, confirmation_watcher) = 316 IndicationHandler::new(get_att_database()); 317 let (tx, rx) = oneshot::channel(); 318 confirmation_watcher.on_confirmation(); 319 confirmation_watcher.on_confirmation(); 320 321 // act: send an indication 322 let time_sent = Instant::now(); 323 let pending_result = spawn_local(async move { 324 indication_handler 325 .send(HANDLE, &DATA, MTU, move |packet| { 326 tx.send(packet).unwrap(); 327 Ok(()) 328 }) 329 .await 330 }); 331 // after it is sent, wait for the timer to fire 332 rx.await.unwrap(); 333 334 // assert: we get the appropriate error 335 assert!(matches!( 336 pending_result.await.unwrap(), 337 Err(IndicationError::ConfirmationTimeout) 338 )); 339 // after the appropriate interval 340 // note: this is not really timing-dependent, since we are using a simulated 341 // clock TODO(aryarahul) - why is this not exactly 30s? 342 let time_slept = Instant::now().duration_since(time_sent); 343 assert!(time_slept > Duration::from_secs(29)); 344 assert!(time_slept < Duration::from_secs(31)); 345 }); 346 } 347 348 #[test] test_mtu_exceeds()349 fn test_mtu_exceeds() { 350 block_on_locally(async move { 351 // arrange 352 let (mut indication_handler, _confirmation_watcher) = 353 IndicationHandler::new(get_att_database()); 354 355 // act: send an indication with an ATT_MTU of 4 and data length of 3 356 let res = indication_handler.send(HANDLE, &DATA, 4, move |_| unreachable!()).await; 357 358 // assert: that we got the expected error, indicating the max data size (not the 359 // ATT_MTU, but ATT_MTU-3) 360 assert!(matches!(res, Err(IndicationError::DataExceedsMtu { mtu: 1 }))); 361 }); 362 } 363 } 364