1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import os 21import pytest 22 23from bumble.controller import Controller 24from bumble.core import BT_BR_EDR_TRANSPORT 25from bumble.link import LocalLink 26from bumble.device import Device 27from bumble.host import Host 28from bumble.transport import AsyncPipeSink 29from bumble.avdtp import ( 30 AVDTP_IDLE_STATE, 31 AVDTP_STREAMING_STATE, 32 MediaPacketPump, 33 Protocol, 34 Listener, 35 MediaCodecCapabilities, 36 MediaPacket, 37 AVDTP_AUDIO_MEDIA_TYPE, 38 AVDTP_TSEP_SNK, 39 A2DP_SBC_CODEC_TYPE, 40) 41from bumble.a2dp import ( 42 SbcMediaCodecInformation, 43 SBC_MONO_CHANNEL_MODE, 44 SBC_DUAL_CHANNEL_MODE, 45 SBC_STEREO_CHANNEL_MODE, 46 SBC_JOINT_STEREO_CHANNEL_MODE, 47 SBC_LOUDNESS_ALLOCATION_METHOD, 48 SBC_SNR_ALLOCATION_METHOD, 49) 50 51# ----------------------------------------------------------------------------- 52# Logging 53# ----------------------------------------------------------------------------- 54logger = logging.getLogger(__name__) 55 56 57# ----------------------------------------------------------------------------- 58class TwoDevices: 59 def __init__(self): 60 self.connections = [None, None] 61 62 addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0'] 63 self.link = LocalLink() 64 self.controllers = [ 65 Controller('C1', link=self.link, public_address=addresses[0]), 66 Controller('C2', link=self.link, public_address=addresses[1]), 67 ] 68 self.devices = [ 69 Device( 70 address=addresses[0], 71 host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), 72 ), 73 Device( 74 address=addresses[1], 75 host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), 76 ), 77 ] 78 79 self.paired = [None, None] 80 81 def on_connection(self, which, connection): 82 self.connections[which] = connection 83 84 def on_paired(self, which, keys): 85 self.paired[which] = keys 86 87 88# ----------------------------------------------------------------------------- 89@pytest.mark.asyncio 90async def test_self_connection(): 91 # Create two devices, each with a controller, attached to the same link 92 two_devices = TwoDevices() 93 94 # Attach listeners 95 two_devices.devices[0].on( 96 'connection', lambda connection: two_devices.on_connection(0, connection) 97 ) 98 two_devices.devices[1].on( 99 'connection', lambda connection: two_devices.on_connection(1, connection) 100 ) 101 102 # Enable Classic connections 103 two_devices.devices[0].classic_enabled = True 104 two_devices.devices[1].classic_enabled = True 105 106 # Start 107 await two_devices.devices[0].power_on() 108 await two_devices.devices[1].power_on() 109 110 # Connect the two devices 111 await asyncio.gather( 112 two_devices.devices[0].connect( 113 two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT 114 ), 115 two_devices.devices[1].accept(two_devices.devices[0].public_address), 116 ) 117 118 # Check the post conditions 119 assert two_devices.connections[0] is not None 120 assert two_devices.connections[1] is not None 121 122 123# ----------------------------------------------------------------------------- 124def source_codec_capabilities(): 125 return MediaCodecCapabilities( 126 media_type=AVDTP_AUDIO_MEDIA_TYPE, 127 media_codec_type=A2DP_SBC_CODEC_TYPE, 128 media_codec_information=SbcMediaCodecInformation.from_discrete_values( 129 sampling_frequency=44100, 130 channel_mode=SBC_JOINT_STEREO_CHANNEL_MODE, 131 block_length=16, 132 subbands=8, 133 allocation_method=SBC_LOUDNESS_ALLOCATION_METHOD, 134 minimum_bitpool_value=2, 135 maximum_bitpool_value=53, 136 ), 137 ) 138 139 140# ----------------------------------------------------------------------------- 141def sink_codec_capabilities(): 142 return MediaCodecCapabilities( 143 media_type=AVDTP_AUDIO_MEDIA_TYPE, 144 media_codec_type=A2DP_SBC_CODEC_TYPE, 145 media_codec_information=SbcMediaCodecInformation.from_lists( 146 sampling_frequencies=[48000, 44100, 32000, 16000], 147 channel_modes=[ 148 SBC_MONO_CHANNEL_MODE, 149 SBC_DUAL_CHANNEL_MODE, 150 SBC_STEREO_CHANNEL_MODE, 151 SBC_JOINT_STEREO_CHANNEL_MODE, 152 ], 153 block_lengths=[4, 8, 12, 16], 154 subbands=[4, 8], 155 allocation_methods=[ 156 SBC_LOUDNESS_ALLOCATION_METHOD, 157 SBC_SNR_ALLOCATION_METHOD, 158 ], 159 minimum_bitpool_value=2, 160 maximum_bitpool_value=53, 161 ), 162 ) 163 164 165# ----------------------------------------------------------------------------- 166@pytest.mark.asyncio 167async def test_source_sink_1(): 168 two_devices = TwoDevices() 169 # Enable Classic connections 170 two_devices.devices[0].classic_enabled = True 171 two_devices.devices[1].classic_enabled = True 172 await two_devices.devices[0].power_on() 173 await two_devices.devices[1].power_on() 174 175 def on_rtp_packet(packet): 176 rtp_packets.append(packet) 177 if len(rtp_packets) == rtp_packets_expected: 178 rtp_packets_fully_received.set_result(None) 179 180 sink = None 181 182 def on_avdtp_connection(server): 183 nonlocal sink 184 sink = server.add_sink(sink_codec_capabilities()) 185 sink.on('rtp_packet', on_rtp_packet) 186 187 # Create a listener to wait for AVDTP connections 188 listener = Listener.for_device(two_devices.devices[1]) 189 listener.on('connection', on_avdtp_connection) 190 191 async def make_connection(): 192 connections = await asyncio.gather( 193 two_devices.devices[0].connect( 194 two_devices.devices[1].public_address, BT_BR_EDR_TRANSPORT 195 ), 196 two_devices.devices[1].accept(two_devices.devices[0].public_address), 197 ) 198 return connections[0] 199 200 connection = await make_connection() 201 client = await Protocol.connect(connection) 202 endpoints = await client.discover_remote_endpoints() 203 assert len(endpoints) == 1 204 remote_sink = list(endpoints)[0] 205 assert remote_sink.in_use == 0 206 assert remote_sink.media_type == AVDTP_AUDIO_MEDIA_TYPE 207 assert remote_sink.tsep == AVDTP_TSEP_SNK 208 209 async def generate_packets(packet_count): 210 sequence_number = 0 211 timestamp = 0 212 for i in range(packet_count): 213 payload = bytes([sequence_number % 256]) 214 packet = MediaPacket( 215 2, 0, 0, 0, sequence_number, timestamp, 0, [], 96, payload 216 ) 217 packet.timestamp_seconds = timestamp / 44100 218 timestamp += 10 219 sequence_number += 1 220 yield packet 221 222 # Send packets using a pump object 223 rtp_packets_fully_received = asyncio.get_running_loop().create_future() 224 rtp_packets_expected = 3 225 rtp_packets = [] 226 pump = MediaPacketPump(generate_packets(3)) 227 source = client.add_source(source_codec_capabilities(), pump) 228 stream = await client.create_stream(source, remote_sink) 229 await stream.start() 230 assert stream.state == AVDTP_STREAMING_STATE 231 assert stream.local_endpoint.in_use == 1 232 assert stream.rtp_channel is not None 233 assert sink.in_use == 1 234 assert sink.stream is not None 235 assert sink.stream.state == AVDTP_STREAMING_STATE 236 await rtp_packets_fully_received 237 238 await stream.close() 239 assert stream.rtp_channel is None 240 assert source.in_use == 0 241 assert source.stream.state == AVDTP_IDLE_STATE 242 assert sink.in_use == 0 243 assert sink.stream.state == AVDTP_IDLE_STATE 244 245 # Send packets manually 246 rtp_packets_fully_received = asyncio.get_running_loop().create_future() 247 rtp_packets_expected = 3 248 rtp_packets = [] 249 source_packets = [ 250 MediaPacket(2, 0, 0, 0, i, i * 10, 0, [], 96, bytes([i])) for i in range(3) 251 ] 252 source = client.add_source(source_codec_capabilities(), None) 253 stream = await client.create_stream(source, remote_sink) 254 await stream.start() 255 assert stream.state == AVDTP_STREAMING_STATE 256 assert stream.local_endpoint.in_use == 1 257 assert stream.rtp_channel is not None 258 assert sink.in_use == 1 259 assert sink.stream is not None 260 assert sink.stream.state == AVDTP_STREAMING_STATE 261 262 stream.send_media_packet(source_packets[0]) 263 stream.send_media_packet(source_packets[1]) 264 stream.send_media_packet(source_packets[2]) 265 266 await stream.close() 267 assert stream.rtp_channel is None 268 assert len(rtp_packets) == 3 269 assert source.in_use == 0 270 assert source.stream.state == AVDTP_IDLE_STATE 271 assert sink.in_use == 0 272 assert sink.stream.state == AVDTP_IDLE_STATE 273 274 275# ----------------------------------------------------------------------------- 276async def run_test_self(): 277 await test_self_connection() 278 await test_source_sink_1() 279 280 281# ----------------------------------------------------------------------------- 282if __name__ == '__main__': 283 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 284 asyncio.run(run_test_self()) 285