1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import logging
20import os
21import struct
22import pytest
23from unittest.mock import AsyncMock, Mock, ANY
24
25from bumble.controller import Controller
26from bumble.gatt_client import CharacteristicProxy
27from bumble.link import LocalLink
28from bumble.device import Device, Peer
29from bumble.host import Host
30from bumble.gatt import (
31    GATT_BATTERY_LEVEL_CHARACTERISTIC,
32    GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
33    CharacteristicAdapter,
34    DelegatedCharacteristicAdapter,
35    PackedCharacteristicAdapter,
36    MappedCharacteristicAdapter,
37    UTF8CharacteristicAdapter,
38    Service,
39    Characteristic,
40    CharacteristicValue,
41    Descriptor,
42)
43from bumble.transport import AsyncPipeSink
44from bumble.core import UUID
45from bumble.att import (
46    Attribute,
47    ATT_EXCHANGE_MTU_REQUEST,
48    ATT_ATTRIBUTE_NOT_FOUND_ERROR,
49    ATT_PDU,
50    ATT_Error,
51    ATT_Error_Response,
52    ATT_Read_By_Group_Type_Request,
53    ErrorCode,
54)
55from .test_utils import async_barrier
56
57
58# -----------------------------------------------------------------------------
59def basic_check(x):
60    pdu = x.to_bytes()
61    parsed = ATT_PDU.from_bytes(pdu)
62    x_str = str(x)
63    parsed_str = str(parsed)
64    assert x_str == parsed_str
65
66
67# -----------------------------------------------------------------------------
68def test_UUID():
69    u = UUID.from_16_bits(0x7788)
70    assert str(u) == 'UUID-16:7788'
71    u = UUID.from_32_bits(0x11223344)
72    assert str(u) == 'UUID-32:11223344'
73    u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
74    assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
75    v = UUID(str(u))
76    assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
77    w = UUID.from_bytes(v.to_bytes())
78    assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
79
80    u1 = UUID.from_16_bits(0x1234)
81    b1 = u1.to_bytes(force_128=True)
82    u2 = UUID.from_bytes(b1)
83    assert u1 == u2
84
85    u3 = UUID.from_16_bits(0x180A)
86    assert str(u3) == 'UUID-16:180A (Device Information)'
87
88
89# -----------------------------------------------------------------------------
90def test_ATT_Error_Response():
91    pdu = ATT_Error_Response(
92        request_opcode_in_error=ATT_EXCHANGE_MTU_REQUEST,
93        attribute_handle_in_error=0x0000,
94        error_code=ATT_ATTRIBUTE_NOT_FOUND_ERROR,
95    )
96    basic_check(pdu)
97
98
99# -----------------------------------------------------------------------------
100def test_ATT_Read_By_Group_Type_Request():
101    pdu = ATT_Read_By_Group_Type_Request(
102        starting_handle=0x0001,
103        ending_handle=0xFFFF,
104        attribute_group_type=UUID.from_16_bits(0x2800),
105    )
106    basic_check(pdu)
107
108
109# -----------------------------------------------------------------------------
110@pytest.mark.asyncio
111async def test_characteristic_encoding():
112    class Foo(Characteristic):
113        def encode_value(self, value):
114            return bytes([value])
115
116        def decode_value(self, value_bytes):
117            return value_bytes[0]
118
119    c = Foo(
120        GATT_BATTERY_LEVEL_CHARACTERISTIC,
121        Characteristic.Properties.READ,
122        Characteristic.READABLE,
123        123,
124    )
125    x = await c.read_value(None)
126    assert x == bytes([123])
127    await c.write_value(None, bytes([122]))
128    assert c.value == 122
129
130    class FooProxy(CharacteristicProxy):
131        def __init__(self, characteristic):
132            super().__init__(
133                characteristic.client,
134                characteristic.handle,
135                characteristic.end_group_handle,
136                characteristic.uuid,
137                characteristic.properties,
138            )
139
140        def encode_value(self, value):
141            return bytes([value])
142
143        def decode_value(self, value_bytes):
144            return value_bytes[0]
145
146    [client, server] = LinkedDevices().devices[:2]
147
148    characteristic = Characteristic(
149        'FDB159DB-036C-49E3-B3DB-6325AC750806',
150        Characteristic.Properties.READ
151        | Characteristic.Properties.WRITE
152        | Characteristic.Properties.NOTIFY,
153        Characteristic.READABLE | Characteristic.WRITEABLE,
154        bytes([123]),
155    )
156
157    async def async_read(connection):
158        return 0x05060708
159
160    async_characteristic = PackedCharacteristicAdapter(
161        Characteristic(
162            '2AB7E91B-43E8-4F73-AC3B-80C1683B47F9',
163            Characteristic.Properties.READ,
164            Characteristic.READABLE,
165            CharacteristicValue(read=async_read),
166        ),
167        '>I',
168    )
169
170    service = Service(
171        '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic, async_characteristic]
172    )
173    server.add_service(service)
174
175    await client.power_on()
176    await server.power_on()
177    connection = await client.connect(server.random_address)
178    peer = Peer(connection)
179
180    await peer.discover_services()
181    await peer.discover_characteristics()
182    c = peer.get_characteristics_by_uuid(characteristic.uuid)
183    assert len(c) == 1
184    c = c[0]
185    cp = FooProxy(c)
186
187    v = await cp.read_value()
188    assert v == 123
189    await cp.write_value(124)
190    await async_barrier()
191    assert characteristic.value == bytes([124])
192
193    v = await cp.read_value()
194    assert v == 124
195    await cp.write_value(125, with_response=True)
196    await async_barrier()
197    assert characteristic.value == bytes([125])
198
199    cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
200    await cd.write_value(100, with_response=True)
201    await async_barrier()
202    assert characteristic.value == bytes([50])
203
204    c2 = peer.get_characteristics_by_uuid(async_characteristic.uuid)
205    assert len(c2) == 1
206    c2 = c2[0]
207    cd2 = PackedCharacteristicAdapter(c2, ">I")
208    cd2v = await cd2.read_value()
209    assert cd2v == 0x05060708
210
211    last_change = None
212
213    def on_change(value):
214        nonlocal last_change
215        last_change = value
216
217    await c.subscribe(on_change)
218    await server.notify_subscribers(characteristic)
219    await async_barrier()
220    assert last_change == characteristic.value
221    last_change = None
222
223    await server.notify_subscribers(characteristic, value=bytes([125]))
224    await async_barrier()
225    assert last_change == bytes([125])
226    last_change = None
227
228    await c.unsubscribe(on_change)
229    await server.notify_subscribers(characteristic)
230    await async_barrier()
231    assert last_change is None
232
233    await cp.subscribe(on_change)
234    await server.notify_subscribers(characteristic)
235    await async_barrier()
236    assert last_change == characteristic.value[0]
237    last_change = None
238
239    await server.notify_subscribers(characteristic, value=bytes([126]))
240    await async_barrier()
241    assert last_change == 126
242    last_change = None
243
244    await cp.unsubscribe(on_change)
245    await server.notify_subscribers(characteristic)
246    await async_barrier()
247    assert last_change is None
248
249    cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0])
250    await cd.subscribe(on_change)
251    await server.notify_subscribers(characteristic)
252    await async_barrier()
253    assert last_change == characteristic.value[0]
254    last_change = None
255
256    await cd.unsubscribe(on_change)
257    await server.notify_subscribers(characteristic)
258    await async_barrier()
259    assert last_change is None
260
261
262# -----------------------------------------------------------------------------
263@pytest.mark.asyncio
264async def test_attribute_getters():
265    [client, server] = LinkedDevices().devices[:2]
266
267    characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
268    characteristic = Characteristic(
269        characteristic_uuid,
270        Characteristic.Properties.READ
271        | Characteristic.Properties.WRITE
272        | Characteristic.Properties.NOTIFY,
273        Characteristic.READABLE | Characteristic.WRITEABLE,
274        bytes([123]),
275    )
276
277    service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829')
278    service = Service(service_uuid, [characteristic])
279    server.add_service(service)
280
281    service_attr = server.gatt_server.get_service_attribute(service_uuid)
282    assert service_attr
283
284    (
285        char_decl_attr,
286        char_value_attr,
287    ) = server.gatt_server.get_characteristic_attributes(
288        service_uuid, characteristic_uuid
289    )
290    assert char_decl_attr and char_value_attr
291
292    desc_attr = server.gatt_server.get_descriptor_attribute(
293        service_uuid,
294        characteristic_uuid,
295        GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
296    )
297    assert desc_attr
298
299    # assert all handles are in expected order
300    assert (
301        service_attr.handle
302        < char_decl_attr.handle
303        < char_value_attr.handle
304        < desc_attr.handle
305        == service_attr.end_group_handle
306    )
307    # assert characteristic declarations attribute is followed by characteristic value attribute
308    assert char_decl_attr.handle + 1 == char_value_attr.handle
309
310
311# -----------------------------------------------------------------------------
312@pytest.mark.asyncio
313async def test_CharacteristicAdapter():
314    # Check that the CharacteristicAdapter base class is transparent
315    v = bytes([1, 2, 3])
316    c = Characteristic(
317        GATT_BATTERY_LEVEL_CHARACTERISTIC,
318        Characteristic.Properties.READ,
319        Characteristic.READABLE,
320        v,
321    )
322    a = CharacteristicAdapter(c)
323
324    value = await a.read_value(None)
325    assert value == v
326
327    v = bytes([3, 4, 5])
328    await a.write_value(None, v)
329    assert c.value == v
330
331    # Simple delegated adapter
332    a = DelegatedCharacteristicAdapter(
333        c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))
334    )
335
336    value = await a.read_value(None)
337    assert value == bytes(reversed(v))
338
339    v = bytes([3, 4, 5])
340    await a.write_value(None, v)
341    assert a.value == bytes(reversed(v))
342
343    # Packed adapter with single element format
344    v = 1234
345    pv = struct.pack('>H', v)
346    c.value = v
347    a = PackedCharacteristicAdapter(c, '>H')
348
349    value = await a.read_value(None)
350    assert value == pv
351    c.value = None
352    await a.write_value(None, pv)
353    assert a.value == v
354
355    # Packed adapter with multi-element format
356    v1 = 1234
357    v2 = 5678
358    pv = struct.pack('>HH', v1, v2)
359    c.value = (v1, v2)
360    a = PackedCharacteristicAdapter(c, '>HH')
361
362    value = await a.read_value(None)
363    assert value == pv
364    c.value = None
365    await a.write_value(None, pv)
366    assert a.value == (v1, v2)
367
368    # Mapped adapter
369    v1 = 1234
370    v2 = 5678
371    pv = struct.pack('>HH', v1, v2)
372    mapped = {'v1': v1, 'v2': v2}
373    c.value = mapped
374    a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
375
376    value = await a.read_value(None)
377    assert value == pv
378    c.value = None
379    await a.write_value(None, pv)
380    assert a.value == mapped
381
382    # UTF-8 adapter
383    v = 'Hello π'
384    ev = v.encode('utf-8')
385    c.value = v
386    a = UTF8CharacteristicAdapter(c)
387
388    value = await a.read_value(None)
389    assert value == ev
390    c.value = None
391    await a.write_value(None, ev)
392    assert a.value == v
393
394
395# -----------------------------------------------------------------------------
396@pytest.mark.asyncio
397async def test_CharacteristicValue():
398    b = bytes([1, 2, 3])
399
400    async def read_value(connection):
401        return b
402
403    c = CharacteristicValue(read=read_value)
404    x = await c.read(None)
405    assert x == b
406
407    m = Mock()
408    c = CharacteristicValue(write=m)
409    z = object()
410    c.write(z, b)
411    m.assert_called_once_with(z, b)
412
413
414# -----------------------------------------------------------------------------
415@pytest.mark.asyncio
416async def test_CharacteristicValue_async():
417    b = bytes([1, 2, 3])
418
419    async def read_value(connection):
420        return b
421
422    c = CharacteristicValue(read=read_value)
423    x = await c.read(None)
424    assert x == b
425
426    m = AsyncMock()
427    c = CharacteristicValue(write=m)
428    z = object()
429    await c.write(z, b)
430    m.assert_called_once_with(z, b)
431
432
433# -----------------------------------------------------------------------------
434class LinkedDevices:
435    def __init__(self):
436        self.connections = [None, None, None]
437
438        self.link = LocalLink()
439        self.controllers = [
440            Controller('C1', link=self.link),
441            Controller('C2', link=self.link),
442            Controller('C3', link=self.link),
443        ]
444        self.devices = [
445            Device(
446                address='F0:F1:F2:F3:F4:F5',
447                host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
448            ),
449            Device(
450                address='F1:F2:F3:F4:F5:F6',
451                host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
452            ),
453            Device(
454                address='F2:F3:F4:F5:F6:F7',
455                host=Host(self.controllers[2], AsyncPipeSink(self.controllers[2])),
456            ),
457        ]
458
459        self.paired = [None, None, None]
460
461
462# -----------------------------------------------------------------------------
463@pytest.mark.asyncio
464async def test_read_write():
465    [client, server] = LinkedDevices().devices[:2]
466
467    characteristic1 = Characteristic(
468        'FDB159DB-036C-49E3-B3DB-6325AC750806',
469        Characteristic.Properties.READ | Characteristic.Properties.WRITE,
470        Characteristic.READABLE | Characteristic.WRITEABLE,
471    )
472
473    def on_characteristic1_write(connection, value):
474        characteristic1._last_value = (connection, value)
475
476    characteristic1.on('write', on_characteristic1_write)
477
478    def on_characteristic2_read(connection):
479        return bytes(str(connection.peer_address))
480
481    def on_characteristic2_write(connection, value):
482        characteristic2._last_value = (connection, value)
483
484    characteristic2 = Characteristic(
485        '66DE9057-C848-4ACA-B993-D675644EBB85',
486        Characteristic.Properties.READ | Characteristic.Properties.WRITE,
487        Characteristic.READABLE | Characteristic.WRITEABLE,
488        CharacteristicValue(
489            read=on_characteristic2_read, write=on_characteristic2_write
490        ),
491    )
492
493    service1 = Service(
494        '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2]
495    )
496    server.add_services([service1])
497
498    await client.power_on()
499    await server.power_on()
500    connection = await client.connect(server.random_address)
501    peer = Peer(connection)
502
503    await peer.discover_services()
504    await peer.discover_characteristics()
505    c = peer.get_characteristics_by_uuid(characteristic1.uuid)
506    assert len(c) == 1
507    c1 = c[0]
508    c = peer.get_characteristics_by_uuid(characteristic2.uuid)
509    assert len(c) == 1
510    c2 = c[0]
511
512    v1 = await peer.read_value(c1)
513    assert v1 == b''
514    b = bytes([1, 2, 3])
515    await peer.write_value(c1, b)
516    await async_barrier()
517    assert characteristic1.value == b
518    v1 = await peer.read_value(c1)
519    assert v1 == b
520    assert type(characteristic1._last_value is tuple)
521    assert len(characteristic1._last_value) == 2
522    assert str(characteristic1._last_value[0].peer_address) == str(
523        client.random_address
524    )
525    assert characteristic1._last_value[1] == b
526    bb = bytes([3, 4, 5, 6])
527    characteristic1.value = bb
528    v1 = await peer.read_value(c1)
529    assert v1 == bb
530
531    await peer.write_value(c2, b)
532    await async_barrier()
533    assert type(characteristic2._last_value is tuple)
534    assert len(characteristic2._last_value) == 2
535    assert str(characteristic2._last_value[0].peer_address) == str(
536        client.random_address
537    )
538    assert characteristic2._last_value[1] == b
539
540
541# -----------------------------------------------------------------------------
542@pytest.mark.asyncio
543async def test_read_write2():
544    [client, server] = LinkedDevices().devices[:2]
545
546    v = bytes([0x11, 0x22, 0x33, 0x44])
547    characteristic1 = Characteristic(
548        'FDB159DB-036C-49E3-B3DB-6325AC750806',
549        Characteristic.Properties.READ | Characteristic.Properties.WRITE,
550        Characteristic.READABLE | Characteristic.WRITEABLE,
551        value=v,
552    )
553
554    service1 = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1])
555    server.add_services([service1])
556
557    await client.power_on()
558    await server.power_on()
559    connection = await client.connect(server.random_address)
560    peer = Peer(connection)
561
562    await peer.discover_services()
563    c = peer.get_services_by_uuid(service1.uuid)
564    assert len(c) == 1
565    s = c[0]
566    await s.discover_characteristics()
567    c = s.get_characteristics_by_uuid(characteristic1.uuid)
568    assert len(c) == 1
569    c1 = c[0]
570
571    v1 = await c1.read_value()
572    assert v1 == v
573
574    a1 = PackedCharacteristicAdapter(c1, '>I')
575    v1 = await a1.read_value()
576    assert v1 == struct.unpack('>I', v)[0]
577
578    b = bytes([0x55, 0x66, 0x77, 0x88])
579    await a1.write_value(struct.unpack('>I', b)[0])
580    await async_barrier()
581    assert characteristic1.value == b
582    v1 = await a1.read_value()
583    assert v1 == struct.unpack('>I', b)[0]
584
585
586# -----------------------------------------------------------------------------
587@pytest.mark.asyncio
588async def test_subscribe_notify():
589    [client, server] = LinkedDevices().devices[:2]
590
591    characteristic1 = Characteristic(
592        'FDB159DB-036C-49E3-B3DB-6325AC750806',
593        Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
594        Characteristic.READABLE,
595        bytes([1, 2, 3]),
596    )
597
598    def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled):
599        characteristic1._last_subscription = (
600            connection,
601            notify_enabled,
602            indicate_enabled,
603        )
604
605    characteristic1.on('subscription', on_characteristic1_subscription)
606
607    characteristic2 = Characteristic(
608        '66DE9057-C848-4ACA-B993-D675644EBB85',
609        Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
610        Characteristic.READABLE,
611        bytes([4, 5, 6]),
612    )
613
614    def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled):
615        characteristic2._last_subscription = (
616            connection,
617            notify_enabled,
618            indicate_enabled,
619        )
620
621    characteristic2.on('subscription', on_characteristic2_subscription)
622
623    characteristic3 = Characteristic(
624        'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
625        Characteristic.Properties.READ
626        | Characteristic.Properties.NOTIFY
627        | Characteristic.Properties.INDICATE,
628        Characteristic.READABLE,
629        bytes([7, 8, 9]),
630    )
631
632    def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled):
633        characteristic3._last_subscription = (
634            connection,
635            notify_enabled,
636            indicate_enabled,
637        )
638
639    characteristic3.on('subscription', on_characteristic3_subscription)
640
641    service1 = Service(
642        '3A657F47-D34F-46B3-B1EC-698E29B6B829',
643        [characteristic1, characteristic2, characteristic3],
644    )
645    server.add_services([service1])
646
647    def on_characteristic_subscription(
648        connection, characteristic, notify_enabled, indicate_enabled
649    ):
650        server._last_subscription = (
651            connection,
652            characteristic,
653            notify_enabled,
654            indicate_enabled,
655        )
656
657    server.on('characteristic_subscription', on_characteristic_subscription)
658
659    await client.power_on()
660    await server.power_on()
661    connection = await client.connect(server.random_address)
662    peer = Peer(connection)
663
664    await peer.discover_services()
665    await peer.discover_characteristics()
666    c = peer.get_characteristics_by_uuid(characteristic1.uuid)
667    assert len(c) == 1
668    c1 = c[0]
669    c = peer.get_characteristics_by_uuid(characteristic2.uuid)
670    assert len(c) == 1
671    c2 = c[0]
672    c = peer.get_characteristics_by_uuid(characteristic3.uuid)
673    assert len(c) == 1
674    c3 = c[0]
675
676    c1._called = False
677    c1._last_update = None
678
679    def on_c1_update(value):
680        c1._called = True
681        c1._last_update = value
682
683    c1.on('update', on_c1_update)
684    await peer.subscribe(c1)
685    await async_barrier()
686    assert server._last_subscription[1] == characteristic1
687    assert server._last_subscription[2]
688    assert not server._last_subscription[3]
689    assert characteristic1._last_subscription[1]
690    assert not characteristic1._last_subscription[2]
691    await server.indicate_subscribers(characteristic1)
692    await async_barrier()
693    assert not c1._called
694    await server.notify_subscribers(characteristic1)
695    await async_barrier()
696    assert c1._called
697    assert c1._last_update == characteristic1.value
698
699    c1._called = False
700    c1._last_update = None
701    c1_value = characteristic1.value
702    await server.notify_subscribers(characteristic1, bytes([0, 1, 2]))
703    await async_barrier()
704    assert c1._called
705    assert c1._last_update == bytes([0, 1, 2])
706    assert characteristic1.value == c1_value
707
708    c1._called = False
709    await peer.unsubscribe(c1)
710    await server.notify_subscribers(characteristic1)
711    assert not c1._called
712
713    c2._called = False
714    c2._last_update = None
715
716    def on_c2_update(value):
717        c2._called = True
718        c2._last_update = value
719
720    await peer.subscribe(c2, on_c2_update)
721    await async_barrier()
722    await server.notify_subscriber(
723        characteristic2._last_subscription[0], characteristic2
724    )
725    await async_barrier()
726    assert not c2._called
727    await server.indicate_subscriber(
728        characteristic2._last_subscription[0], characteristic2
729    )
730    await async_barrier()
731    assert c2._called
732    assert c2._last_update == characteristic2.value
733
734    c2._called = False
735    await peer.unsubscribe(c2, on_c2_update)
736    await server.indicate_subscriber(
737        characteristic2._last_subscription[0], characteristic2
738    )
739    await async_barrier()
740    assert not c2._called
741
742    c3._called = False
743    c3._called_2 = False
744    c3._called_3 = False
745    c3._last_update = None
746    c3._last_update_2 = None
747    c3._last_update_3 = None
748
749    def on_c3_update(value):
750        c3._called = True
751        c3._last_update = value
752
753    def on_c3_update_2(value):  # for notify
754        c3._called_2 = True
755        c3._last_update_2 = value
756
757    def on_c3_update_3(value):  # for indicate
758        c3._called_3 = True
759        c3._last_update_3 = value
760
761    c3.on('update', on_c3_update)
762    await peer.subscribe(c3, on_c3_update_2)
763    await async_barrier()
764    await server.notify_subscriber(
765        characteristic3._last_subscription[0], characteristic3
766    )
767    await async_barrier()
768    assert c3._called
769    assert c3._last_update == characteristic3.value
770    assert c3._called_2
771    assert c3._last_update_2 == characteristic3.value
772    assert not c3._called_3
773
774    c3._called = False
775    c3._called_2 = False
776    c3._called_3 = False
777    await peer.unsubscribe(c3)
778    await peer.subscribe(c3, on_c3_update_3, prefer_notify=False)
779    await async_barrier()
780    characteristic3.value = bytes([1, 2, 3])
781    await server.indicate_subscriber(
782        characteristic3._last_subscription[0], characteristic3
783    )
784    await async_barrier()
785    assert c3._called
786    assert c3._last_update == characteristic3.value
787    assert not c3._called_2
788    assert c3._called_3
789    assert c3._last_update_3 == characteristic3.value
790
791    c3._called = False
792    c3._called_2 = False
793    c3._called_3 = False
794    await peer.unsubscribe(c3)
795    await server.notify_subscriber(
796        characteristic3._last_subscription[0], characteristic3
797    )
798    await server.indicate_subscriber(
799        characteristic3._last_subscription[0], characteristic3
800    )
801    await async_barrier()
802    assert not c3._called
803    assert not c3._called_2
804    assert not c3._called_3
805
806
807# -----------------------------------------------------------------------------
808@pytest.mark.asyncio
809async def test_unsubscribe():
810    [client, server] = LinkedDevices().devices[:2]
811
812    characteristic1 = Characteristic(
813        'FDB159DB-036C-49E3-B3DB-6325AC750806',
814        Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
815        Characteristic.READABLE,
816        bytes([1, 2, 3]),
817    )
818    characteristic2 = Characteristic(
819        '3234C4F4-3F34-4616-8935-45A50EE05DEB',
820        Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
821        Characteristic.READABLE,
822        bytes([1, 2, 3]),
823    )
824
825    service1 = Service(
826        '3A657F47-D34F-46B3-B1EC-698E29B6B829',
827        [characteristic1, characteristic2],
828    )
829    server.add_services([service1])
830
831    mock1 = Mock()
832    characteristic1.on('subscription', mock1)
833    mock2 = Mock()
834    characteristic2.on('subscription', mock2)
835
836    await client.power_on()
837    await server.power_on()
838    connection = await client.connect(server.random_address)
839    peer = Peer(connection)
840
841    await peer.discover_services()
842    await peer.discover_characteristics()
843    c = peer.get_characteristics_by_uuid(characteristic1.uuid)
844    assert len(c) == 1
845    c1 = c[0]
846    c = peer.get_characteristics_by_uuid(characteristic2.uuid)
847    assert len(c) == 1
848    c2 = c[0]
849
850    await c1.subscribe()
851    await async_barrier()
852    mock1.assert_called_once_with(ANY, True, False)
853
854    await c2.subscribe()
855    await async_barrier()
856    mock2.assert_called_once_with(ANY, True, False)
857
858    mock1.reset_mock()
859    await c1.unsubscribe()
860    await async_barrier()
861    mock1.assert_called_once_with(ANY, False, False)
862
863    mock2.reset_mock()
864    await c2.unsubscribe()
865    await async_barrier()
866    mock2.assert_called_once_with(ANY, False, False)
867
868    mock1.reset_mock()
869    await c1.unsubscribe()
870    await async_barrier()
871    mock1.assert_not_called()
872
873    mock2.reset_mock()
874    await c2.unsubscribe()
875    await async_barrier()
876    mock2.assert_not_called()
877
878    mock1.reset_mock()
879    await c1.unsubscribe(force=True)
880    await async_barrier()
881    mock1.assert_called_once_with(ANY, False, False)
882
883
884# -----------------------------------------------------------------------------
885@pytest.mark.asyncio
886async def test_discover_all():
887    [client, server] = LinkedDevices().devices[:2]
888
889    characteristic1 = Characteristic(
890        'FDB159DB-036C-49E3-B3DB-6325AC750806',
891        Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
892        Characteristic.READABLE,
893        bytes([1, 2, 3]),
894    )
895
896    descriptor1 = Descriptor('2902', 'READABLE,WRITEABLE')
897    descriptor2 = Descriptor('AAAA', 'READABLE,WRITEABLE')
898    characteristic2 = Characteristic(
899        '3234C4F4-3F34-4616-8935-45A50EE05DEB',
900        Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
901        Characteristic.READABLE,
902        bytes([1, 2, 3]),
903        descriptors=[descriptor1, descriptor2],
904    )
905
906    service1 = Service(
907        '3A657F47-D34F-46B3-B1EC-698E29B6B829',
908        [characteristic1, characteristic2],
909    )
910    service2 = Service('1111', [])
911    server.add_services([service1, service2])
912
913    await client.power_on()
914    await server.power_on()
915    connection = await client.connect(server.random_address)
916    peer = Peer(connection)
917
918    await peer.discover_all()
919    assert len(peer.gatt_client.services) == 3
920    # service 1800 gets added automatically
921    assert peer.gatt_client.services[0].uuid == UUID('1800')
922    assert peer.gatt_client.services[1].uuid == service1.uuid
923    assert peer.gatt_client.services[2].uuid == service2.uuid
924    s = peer.get_services_by_uuid(service1.uuid)
925    assert len(s) == 1
926    assert len(s[0].characteristics) == 2
927    c = peer.get_characteristics_by_uuid(uuid=characteristic2.uuid, service=s[0])
928    assert len(c) == 1
929    assert len(c[0].descriptors) == 2
930    s = peer.get_services_by_uuid(service2.uuid)
931    assert len(s) == 1
932    assert len(s[0].characteristics) == 0
933
934
935# -----------------------------------------------------------------------------
936@pytest.mark.asyncio
937async def test_mtu_exchange():
938    [d1, d2, d3] = LinkedDevices().devices[:3]
939
940    d3.gatt_server.max_mtu = 100
941
942    d3_connections = []
943
944    @d3.on('connection')
945    def on_d3_connection(connection):
946        d3_connections.append(connection)
947
948    await d1.power_on()
949    await d2.power_on()
950    await d3.power_on()
951
952    d1_connection = await d1.connect(d3.random_address)
953    assert len(d3_connections) == 1
954    assert d3_connections[0] is not None
955
956    d2_connection = await d2.connect(d3.random_address)
957    assert len(d3_connections) == 2
958    assert d3_connections[1] is not None
959
960    d1_peer = Peer(d1_connection)
961    d2_peer = Peer(d2_connection)
962
963    d1_client_mtu = await d1_peer.request_mtu(220)
964    assert d1_client_mtu == 100
965    assert d1_connection.att_mtu == 100
966
967    d2_client_mtu = await d2_peer.request_mtu(50)
968    assert d2_client_mtu == 50
969    assert d2_connection.att_mtu == 50
970
971
972# -----------------------------------------------------------------------------
973def test_char_property_to_string():
974    # single
975    assert str(Characteristic.Properties(0x01)) == "BROADCAST"
976    assert str(Characteristic.Properties.BROADCAST) == "BROADCAST"
977
978    # double
979    assert str(Characteristic.Properties(0x03)) == "BROADCAST|READ"
980    assert (
981        str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ)
982        == "BROADCAST|READ"
983    )
984
985
986# -----------------------------------------------------------------------------
987def test_characteristic_property_from_string():
988    # single
989    assert (
990        Characteristic.Properties.from_string("BROADCAST")
991        == Characteristic.Properties.BROADCAST
992    )
993
994    # double
995    assert (
996        Characteristic.Properties.from_string("BROADCAST,READ")
997        == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
998    )
999    assert (
1000        Characteristic.Properties.from_string("READ,BROADCAST")
1001        == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
1002    )
1003    assert (
1004        Characteristic.Properties.from_string("BROADCAST|READ")
1005        == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
1006    )
1007
1008
1009# -----------------------------------------------------------------------------
1010def test_characteristic_property_from_string_assert():
1011    with pytest.raises(TypeError) as e_info:
1012        Characteristic.Properties.from_string("BROADCAST,HELLO")
1013
1014    assert (
1015        str(e_info.value)
1016        == """Characteristic.Properties::from_string() error:
1017Expected a string containing any of the keys, separated by , or |: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES
1018Got: BROADCAST,HELLO"""
1019    )
1020
1021
1022# -----------------------------------------------------------------------------
1023@pytest.mark.asyncio
1024async def test_server_string():
1025    [_, server] = LinkedDevices().devices[:2]
1026
1027    characteristic = Characteristic(
1028        'FDB159DB-036C-49E3-B3DB-6325AC750806',
1029        Characteristic.Properties.READ
1030        | Characteristic.Properties.WRITE
1031        | Characteristic.Properties.NOTIFY,
1032        Characteristic.READABLE | Characteristic.WRITEABLE,
1033        bytes([123]),
1034    )
1035
1036    service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic])
1037    server.add_service(service)
1038
1039    assert (
1040        str(server.gatt_server)
1041        == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
1042CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), READ)
1043Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), READ)
1044CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
1045Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
1046Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
1047CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
1048Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
1049Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
1050    )
1051
1052
1053# -----------------------------------------------------------------------------
1054async def async_main():
1055    test_UUID()
1056    test_ATT_Error_Response()
1057    test_ATT_Read_By_Group_Type_Request()
1058    await test_read_write()
1059    await test_read_write2()
1060    await test_subscribe_notify()
1061    await test_unsubscribe()
1062    await test_characteristic_encoding()
1063    await test_mtu_exchange()
1064    await test_CharacteristicValue()
1065    await test_CharacteristicValue_async()
1066    await test_CharacteristicAdapter()
1067
1068
1069# -----------------------------------------------------------------------------
1070def test_permissions_from_string():
1071    assert Attribute.Permissions.from_string('READABLE') == 1
1072    assert Attribute.Permissions.from_string('WRITEABLE') == 2
1073    assert Attribute.Permissions.from_string('READABLE,WRITEABLE') == 3
1074
1075
1076# -----------------------------------------------------------------------------
1077def test_characteristic_permissions():
1078    characteristic = Characteristic(
1079        'FDB159DB-036C-49E3-B3DB-6325AC750806',
1080        Characteristic.Properties.READ
1081        | Characteristic.Properties.WRITE
1082        | Characteristic.Properties.NOTIFY,
1083        'READABLE,WRITEABLE',
1084    )
1085    assert characteristic.permissions == 3
1086
1087
1088# -----------------------------------------------------------------------------
1089def test_characteristic_has_properties():
1090    characteristic = Characteristic(
1091        'FDB159DB-036C-49E3-B3DB-6325AC750806',
1092        Characteristic.Properties.READ
1093        | Characteristic.Properties.WRITE
1094        | Characteristic.Properties.NOTIFY,
1095        'READABLE,WRITEABLE',
1096    )
1097    assert characteristic.has_properties(Characteristic.Properties.READ)
1098    assert characteristic.has_properties(
1099        Characteristic.Properties.READ | Characteristic.Properties.WRITE
1100    )
1101    assert not characteristic.has_properties(
1102        Characteristic.Properties.READ
1103        | Characteristic.Properties.WRITE
1104        | Characteristic.Properties.INDICATE
1105    )
1106    assert not characteristic.has_properties(Characteristic.Properties.INDICATE)
1107
1108
1109# -----------------------------------------------------------------------------
1110def test_descriptor_permissions():
1111    descriptor = Descriptor('2902', 'READABLE,WRITEABLE')
1112    assert descriptor.permissions == 3
1113
1114
1115# -----------------------------------------------------------------------------
1116def test_get_attribute_group():
1117    device = Device()
1118
1119    # add some services / characteristics to the gatt server
1120    characteristic1 = Characteristic(
1121        '1111',
1122        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
1123        Characteristic.READABLE | Characteristic.WRITEABLE,
1124        bytes([123]),
1125    )
1126    characteristic2 = Characteristic(
1127        '2222',
1128        Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
1129        Characteristic.READABLE | Characteristic.WRITEABLE,
1130        bytes([123]),
1131    )
1132    services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])]
1133    device.gatt_server.add_services(services)
1134
1135    # get the handles from gatt server
1136    characteristic_attributes1 = device.gatt_server.get_characteristic_attributes(
1137        UUID('1212'), UUID('1111')
1138    )
1139    assert characteristic_attributes1 is not None
1140    characteristic_attributes2 = device.gatt_server.get_characteristic_attributes(
1141        UUID('3233'), UUID('2222')
1142    )
1143    assert characteristic_attributes2 is not None
1144    descriptor1 = device.gatt_server.get_descriptor_attribute(
1145        UUID('1212'), UUID('1111'), UUID('2902')
1146    )
1147    assert descriptor1 is not None
1148    descriptor2 = device.gatt_server.get_descriptor_attribute(
1149        UUID('3233'), UUID('2222'), UUID('2902')
1150    )
1151    assert descriptor2 is not None
1152
1153    # confirm the handles map back to the service
1154    assert (
1155        UUID('1212')
1156        == device.gatt_server.get_attribute_group(
1157            characteristic_attributes1[0].handle, Service
1158        ).uuid
1159    )
1160    assert (
1161        UUID('1212')
1162        == device.gatt_server.get_attribute_group(
1163            characteristic_attributes1[1].handle, Service
1164        ).uuid
1165    )
1166    assert (
1167        UUID('1212')
1168        == device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid
1169    )
1170    assert (
1171        UUID('3233')
1172        == device.gatt_server.get_attribute_group(
1173            characteristic_attributes2[0].handle, Service
1174        ).uuid
1175    )
1176    assert (
1177        UUID('3233')
1178        == device.gatt_server.get_attribute_group(
1179            characteristic_attributes2[1].handle, Service
1180        ).uuid
1181    )
1182    assert (
1183        UUID('3233')
1184        == device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid
1185    )
1186
1187    # confirm the handles map back to the characteristic
1188    assert (
1189        UUID('1111')
1190        == device.gatt_server.get_attribute_group(
1191            descriptor1.handle, Characteristic
1192        ).uuid
1193    )
1194    assert (
1195        UUID('2222')
1196        == device.gatt_server.get_attribute_group(
1197            descriptor2.handle, Characteristic
1198        ).uuid
1199    )
1200
1201
1202# -----------------------------------------------------------------------------
1203@pytest.mark.asyncio
1204async def test_get_characteristics_by_uuid():
1205    [client, server] = LinkedDevices().devices[:2]
1206
1207    characteristic1 = Characteristic(
1208        '1234',
1209        Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
1210        Characteristic.READABLE,
1211        bytes([1, 2, 3]),
1212    )
1213    characteristic2 = Characteristic(
1214        '5678',
1215        Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
1216        Characteristic.READABLE,
1217        bytes([1, 2, 3]),
1218    )
1219    service1 = Service(
1220        'ABCD',
1221        [characteristic1, characteristic2],
1222    )
1223    service2 = Service(
1224        'FFFF',
1225        [characteristic1],
1226    )
1227
1228    server.add_services([service1, service2])
1229
1230    await client.power_on()
1231    await server.power_on()
1232    connection = await client.connect(server.random_address)
1233    peer = Peer(connection)
1234
1235    await peer.discover_services()
1236    await peer.discover_characteristics()
1237    c = peer.get_characteristics_by_uuid(uuid=UUID('1234'))
1238    assert len(c) == 2
1239    assert isinstance(c[0], CharacteristicProxy)
1240    c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=UUID('ABCD'))
1241    assert len(c) == 1
1242    assert isinstance(c[0], CharacteristicProxy)
1243    c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=UUID('AAAA'))
1244    assert len(c) == 0
1245
1246    s = peer.get_services_by_uuid(uuid=UUID('ABCD'))
1247    assert len(s) == 1
1248    c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=s[0])
1249    assert len(s) == 1
1250
1251
1252# -----------------------------------------------------------------------------
1253@pytest.mark.asyncio
1254async def test_write_return_error():
1255    [client, server] = LinkedDevices().devices[:2]
1256
1257    on_write = Mock(side_effect=ATT_Error(error_code=ErrorCode.VALUE_NOT_ALLOWED))
1258    characteristic = Characteristic(
1259        '1234',
1260        Characteristic.Properties.WRITE,
1261        Characteristic.Permissions.WRITEABLE,
1262        CharacteristicValue(write=on_write),
1263    )
1264    service = Service('ABCD', [characteristic])
1265    server.add_service(service)
1266
1267    await client.power_on()
1268    await server.power_on()
1269    connection = await client.connect(server.random_address)
1270
1271    async with Peer(connection) as peer:
1272        c = peer.get_characteristics_by_uuid(uuid=UUID('1234'))[0]
1273        with pytest.raises(ATT_Error) as e:
1274            await c.write_value(b'', with_response=True)
1275        assert e.value.error_code == ErrorCode.VALUE_NOT_ALLOWED
1276
1277
1278# -----------------------------------------------------------------------------
1279if __name__ == '__main__':
1280    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
1281    asyncio.run(async_main())
1282