# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import os import pytest from bumble.core import UUID, BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID from bumble.sdp import ( DataElement, ServiceAttribute, Client, Server, SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, SDP_PUBLIC_BROWSE_ROOT, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, ) from .test_utils import TwoDevices # ----------------------------------------------------------------------------- # pylint: disable=invalid-name # ----------------------------------------------------------------------------- # ----------------------------------------------------------------------------- def basic_check(x: DataElement) -> None: serialized = bytes(x) if len(serialized) < 500: print('Original:', x) print('Serialized:', serialized.hex()) parsed = DataElement.from_bytes(serialized) if len(serialized) < 500: print('Parsed:', parsed) parsed_bytes = bytes(parsed) if len(serialized) < 500: print('Parsed Bytes:', parsed_bytes.hex()) assert parsed_bytes == serialized x_str = str(x) parsed_str = str(parsed) assert x_str == parsed_str # ----------------------------------------------------------------------------- def test_data_elements() -> None: e = DataElement(DataElement.NIL, None) basic_check(e) e = DataElement(DataElement.UNSIGNED_INTEGER, 12, 1) basic_check(e) e = DataElement(DataElement.UNSIGNED_INTEGER, 1234, 2) basic_check(e) e = DataElement(DataElement.UNSIGNED_INTEGER, 0x123456, 4) basic_check(e) e = DataElement(DataElement.UNSIGNED_INTEGER, 0x123456789, 8) basic_check(e) e = DataElement(DataElement.UNSIGNED_INTEGER, 0x0000FFFF, value_size=4) basic_check(e) e = DataElement(DataElement.SIGNED_INTEGER, -12, 1) basic_check(e) e = DataElement(DataElement.SIGNED_INTEGER, -1234, 2) basic_check(e) e = DataElement(DataElement.SIGNED_INTEGER, -0x123456, 4) basic_check(e) e = DataElement(DataElement.SIGNED_INTEGER, -0x123456789, 8) basic_check(e) e = DataElement(DataElement.SIGNED_INTEGER, 0x0000FFFF, value_size=4) basic_check(e) e = DataElement(DataElement.UUID, UUID.from_16_bits(1234)) basic_check(e) e = DataElement(DataElement.UUID, UUID.from_32_bits(123456789)) basic_check(e) e = DataElement(DataElement.UUID, UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')) basic_check(e) e = DataElement(DataElement.TEXT_STRING, b'hello') basic_check(e) e = DataElement(DataElement.TEXT_STRING, b'hello' * 60) basic_check(e) e = DataElement(DataElement.TEXT_STRING, b'hello' * 20000) basic_check(e) e = DataElement(DataElement.BOOLEAN, True) basic_check(e) e = DataElement(DataElement.BOOLEAN, False) basic_check(e) e = DataElement(DataElement.SEQUENCE, [DataElement(DataElement.BOOLEAN, True)]) basic_check(e) e = DataElement( DataElement.SEQUENCE, [ DataElement(DataElement.BOOLEAN, True), DataElement(DataElement.TEXT_STRING, b'hello'), ], ) basic_check(e) e = DataElement(DataElement.ALTERNATIVE, [DataElement(DataElement.BOOLEAN, True)]) basic_check(e) e = DataElement( DataElement.ALTERNATIVE, [ DataElement(DataElement.BOOLEAN, True), DataElement(DataElement.TEXT_STRING, b'hello'), ], ) basic_check(e) e = DataElement(DataElement.URL, 'http://example.com') e = DataElement.nil() e = DataElement.unsigned_integer(1234, 2) basic_check(e) e = DataElement.signed_integer(-1234, 2) basic_check(e) e = DataElement.uuid(UUID.from_16_bits(1234)) basic_check(e) e = DataElement.text_string(b'hello') basic_check(e) e = DataElement.boolean(True) basic_check(e) e = DataElement.sequence( [DataElement.signed_integer(0, 1), DataElement.text_string(b'hello')] ) basic_check(e) e = DataElement.alternative( [DataElement.signed_integer(0, 1), DataElement.text_string(b'hello')] ) basic_check(e) e = DataElement.url('http://foobar.com') basic_check(e) # ----------------------------------------------------------------------------- def sdp_records(): return { 0x00010001: [ ServiceAttribute( SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, DataElement.unsigned_integer_32(0x00010001), ), ServiceAttribute( SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), ), ServiceAttribute( SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, DataElement.sequence( [DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))] ), ), ServiceAttribute( SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, DataElement.sequence( [ DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), ] ), ), ] } # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_service_search(): # Setup connections devices = TwoDevices() await devices.setup_connection() assert devices.connections[0] assert devices.connections[1] # Register SDP service devices.devices[0].sdp_server.service_records.update(sdp_records()) # Search for service client = Client(devices.connections[1]) await client.connect() services = await client.search_services( [UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')] ) # Then assert services[0] == 0x00010001 # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_service_attribute(): # Setup connections devices = TwoDevices() await devices.setup_connection() # Register SDP service devices.devices[0].sdp_server.service_records.update(sdp_records()) # Search for service client = Client(devices.connections[1]) await client.connect() attributes = await client.get_attributes( 0x00010001, [SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID] ) # Then assert attributes[0].value.value == sdp_records()[0x00010001][0].value.value # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_service_search_attribute(): # Setup connections devices = TwoDevices() await devices.setup_connection() # Register SDP service devices.devices[0].sdp_server.service_records.update(sdp_records()) # Search for service client = Client(devices.connections[1]) await client.connect() attributes = await client.search_attributes( [UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [(0x0000FFFF, 8)] ) # Then for expect, actual in zip(attributes, sdp_records().values()): assert expect.id == actual.id assert expect.value == actual.value # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_client_async_context(): devices = TwoDevices() await devices.setup_connection() client = Client(devices.connections[1]) async with client: assert client.channel is not None assert client.channel is None # ----------------------------------------------------------------------------- async def run(): test_data_elements() await test_service_attribute() await test_service_search() await test_service_search_attribute() # ----------------------------------------------------------------------------- if __name__ == '__main__': logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) asyncio.run(run())