# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import os import struct import pytest from unittest.mock import AsyncMock, Mock, ANY from bumble.controller import Controller from bumble.gatt_client import CharacteristicProxy from bumble.link import LocalLink from bumble.device import Device, Peer from bumble.host import Host from bumble.gatt import ( GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, CharacteristicAdapter, DelegatedCharacteristicAdapter, PackedCharacteristicAdapter, MappedCharacteristicAdapter, UTF8CharacteristicAdapter, Service, Characteristic, CharacteristicValue, Descriptor, ) from bumble.transport import AsyncPipeSink from bumble.core import UUID from bumble.att import ( Attribute, ATT_EXCHANGE_MTU_REQUEST, ATT_ATTRIBUTE_NOT_FOUND_ERROR, ATT_PDU, ATT_Error, ATT_Error_Response, ATT_Read_By_Group_Type_Request, ErrorCode, ) from .test_utils import async_barrier # ----------------------------------------------------------------------------- def basic_check(x): pdu = x.to_bytes() parsed = ATT_PDU.from_bytes(pdu) x_str = str(x) parsed_str = str(parsed) assert x_str == parsed_str # ----------------------------------------------------------------------------- def test_UUID(): u = UUID.from_16_bits(0x7788) assert str(u) == 'UUID-16:7788' u = UUID.from_32_bits(0x11223344) assert str(u) == 'UUID-32:11223344' u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' v = UUID(str(u)) assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' w = UUID.from_bytes(v.to_bytes()) assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' u1 = UUID.from_16_bits(0x1234) b1 = u1.to_bytes(force_128=True) u2 = UUID.from_bytes(b1) assert u1 == u2 u3 = UUID.from_16_bits(0x180A) assert str(u3) == 'UUID-16:180A (Device Information)' # ----------------------------------------------------------------------------- def test_ATT_Error_Response(): pdu = ATT_Error_Response( request_opcode_in_error=ATT_EXCHANGE_MTU_REQUEST, attribute_handle_in_error=0x0000, error_code=ATT_ATTRIBUTE_NOT_FOUND_ERROR, ) basic_check(pdu) # ----------------------------------------------------------------------------- def test_ATT_Read_By_Group_Type_Request(): pdu = ATT_Read_By_Group_Type_Request( starting_handle=0x0001, ending_handle=0xFFFF, attribute_group_type=UUID.from_16_bits(0x2800), ) basic_check(pdu) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_characteristic_encoding(): class Foo(Characteristic): def encode_value(self, value): return bytes([value]) def decode_value(self, value_bytes): return value_bytes[0] c = Foo( GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, 123, ) x = await c.read_value(None) assert x == bytes([123]) await c.write_value(None, bytes([122])) assert c.value == 122 class FooProxy(CharacteristicProxy): def __init__(self, characteristic): super().__init__( characteristic.client, characteristic.handle, characteristic.end_group_handle, characteristic.uuid, characteristic.properties, ) def encode_value(self, value): return bytes([value]) def decode_value(self, value_bytes): return value_bytes[0] [client, server] = LinkedDevices().devices[:2] characteristic = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.Properties.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([123]), ) async def async_read(connection): return 0x05060708 async_characteristic = PackedCharacteristicAdapter( Characteristic( '2AB7E91B-43E8-4F73-AC3B-80C1683B47F9', Characteristic.Properties.READ, Characteristic.READABLE, CharacteristicValue(read=async_read), ), '>I', ) service = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic, async_characteristic] ) server.add_service(service) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(characteristic.uuid) assert len(c) == 1 c = c[0] cp = FooProxy(c) v = await cp.read_value() assert v == 123 await cp.write_value(124) await async_barrier() assert characteristic.value == bytes([124]) v = await cp.read_value() assert v == 124 await cp.write_value(125, with_response=True) await async_barrier() assert characteristic.value == bytes([125]) cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2])) await cd.write_value(100, with_response=True) await async_barrier() assert characteristic.value == bytes([50]) c2 = peer.get_characteristics_by_uuid(async_characteristic.uuid) assert len(c2) == 1 c2 = c2[0] cd2 = PackedCharacteristicAdapter(c2, ">I") cd2v = await cd2.read_value() assert cd2v == 0x05060708 last_change = None def on_change(value): nonlocal last_change last_change = value await c.subscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change == characteristic.value last_change = None await server.notify_subscribers(characteristic, value=bytes([125])) await async_barrier() assert last_change == bytes([125]) last_change = None await c.unsubscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change is None await cp.subscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change == characteristic.value[0] last_change = None await server.notify_subscribers(characteristic, value=bytes([126])) await async_barrier() assert last_change == 126 last_change = None await cp.unsubscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change is None cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0]) await cd.subscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change == characteristic.value[0] last_change = None await cd.unsubscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change is None # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_attribute_getters(): [client, server] = LinkedDevices().devices[:2] characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806') characteristic = Characteristic( characteristic_uuid, Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.Properties.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([123]), ) service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829') service = Service(service_uuid, [characteristic]) server.add_service(service) service_attr = server.gatt_server.get_service_attribute(service_uuid) assert service_attr ( char_decl_attr, char_value_attr, ) = server.gatt_server.get_characteristic_attributes( service_uuid, characteristic_uuid ) assert char_decl_attr and char_value_attr desc_attr = server.gatt_server.get_descriptor_attribute( service_uuid, characteristic_uuid, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, ) assert desc_attr # assert all handles are in expected order assert ( service_attr.handle < char_decl_attr.handle < char_value_attr.handle < desc_attr.handle == service_attr.end_group_handle ) # assert characteristic declarations attribute is followed by characteristic value attribute assert char_decl_attr.handle + 1 == char_value_attr.handle # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_CharacteristicAdapter(): # Check that the CharacteristicAdapter base class is transparent v = bytes([1, 2, 3]) c = Characteristic( GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, v, ) a = CharacteristicAdapter(c) value = await a.read_value(None) assert value == v v = bytes([3, 4, 5]) await a.write_value(None, v) assert c.value == v # Simple delegated adapter a = DelegatedCharacteristicAdapter( c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)) ) value = await a.read_value(None) assert value == bytes(reversed(v)) v = bytes([3, 4, 5]) await a.write_value(None, v) assert a.value == bytes(reversed(v)) # Packed adapter with single element format v = 1234 pv = struct.pack('>H', v) c.value = v a = PackedCharacteristicAdapter(c, '>H') value = await a.read_value(None) assert value == pv c.value = None await a.write_value(None, pv) assert a.value == v # Packed adapter with multi-element format v1 = 1234 v2 = 5678 pv = struct.pack('>HH', v1, v2) c.value = (v1, v2) a = PackedCharacteristicAdapter(c, '>HH') value = await a.read_value(None) assert value == pv c.value = None await a.write_value(None, pv) assert a.value == (v1, v2) # Mapped adapter v1 = 1234 v2 = 5678 pv = struct.pack('>HH', v1, v2) mapped = {'v1': v1, 'v2': v2} c.value = mapped a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2')) value = await a.read_value(None) assert value == pv c.value = None await a.write_value(None, pv) assert a.value == mapped # UTF-8 adapter v = 'Hello π' ev = v.encode('utf-8') c.value = v a = UTF8CharacteristicAdapter(c) value = await a.read_value(None) assert value == ev c.value = None await a.write_value(None, ev) assert a.value == v # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_CharacteristicValue(): b = bytes([1, 2, 3]) async def read_value(connection): return b c = CharacteristicValue(read=read_value) x = await c.read(None) assert x == b m = Mock() c = CharacteristicValue(write=m) z = object() c.write(z, b) m.assert_called_once_with(z, b) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_CharacteristicValue_async(): b = bytes([1, 2, 3]) async def read_value(connection): return b c = CharacteristicValue(read=read_value) x = await c.read(None) assert x == b m = AsyncMock() c = CharacteristicValue(write=m) z = object() await c.write(z, b) m.assert_called_once_with(z, b) # ----------------------------------------------------------------------------- class LinkedDevices: def __init__(self): self.connections = [None, None, None] self.link = LocalLink() self.controllers = [ Controller('C1', link=self.link), Controller('C2', link=self.link), Controller('C3', link=self.link), ] self.devices = [ Device( address='F0:F1:F2:F3:F4:F5', host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), ), Device( address='F1:F2:F3:F4:F5:F6', host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), ), Device( address='F2:F3:F4:F5:F6:F7', host=Host(self.controllers[2], AsyncPipeSink(self.controllers[2])), ), ] self.paired = [None, None, None] # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_read_write(): [client, server] = LinkedDevices().devices[:2] characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.Properties.READ | Characteristic.Properties.WRITE, Characteristic.READABLE | Characteristic.WRITEABLE, ) def on_characteristic1_write(connection, value): characteristic1._last_value = (connection, value) characteristic1.on('write', on_characteristic1_write) def on_characteristic2_read(connection): return bytes(str(connection.peer_address)) def on_characteristic2_write(connection, value): characteristic2._last_value = (connection, value) characteristic2 = Characteristic( '66DE9057-C848-4ACA-B993-D675644EBB85', Characteristic.Properties.READ | Characteristic.Properties.WRITE, Characteristic.READABLE | Characteristic.WRITEABLE, CharacteristicValue( read=on_characteristic2_read, write=on_characteristic2_write ), ) service1 = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2] ) server.add_services([service1]) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(characteristic1.uuid) assert len(c) == 1 c1 = c[0] c = peer.get_characteristics_by_uuid(characteristic2.uuid) assert len(c) == 1 c2 = c[0] v1 = await peer.read_value(c1) assert v1 == b'' b = bytes([1, 2, 3]) await peer.write_value(c1, b) await async_barrier() assert characteristic1.value == b v1 = await peer.read_value(c1) assert v1 == b assert type(characteristic1._last_value is tuple) assert len(characteristic1._last_value) == 2 assert str(characteristic1._last_value[0].peer_address) == str( client.random_address ) assert characteristic1._last_value[1] == b bb = bytes([3, 4, 5, 6]) characteristic1.value = bb v1 = await peer.read_value(c1) assert v1 == bb await peer.write_value(c2, b) await async_barrier() assert type(characteristic2._last_value is tuple) assert len(characteristic2._last_value) == 2 assert str(characteristic2._last_value[0].peer_address) == str( client.random_address ) assert characteristic2._last_value[1] == b # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_read_write2(): [client, server] = LinkedDevices().devices[:2] v = bytes([0x11, 0x22, 0x33, 0x44]) characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.Properties.READ | Characteristic.Properties.WRITE, Characteristic.READABLE | Characteristic.WRITEABLE, value=v, ) service1 = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1]) server.add_services([service1]) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() c = peer.get_services_by_uuid(service1.uuid) assert len(c) == 1 s = c[0] await s.discover_characteristics() c = s.get_characteristics_by_uuid(characteristic1.uuid) assert len(c) == 1 c1 = c[0] v1 = await c1.read_value() assert v1 == v a1 = PackedCharacteristicAdapter(c1, '>I') v1 = await a1.read_value() assert v1 == struct.unpack('>I', v)[0] b = bytes([0x55, 0x66, 0x77, 0x88]) await a1.write_value(struct.unpack('>I', b)[0]) await async_barrier() assert characteristic1.value == b v1 = await a1.read_value() assert v1 == struct.unpack('>I', b)[0] # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_subscribe_notify(): [client, server] = LinkedDevices().devices[:2] characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, Characteristic.READABLE, bytes([1, 2, 3]), ) def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled): characteristic1._last_subscription = ( connection, notify_enabled, indicate_enabled, ) characteristic1.on('subscription', on_characteristic1_subscription) characteristic2 = Characteristic( '66DE9057-C848-4ACA-B993-D675644EBB85', Characteristic.Properties.READ | Characteristic.Properties.INDICATE, Characteristic.READABLE, bytes([4, 5, 6]), ) def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled): characteristic2._last_subscription = ( connection, notify_enabled, indicate_enabled, ) characteristic2.on('subscription', on_characteristic2_subscription) characteristic3 = Characteristic( 'AB5E639C-40C1-4238-B9CB-AF41F8B806E4', Characteristic.Properties.READ | Characteristic.Properties.NOTIFY | Characteristic.Properties.INDICATE, Characteristic.READABLE, bytes([7, 8, 9]), ) def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled): characteristic3._last_subscription = ( connection, notify_enabled, indicate_enabled, ) characteristic3.on('subscription', on_characteristic3_subscription) service1 = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2, characteristic3], ) server.add_services([service1]) def on_characteristic_subscription( connection, characteristic, notify_enabled, indicate_enabled ): server._last_subscription = ( connection, characteristic, notify_enabled, indicate_enabled, ) server.on('characteristic_subscription', on_characteristic_subscription) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(characteristic1.uuid) assert len(c) == 1 c1 = c[0] c = peer.get_characteristics_by_uuid(characteristic2.uuid) assert len(c) == 1 c2 = c[0] c = peer.get_characteristics_by_uuid(characteristic3.uuid) assert len(c) == 1 c3 = c[0] c1._called = False c1._last_update = None def on_c1_update(value): c1._called = True c1._last_update = value c1.on('update', on_c1_update) await peer.subscribe(c1) await async_barrier() assert server._last_subscription[1] == characteristic1 assert server._last_subscription[2] assert not server._last_subscription[3] assert characteristic1._last_subscription[1] assert not characteristic1._last_subscription[2] await server.indicate_subscribers(characteristic1) await async_barrier() assert not c1._called await server.notify_subscribers(characteristic1) await async_barrier() assert c1._called assert c1._last_update == characteristic1.value c1._called = False c1._last_update = None c1_value = characteristic1.value await server.notify_subscribers(characteristic1, bytes([0, 1, 2])) await async_barrier() assert c1._called assert c1._last_update == bytes([0, 1, 2]) assert characteristic1.value == c1_value c1._called = False await peer.unsubscribe(c1) await server.notify_subscribers(characteristic1) assert not c1._called c2._called = False c2._last_update = None def on_c2_update(value): c2._called = True c2._last_update = value await peer.subscribe(c2, on_c2_update) await async_barrier() await server.notify_subscriber( characteristic2._last_subscription[0], characteristic2 ) await async_barrier() assert not c2._called await server.indicate_subscriber( characteristic2._last_subscription[0], characteristic2 ) await async_barrier() assert c2._called assert c2._last_update == characteristic2.value c2._called = False await peer.unsubscribe(c2, on_c2_update) await server.indicate_subscriber( characteristic2._last_subscription[0], characteristic2 ) await async_barrier() assert not c2._called c3._called = False c3._called_2 = False c3._called_3 = False c3._last_update = None c3._last_update_2 = None c3._last_update_3 = None def on_c3_update(value): c3._called = True c3._last_update = value def on_c3_update_2(value): # for notify c3._called_2 = True c3._last_update_2 = value def on_c3_update_3(value): # for indicate c3._called_3 = True c3._last_update_3 = value c3.on('update', on_c3_update) await peer.subscribe(c3, on_c3_update_2) await async_barrier() await server.notify_subscriber( characteristic3._last_subscription[0], characteristic3 ) await async_barrier() assert c3._called assert c3._last_update == characteristic3.value assert c3._called_2 assert c3._last_update_2 == characteristic3.value assert not c3._called_3 c3._called = False c3._called_2 = False c3._called_3 = False await peer.unsubscribe(c3) await peer.subscribe(c3, on_c3_update_3, prefer_notify=False) await async_barrier() characteristic3.value = bytes([1, 2, 3]) await server.indicate_subscriber( characteristic3._last_subscription[0], characteristic3 ) await async_barrier() assert c3._called assert c3._last_update == characteristic3.value assert not c3._called_2 assert c3._called_3 assert c3._last_update_3 == characteristic3.value c3._called = False c3._called_2 = False c3._called_3 = False await peer.unsubscribe(c3) await server.notify_subscriber( characteristic3._last_subscription[0], characteristic3 ) await server.indicate_subscriber( characteristic3._last_subscription[0], characteristic3 ) await async_barrier() assert not c3._called assert not c3._called_2 assert not c3._called_3 # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_unsubscribe(): [client, server] = LinkedDevices().devices[:2] characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, Characteristic.READABLE, bytes([1, 2, 3]), ) characteristic2 = Characteristic( '3234C4F4-3F34-4616-8935-45A50EE05DEB', Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, Characteristic.READABLE, bytes([1, 2, 3]), ) service1 = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2], ) server.add_services([service1]) mock1 = Mock() characteristic1.on('subscription', mock1) mock2 = Mock() characteristic2.on('subscription', mock2) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(characteristic1.uuid) assert len(c) == 1 c1 = c[0] c = peer.get_characteristics_by_uuid(characteristic2.uuid) assert len(c) == 1 c2 = c[0] await c1.subscribe() await async_barrier() mock1.assert_called_once_with(ANY, True, False) await c2.subscribe() await async_barrier() mock2.assert_called_once_with(ANY, True, False) mock1.reset_mock() await c1.unsubscribe() await async_barrier() mock1.assert_called_once_with(ANY, False, False) mock2.reset_mock() await c2.unsubscribe() await async_barrier() mock2.assert_called_once_with(ANY, False, False) mock1.reset_mock() await c1.unsubscribe() await async_barrier() mock1.assert_not_called() mock2.reset_mock() await c2.unsubscribe() await async_barrier() mock2.assert_not_called() mock1.reset_mock() await c1.unsubscribe(force=True) await async_barrier() mock1.assert_called_once_with(ANY, False, False) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_discover_all(): [client, server] = LinkedDevices().devices[:2] characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, Characteristic.READABLE, bytes([1, 2, 3]), ) descriptor1 = Descriptor('2902', 'READABLE,WRITEABLE') descriptor2 = Descriptor('AAAA', 'READABLE,WRITEABLE') characteristic2 = Characteristic( '3234C4F4-3F34-4616-8935-45A50EE05DEB', Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, Characteristic.READABLE, bytes([1, 2, 3]), descriptors=[descriptor1, descriptor2], ) service1 = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2], ) service2 = Service('1111', []) server.add_services([service1, service2]) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_all() assert len(peer.gatt_client.services) == 3 # service 1800 gets added automatically assert peer.gatt_client.services[0].uuid == UUID('1800') assert peer.gatt_client.services[1].uuid == service1.uuid assert peer.gatt_client.services[2].uuid == service2.uuid s = peer.get_services_by_uuid(service1.uuid) assert len(s) == 1 assert len(s[0].characteristics) == 2 c = peer.get_characteristics_by_uuid(uuid=characteristic2.uuid, service=s[0]) assert len(c) == 1 assert len(c[0].descriptors) == 2 s = peer.get_services_by_uuid(service2.uuid) assert len(s) == 1 assert len(s[0].characteristics) == 0 # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_mtu_exchange(): [d1, d2, d3] = LinkedDevices().devices[:3] d3.gatt_server.max_mtu = 100 d3_connections = [] @d3.on('connection') def on_d3_connection(connection): d3_connections.append(connection) await d1.power_on() await d2.power_on() await d3.power_on() d1_connection = await d1.connect(d3.random_address) assert len(d3_connections) == 1 assert d3_connections[0] is not None d2_connection = await d2.connect(d3.random_address) assert len(d3_connections) == 2 assert d3_connections[1] is not None d1_peer = Peer(d1_connection) d2_peer = Peer(d2_connection) d1_client_mtu = await d1_peer.request_mtu(220) assert d1_client_mtu == 100 assert d1_connection.att_mtu == 100 d2_client_mtu = await d2_peer.request_mtu(50) assert d2_client_mtu == 50 assert d2_connection.att_mtu == 50 # ----------------------------------------------------------------------------- def test_char_property_to_string(): # single assert str(Characteristic.Properties(0x01)) == "BROADCAST" assert str(Characteristic.Properties.BROADCAST) == "BROADCAST" # double assert str(Characteristic.Properties(0x03)) == "BROADCAST|READ" assert ( str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ) == "BROADCAST|READ" ) # ----------------------------------------------------------------------------- def test_characteristic_property_from_string(): # single assert ( Characteristic.Properties.from_string("BROADCAST") == Characteristic.Properties.BROADCAST ) # double assert ( Characteristic.Properties.from_string("BROADCAST,READ") == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ ) assert ( Characteristic.Properties.from_string("READ,BROADCAST") == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ ) assert ( Characteristic.Properties.from_string("BROADCAST|READ") == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ ) # ----------------------------------------------------------------------------- def test_characteristic_property_from_string_assert(): with pytest.raises(TypeError) as e_info: Characteristic.Properties.from_string("BROADCAST,HELLO") assert ( str(e_info.value) == """Characteristic.Properties::from_string() error: Expected a string containing any of the keys, separated by , or |: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES Got: BROADCAST,HELLO""" ) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_server_string(): [_, server] = LinkedDevices().devices[:2] characteristic = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.Properties.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([123]), ) service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic]) server.add_service(service) assert ( str(server.gatt_server) == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access)) CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), READ) Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), READ) CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), READ) Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), READ) Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829) CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY) Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY) Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)""" ) # ----------------------------------------------------------------------------- async def async_main(): test_UUID() test_ATT_Error_Response() test_ATT_Read_By_Group_Type_Request() await test_read_write() await test_read_write2() await test_subscribe_notify() await test_unsubscribe() await test_characteristic_encoding() await test_mtu_exchange() await test_CharacteristicValue() await test_CharacteristicValue_async() await test_CharacteristicAdapter() # ----------------------------------------------------------------------------- def test_permissions_from_string(): assert Attribute.Permissions.from_string('READABLE') == 1 assert Attribute.Permissions.from_string('WRITEABLE') == 2 assert Attribute.Permissions.from_string('READABLE,WRITEABLE') == 3 # ----------------------------------------------------------------------------- def test_characteristic_permissions(): characteristic = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.Properties.NOTIFY, 'READABLE,WRITEABLE', ) assert characteristic.permissions == 3 # ----------------------------------------------------------------------------- def test_characteristic_has_properties(): characteristic = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.Properties.NOTIFY, 'READABLE,WRITEABLE', ) assert characteristic.has_properties(Characteristic.Properties.READ) assert characteristic.has_properties( Characteristic.Properties.READ | Characteristic.Properties.WRITE ) assert not characteristic.has_properties( Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.Properties.INDICATE ) assert not characteristic.has_properties(Characteristic.Properties.INDICATE) # ----------------------------------------------------------------------------- def test_descriptor_permissions(): descriptor = Descriptor('2902', 'READABLE,WRITEABLE') assert descriptor.permissions == 3 # ----------------------------------------------------------------------------- def test_get_attribute_group(): device = Device() # add some services / characteristics to the gatt server characteristic1 = Characteristic( '1111', Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([123]), ) characteristic2 = Characteristic( '2222', Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([123]), ) services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])] device.gatt_server.add_services(services) # get the handles from gatt server characteristic_attributes1 = device.gatt_server.get_characteristic_attributes( UUID('1212'), UUID('1111') ) assert characteristic_attributes1 is not None characteristic_attributes2 = device.gatt_server.get_characteristic_attributes( UUID('3233'), UUID('2222') ) assert characteristic_attributes2 is not None descriptor1 = device.gatt_server.get_descriptor_attribute( UUID('1212'), UUID('1111'), UUID('2902') ) assert descriptor1 is not None descriptor2 = device.gatt_server.get_descriptor_attribute( UUID('3233'), UUID('2222'), UUID('2902') ) assert descriptor2 is not None # confirm the handles map back to the service assert ( UUID('1212') == device.gatt_server.get_attribute_group( characteristic_attributes1[0].handle, Service ).uuid ) assert ( UUID('1212') == device.gatt_server.get_attribute_group( characteristic_attributes1[1].handle, Service ).uuid ) assert ( UUID('1212') == device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid ) assert ( UUID('3233') == device.gatt_server.get_attribute_group( characteristic_attributes2[0].handle, Service ).uuid ) assert ( UUID('3233') == device.gatt_server.get_attribute_group( characteristic_attributes2[1].handle, Service ).uuid ) assert ( UUID('3233') == device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid ) # confirm the handles map back to the characteristic assert ( UUID('1111') == device.gatt_server.get_attribute_group( descriptor1.handle, Characteristic ).uuid ) assert ( UUID('2222') == device.gatt_server.get_attribute_group( descriptor2.handle, Characteristic ).uuid ) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_get_characteristics_by_uuid(): [client, server] = LinkedDevices().devices[:2] characteristic1 = Characteristic( '1234', Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, Characteristic.READABLE, bytes([1, 2, 3]), ) characteristic2 = Characteristic( '5678', Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, Characteristic.READABLE, bytes([1, 2, 3]), ) service1 = Service( 'ABCD', [characteristic1, characteristic2], ) service2 = Service( 'FFFF', [characteristic1], ) server.add_services([service1, service2]) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(uuid=UUID('1234')) assert len(c) == 2 assert isinstance(c[0], CharacteristicProxy) c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=UUID('ABCD')) assert len(c) == 1 assert isinstance(c[0], CharacteristicProxy) c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=UUID('AAAA')) assert len(c) == 0 s = peer.get_services_by_uuid(uuid=UUID('ABCD')) assert len(s) == 1 c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=s[0]) assert len(s) == 1 # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_write_return_error(): [client, server] = LinkedDevices().devices[:2] on_write = Mock(side_effect=ATT_Error(error_code=ErrorCode.VALUE_NOT_ALLOWED)) characteristic = Characteristic( '1234', Characteristic.Properties.WRITE, Characteristic.Permissions.WRITEABLE, CharacteristicValue(write=on_write), ) service = Service('ABCD', [characteristic]) server.add_service(service) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) async with Peer(connection) as peer: c = peer.get_characteristics_by_uuid(uuid=UUID('1234'))[0] with pytest.raises(ATT_Error) as e: await c.write_value(b'', with_response=True) assert e.value.error_code == ErrorCode.VALUE_NOT_ALLOWED # ----------------------------------------------------------------------------- if __name__ == '__main__': logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) asyncio.run(async_main())