1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19import asyncio 20import functools 21from bumble import att, gatt, gatt_client 22from bumble.core import InvalidArgumentError, InvalidStateError 23from bumble.device import Device, Connection 24from bumble.utils import AsyncRunner, OpenIntEnum 25from bumble.hci import Address 26from dataclasses import dataclass, field 27import logging 28from typing import Any, Dict, List, Optional, Set, Union 29 30 31# ----------------------------------------------------------------------------- 32# Constants 33# ----------------------------------------------------------------------------- 34class ErrorCode(OpenIntEnum): 35 '''See Hearing Access Service 2.4. Attribute Profile error codes.''' 36 37 INVALID_OPCODE = 0x80 38 WRITE_NAME_NOT_ALLOWED = 0x81 39 PRESET_SYNCHRONIZATION_NOT_SUPPORTED = 0x82 40 PRESET_OPERATION_NOT_POSSIBLE = 0x83 41 INVALID_PARAMETERS_LENGTH = 0x84 42 43 44class HearingAidType(OpenIntEnum): 45 '''See Hearing Access Service 3.1. Hearing Aid Features.''' 46 47 BINAURAL_HEARING_AID = 0b00 48 MONAURAL_HEARING_AID = 0b01 49 BANDED_HEARING_AID = 0b10 50 51 52class PresetSynchronizationSupport(OpenIntEnum): 53 '''See Hearing Access Service 3.1. Hearing Aid Features.''' 54 55 PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED = 0b0 56 PRESET_SYNCHRONIZATION_IS_SUPPORTED = 0b1 57 58 59class IndependentPresets(OpenIntEnum): 60 '''See Hearing Access Service 3.1. Hearing Aid Features.''' 61 62 IDENTICAL_PRESET_RECORD = 0b0 63 DIFFERENT_PRESET_RECORD = 0b1 64 65 66class DynamicPresets(OpenIntEnum): 67 '''See Hearing Access Service 3.1. Hearing Aid Features.''' 68 69 PRESET_RECORDS_DOES_NOT_CHANGE = 0b0 70 PRESET_RECORDS_MAY_CHANGE = 0b1 71 72 73class WritablePresetsSupport(OpenIntEnum): 74 '''See Hearing Access Service 3.1. Hearing Aid Features.''' 75 76 WRITABLE_PRESET_RECORDS_NOT_SUPPORTED = 0b0 77 WRITABLE_PRESET_RECORDS_SUPPORTED = 0b1 78 79 80class HearingAidPresetControlPointOpcode(OpenIntEnum): 81 '''See Hearing Access Service 3.3.1 Hearing Aid Preset Control Point operation requirements.''' 82 83 # fmt: off 84 READ_PRESETS_REQUEST = 0x01 85 READ_PRESET_RESPONSE = 0x02 86 PRESET_CHANGED = 0x03 87 WRITE_PRESET_NAME = 0x04 88 SET_ACTIVE_PRESET = 0x05 89 SET_NEXT_PRESET = 0x06 90 SET_PREVIOUS_PRESET = 0x07 91 SET_ACTIVE_PRESET_SYNCHRONIZED_LOCALLY = 0x08 92 SET_NEXT_PRESET_SYNCHRONIZED_LOCALLY = 0x09 93 SET_PREVIOUS_PRESET_SYNCHRONIZED_LOCALLY = 0x0A 94 95 96@dataclass 97class HearingAidFeatures: 98 '''See Hearing Access Service 3.1. Hearing Aid Features.''' 99 100 hearing_aid_type: HearingAidType 101 preset_synchronization_support: PresetSynchronizationSupport 102 independent_presets: IndependentPresets 103 dynamic_presets: DynamicPresets 104 writable_presets_support: WritablePresetsSupport 105 106 def __bytes__(self) -> bytes: 107 return bytes( 108 [ 109 (self.hearing_aid_type << 0) 110 | (self.preset_synchronization_support << 2) 111 | (self.independent_presets << 3) 112 | (self.dynamic_presets << 4) 113 | (self.writable_presets_support << 5) 114 ] 115 ) 116 117 118def HearingAidFeatures_from_bytes(data: int) -> HearingAidFeatures: 119 return HearingAidFeatures( 120 HearingAidType(data & 0b11), 121 PresetSynchronizationSupport(data >> 2 & 0b1), 122 IndependentPresets(data >> 3 & 0b1), 123 DynamicPresets(data >> 4 & 0b1), 124 WritablePresetsSupport(data >> 5 & 0b1), 125 ) 126 127 128@dataclass 129class PresetChangedOperation: 130 '''See Hearing Access Service 3.2.2.2. Preset Changed operation.''' 131 132 class ChangeId(OpenIntEnum): 133 # fmt: off 134 GENERIC_UPDATE = 0x00 135 PRESET_RECORD_DELETED = 0x01 136 PRESET_RECORD_AVAILABLE = 0x02 137 PRESET_RECORD_UNAVAILABLE = 0x03 138 139 @dataclass 140 class Generic: 141 prev_index: int 142 preset_record: PresetRecord 143 144 def __bytes__(self) -> bytes: 145 return bytes([self.prev_index]) + bytes(self.preset_record) 146 147 change_id: ChangeId 148 additional_parameters: Union[Generic, int] 149 150 def to_bytes(self, is_last: bool) -> bytes: 151 if isinstance(self.additional_parameters, PresetChangedOperation.Generic): 152 additional_parameters_bytes = bytes(self.additional_parameters) 153 else: 154 additional_parameters_bytes = bytes([self.additional_parameters]) 155 156 return ( 157 bytes( 158 [ 159 HearingAidPresetControlPointOpcode.PRESET_CHANGED, 160 self.change_id, 161 is_last, 162 ] 163 ) 164 + additional_parameters_bytes 165 ) 166 167 168class PresetChangedOperationDeleted(PresetChangedOperation): 169 def __init__(self, index) -> None: 170 self.change_id = PresetChangedOperation.ChangeId.PRESET_RECORD_DELETED 171 self.additional_parameters = index 172 173 174class PresetChangedOperationAvailable(PresetChangedOperation): 175 def __init__(self, index) -> None: 176 self.change_id = PresetChangedOperation.ChangeId.PRESET_RECORD_AVAILABLE 177 self.additional_parameters = index 178 179 180class PresetChangedOperationUnavailable(PresetChangedOperation): 181 def __init__(self, index) -> None: 182 self.change_id = PresetChangedOperation.ChangeId.PRESET_RECORD_UNAVAILABLE 183 self.additional_parameters = index 184 185 186@dataclass 187class PresetRecord: 188 '''See Hearing Access Service 2.8. Preset record.''' 189 190 @dataclass 191 class Property: 192 class Writable(OpenIntEnum): 193 CANNOT_BE_WRITTEN = 0b0 194 CAN_BE_WRITTEN = 0b1 195 196 class IsAvailable(OpenIntEnum): 197 IS_UNAVAILABLE = 0b0 198 IS_AVAILABLE = 0b1 199 200 writable: Writable = Writable.CAN_BE_WRITTEN 201 is_available: IsAvailable = IsAvailable.IS_AVAILABLE 202 203 def __bytes__(self) -> bytes: 204 return bytes([self.writable | (self.is_available << 1)]) 205 206 index: int 207 name: str 208 properties: Property = field(default_factory=Property) 209 210 def __bytes__(self) -> bytes: 211 return bytes([self.index]) + bytes(self.properties) + self.name.encode('utf-8') 212 213 def is_available(self) -> bool: 214 return ( 215 self.properties.is_available 216 == PresetRecord.Property.IsAvailable.IS_AVAILABLE 217 ) 218 219 220# ----------------------------------------------------------------------------- 221# Server 222# ----------------------------------------------------------------------------- 223class HearingAccessService(gatt.TemplateService): 224 UUID = gatt.GATT_HEARING_ACCESS_SERVICE 225 226 hearing_aid_features_characteristic: gatt.Characteristic 227 hearing_aid_preset_control_point: gatt.Characteristic 228 active_preset_index_characteristic: gatt.Characteristic 229 active_preset_index: int 230 active_preset_index_per_device: Dict[Address, int] 231 232 device: Device 233 234 server_features: HearingAidFeatures 235 preset_records: Dict[int, PresetRecord] # key is the preset index 236 read_presets_request_in_progress: bool 237 238 preset_changed_operations_history_per_device: Dict[ 239 Address, List[PresetChangedOperation] 240 ] 241 242 # Keep an updated list of connected client to send notification to 243 currently_connected_clients: Set[Connection] 244 245 def __init__( 246 self, device: Device, features: HearingAidFeatures, presets: List[PresetRecord] 247 ) -> None: 248 self.active_preset_index_per_device = {} 249 self.read_presets_request_in_progress = False 250 self.preset_changed_operations_history_per_device = {} 251 self.currently_connected_clients = set() 252 253 self.device = device 254 self.server_features = features 255 if len(presets) < 1: 256 raise InvalidArgumentError(f'Invalid presets: {presets}') 257 258 self.preset_records = {} 259 for p in presets: 260 if len(p.name.encode()) < 1 or len(p.name.encode()) > 40: 261 raise InvalidArgumentError(f'Invalid name: {p.name}') 262 263 self.preset_records[p.index] = p 264 265 # associate the lowest index as the current active preset at startup 266 self.active_preset_index = sorted(self.preset_records.keys())[0] 267 268 @device.on('connection') # type: ignore 269 def on_connection(connection: Connection) -> None: 270 @connection.on('disconnection') # type: ignore 271 def on_disconnection(_reason) -> None: 272 self.currently_connected_clients.remove(connection) 273 274 @connection.on('pairing') # type: ignore 275 def on_pairing(*_: Any) -> None: 276 self.on_incoming_paired_connection(connection) 277 278 if connection.peer_resolvable_address: 279 self.on_incoming_paired_connection(connection) 280 281 self.hearing_aid_features_characteristic = gatt.Characteristic( 282 uuid=gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC, 283 properties=gatt.Characteristic.Properties.READ, 284 permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, 285 value=bytes(self.server_features), 286 ) 287 self.hearing_aid_preset_control_point = gatt.Characteristic( 288 uuid=gatt.GATT_HEARING_AID_PRESET_CONTROL_POINT_CHARACTERISTIC, 289 properties=( 290 gatt.Characteristic.Properties.WRITE 291 | gatt.Characteristic.Properties.INDICATE 292 ), 293 permissions=gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, 294 value=gatt.CharacteristicValue( 295 write=self._on_write_hearing_aid_preset_control_point 296 ), 297 ) 298 self.active_preset_index_characteristic = gatt.Characteristic( 299 uuid=gatt.GATT_ACTIVE_PRESET_INDEX_CHARACTERISTIC, 300 properties=( 301 gatt.Characteristic.Properties.READ 302 | gatt.Characteristic.Properties.NOTIFY 303 ), 304 permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, 305 value=gatt.CharacteristicValue(read=self._on_read_active_preset_index), 306 ) 307 308 super().__init__( 309 [ 310 self.hearing_aid_features_characteristic, 311 self.hearing_aid_preset_control_point, 312 self.active_preset_index_characteristic, 313 ] 314 ) 315 316 def on_incoming_paired_connection(self, connection: Connection): 317 '''Setup initial operations to handle a remote bonded HAP device''' 318 # TODO Should we filter on HAP device only ? 319 self.currently_connected_clients.add(connection) 320 if ( 321 connection.peer_address 322 not in self.preset_changed_operations_history_per_device 323 ): 324 self.preset_changed_operations_history_per_device[ 325 connection.peer_address 326 ] = [] 327 return 328 329 async def on_connection_async() -> None: 330 # Send all the PresetChangedOperation that occur when not connected 331 await self._preset_changed_operation(connection) 332 # Update the active preset index if needed 333 await self.notify_active_preset_for_connection(connection) 334 335 connection.abort_on('disconnection', on_connection_async()) 336 337 def _on_read_active_preset_index( 338 self, __connection__: Optional[Connection] 339 ) -> bytes: 340 return bytes([self.active_preset_index]) 341 342 # TODO this need to be triggered when device is unbonded 343 def on_forget(self, addr: Address) -> None: 344 self.preset_changed_operations_history_per_device.pop(addr) 345 346 async def _on_write_hearing_aid_preset_control_point( 347 self, connection: Optional[Connection], value: bytes 348 ): 349 assert connection 350 351 opcode = HearingAidPresetControlPointOpcode(value[0]) 352 handler = getattr(self, '_on_' + opcode.name.lower()) 353 await handler(connection, value) 354 355 async def _on_read_presets_request( 356 self, connection: Optional[Connection], value: bytes 357 ): 358 assert connection 359 if connection.att_mtu < 49: # 2.5. GATT sub-procedure requirements 360 logging.warning(f'HAS require MTU >= 49: {connection}') 361 362 if self.read_presets_request_in_progress: 363 raise att.ATT_Error(att.ErrorCode.PROCEDURE_ALREADY_IN_PROGRESS) 364 self.read_presets_request_in_progress = True 365 366 start_index = value[1] 367 if start_index == 0x00: 368 raise att.ATT_Error(att.ErrorCode.OUT_OF_RANGE) 369 370 num_presets = value[2] 371 if num_presets == 0x00: 372 raise att.ATT_Error(att.ErrorCode.OUT_OF_RANGE) 373 374 # Sending `num_presets` presets ordered by increasing index field, starting from start_index 375 presets = [ 376 self.preset_records[key] 377 for key in sorted(self.preset_records.keys()) 378 if self.preset_records[key].index >= start_index 379 ] 380 del presets[num_presets:] 381 if len(presets) == 0: 382 raise att.ATT_Error(att.ErrorCode.OUT_OF_RANGE) 383 384 AsyncRunner.spawn(self._read_preset_response(connection, presets)) 385 386 async def _read_preset_response( 387 self, connection: Connection, presets: List[PresetRecord] 388 ): 389 # If the ATT bearer is terminated before all notifications or indications are sent, then the server shall consider the Read Presets Request operation aborted and shall not either continue or restart the operation when the client reconnects. 390 try: 391 for i, preset in enumerate(presets): 392 await connection.device.indicate_subscriber( 393 connection, 394 self.hearing_aid_preset_control_point, 395 value=bytes( 396 [ 397 HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE, 398 i == len(presets) - 1, 399 ] 400 ) 401 + bytes(preset), 402 ) 403 404 finally: 405 # indicate_subscriber can raise a TimeoutError, we need to gracefully terminate the operation 406 self.read_presets_request_in_progress = False 407 408 async def generic_update(self, op: PresetChangedOperation) -> None: 409 '''Server API to perform a generic update. It is the responsibility of the caller to modify the preset_records to match the PresetChangedOperation being sent''' 410 await self._notifyPresetOperations(op) 411 412 async def delete_preset(self, index: int) -> None: 413 '''Server API to delete a preset. It should not be the current active preset''' 414 415 if index == self.active_preset_index: 416 raise InvalidStateError('Cannot delete active preset') 417 418 del self.preset_records[index] 419 await self._notifyPresetOperations(PresetChangedOperationDeleted(index)) 420 421 async def available_preset(self, index: int) -> None: 422 '''Server API to make a preset available''' 423 424 preset = self.preset_records[index] 425 preset.properties.is_available = PresetRecord.Property.IsAvailable.IS_AVAILABLE 426 await self._notifyPresetOperations(PresetChangedOperationAvailable(index)) 427 428 async def unavailable_preset(self, index: int) -> None: 429 '''Server API to make a preset unavailable. It should not be the current active preset''' 430 431 if index == self.active_preset_index: 432 raise InvalidStateError('Cannot set active preset as unavailable') 433 434 preset = self.preset_records[index] 435 preset.properties.is_available = ( 436 PresetRecord.Property.IsAvailable.IS_UNAVAILABLE 437 ) 438 await self._notifyPresetOperations(PresetChangedOperationUnavailable(index)) 439 440 async def _preset_changed_operation(self, connection: Connection) -> None: 441 '''Send all PresetChangedOperation saved for a given connection''' 442 op_list = self.preset_changed_operations_history_per_device.get( 443 connection.peer_address, [] 444 ) 445 446 # Notification will be sent in index order 447 def get_op_index(op: PresetChangedOperation) -> int: 448 if isinstance(op.additional_parameters, PresetChangedOperation.Generic): 449 return op.additional_parameters.prev_index 450 return op.additional_parameters 451 452 op_list.sort(key=get_op_index) 453 # If the ATT bearer is terminated before all notifications or indications are sent, then the server shall consider the Preset Changed operation aborted and shall continue the operation when the client reconnects. 454 while len(op_list) > 0: 455 try: 456 await connection.device.indicate_subscriber( 457 connection, 458 self.hearing_aid_preset_control_point, 459 value=op_list[0].to_bytes(len(op_list) == 1), 460 ) 461 # Remove item once sent, and keep the non sent item in the list 462 op_list.pop(0) 463 except TimeoutError: 464 break 465 466 async def _notifyPresetOperations(self, op: PresetChangedOperation) -> None: 467 for historyList in self.preset_changed_operations_history_per_device.values(): 468 historyList.append(op) 469 470 for connection in self.currently_connected_clients: 471 await self._preset_changed_operation(connection) 472 473 async def _on_write_preset_name( 474 self, connection: Optional[Connection], value: bytes 475 ): 476 assert connection 477 478 if self.read_presets_request_in_progress: 479 raise att.ATT_Error(att.ErrorCode.PROCEDURE_ALREADY_IN_PROGRESS) 480 481 index = value[1] 482 preset = self.preset_records.get(index, None) 483 if ( 484 not preset 485 or preset.properties.writable 486 == PresetRecord.Property.Writable.CANNOT_BE_WRITTEN 487 ): 488 raise att.ATT_Error(ErrorCode.WRITE_NAME_NOT_ALLOWED) 489 490 name = value[2:].decode('utf-8') 491 if not name or len(name) > 40: 492 raise att.ATT_Error(ErrorCode.INVALID_PARAMETERS_LENGTH) 493 494 preset.name = name 495 496 await self.generic_update( 497 PresetChangedOperation( 498 PresetChangedOperation.ChangeId.GENERIC_UPDATE, 499 PresetChangedOperation.Generic(index, preset), 500 ) 501 ) 502 503 async def notify_active_preset_for_connection(self, connection: Connection) -> None: 504 if ( 505 self.active_preset_index_per_device.get(connection.peer_address, 0x00) 506 == self.active_preset_index 507 ): 508 # Nothing to do, peer is already updated 509 return 510 511 await connection.device.notify_subscriber( 512 connection, 513 attribute=self.active_preset_index_characteristic, 514 value=bytes([self.active_preset_index]), 515 ) 516 self.active_preset_index_per_device[connection.peer_address] = ( 517 self.active_preset_index 518 ) 519 520 async def notify_active_preset(self) -> None: 521 for connection in self.currently_connected_clients: 522 await self.notify_active_preset_for_connection(connection) 523 524 async def set_active_preset( 525 self, connection: Optional[Connection], value: bytes 526 ) -> None: 527 assert connection 528 index = value[1] 529 preset = self.preset_records.get(index, None) 530 if ( 531 not preset 532 or preset.properties.is_available 533 != PresetRecord.Property.IsAvailable.IS_AVAILABLE 534 ): 535 raise att.ATT_Error(ErrorCode.PRESET_OPERATION_NOT_POSSIBLE) 536 537 if index == self.active_preset_index: 538 # Already at correct value 539 return 540 541 self.active_preset_index = index 542 await self.notify_active_preset() 543 544 async def _on_set_active_preset( 545 self, connection: Optional[Connection], value: bytes 546 ): 547 await self.set_active_preset(connection, value) 548 549 async def set_next_or_previous_preset( 550 self, connection: Optional[Connection], is_previous 551 ): 552 '''Set the next or the previous preset as active''' 553 assert connection 554 555 if self.active_preset_index == 0x00: 556 raise att.ATT_Error(ErrorCode.PRESET_OPERATION_NOT_POSSIBLE) 557 558 first_preset: Optional[PresetRecord] = None # To loop to first preset 559 next_preset: Optional[PresetRecord] = None 560 for index, record in sorted(self.preset_records.items(), reverse=is_previous): 561 if not record.is_available(): 562 continue 563 if first_preset == None: 564 first_preset = record 565 if is_previous: 566 if index >= self.active_preset_index: 567 continue 568 elif index <= self.active_preset_index: 569 continue 570 next_preset = record 571 break 572 573 if not first_preset: # If no other preset are available 574 raise att.ATT_Error(ErrorCode.PRESET_OPERATION_NOT_POSSIBLE) 575 576 if next_preset: 577 self.active_preset_index = next_preset.index 578 else: 579 self.active_preset_index = first_preset.index 580 await self.notify_active_preset() 581 582 async def _on_set_next_preset( 583 self, connection: Optional[Connection], __value__: bytes 584 ) -> None: 585 await self.set_next_or_previous_preset(connection, False) 586 587 async def _on_set_previous_preset( 588 self, connection: Optional[Connection], __value__: bytes 589 ) -> None: 590 await self.set_next_or_previous_preset(connection, True) 591 592 async def _on_set_active_preset_synchronized_locally( 593 self, connection: Optional[Connection], value: bytes 594 ): 595 if ( 596 self.server_features.preset_synchronization_support 597 == PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED 598 ): 599 raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED) 600 await self.set_active_preset(connection, value) 601 # TODO (low priority) inform other server of the change 602 603 async def _on_set_next_preset_synchronized_locally( 604 self, connection: Optional[Connection], __value__: bytes 605 ): 606 if ( 607 self.server_features.preset_synchronization_support 608 == PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED 609 ): 610 raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED) 611 await self.set_next_or_previous_preset(connection, False) 612 # TODO (low priority) inform other server of the change 613 614 async def _on_set_previous_preset_synchronized_locally( 615 self, connection: Optional[Connection], __value__: bytes 616 ): 617 if ( 618 self.server_features.preset_synchronization_support 619 == PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED 620 ): 621 raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED) 622 await self.set_next_or_previous_preset(connection, True) 623 # TODO (low priority) inform other server of the change 624 625 626# ----------------------------------------------------------------------------- 627# Client 628# ----------------------------------------------------------------------------- 629class HearingAccessServiceProxy(gatt_client.ProfileServiceProxy): 630 SERVICE_CLASS = HearingAccessService 631 632 hearing_aid_preset_control_point: gatt_client.CharacteristicProxy 633 preset_control_point_indications: asyncio.Queue 634 635 def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: 636 self.service_proxy = service_proxy 637 638 self.server_features = gatt.PackedCharacteristicAdapter( 639 service_proxy.get_characteristics_by_uuid( 640 gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC 641 )[0], 642 'B', 643 ) 644 645 self.hearing_aid_preset_control_point = ( 646 service_proxy.get_characteristics_by_uuid( 647 gatt.GATT_HEARING_AID_PRESET_CONTROL_POINT_CHARACTERISTIC 648 )[0] 649 ) 650 651 self.active_preset_index = gatt.PackedCharacteristicAdapter( 652 service_proxy.get_characteristics_by_uuid( 653 gatt.GATT_ACTIVE_PRESET_INDEX_CHARACTERISTIC 654 )[0], 655 'B', 656 ) 657 658 async def setup_subscription(self): 659 self.preset_control_point_indications = asyncio.Queue() 660 self.active_preset_index_notification = asyncio.Queue() 661 662 def on_active_preset_index_notification(data: bytes): 663 self.active_preset_index_notification.put_nowait(data) 664 665 def on_preset_control_point_indication(data: bytes): 666 self.preset_control_point_indications.put_nowait(data) 667 668 await self.hearing_aid_preset_control_point.subscribe( 669 functools.partial(on_preset_control_point_indication), prefer_notify=False 670 ) 671 672 await self.active_preset_index.subscribe( 673 functools.partial(on_active_preset_index_notification) 674 ) 675