1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# ATT - Attribute Protocol 17# 18# See Bluetooth spec @ Vol 3, Part F 19# 20# ----------------------------------------------------------------------------- 21 22# ----------------------------------------------------------------------------- 23# Imports 24# ----------------------------------------------------------------------------- 25from __future__ import annotations 26 27import enum 28import functools 29import inspect 30import struct 31from typing import ( 32 Any, 33 Awaitable, 34 Callable, 35 Dict, 36 List, 37 Optional, 38 Type, 39 Union, 40 TYPE_CHECKING, 41) 42 43from pyee import EventEmitter 44 45from bumble import utils 46from bumble.core import UUID, name_or_number, ProtocolError 47from bumble.hci import HCI_Object, key_with_value 48from bumble.colors import color 49 50if TYPE_CHECKING: 51 from bumble.device import Connection 52 53# ----------------------------------------------------------------------------- 54# Constants 55# ----------------------------------------------------------------------------- 56# fmt: off 57# pylint: disable=line-too-long 58 59ATT_CID = 0x04 60 61ATT_ERROR_RESPONSE = 0x01 62ATT_EXCHANGE_MTU_REQUEST = 0x02 63ATT_EXCHANGE_MTU_RESPONSE = 0x03 64ATT_FIND_INFORMATION_REQUEST = 0x04 65ATT_FIND_INFORMATION_RESPONSE = 0x05 66ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06 67ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07 68ATT_READ_BY_TYPE_REQUEST = 0x08 69ATT_READ_BY_TYPE_RESPONSE = 0x09 70ATT_READ_REQUEST = 0x0A 71ATT_READ_RESPONSE = 0x0B 72ATT_READ_BLOB_REQUEST = 0x0C 73ATT_READ_BLOB_RESPONSE = 0x0D 74ATT_READ_MULTIPLE_REQUEST = 0x0E 75ATT_READ_MULTIPLE_RESPONSE = 0x0F 76ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10 77ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11 78ATT_WRITE_REQUEST = 0x12 79ATT_WRITE_RESPONSE = 0x13 80ATT_WRITE_COMMAND = 0x52 81ATT_SIGNED_WRITE_COMMAND = 0xD2 82ATT_PREPARE_WRITE_REQUEST = 0x16 83ATT_PREPARE_WRITE_RESPONSE = 0x17 84ATT_EXECUTE_WRITE_REQUEST = 0x18 85ATT_EXECUTE_WRITE_RESPONSE = 0x19 86ATT_HANDLE_VALUE_NOTIFICATION = 0x1B 87ATT_HANDLE_VALUE_INDICATION = 0x1D 88ATT_HANDLE_VALUE_CONFIRMATION = 0x1E 89 90ATT_PDU_NAMES = { 91 ATT_ERROR_RESPONSE: 'ATT_ERROR_RESPONSE', 92 ATT_EXCHANGE_MTU_REQUEST: 'ATT_EXCHANGE_MTU_REQUEST', 93 ATT_EXCHANGE_MTU_RESPONSE: 'ATT_EXCHANGE_MTU_RESPONSE', 94 ATT_FIND_INFORMATION_REQUEST: 'ATT_FIND_INFORMATION_REQUEST', 95 ATT_FIND_INFORMATION_RESPONSE: 'ATT_FIND_INFORMATION_RESPONSE', 96 ATT_FIND_BY_TYPE_VALUE_REQUEST: 'ATT_FIND_BY_TYPE_VALUE_REQUEST', 97 ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE', 98 ATT_READ_BY_TYPE_REQUEST: 'ATT_READ_BY_TYPE_REQUEST', 99 ATT_READ_BY_TYPE_RESPONSE: 'ATT_READ_BY_TYPE_RESPONSE', 100 ATT_READ_REQUEST: 'ATT_READ_REQUEST', 101 ATT_READ_RESPONSE: 'ATT_READ_RESPONSE', 102 ATT_READ_BLOB_REQUEST: 'ATT_READ_BLOB_REQUEST', 103 ATT_READ_BLOB_RESPONSE: 'ATT_READ_BLOB_RESPONSE', 104 ATT_READ_MULTIPLE_REQUEST: 'ATT_READ_MULTIPLE_REQUEST', 105 ATT_READ_MULTIPLE_RESPONSE: 'ATT_READ_MULTIPLE_RESPONSE', 106 ATT_READ_BY_GROUP_TYPE_REQUEST: 'ATT_READ_BY_GROUP_TYPE_REQUEST', 107 ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE', 108 ATT_WRITE_REQUEST: 'ATT_WRITE_REQUEST', 109 ATT_WRITE_RESPONSE: 'ATT_WRITE_RESPONSE', 110 ATT_WRITE_COMMAND: 'ATT_WRITE_COMMAND', 111 ATT_SIGNED_WRITE_COMMAND: 'ATT_SIGNED_WRITE_COMMAND', 112 ATT_PREPARE_WRITE_REQUEST: 'ATT_PREPARE_WRITE_REQUEST', 113 ATT_PREPARE_WRITE_RESPONSE: 'ATT_PREPARE_WRITE_RESPONSE', 114 ATT_EXECUTE_WRITE_REQUEST: 'ATT_EXECUTE_WRITE_REQUEST', 115 ATT_EXECUTE_WRITE_RESPONSE: 'ATT_EXECUTE_WRITE_RESPONSE', 116 ATT_HANDLE_VALUE_NOTIFICATION: 'ATT_HANDLE_VALUE_NOTIFICATION', 117 ATT_HANDLE_VALUE_INDICATION: 'ATT_HANDLE_VALUE_INDICATION', 118 ATT_HANDLE_VALUE_CONFIRMATION: 'ATT_HANDLE_VALUE_CONFIRMATION' 119} 120 121ATT_REQUESTS = [ 122 ATT_EXCHANGE_MTU_REQUEST, 123 ATT_FIND_INFORMATION_REQUEST, 124 ATT_FIND_BY_TYPE_VALUE_REQUEST, 125 ATT_READ_BY_TYPE_REQUEST, 126 ATT_READ_REQUEST, 127 ATT_READ_BLOB_REQUEST, 128 ATT_READ_MULTIPLE_REQUEST, 129 ATT_READ_BY_GROUP_TYPE_REQUEST, 130 ATT_WRITE_REQUEST, 131 ATT_PREPARE_WRITE_REQUEST, 132 ATT_EXECUTE_WRITE_REQUEST 133] 134 135ATT_RESPONSES = [ 136 ATT_ERROR_RESPONSE, 137 ATT_EXCHANGE_MTU_RESPONSE, 138 ATT_FIND_INFORMATION_RESPONSE, 139 ATT_FIND_BY_TYPE_VALUE_RESPONSE, 140 ATT_READ_BY_TYPE_RESPONSE, 141 ATT_READ_RESPONSE, 142 ATT_READ_BLOB_RESPONSE, 143 ATT_READ_MULTIPLE_RESPONSE, 144 ATT_READ_BY_GROUP_TYPE_RESPONSE, 145 ATT_WRITE_RESPONSE, 146 ATT_PREPARE_WRITE_RESPONSE, 147 ATT_EXECUTE_WRITE_RESPONSE 148] 149 150class ErrorCode(utils.OpenIntEnum): 151 ''' 152 See 153 154 * Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response 155 * Core Specification Supplement: Common Profile And Service Error Codes 156 ''' 157 INVALID_HANDLE = 0x01 158 READ_NOT_PERMITTED = 0x02 159 WRITE_NOT_PERMITTED = 0x03 160 INVALID_PDU = 0x04 161 INSUFFICIENT_AUTHENTICATION = 0x05 162 REQUEST_NOT_SUPPORTED = 0x06 163 INVALID_OFFSET = 0x07 164 INSUFFICIENT_AUTHORIZATION = 0x08 165 PREPARE_QUEUE_FULL = 0x09 166 ATTRIBUTE_NOT_FOUND = 0x0A 167 ATTRIBUTE_NOT_LONG = 0x0B 168 INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0C 169 INVALID_ATTRIBUTE_LENGTH = 0x0D 170 UNLIKELY_ERROR = 0x0E 171 INSUFFICIENT_ENCRYPTION = 0x0F 172 UNSUPPORTED_GROUP_TYPE = 0x10 173 INSUFFICIENT_RESOURCES = 0x11 174 DATABASE_OUT_OF_SYNC = 0x12 175 VALUE_NOT_ALLOWED = 0x13 176 # 0x80 – 0x9F: Application Error 177 # 0xE0 – 0xFF: Common Profile and Service Error Codes 178 WRITE_REQUEST_REJECTED = 0xFC 179 CCCD_IMPROPERLY_CONFIGURED = 0xFD 180 PROCEDURE_ALREADY_IN_PROGRESS = 0xFE 181 OUT_OF_RANGE = 0xFF 182 183# Backward Compatible Constants 184ATT_INVALID_HANDLE_ERROR = ErrorCode.INVALID_HANDLE 185ATT_READ_NOT_PERMITTED_ERROR = ErrorCode.READ_NOT_PERMITTED 186ATT_WRITE_NOT_PERMITTED_ERROR = ErrorCode.WRITE_NOT_PERMITTED 187ATT_INVALID_PDU_ERROR = ErrorCode.INVALID_PDU 188ATT_INSUFFICIENT_AUTHENTICATION_ERROR = ErrorCode.INSUFFICIENT_AUTHENTICATION 189ATT_REQUEST_NOT_SUPPORTED_ERROR = ErrorCode.REQUEST_NOT_SUPPORTED 190ATT_INVALID_OFFSET_ERROR = ErrorCode.INVALID_OFFSET 191ATT_INSUFFICIENT_AUTHORIZATION_ERROR = ErrorCode.INSUFFICIENT_AUTHORIZATION 192ATT_PREPARE_QUEUE_FULL_ERROR = ErrorCode.PREPARE_QUEUE_FULL 193ATT_ATTRIBUTE_NOT_FOUND_ERROR = ErrorCode.ATTRIBUTE_NOT_FOUND 194ATT_ATTRIBUTE_NOT_LONG_ERROR = ErrorCode.ATTRIBUTE_NOT_LONG 195ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = ErrorCode.INSUFFICIENT_ENCRYPTION_KEY_SIZE 196ATT_INVALID_ATTRIBUTE_LENGTH_ERROR = ErrorCode.INVALID_ATTRIBUTE_LENGTH 197ATT_UNLIKELY_ERROR_ERROR = ErrorCode.UNLIKELY_ERROR 198ATT_INSUFFICIENT_ENCRYPTION_ERROR = ErrorCode.INSUFFICIENT_ENCRYPTION 199ATT_UNSUPPORTED_GROUP_TYPE_ERROR = ErrorCode.UNSUPPORTED_GROUP_TYPE 200ATT_INSUFFICIENT_RESOURCES_ERROR = ErrorCode.INSUFFICIENT_RESOURCES 201 202ATT_DEFAULT_MTU = 23 203 204HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'} 205# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda 206UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y) 207# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda 208UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731 209 210# fmt: on 211# pylint: enable=line-too-long 212# pylint: disable=invalid-name 213 214 215# ----------------------------------------------------------------------------- 216# Exceptions 217# ----------------------------------------------------------------------------- 218class ATT_Error(ProtocolError): 219 def __init__(self, error_code, att_handle=0x0000, message=''): 220 super().__init__( 221 error_code, 222 error_namespace='att', 223 error_name=ATT_PDU.error_name(error_code), 224 ) 225 self.att_handle = att_handle 226 self.message = message 227 228 def __str__(self): 229 return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}' 230 231 232# ----------------------------------------------------------------------------- 233# Attribute Protocol 234# ----------------------------------------------------------------------------- 235class ATT_PDU: 236 ''' 237 See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU 238 ''' 239 240 pdu_classes: Dict[int, Type[ATT_PDU]] = {} 241 op_code = 0 242 name: str 243 244 @staticmethod 245 def from_bytes(pdu): 246 op_code = pdu[0] 247 248 cls = ATT_PDU.pdu_classes.get(op_code) 249 if cls is None: 250 instance = ATT_PDU(pdu) 251 instance.name = ATT_PDU.pdu_name(op_code) 252 instance.op_code = op_code 253 return instance 254 self = cls.__new__(cls) 255 ATT_PDU.__init__(self, pdu) 256 if hasattr(self, 'fields'): 257 self.init_from_bytes(pdu, 1) 258 return self 259 260 @staticmethod 261 def pdu_name(op_code): 262 return name_or_number(ATT_PDU_NAMES, op_code, 2) 263 264 @classmethod 265 def error_name(cls, error_code: int) -> str: 266 return ErrorCode(error_code).name 267 268 @staticmethod 269 def subclass(fields): 270 def inner(cls): 271 cls.name = cls.__name__.upper() 272 cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name) 273 if cls.op_code is None: 274 raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES') 275 cls.fields = fields 276 277 # Register a factory for this class 278 ATT_PDU.pdu_classes[cls.op_code] = cls 279 280 return cls 281 282 return inner 283 284 def __init__(self, pdu=None, **kwargs): 285 if hasattr(self, 'fields') and kwargs: 286 HCI_Object.init_from_fields(self, self.fields, kwargs) 287 if pdu is None: 288 pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields) 289 self.pdu = pdu 290 291 def init_from_bytes(self, pdu, offset): 292 return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 293 294 def to_bytes(self): 295 return self.pdu 296 297 @property 298 def is_command(self): 299 return ((self.op_code >> 6) & 1) == 1 300 301 @property 302 def has_authentication_signature(self): 303 return ((self.op_code >> 7) & 1) == 1 304 305 def __bytes__(self): 306 return self.to_bytes() 307 308 def __str__(self): 309 result = color(self.name, 'yellow') 310 if fields := getattr(self, 'fields', None): 311 result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') 312 else: 313 if len(self.pdu) > 1: 314 result += f': {self.pdu.hex()}' 315 return result 316 317 318# ----------------------------------------------------------------------------- 319@ATT_PDU.subclass( 320 [ 321 ('request_opcode_in_error', {'size': 1, 'mapper': ATT_PDU.pdu_name}), 322 ('attribute_handle_in_error', HANDLE_FIELD_SPEC), 323 ('error_code', {'size': 1, 'mapper': ATT_PDU.error_name}), 324 ] 325) 326class ATT_Error_Response(ATT_PDU): 327 ''' 328 See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response 329 ''' 330 331 332# ----------------------------------------------------------------------------- 333@ATT_PDU.subclass([('client_rx_mtu', 2)]) 334class ATT_Exchange_MTU_Request(ATT_PDU): 335 ''' 336 See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request 337 ''' 338 339 340# ----------------------------------------------------------------------------- 341@ATT_PDU.subclass([('server_rx_mtu', 2)]) 342class ATT_Exchange_MTU_Response(ATT_PDU): 343 ''' 344 See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response 345 ''' 346 347 348# ----------------------------------------------------------------------------- 349@ATT_PDU.subclass( 350 [('starting_handle', HANDLE_FIELD_SPEC), ('ending_handle', HANDLE_FIELD_SPEC)] 351) 352class ATT_Find_Information_Request(ATT_PDU): 353 ''' 354 See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request 355 ''' 356 357 358# ----------------------------------------------------------------------------- 359@ATT_PDU.subclass([('format', 1), ('information_data', '*')]) 360class ATT_Find_Information_Response(ATT_PDU): 361 ''' 362 See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response 363 ''' 364 365 def parse_information_data(self): 366 self.information = [] 367 offset = 0 368 uuid_size = 2 if self.format == 1 else 16 369 while offset + uuid_size <= len(self.information_data): 370 handle = struct.unpack_from('<H', self.information_data, offset)[0] 371 uuid = self.information_data[2 + offset : 2 + offset + uuid_size] 372 self.information.append((handle, uuid)) 373 offset += 2 + uuid_size 374 375 def __init__(self, *args, **kwargs): 376 super().__init__(*args, **kwargs) 377 self.parse_information_data() 378 379 def init_from_bytes(self, pdu, offset): 380 super().init_from_bytes(pdu, offset) 381 self.parse_information_data() 382 383 def __str__(self): 384 result = color(self.name, 'yellow') 385 result += ':\n' + HCI_Object.format_fields( 386 self.__dict__, 387 [ 388 ('format', 1), 389 ( 390 'information', 391 { 392 'mapper': lambda x: ', '.join( 393 [f'0x{handle:04X}:{uuid.hex()}' for handle, uuid in x] 394 ) 395 }, 396 ), 397 ], 398 ' ', 399 ) 400 return result 401 402 403# ----------------------------------------------------------------------------- 404@ATT_PDU.subclass( 405 [ 406 ('starting_handle', HANDLE_FIELD_SPEC), 407 ('ending_handle', HANDLE_FIELD_SPEC), 408 ('attribute_type', UUID_2_FIELD_SPEC), 409 ('attribute_value', '*'), 410 ] 411) 412class ATT_Find_By_Type_Value_Request(ATT_PDU): 413 ''' 414 See Bluetooth spec @ Vol 3, Part F - 3.4.3.3 Find By Type Value Request 415 ''' 416 417 418# ----------------------------------------------------------------------------- 419@ATT_PDU.subclass([('handles_information_list', '*')]) 420class ATT_Find_By_Type_Value_Response(ATT_PDU): 421 ''' 422 See Bluetooth spec @ Vol 3, Part F - 3.4.3.4 Find By Type Value Response 423 ''' 424 425 def parse_handles_information_list(self): 426 self.handles_information = [] 427 offset = 0 428 while offset + 4 <= len(self.handles_information_list): 429 found_attribute_handle, group_end_handle = struct.unpack_from( 430 '<HH', self.handles_information_list, offset 431 ) 432 self.handles_information.append((found_attribute_handle, group_end_handle)) 433 offset += 4 434 435 def __init__(self, *args, **kwargs): 436 super().__init__(*args, **kwargs) 437 self.parse_handles_information_list() 438 439 def init_from_bytes(self, pdu, offset): 440 super().init_from_bytes(pdu, offset) 441 self.parse_handles_information_list() 442 443 def __str__(self): 444 result = color(self.name, 'yellow') 445 result += ':\n' + HCI_Object.format_fields( 446 self.__dict__, 447 [ 448 ( 449 'handles_information', 450 { 451 'mapper': lambda x: ', '.join( 452 [ 453 f'0x{handle1:04X}-0x{handle2:04X}' 454 for handle1, handle2 in x 455 ] 456 ) 457 }, 458 ) 459 ], 460 ' ', 461 ) 462 return result 463 464 465# ----------------------------------------------------------------------------- 466@ATT_PDU.subclass( 467 [ 468 ('starting_handle', HANDLE_FIELD_SPEC), 469 ('ending_handle', HANDLE_FIELD_SPEC), 470 ('attribute_type', UUID_2_16_FIELD_SPEC), 471 ] 472) 473class ATT_Read_By_Type_Request(ATT_PDU): 474 ''' 475 See Bluetooth spec @ Vol 3, Part F - 3.4.4.1 Read By Type Request 476 ''' 477 478 479# ----------------------------------------------------------------------------- 480@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')]) 481class ATT_Read_By_Type_Response(ATT_PDU): 482 ''' 483 See Bluetooth spec @ Vol 3, Part F - 3.4.4.2 Read By Type Response 484 ''' 485 486 def parse_attribute_data_list(self): 487 self.attributes = [] 488 offset = 0 489 while self.length != 0 and offset + self.length <= len( 490 self.attribute_data_list 491 ): 492 (attribute_handle,) = struct.unpack_from( 493 '<H', self.attribute_data_list, offset 494 ) 495 attribute_value = self.attribute_data_list[ 496 offset + 2 : offset + self.length 497 ] 498 self.attributes.append((attribute_handle, attribute_value)) 499 offset += self.length 500 501 def __init__(self, *args, **kwargs): 502 super().__init__(*args, **kwargs) 503 self.parse_attribute_data_list() 504 505 def init_from_bytes(self, pdu, offset): 506 super().init_from_bytes(pdu, offset) 507 self.parse_attribute_data_list() 508 509 def __str__(self): 510 result = color(self.name, 'yellow') 511 result += ':\n' + HCI_Object.format_fields( 512 self.__dict__, 513 [ 514 ('length', 1), 515 ( 516 'attributes', 517 { 518 'mapper': lambda x: ', '.join( 519 [f'0x{handle:04X}:{value.hex()}' for handle, value in x] 520 ) 521 }, 522 ), 523 ], 524 ' ', 525 ) 526 return result 527 528 529# ----------------------------------------------------------------------------- 530@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC)]) 531class ATT_Read_Request(ATT_PDU): 532 ''' 533 See Bluetooth spec @ Vol 3, Part F - 3.4.4.3 Read Request 534 ''' 535 536 537# ----------------------------------------------------------------------------- 538@ATT_PDU.subclass([('attribute_value', '*')]) 539class ATT_Read_Response(ATT_PDU): 540 ''' 541 See Bluetooth spec @ Vol 3, Part F - 3.4.4.4 Read Response 542 ''' 543 544 545# ----------------------------------------------------------------------------- 546@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('value_offset', 2)]) 547class ATT_Read_Blob_Request(ATT_PDU): 548 ''' 549 See Bluetooth spec @ Vol 3, Part F - 3.4.4.5 Read Blob Request 550 ''' 551 552 553# ----------------------------------------------------------------------------- 554@ATT_PDU.subclass([('part_attribute_value', '*')]) 555class ATT_Read_Blob_Response(ATT_PDU): 556 ''' 557 See Bluetooth spec @ Vol 3, Part F - 3.4.4.6 Read Blob Response 558 ''' 559 560 561# ----------------------------------------------------------------------------- 562@ATT_PDU.subclass([('set_of_handles', '*')]) 563class ATT_Read_Multiple_Request(ATT_PDU): 564 ''' 565 See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request 566 ''' 567 568 569# ----------------------------------------------------------------------------- 570@ATT_PDU.subclass([('set_of_values', '*')]) 571class ATT_Read_Multiple_Response(ATT_PDU): 572 ''' 573 See Bluetooth spec @ Vol 3, Part F - 3.4.4.8 Read Multiple Response 574 ''' 575 576 577# ----------------------------------------------------------------------------- 578@ATT_PDU.subclass( 579 [ 580 ('starting_handle', HANDLE_FIELD_SPEC), 581 ('ending_handle', HANDLE_FIELD_SPEC), 582 ('attribute_group_type', UUID_2_16_FIELD_SPEC), 583 ] 584) 585class ATT_Read_By_Group_Type_Request(ATT_PDU): 586 ''' 587 See Bluetooth spec @ Vol 3, Part F - 3.4.4.9 Read by Group Type Request 588 ''' 589 590 591# ----------------------------------------------------------------------------- 592@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')]) 593class ATT_Read_By_Group_Type_Response(ATT_PDU): 594 ''' 595 See Bluetooth spec @ Vol 3, Part F - 3.4.4.10 Read by Group Type Response 596 ''' 597 598 def parse_attribute_data_list(self): 599 self.attributes = [] 600 offset = 0 601 while self.length != 0 and offset + self.length <= len( 602 self.attribute_data_list 603 ): 604 attribute_handle, end_group_handle = struct.unpack_from( 605 '<HH', self.attribute_data_list, offset 606 ) 607 attribute_value = self.attribute_data_list[ 608 offset + 4 : offset + self.length 609 ] 610 self.attributes.append( 611 (attribute_handle, end_group_handle, attribute_value) 612 ) 613 offset += self.length 614 615 def __init__(self, *args, **kwargs): 616 super().__init__(*args, **kwargs) 617 self.parse_attribute_data_list() 618 619 def init_from_bytes(self, pdu, offset): 620 super().init_from_bytes(pdu, offset) 621 self.parse_attribute_data_list() 622 623 def __str__(self): 624 result = color(self.name, 'yellow') 625 result += ':\n' + HCI_Object.format_fields( 626 self.__dict__, 627 [ 628 ('length', 1), 629 ( 630 'attributes', 631 { 632 'mapper': lambda x: ', '.join( 633 [ 634 f'0x{handle:04X}-0x{end:04X}:{value.hex()}' 635 for handle, end, value in x 636 ] 637 ) 638 }, 639 ), 640 ], 641 ' ', 642 ) 643 return result 644 645 646# ----------------------------------------------------------------------------- 647@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 648class ATT_Write_Request(ATT_PDU): 649 ''' 650 See Bluetooth spec @ Vol 3, Part F - 3.4.5.1 Write Request 651 ''' 652 653 654# ----------------------------------------------------------------------------- 655@ATT_PDU.subclass([]) 656class ATT_Write_Response(ATT_PDU): 657 ''' 658 See Bluetooth spec @ Vol 3, Part F - 3.4.5.2 Write Response 659 ''' 660 661 662# ----------------------------------------------------------------------------- 663@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 664class ATT_Write_Command(ATT_PDU): 665 ''' 666 See Bluetooth spec @ Vol 3, Part F - 3.4.5.3 Write Command 667 ''' 668 669 670# ----------------------------------------------------------------------------- 671@ATT_PDU.subclass( 672 [ 673 ('attribute_handle', HANDLE_FIELD_SPEC), 674 ('attribute_value', '*'), 675 # ('authentication_signature', 'TODO') 676 ] 677) 678class ATT_Signed_Write_Command(ATT_PDU): 679 ''' 680 See Bluetooth spec @ Vol 3, Part F - 3.4.5.4 Signed Write Command 681 ''' 682 683 684# ----------------------------------------------------------------------------- 685@ATT_PDU.subclass( 686 [ 687 ('attribute_handle', HANDLE_FIELD_SPEC), 688 ('value_offset', 2), 689 ('part_attribute_value', '*'), 690 ] 691) 692class ATT_Prepare_Write_Request(ATT_PDU): 693 ''' 694 See Bluetooth spec @ Vol 3, Part F - 3.4.6.1 Prepare Write Request 695 ''' 696 697 698# ----------------------------------------------------------------------------- 699@ATT_PDU.subclass( 700 [ 701 ('attribute_handle', HANDLE_FIELD_SPEC), 702 ('value_offset', 2), 703 ('part_attribute_value', '*'), 704 ] 705) 706class ATT_Prepare_Write_Response(ATT_PDU): 707 ''' 708 See Bluetooth spec @ Vol 3, Part F - 3.4.6.2 Prepare Write Response 709 ''' 710 711 712# ----------------------------------------------------------------------------- 713@ATT_PDU.subclass([]) 714class ATT_Execute_Write_Request(ATT_PDU): 715 ''' 716 See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request 717 ''' 718 719 720# ----------------------------------------------------------------------------- 721@ATT_PDU.subclass([]) 722class ATT_Execute_Write_Response(ATT_PDU): 723 ''' 724 See Bluetooth spec @ Vol 3, Part F - 3.4.6.4 Execute Write Response 725 ''' 726 727 728# ----------------------------------------------------------------------------- 729@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 730class ATT_Handle_Value_Notification(ATT_PDU): 731 ''' 732 See Bluetooth spec @ Vol 3, Part F - 3.4.7.1 Handle Value Notification 733 ''' 734 735 736# ----------------------------------------------------------------------------- 737@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')]) 738class ATT_Handle_Value_Indication(ATT_PDU): 739 ''' 740 See Bluetooth spec @ Vol 3, Part F - 3.4.7.2 Handle Value Indication 741 ''' 742 743 744# ----------------------------------------------------------------------------- 745@ATT_PDU.subclass([]) 746class ATT_Handle_Value_Confirmation(ATT_PDU): 747 ''' 748 See Bluetooth spec @ Vol 3, Part F - 3.4.7.3 Handle Value Confirmation 749 ''' 750 751 752# ----------------------------------------------------------------------------- 753class AttributeValue: 754 ''' 755 Attribute value where reading and/or writing is delegated to functions 756 passed as arguments to the constructor. 757 ''' 758 759 def __init__( 760 self, 761 read: Union[ 762 Callable[[Optional[Connection]], bytes], 763 Callable[[Optional[Connection]], Awaitable[bytes]], 764 None, 765 ] = None, 766 write: Union[ 767 Callable[[Optional[Connection], bytes], None], 768 Callable[[Optional[Connection], bytes], Awaitable[None]], 769 None, 770 ] = None, 771 ): 772 self._read = read 773 self._write = write 774 775 def read(self, connection: Optional[Connection]) -> Union[bytes, Awaitable[bytes]]: 776 return self._read(connection) if self._read else b'' 777 778 def write( 779 self, connection: Optional[Connection], value: bytes 780 ) -> Union[Awaitable[None], None]: 781 if self._write: 782 return self._write(connection, value) 783 784 return None 785 786 787# ----------------------------------------------------------------------------- 788class Attribute(EventEmitter): 789 class Permissions(enum.IntFlag): 790 READABLE = 0x01 791 WRITEABLE = 0x02 792 READ_REQUIRES_ENCRYPTION = 0x04 793 WRITE_REQUIRES_ENCRYPTION = 0x08 794 READ_REQUIRES_AUTHENTICATION = 0x10 795 WRITE_REQUIRES_AUTHENTICATION = 0x20 796 READ_REQUIRES_AUTHORIZATION = 0x40 797 WRITE_REQUIRES_AUTHORIZATION = 0x80 798 799 @classmethod 800 def from_string(cls, permissions_str: str) -> Attribute.Permissions: 801 try: 802 return functools.reduce( 803 lambda x, y: x | Attribute.Permissions[y], 804 permissions_str.replace('|', ',').split(","), 805 Attribute.Permissions(0), 806 ) 807 except TypeError as exc: 808 # The check for `p.name is not None` here is needed because for InFlag 809 # enums, the .name property can be None, when the enum value is 0, 810 # so the type hint for .name is Optional[str]. 811 enum_list: List[str] = [p.name for p in cls if p.name is not None] 812 enum_list_str = ",".join(enum_list) 813 raise TypeError( 814 f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {enum_list_str}\nGot: {permissions_str}" 815 ) from exc 816 817 # Permission flags(legacy-use only) 818 READABLE = Permissions.READABLE 819 WRITEABLE = Permissions.WRITEABLE 820 READ_REQUIRES_ENCRYPTION = Permissions.READ_REQUIRES_ENCRYPTION 821 WRITE_REQUIRES_ENCRYPTION = Permissions.WRITE_REQUIRES_ENCRYPTION 822 READ_REQUIRES_AUTHENTICATION = Permissions.READ_REQUIRES_AUTHENTICATION 823 WRITE_REQUIRES_AUTHENTICATION = Permissions.WRITE_REQUIRES_AUTHENTICATION 824 READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION 825 WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION 826 827 value: Union[bytes, AttributeValue] 828 829 def __init__( 830 self, 831 attribute_type: Union[str, bytes, UUID], 832 permissions: Union[str, Attribute.Permissions], 833 value: Union[str, bytes, AttributeValue] = b'', 834 ) -> None: 835 EventEmitter.__init__(self) 836 self.handle = 0 837 self.end_group_handle = 0 838 if isinstance(permissions, str): 839 self.permissions = Attribute.Permissions.from_string(permissions) 840 else: 841 self.permissions = permissions 842 843 # Convert the type to a UUID object if it isn't already 844 if isinstance(attribute_type, str): 845 self.type = UUID(attribute_type) 846 elif isinstance(attribute_type, bytes): 847 self.type = UUID.from_bytes(attribute_type) 848 else: 849 self.type = attribute_type 850 851 # Convert the value to a byte array 852 if isinstance(value, str): 853 self.value = bytes(value, 'utf-8') 854 else: 855 self.value = value 856 857 def encode_value(self, value: Any) -> bytes: 858 return value 859 860 def decode_value(self, value_bytes: bytes) -> Any: 861 return value_bytes 862 863 async def read_value(self, connection: Optional[Connection]) -> bytes: 864 if ( 865 (self.permissions & self.READ_REQUIRES_ENCRYPTION) 866 and connection is not None 867 and not connection.encryption 868 ): 869 raise ATT_Error( 870 error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle 871 ) 872 if ( 873 (self.permissions & self.READ_REQUIRES_AUTHENTICATION) 874 and connection is not None 875 and not connection.authenticated 876 ): 877 raise ATT_Error( 878 error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle 879 ) 880 if self.permissions & self.READ_REQUIRES_AUTHORIZATION: 881 # TODO: handle authorization better 882 raise ATT_Error( 883 error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle 884 ) 885 886 if hasattr(self.value, 'read'): 887 try: 888 value = self.value.read(connection) 889 if inspect.isawaitable(value): 890 value = await value 891 except ATT_Error as error: 892 raise ATT_Error( 893 error_code=error.error_code, att_handle=self.handle 894 ) from error 895 else: 896 value = self.value 897 898 return self.encode_value(value) 899 900 async def write_value(self, connection: Connection, value_bytes: bytes) -> None: 901 if ( 902 self.permissions & self.WRITE_REQUIRES_ENCRYPTION 903 ) and not connection.encryption: 904 raise ATT_Error( 905 error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle 906 ) 907 if ( 908 self.permissions & self.WRITE_REQUIRES_AUTHENTICATION 909 ) and not connection.authenticated: 910 raise ATT_Error( 911 error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle 912 ) 913 if self.permissions & self.WRITE_REQUIRES_AUTHORIZATION: 914 # TODO: handle authorization better 915 raise ATT_Error( 916 error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle 917 ) 918 919 value = self.decode_value(value_bytes) 920 921 if hasattr(self.value, 'write'): 922 try: 923 result = self.value.write(connection, value) 924 if inspect.isawaitable(result): 925 await result 926 except ATT_Error as error: 927 raise ATT_Error( 928 error_code=error.error_code, att_handle=self.handle 929 ) from error 930 else: 931 self.value = value 932 933 self.emit('write', connection, value) 934 935 def __repr__(self): 936 if isinstance(self.value, bytes): 937 value_str = self.value.hex() 938 else: 939 value_str = str(self.value) 940 if value_str: 941 value_string = f', value={self.value.hex()}' 942 else: 943 value_string = '' 944 return ( 945 f'Attribute(handle=0x{self.handle:04X}, ' 946 f'type={self.type}, ' 947 f'permissions={self.permissions}{value_string})' 948 ) 949