# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import os import random import pytest from bumble.core import ProtocolError from bumble.l2cap import ( L2CAP_Connection_Request, ClassicChannelSpec, LeCreditBasedChannelSpec, ) from .test_utils import TwoDevices # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # ----------------------------------------------------------------------------- def test_helpers(): psm = L2CAP_Connection_Request.serialize_psm(0x01) assert psm == bytes([0x01, 0x00]) psm = L2CAP_Connection_Request.serialize_psm(0x1023) assert psm == bytes([0x23, 0x10]) psm = L2CAP_Connection_Request.serialize_psm(0x242311) assert psm == bytes([0x11, 0x23, 0x24]) (offset, psm) = L2CAP_Connection_Request.parse_psm( bytes([0x00, 0x01, 0x00, 0x44]), 1 ) assert offset == 3 assert psm == 0x01 (offset, psm) = L2CAP_Connection_Request.parse_psm( bytes([0x00, 0x23, 0x10, 0x44]), 1 ) assert offset == 3 assert psm == 0x1023 (offset, psm) = L2CAP_Connection_Request.parse_psm( bytes([0x00, 0x11, 0x23, 0x24, 0x44]), 1 ) assert offset == 4 assert psm == 0x242311 rq = L2CAP_Connection_Request(psm=0x01, source_cid=0x44) brq = bytes(rq) srq = L2CAP_Connection_Request.from_bytes(brq) assert srq.psm == rq.psm assert srq.source_cid == rq.source_cid # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_basic_connection(): devices = TwoDevices() await devices.setup_connection() psm = 1234 # Check that if there's no one listening, we can't connect with pytest.raises(ProtocolError): l2cap_channel = await devices.connections[0].create_l2cap_channel( spec=LeCreditBasedChannelSpec(psm) ) # Now add a listener incoming_channel = None received = [] def on_coc(channel): nonlocal incoming_channel incoming_channel = channel def on_data(data): received.append(data) channel.sink = on_data devices.devices[1].create_l2cap_server( spec=LeCreditBasedChannelSpec(psm=1234), handler=on_coc ) l2cap_channel = await devices.connections[0].create_l2cap_channel( spec=LeCreditBasedChannelSpec(psm) ) messages = (bytes([1, 2, 3]), bytes([4, 5, 6]), bytes(10000)) for message in messages: l2cap_channel.write(message) await asyncio.sleep(0) await l2cap_channel.drain() # Test closing closed = [False, False] closed_event = asyncio.Event() def on_close(which, event): closed[which] = True if event: event.set() l2cap_channel.on('close', lambda: on_close(0, None)) incoming_channel.on('close', lambda: on_close(1, closed_event)) await l2cap_channel.disconnect() assert closed == [True, True] await closed_event.wait() sent_bytes = b''.join(messages) received_bytes = b''.join(received) assert sent_bytes == received_bytes # ----------------------------------------------------------------------------- async def transfer_payload(max_credits, mtu, mps): devices = TwoDevices() await devices.setup_connection() received = [] def on_coc(channel): def on_data(data): received.append(data) channel.sink = on_data server = devices.devices[1].create_l2cap_server( spec=LeCreditBasedChannelSpec(max_credits=max_credits, mtu=mtu, mps=mps), handler=on_coc, ) l2cap_channel = await devices.connections[0].create_l2cap_channel( spec=LeCreditBasedChannelSpec(server.psm) ) messages = [bytes([1, 2, 3, 4, 5, 6, 7]) * x for x in (3, 10, 100, 789)] for message in messages: l2cap_channel.write(message) await asyncio.sleep(0) if random.randint(0, 5) == 1: await l2cap_channel.drain() await l2cap_channel.drain() await l2cap_channel.disconnect() sent_bytes = b''.join(messages) received_bytes = b''.join(received) assert sent_bytes == received_bytes @pytest.mark.asyncio async def test_transfer(): for max_credits in (1, 10, 100, 10000): for mtu in (50, 255, 256, 1000): for mps in (50, 255, 256, 1000): # print(max_credits, mtu, mps) await transfer_payload(max_credits, mtu, mps) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_bidirectional_transfer(): devices = TwoDevices() await devices.setup_connection() client_received = [] server_received = [] server_channel = None def on_server_coc(channel): nonlocal server_channel server_channel = channel def on_server_data(data): server_received.append(data) channel.sink = on_server_data def on_client_data(data): client_received.append(data) server = devices.devices[1].create_l2cap_server( spec=LeCreditBasedChannelSpec(), handler=on_server_coc ) client_channel = await devices.connections[0].create_l2cap_channel( spec=LeCreditBasedChannelSpec(server.psm) ) client_channel.sink = on_client_data messages = [bytes([1, 2, 3, 4, 5, 6, 7]) * x for x in (3, 10, 100)] for message in messages: client_channel.write(message) await client_channel.drain() await asyncio.sleep(0) server_channel.write(message) await server_channel.drain() await client_channel.disconnect() message_bytes = b''.join(messages) client_received_bytes = b''.join(client_received) server_received_bytes = b''.join(server_received) assert client_received_bytes == message_bytes assert server_received_bytes == message_bytes # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_mtu(): devices = TwoDevices() await devices.setup_connection() def on_channel_open(channel): assert channel.peer_mtu == 456 def on_channel(channel): channel.on('open', lambda: on_channel_open(channel)) server = devices.devices[1].create_l2cap_server( spec=ClassicChannelSpec(mtu=345), handler=on_channel ) client_channel = await devices.connections[0].create_l2cap_channel( spec=ClassicChannelSpec(server.psm, mtu=456) ) assert client_channel.peer_mtu == 345 # ----------------------------------------------------------------------------- async def run(): test_helpers() await test_basic_connection() await test_transfer() await test_bidirectional_transfer() await test_mtu() # ----------------------------------------------------------------------------- if __name__ == '__main__': logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) asyncio.run(run())