1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# ATT - Attribute Protocol
17#
18# See Bluetooth spec @ Vol 3, Part F
19#
20# -----------------------------------------------------------------------------
21
22# -----------------------------------------------------------------------------
23# Imports
24# -----------------------------------------------------------------------------
25from __future__ import annotations
26
27import enum
28import functools
29import inspect
30import struct
31from typing import (
32    Any,
33    Awaitable,
34    Callable,
35    Dict,
36    List,
37    Optional,
38    Type,
39    Union,
40    TYPE_CHECKING,
41)
42
43from pyee import EventEmitter
44
45from bumble import utils
46from bumble.core import UUID, name_or_number, ProtocolError
47from bumble.hci import HCI_Object, key_with_value
48from bumble.colors import color
49
50if TYPE_CHECKING:
51    from bumble.device import Connection
52
53# -----------------------------------------------------------------------------
54# Constants
55# -----------------------------------------------------------------------------
56# fmt: off
57# pylint: disable=line-too-long
58
59ATT_CID = 0x04
60
61ATT_ERROR_RESPONSE              = 0x01
62ATT_EXCHANGE_MTU_REQUEST        = 0x02
63ATT_EXCHANGE_MTU_RESPONSE       = 0x03
64ATT_FIND_INFORMATION_REQUEST    = 0x04
65ATT_FIND_INFORMATION_RESPONSE   = 0x05
66ATT_FIND_BY_TYPE_VALUE_REQUEST  = 0x06
67ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
68ATT_READ_BY_TYPE_REQUEST        = 0x08
69ATT_READ_BY_TYPE_RESPONSE       = 0x09
70ATT_READ_REQUEST                = 0x0A
71ATT_READ_RESPONSE               = 0x0B
72ATT_READ_BLOB_REQUEST           = 0x0C
73ATT_READ_BLOB_RESPONSE          = 0x0D
74ATT_READ_MULTIPLE_REQUEST       = 0x0E
75ATT_READ_MULTIPLE_RESPONSE      = 0x0F
76ATT_READ_BY_GROUP_TYPE_REQUEST  = 0x10
77ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
78ATT_WRITE_REQUEST               = 0x12
79ATT_WRITE_RESPONSE              = 0x13
80ATT_WRITE_COMMAND               = 0x52
81ATT_SIGNED_WRITE_COMMAND        = 0xD2
82ATT_PREPARE_WRITE_REQUEST       = 0x16
83ATT_PREPARE_WRITE_RESPONSE      = 0x17
84ATT_EXECUTE_WRITE_REQUEST       = 0x18
85ATT_EXECUTE_WRITE_RESPONSE      = 0x19
86ATT_HANDLE_VALUE_NOTIFICATION   = 0x1B
87ATT_HANDLE_VALUE_INDICATION     = 0x1D
88ATT_HANDLE_VALUE_CONFIRMATION   = 0x1E
89
90ATT_PDU_NAMES = {
91    ATT_ERROR_RESPONSE:              'ATT_ERROR_RESPONSE',
92    ATT_EXCHANGE_MTU_REQUEST:        'ATT_EXCHANGE_MTU_REQUEST',
93    ATT_EXCHANGE_MTU_RESPONSE:       'ATT_EXCHANGE_MTU_RESPONSE',
94    ATT_FIND_INFORMATION_REQUEST:    'ATT_FIND_INFORMATION_REQUEST',
95    ATT_FIND_INFORMATION_RESPONSE:   'ATT_FIND_INFORMATION_RESPONSE',
96    ATT_FIND_BY_TYPE_VALUE_REQUEST:  'ATT_FIND_BY_TYPE_VALUE_REQUEST',
97    ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE',
98    ATT_READ_BY_TYPE_REQUEST:        'ATT_READ_BY_TYPE_REQUEST',
99    ATT_READ_BY_TYPE_RESPONSE:       'ATT_READ_BY_TYPE_RESPONSE',
100    ATT_READ_REQUEST:                'ATT_READ_REQUEST',
101    ATT_READ_RESPONSE:               'ATT_READ_RESPONSE',
102    ATT_READ_BLOB_REQUEST:           'ATT_READ_BLOB_REQUEST',
103    ATT_READ_BLOB_RESPONSE:          'ATT_READ_BLOB_RESPONSE',
104    ATT_READ_MULTIPLE_REQUEST:       'ATT_READ_MULTIPLE_REQUEST',
105    ATT_READ_MULTIPLE_RESPONSE:      'ATT_READ_MULTIPLE_RESPONSE',
106    ATT_READ_BY_GROUP_TYPE_REQUEST:  'ATT_READ_BY_GROUP_TYPE_REQUEST',
107    ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE',
108    ATT_WRITE_REQUEST:               'ATT_WRITE_REQUEST',
109    ATT_WRITE_RESPONSE:              'ATT_WRITE_RESPONSE',
110    ATT_WRITE_COMMAND:               'ATT_WRITE_COMMAND',
111    ATT_SIGNED_WRITE_COMMAND:        'ATT_SIGNED_WRITE_COMMAND',
112    ATT_PREPARE_WRITE_REQUEST:       'ATT_PREPARE_WRITE_REQUEST',
113    ATT_PREPARE_WRITE_RESPONSE:      'ATT_PREPARE_WRITE_RESPONSE',
114    ATT_EXECUTE_WRITE_REQUEST:       'ATT_EXECUTE_WRITE_REQUEST',
115    ATT_EXECUTE_WRITE_RESPONSE:      'ATT_EXECUTE_WRITE_RESPONSE',
116    ATT_HANDLE_VALUE_NOTIFICATION:   'ATT_HANDLE_VALUE_NOTIFICATION',
117    ATT_HANDLE_VALUE_INDICATION:     'ATT_HANDLE_VALUE_INDICATION',
118    ATT_HANDLE_VALUE_CONFIRMATION:   'ATT_HANDLE_VALUE_CONFIRMATION'
119}
120
121ATT_REQUESTS = [
122    ATT_EXCHANGE_MTU_REQUEST,
123    ATT_FIND_INFORMATION_REQUEST,
124    ATT_FIND_BY_TYPE_VALUE_REQUEST,
125    ATT_READ_BY_TYPE_REQUEST,
126    ATT_READ_REQUEST,
127    ATT_READ_BLOB_REQUEST,
128    ATT_READ_MULTIPLE_REQUEST,
129    ATT_READ_BY_GROUP_TYPE_REQUEST,
130    ATT_WRITE_REQUEST,
131    ATT_PREPARE_WRITE_REQUEST,
132    ATT_EXECUTE_WRITE_REQUEST
133]
134
135ATT_RESPONSES = [
136    ATT_ERROR_RESPONSE,
137    ATT_EXCHANGE_MTU_RESPONSE,
138    ATT_FIND_INFORMATION_RESPONSE,
139    ATT_FIND_BY_TYPE_VALUE_RESPONSE,
140    ATT_READ_BY_TYPE_RESPONSE,
141    ATT_READ_RESPONSE,
142    ATT_READ_BLOB_RESPONSE,
143    ATT_READ_MULTIPLE_RESPONSE,
144    ATT_READ_BY_GROUP_TYPE_RESPONSE,
145    ATT_WRITE_RESPONSE,
146    ATT_PREPARE_WRITE_RESPONSE,
147    ATT_EXECUTE_WRITE_RESPONSE
148]
149
150class ErrorCode(utils.OpenIntEnum):
151    '''
152    See
153
154    * Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response
155    * Core Specification Supplement: Common Profile And Service Error Codes
156    '''
157    INVALID_HANDLE                   = 0x01
158    READ_NOT_PERMITTED               = 0x02
159    WRITE_NOT_PERMITTED              = 0x03
160    INVALID_PDU                      = 0x04
161    INSUFFICIENT_AUTHENTICATION      = 0x05
162    REQUEST_NOT_SUPPORTED            = 0x06
163    INVALID_OFFSET                   = 0x07
164    INSUFFICIENT_AUTHORIZATION       = 0x08
165    PREPARE_QUEUE_FULL               = 0x09
166    ATTRIBUTE_NOT_FOUND              = 0x0A
167    ATTRIBUTE_NOT_LONG               = 0x0B
168    INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0C
169    INVALID_ATTRIBUTE_LENGTH         = 0x0D
170    UNLIKELY_ERROR                   = 0x0E
171    INSUFFICIENT_ENCRYPTION          = 0x0F
172    UNSUPPORTED_GROUP_TYPE           = 0x10
173    INSUFFICIENT_RESOURCES           = 0x11
174    DATABASE_OUT_OF_SYNC             = 0x12
175    VALUE_NOT_ALLOWED                = 0x13
176    # 0x80 – 0x9F: Application Error
177    # 0xE0 – 0xFF: Common Profile and Service Error Codes
178    WRITE_REQUEST_REJECTED           = 0xFC
179    CCCD_IMPROPERLY_CONFIGURED       = 0xFD
180    PROCEDURE_ALREADY_IN_PROGRESS    = 0xFE
181    OUT_OF_RANGE                     = 0xFF
182
183# Backward Compatible Constants
184ATT_INVALID_HANDLE_ERROR                   = ErrorCode.INVALID_HANDLE
185ATT_READ_NOT_PERMITTED_ERROR               = ErrorCode.READ_NOT_PERMITTED
186ATT_WRITE_NOT_PERMITTED_ERROR              = ErrorCode.WRITE_NOT_PERMITTED
187ATT_INVALID_PDU_ERROR                      = ErrorCode.INVALID_PDU
188ATT_INSUFFICIENT_AUTHENTICATION_ERROR      = ErrorCode.INSUFFICIENT_AUTHENTICATION
189ATT_REQUEST_NOT_SUPPORTED_ERROR            = ErrorCode.REQUEST_NOT_SUPPORTED
190ATT_INVALID_OFFSET_ERROR                   = ErrorCode.INVALID_OFFSET
191ATT_INSUFFICIENT_AUTHORIZATION_ERROR       = ErrorCode.INSUFFICIENT_AUTHORIZATION
192ATT_PREPARE_QUEUE_FULL_ERROR               = ErrorCode.PREPARE_QUEUE_FULL
193ATT_ATTRIBUTE_NOT_FOUND_ERROR              = ErrorCode.ATTRIBUTE_NOT_FOUND
194ATT_ATTRIBUTE_NOT_LONG_ERROR               = ErrorCode.ATTRIBUTE_NOT_LONG
195ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = ErrorCode.INSUFFICIENT_ENCRYPTION_KEY_SIZE
196ATT_INVALID_ATTRIBUTE_LENGTH_ERROR         = ErrorCode.INVALID_ATTRIBUTE_LENGTH
197ATT_UNLIKELY_ERROR_ERROR                   = ErrorCode.UNLIKELY_ERROR
198ATT_INSUFFICIENT_ENCRYPTION_ERROR          = ErrorCode.INSUFFICIENT_ENCRYPTION
199ATT_UNSUPPORTED_GROUP_TYPE_ERROR           = ErrorCode.UNSUPPORTED_GROUP_TYPE
200ATT_INSUFFICIENT_RESOURCES_ERROR           = ErrorCode.INSUFFICIENT_RESOURCES
201
202ATT_DEFAULT_MTU = 23
203
204HANDLE_FIELD_SPEC    = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'}
205# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda
206UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y)
207# pylint: disable-next=unnecessary-lambda-assignment,unnecessary-lambda
208UUID_2_FIELD_SPEC    = lambda x, y: UUID.parse_uuid_2(x, y)  # noqa: E731
209
210# fmt: on
211# pylint: enable=line-too-long
212# pylint: disable=invalid-name
213
214
215# -----------------------------------------------------------------------------
216# Exceptions
217# -----------------------------------------------------------------------------
218class ATT_Error(ProtocolError):
219    def __init__(self, error_code, att_handle=0x0000, message=''):
220        super().__init__(
221            error_code,
222            error_namespace='att',
223            error_name=ATT_PDU.error_name(error_code),
224        )
225        self.att_handle = att_handle
226        self.message = message
227
228    def __str__(self):
229        return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}'
230
231
232# -----------------------------------------------------------------------------
233# Attribute Protocol
234# -----------------------------------------------------------------------------
235class ATT_PDU:
236    '''
237    See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU
238    '''
239
240    pdu_classes: Dict[int, Type[ATT_PDU]] = {}
241    op_code = 0
242    name: str
243
244    @staticmethod
245    def from_bytes(pdu):
246        op_code = pdu[0]
247
248        cls = ATT_PDU.pdu_classes.get(op_code)
249        if cls is None:
250            instance = ATT_PDU(pdu)
251            instance.name = ATT_PDU.pdu_name(op_code)
252            instance.op_code = op_code
253            return instance
254        self = cls.__new__(cls)
255        ATT_PDU.__init__(self, pdu)
256        if hasattr(self, 'fields'):
257            self.init_from_bytes(pdu, 1)
258        return self
259
260    @staticmethod
261    def pdu_name(op_code):
262        return name_or_number(ATT_PDU_NAMES, op_code, 2)
263
264    @classmethod
265    def error_name(cls, error_code: int) -> str:
266        return ErrorCode(error_code).name
267
268    @staticmethod
269    def subclass(fields):
270        def inner(cls):
271            cls.name = cls.__name__.upper()
272            cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name)
273            if cls.op_code is None:
274                raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES')
275            cls.fields = fields
276
277            # Register a factory for this class
278            ATT_PDU.pdu_classes[cls.op_code] = cls
279
280            return cls
281
282        return inner
283
284    def __init__(self, pdu=None, **kwargs):
285        if hasattr(self, 'fields') and kwargs:
286            HCI_Object.init_from_fields(self, self.fields, kwargs)
287        if pdu is None:
288            pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
289        self.pdu = pdu
290
291    def init_from_bytes(self, pdu, offset):
292        return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
293
294    def to_bytes(self):
295        return self.pdu
296
297    @property
298    def is_command(self):
299        return ((self.op_code >> 6) & 1) == 1
300
301    @property
302    def has_authentication_signature(self):
303        return ((self.op_code >> 7) & 1) == 1
304
305    def __bytes__(self):
306        return self.to_bytes()
307
308    def __str__(self):
309        result = color(self.name, 'yellow')
310        if fields := getattr(self, 'fields', None):
311            result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, '  ')
312        else:
313            if len(self.pdu) > 1:
314                result += f': {self.pdu.hex()}'
315        return result
316
317
318# -----------------------------------------------------------------------------
319@ATT_PDU.subclass(
320    [
321        ('request_opcode_in_error', {'size': 1, 'mapper': ATT_PDU.pdu_name}),
322        ('attribute_handle_in_error', HANDLE_FIELD_SPEC),
323        ('error_code', {'size': 1, 'mapper': ATT_PDU.error_name}),
324    ]
325)
326class ATT_Error_Response(ATT_PDU):
327    '''
328    See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response
329    '''
330
331
332# -----------------------------------------------------------------------------
333@ATT_PDU.subclass([('client_rx_mtu', 2)])
334class ATT_Exchange_MTU_Request(ATT_PDU):
335    '''
336    See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request
337    '''
338
339
340# -----------------------------------------------------------------------------
341@ATT_PDU.subclass([('server_rx_mtu', 2)])
342class ATT_Exchange_MTU_Response(ATT_PDU):
343    '''
344    See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response
345    '''
346
347
348# -----------------------------------------------------------------------------
349@ATT_PDU.subclass(
350    [('starting_handle', HANDLE_FIELD_SPEC), ('ending_handle', HANDLE_FIELD_SPEC)]
351)
352class ATT_Find_Information_Request(ATT_PDU):
353    '''
354    See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request
355    '''
356
357
358# -----------------------------------------------------------------------------
359@ATT_PDU.subclass([('format', 1), ('information_data', '*')])
360class ATT_Find_Information_Response(ATT_PDU):
361    '''
362    See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response
363    '''
364
365    def parse_information_data(self):
366        self.information = []
367        offset = 0
368        uuid_size = 2 if self.format == 1 else 16
369        while offset + uuid_size <= len(self.information_data):
370            handle = struct.unpack_from('<H', self.information_data, offset)[0]
371            uuid = self.information_data[2 + offset : 2 + offset + uuid_size]
372            self.information.append((handle, uuid))
373            offset += 2 + uuid_size
374
375    def __init__(self, *args, **kwargs):
376        super().__init__(*args, **kwargs)
377        self.parse_information_data()
378
379    def init_from_bytes(self, pdu, offset):
380        super().init_from_bytes(pdu, offset)
381        self.parse_information_data()
382
383    def __str__(self):
384        result = color(self.name, 'yellow')
385        result += ':\n' + HCI_Object.format_fields(
386            self.__dict__,
387            [
388                ('format', 1),
389                (
390                    'information',
391                    {
392                        'mapper': lambda x: ', '.join(
393                            [f'0x{handle:04X}:{uuid.hex()}' for handle, uuid in x]
394                        )
395                    },
396                ),
397            ],
398            '  ',
399        )
400        return result
401
402
403# -----------------------------------------------------------------------------
404@ATT_PDU.subclass(
405    [
406        ('starting_handle', HANDLE_FIELD_SPEC),
407        ('ending_handle', HANDLE_FIELD_SPEC),
408        ('attribute_type', UUID_2_FIELD_SPEC),
409        ('attribute_value', '*'),
410    ]
411)
412class ATT_Find_By_Type_Value_Request(ATT_PDU):
413    '''
414    See Bluetooth spec @ Vol 3, Part F - 3.4.3.3 Find By Type Value Request
415    '''
416
417
418# -----------------------------------------------------------------------------
419@ATT_PDU.subclass([('handles_information_list', '*')])
420class ATT_Find_By_Type_Value_Response(ATT_PDU):
421    '''
422    See Bluetooth spec @ Vol 3, Part F - 3.4.3.4 Find By Type Value Response
423    '''
424
425    def parse_handles_information_list(self):
426        self.handles_information = []
427        offset = 0
428        while offset + 4 <= len(self.handles_information_list):
429            found_attribute_handle, group_end_handle = struct.unpack_from(
430                '<HH', self.handles_information_list, offset
431            )
432            self.handles_information.append((found_attribute_handle, group_end_handle))
433            offset += 4
434
435    def __init__(self, *args, **kwargs):
436        super().__init__(*args, **kwargs)
437        self.parse_handles_information_list()
438
439    def init_from_bytes(self, pdu, offset):
440        super().init_from_bytes(pdu, offset)
441        self.parse_handles_information_list()
442
443    def __str__(self):
444        result = color(self.name, 'yellow')
445        result += ':\n' + HCI_Object.format_fields(
446            self.__dict__,
447            [
448                (
449                    'handles_information',
450                    {
451                        'mapper': lambda x: ', '.join(
452                            [
453                                f'0x{handle1:04X}-0x{handle2:04X}'
454                                for handle1, handle2 in x
455                            ]
456                        )
457                    },
458                )
459            ],
460            '  ',
461        )
462        return result
463
464
465# -----------------------------------------------------------------------------
466@ATT_PDU.subclass(
467    [
468        ('starting_handle', HANDLE_FIELD_SPEC),
469        ('ending_handle', HANDLE_FIELD_SPEC),
470        ('attribute_type', UUID_2_16_FIELD_SPEC),
471    ]
472)
473class ATT_Read_By_Type_Request(ATT_PDU):
474    '''
475    See Bluetooth spec @ Vol 3, Part F - 3.4.4.1 Read By Type Request
476    '''
477
478
479# -----------------------------------------------------------------------------
480@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')])
481class ATT_Read_By_Type_Response(ATT_PDU):
482    '''
483    See Bluetooth spec @ Vol 3, Part F - 3.4.4.2 Read By Type Response
484    '''
485
486    def parse_attribute_data_list(self):
487        self.attributes = []
488        offset = 0
489        while self.length != 0 and offset + self.length <= len(
490            self.attribute_data_list
491        ):
492            (attribute_handle,) = struct.unpack_from(
493                '<H', self.attribute_data_list, offset
494            )
495            attribute_value = self.attribute_data_list[
496                offset + 2 : offset + self.length
497            ]
498            self.attributes.append((attribute_handle, attribute_value))
499            offset += self.length
500
501    def __init__(self, *args, **kwargs):
502        super().__init__(*args, **kwargs)
503        self.parse_attribute_data_list()
504
505    def init_from_bytes(self, pdu, offset):
506        super().init_from_bytes(pdu, offset)
507        self.parse_attribute_data_list()
508
509    def __str__(self):
510        result = color(self.name, 'yellow')
511        result += ':\n' + HCI_Object.format_fields(
512            self.__dict__,
513            [
514                ('length', 1),
515                (
516                    'attributes',
517                    {
518                        'mapper': lambda x: ', '.join(
519                            [f'0x{handle:04X}:{value.hex()}' for handle, value in x]
520                        )
521                    },
522                ),
523            ],
524            '  ',
525        )
526        return result
527
528
529# -----------------------------------------------------------------------------
530@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC)])
531class ATT_Read_Request(ATT_PDU):
532    '''
533    See Bluetooth spec @ Vol 3, Part F - 3.4.4.3 Read Request
534    '''
535
536
537# -----------------------------------------------------------------------------
538@ATT_PDU.subclass([('attribute_value', '*')])
539class ATT_Read_Response(ATT_PDU):
540    '''
541    See Bluetooth spec @ Vol 3, Part F - 3.4.4.4 Read Response
542    '''
543
544
545# -----------------------------------------------------------------------------
546@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('value_offset', 2)])
547class ATT_Read_Blob_Request(ATT_PDU):
548    '''
549    See Bluetooth spec @ Vol 3, Part F - 3.4.4.5 Read Blob Request
550    '''
551
552
553# -----------------------------------------------------------------------------
554@ATT_PDU.subclass([('part_attribute_value', '*')])
555class ATT_Read_Blob_Response(ATT_PDU):
556    '''
557    See Bluetooth spec @ Vol 3, Part F - 3.4.4.6 Read Blob Response
558    '''
559
560
561# -----------------------------------------------------------------------------
562@ATT_PDU.subclass([('set_of_handles', '*')])
563class ATT_Read_Multiple_Request(ATT_PDU):
564    '''
565    See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request
566    '''
567
568
569# -----------------------------------------------------------------------------
570@ATT_PDU.subclass([('set_of_values', '*')])
571class ATT_Read_Multiple_Response(ATT_PDU):
572    '''
573    See Bluetooth spec @ Vol 3, Part F - 3.4.4.8 Read Multiple Response
574    '''
575
576
577# -----------------------------------------------------------------------------
578@ATT_PDU.subclass(
579    [
580        ('starting_handle', HANDLE_FIELD_SPEC),
581        ('ending_handle', HANDLE_FIELD_SPEC),
582        ('attribute_group_type', UUID_2_16_FIELD_SPEC),
583    ]
584)
585class ATT_Read_By_Group_Type_Request(ATT_PDU):
586    '''
587    See Bluetooth spec @ Vol 3, Part F - 3.4.4.9 Read by Group Type Request
588    '''
589
590
591# -----------------------------------------------------------------------------
592@ATT_PDU.subclass([('length', 1), ('attribute_data_list', '*')])
593class ATT_Read_By_Group_Type_Response(ATT_PDU):
594    '''
595    See Bluetooth spec @ Vol 3, Part F - 3.4.4.10 Read by Group Type Response
596    '''
597
598    def parse_attribute_data_list(self):
599        self.attributes = []
600        offset = 0
601        while self.length != 0 and offset + self.length <= len(
602            self.attribute_data_list
603        ):
604            attribute_handle, end_group_handle = struct.unpack_from(
605                '<HH', self.attribute_data_list, offset
606            )
607            attribute_value = self.attribute_data_list[
608                offset + 4 : offset + self.length
609            ]
610            self.attributes.append(
611                (attribute_handle, end_group_handle, attribute_value)
612            )
613            offset += self.length
614
615    def __init__(self, *args, **kwargs):
616        super().__init__(*args, **kwargs)
617        self.parse_attribute_data_list()
618
619    def init_from_bytes(self, pdu, offset):
620        super().init_from_bytes(pdu, offset)
621        self.parse_attribute_data_list()
622
623    def __str__(self):
624        result = color(self.name, 'yellow')
625        result += ':\n' + HCI_Object.format_fields(
626            self.__dict__,
627            [
628                ('length', 1),
629                (
630                    'attributes',
631                    {
632                        'mapper': lambda x: ', '.join(
633                            [
634                                f'0x{handle:04X}-0x{end:04X}:{value.hex()}'
635                                for handle, end, value in x
636                            ]
637                        )
638                    },
639                ),
640            ],
641            '  ',
642        )
643        return result
644
645
646# -----------------------------------------------------------------------------
647@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
648class ATT_Write_Request(ATT_PDU):
649    '''
650    See Bluetooth spec @ Vol 3, Part F - 3.4.5.1 Write Request
651    '''
652
653
654# -----------------------------------------------------------------------------
655@ATT_PDU.subclass([])
656class ATT_Write_Response(ATT_PDU):
657    '''
658    See Bluetooth spec @ Vol 3, Part F - 3.4.5.2 Write Response
659    '''
660
661
662# -----------------------------------------------------------------------------
663@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
664class ATT_Write_Command(ATT_PDU):
665    '''
666    See Bluetooth spec @ Vol 3, Part F - 3.4.5.3 Write Command
667    '''
668
669
670# -----------------------------------------------------------------------------
671@ATT_PDU.subclass(
672    [
673        ('attribute_handle', HANDLE_FIELD_SPEC),
674        ('attribute_value', '*'),
675        # ('authentication_signature', 'TODO')
676    ]
677)
678class ATT_Signed_Write_Command(ATT_PDU):
679    '''
680    See Bluetooth spec @ Vol 3, Part F - 3.4.5.4 Signed Write Command
681    '''
682
683
684# -----------------------------------------------------------------------------
685@ATT_PDU.subclass(
686    [
687        ('attribute_handle', HANDLE_FIELD_SPEC),
688        ('value_offset', 2),
689        ('part_attribute_value', '*'),
690    ]
691)
692class ATT_Prepare_Write_Request(ATT_PDU):
693    '''
694    See Bluetooth spec @ Vol 3, Part F - 3.4.6.1 Prepare Write Request
695    '''
696
697
698# -----------------------------------------------------------------------------
699@ATT_PDU.subclass(
700    [
701        ('attribute_handle', HANDLE_FIELD_SPEC),
702        ('value_offset', 2),
703        ('part_attribute_value', '*'),
704    ]
705)
706class ATT_Prepare_Write_Response(ATT_PDU):
707    '''
708    See Bluetooth spec @ Vol 3, Part F - 3.4.6.2 Prepare Write Response
709    '''
710
711
712# -----------------------------------------------------------------------------
713@ATT_PDU.subclass([])
714class ATT_Execute_Write_Request(ATT_PDU):
715    '''
716    See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request
717    '''
718
719
720# -----------------------------------------------------------------------------
721@ATT_PDU.subclass([])
722class ATT_Execute_Write_Response(ATT_PDU):
723    '''
724    See Bluetooth spec @ Vol 3, Part F - 3.4.6.4 Execute Write Response
725    '''
726
727
728# -----------------------------------------------------------------------------
729@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
730class ATT_Handle_Value_Notification(ATT_PDU):
731    '''
732    See Bluetooth spec @ Vol 3, Part F - 3.4.7.1 Handle Value Notification
733    '''
734
735
736# -----------------------------------------------------------------------------
737@ATT_PDU.subclass([('attribute_handle', HANDLE_FIELD_SPEC), ('attribute_value', '*')])
738class ATT_Handle_Value_Indication(ATT_PDU):
739    '''
740    See Bluetooth spec @ Vol 3, Part F - 3.4.7.2 Handle Value Indication
741    '''
742
743
744# -----------------------------------------------------------------------------
745@ATT_PDU.subclass([])
746class ATT_Handle_Value_Confirmation(ATT_PDU):
747    '''
748    See Bluetooth spec @ Vol 3, Part F - 3.4.7.3 Handle Value Confirmation
749    '''
750
751
752# -----------------------------------------------------------------------------
753class AttributeValue:
754    '''
755    Attribute value where reading and/or writing is delegated to functions
756    passed as arguments to the constructor.
757    '''
758
759    def __init__(
760        self,
761        read: Union[
762            Callable[[Optional[Connection]], bytes],
763            Callable[[Optional[Connection]], Awaitable[bytes]],
764            None,
765        ] = None,
766        write: Union[
767            Callable[[Optional[Connection], bytes], None],
768            Callable[[Optional[Connection], bytes], Awaitable[None]],
769            None,
770        ] = None,
771    ):
772        self._read = read
773        self._write = write
774
775    def read(self, connection: Optional[Connection]) -> Union[bytes, Awaitable[bytes]]:
776        return self._read(connection) if self._read else b''
777
778    def write(
779        self, connection: Optional[Connection], value: bytes
780    ) -> Union[Awaitable[None], None]:
781        if self._write:
782            return self._write(connection, value)
783
784        return None
785
786
787# -----------------------------------------------------------------------------
788class Attribute(EventEmitter):
789    class Permissions(enum.IntFlag):
790        READABLE = 0x01
791        WRITEABLE = 0x02
792        READ_REQUIRES_ENCRYPTION = 0x04
793        WRITE_REQUIRES_ENCRYPTION = 0x08
794        READ_REQUIRES_AUTHENTICATION = 0x10
795        WRITE_REQUIRES_AUTHENTICATION = 0x20
796        READ_REQUIRES_AUTHORIZATION = 0x40
797        WRITE_REQUIRES_AUTHORIZATION = 0x80
798
799        @classmethod
800        def from_string(cls, permissions_str: str) -> Attribute.Permissions:
801            try:
802                return functools.reduce(
803                    lambda x, y: x | Attribute.Permissions[y],
804                    permissions_str.replace('|', ',').split(","),
805                    Attribute.Permissions(0),
806                )
807            except TypeError as exc:
808                # The check for `p.name is not None` here is needed because for InFlag
809                # enums, the .name property can be None, when the enum value is 0,
810                # so the type hint for .name is Optional[str].
811                enum_list: List[str] = [p.name for p in cls if p.name is not None]
812                enum_list_str = ",".join(enum_list)
813                raise TypeError(
814                    f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {enum_list_str}\nGot: {permissions_str}"
815                ) from exc
816
817    # Permission flags(legacy-use only)
818    READABLE = Permissions.READABLE
819    WRITEABLE = Permissions.WRITEABLE
820    READ_REQUIRES_ENCRYPTION = Permissions.READ_REQUIRES_ENCRYPTION
821    WRITE_REQUIRES_ENCRYPTION = Permissions.WRITE_REQUIRES_ENCRYPTION
822    READ_REQUIRES_AUTHENTICATION = Permissions.READ_REQUIRES_AUTHENTICATION
823    WRITE_REQUIRES_AUTHENTICATION = Permissions.WRITE_REQUIRES_AUTHENTICATION
824    READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION
825    WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION
826
827    value: Union[bytes, AttributeValue]
828
829    def __init__(
830        self,
831        attribute_type: Union[str, bytes, UUID],
832        permissions: Union[str, Attribute.Permissions],
833        value: Union[str, bytes, AttributeValue] = b'',
834    ) -> None:
835        EventEmitter.__init__(self)
836        self.handle = 0
837        self.end_group_handle = 0
838        if isinstance(permissions, str):
839            self.permissions = Attribute.Permissions.from_string(permissions)
840        else:
841            self.permissions = permissions
842
843        # Convert the type to a UUID object if it isn't already
844        if isinstance(attribute_type, str):
845            self.type = UUID(attribute_type)
846        elif isinstance(attribute_type, bytes):
847            self.type = UUID.from_bytes(attribute_type)
848        else:
849            self.type = attribute_type
850
851        # Convert the value to a byte array
852        if isinstance(value, str):
853            self.value = bytes(value, 'utf-8')
854        else:
855            self.value = value
856
857    def encode_value(self, value: Any) -> bytes:
858        return value
859
860    def decode_value(self, value_bytes: bytes) -> Any:
861        return value_bytes
862
863    async def read_value(self, connection: Optional[Connection]) -> bytes:
864        if (
865            (self.permissions & self.READ_REQUIRES_ENCRYPTION)
866            and connection is not None
867            and not connection.encryption
868        ):
869            raise ATT_Error(
870                error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle
871            )
872        if (
873            (self.permissions & self.READ_REQUIRES_AUTHENTICATION)
874            and connection is not None
875            and not connection.authenticated
876        ):
877            raise ATT_Error(
878                error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle
879            )
880        if self.permissions & self.READ_REQUIRES_AUTHORIZATION:
881            # TODO: handle authorization better
882            raise ATT_Error(
883                error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle
884            )
885
886        if hasattr(self.value, 'read'):
887            try:
888                value = self.value.read(connection)
889                if inspect.isawaitable(value):
890                    value = await value
891            except ATT_Error as error:
892                raise ATT_Error(
893                    error_code=error.error_code, att_handle=self.handle
894                ) from error
895        else:
896            value = self.value
897
898        return self.encode_value(value)
899
900    async def write_value(self, connection: Connection, value_bytes: bytes) -> None:
901        if (
902            self.permissions & self.WRITE_REQUIRES_ENCRYPTION
903        ) and not connection.encryption:
904            raise ATT_Error(
905                error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle
906            )
907        if (
908            self.permissions & self.WRITE_REQUIRES_AUTHENTICATION
909        ) and not connection.authenticated:
910            raise ATT_Error(
911                error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle
912            )
913        if self.permissions & self.WRITE_REQUIRES_AUTHORIZATION:
914            # TODO: handle authorization better
915            raise ATT_Error(
916                error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle
917            )
918
919        value = self.decode_value(value_bytes)
920
921        if hasattr(self.value, 'write'):
922            try:
923                result = self.value.write(connection, value)
924                if inspect.isawaitable(result):
925                    await result
926            except ATT_Error as error:
927                raise ATT_Error(
928                    error_code=error.error_code, att_handle=self.handle
929                ) from error
930        else:
931            self.value = value
932
933        self.emit('write', connection, value)
934
935    def __repr__(self):
936        if isinstance(self.value, bytes):
937            value_str = self.value.hex()
938        else:
939            value_str = str(self.value)
940        if value_str:
941            value_string = f', value={self.value.hex()}'
942        else:
943            value_string = ''
944        return (
945            f'Attribute(handle=0x{self.handle:04X}, '
946            f'type={self.type}, '
947            f'permissions={self.permissions}{value_string})'
948        )
949