# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import asyncio import contextlib import dataclasses import logging import os from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple import click import pyee from bumble.colors import color import bumble.company_ids import bumble.core import bumble.device import bumble.gatt import bumble.hci import bumble.profiles.bap import bumble.profiles.bass import bumble.profiles.pbp import bumble.transport import bumble.utils # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # Constants # ----------------------------------------------------------------------------- AURACAST_DEFAULT_DEVICE_NAME = 'Bumble Auracast' AURACAST_DEFAULT_DEVICE_ADDRESS = bumble.hci.Address('F0:F1:F2:F3:F4:F5') AURACAST_DEFAULT_SYNC_TIMEOUT = 5.0 AURACAST_DEFAULT_ATT_MTU = 256 # ----------------------------------------------------------------------------- # Scan For Broadcasts # ----------------------------------------------------------------------------- class BroadcastScanner(pyee.EventEmitter): @dataclasses.dataclass class Broadcast(pyee.EventEmitter): name: str sync: bumble.device.PeriodicAdvertisingSync rssi: int = 0 public_broadcast_announcement: Optional[ bumble.profiles.pbp.PublicBroadcastAnnouncement ] = None broadcast_audio_announcement: Optional[ bumble.profiles.bap.BroadcastAudioAnnouncement ] = None basic_audio_announcement: Optional[ bumble.profiles.bap.BasicAudioAnnouncement ] = None appearance: Optional[bumble.core.Appearance] = None biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None manufacturer_data: Optional[Tuple[str, bytes]] = None def __post_init__(self) -> None: super().__init__() self.sync.on('establishment', self.on_sync_establishment) self.sync.on('loss', self.on_sync_loss) self.sync.on('periodic_advertisement', self.on_periodic_advertisement) self.sync.on('biginfo_advertisement', self.on_biginfo_advertisement) def update(self, advertisement: bumble.device.Advertisement) -> None: self.rssi = advertisement.rssi for service_data in advertisement.data.get_all( bumble.core.AdvertisingData.SERVICE_DATA ): assert isinstance(service_data, tuple) service_uuid, data = service_data assert isinstance(data, bytes) if ( service_uuid == bumble.gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE ): self.public_broadcast_announcement = ( bumble.profiles.pbp.PublicBroadcastAnnouncement.from_bytes(data) ) continue if ( service_uuid == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE ): self.broadcast_audio_announcement = ( bumble.profiles.bap.BroadcastAudioAnnouncement.from_bytes(data) ) continue self.appearance = advertisement.data.get( # type: ignore[assignment] bumble.core.AdvertisingData.APPEARANCE ) if manufacturer_data := advertisement.data.get( bumble.core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA ): assert isinstance(manufacturer_data, tuple) company_id = cast(int, manufacturer_data[0]) data = cast(bytes, manufacturer_data[1]) self.manufacturer_data = ( bumble.company_ids.COMPANY_IDENTIFIERS.get( company_id, f'0x{company_id:04X}' ), data, ) self.emit('update') def print(self) -> None: print( color('Broadcast:', 'yellow'), self.sync.advertiser_address, color(self.sync.state.name, 'green'), ) print(f' {color("Name", "cyan")}: {self.name}') if self.appearance: print(f' {color("Appearance", "cyan")}: {str(self.appearance)}') print(f' {color("RSSI", "cyan")}: {self.rssi}') print(f' {color("SID", "cyan")}: {self.sync.sid}') if self.manufacturer_data: print( f' {color("Manufacturer Data", "cyan")}: ' f'{self.manufacturer_data[0]} -> {self.manufacturer_data[1].hex()}' ) if self.broadcast_audio_announcement: print( f' {color("Broadcast ID", "cyan")}: ' f'{self.broadcast_audio_announcement.broadcast_id}' ) if self.public_broadcast_announcement: print( f' {color("Features", "cyan")}: ' f'{self.public_broadcast_announcement.features}' ) print( f' {color("Metadata", "cyan")}: ' f'{self.public_broadcast_announcement.metadata}' ) if self.basic_audio_announcement: print(color(' Audio:', 'cyan')) print( color(' Presentation Delay:', 'magenta'), self.basic_audio_announcement.presentation_delay, ) for subgroup in self.basic_audio_announcement.subgroups: print(color(' Subgroup:', 'magenta')) print(color(' Codec ID:', 'yellow')) print( color(' Coding Format: ', 'green'), subgroup.codec_id.coding_format.name, ) print( color(' Company ID: ', 'green'), subgroup.codec_id.company_id, ) print( color(' Vendor Specific Codec ID:', 'green'), subgroup.codec_id.vendor_specific_codec_id, ) print( color(' Codec Config:', 'yellow'), subgroup.codec_specific_configuration, ) print(color(' Metadata: ', 'yellow'), subgroup.metadata) for bis in subgroup.bis: print(color(f' BIS [{bis.index}]:', 'yellow')) print( color(' Codec Config:', 'green'), bis.codec_specific_configuration, ) if self.biginfo: print(color(' BIG:', 'cyan')) print( color(' Number of BIS:', 'magenta'), self.biginfo.num_bis, ) print( color(' PHY: ', 'magenta'), self.biginfo.phy.name, ) print( color(' Framed: ', 'magenta'), self.biginfo.framed, ) print( color(' Encrypted: ', 'magenta'), self.biginfo.encrypted, ) def on_sync_establishment(self) -> None: self.emit('sync_establishment') def on_sync_loss(self) -> None: self.basic_audio_announcement = None self.biginfo = None self.emit('sync_loss') def on_periodic_advertisement( self, advertisement: bumble.device.PeriodicAdvertisement ) -> None: if advertisement.data is None: return for service_data in advertisement.data.get_all( bumble.core.AdvertisingData.SERVICE_DATA ): assert isinstance(service_data, tuple) service_uuid, data = service_data assert isinstance(data, bytes) if service_uuid == bumble.gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE: self.basic_audio_announcement = ( bumble.profiles.bap.BasicAudioAnnouncement.from_bytes(data) ) break self.emit('change') def on_biginfo_advertisement( self, advertisement: bumble.device.BIGInfoAdvertisement ) -> None: self.biginfo = advertisement self.emit('change') def __init__( self, device: bumble.device.Device, filter_duplicates: bool, sync_timeout: float, ): super().__init__() self.device = device self.filter_duplicates = filter_duplicates self.sync_timeout = sync_timeout self.broadcasts: Dict[bumble.hci.Address, BroadcastScanner.Broadcast] = {} device.on('advertisement', self.on_advertisement) async def start(self) -> None: await self.device.start_scanning( active=False, filter_duplicates=False, ) async def stop(self) -> None: await self.device.stop_scanning() def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: if ( broadcast_name := advertisement.data.get( bumble.core.AdvertisingData.BROADCAST_NAME ) ) is None: return assert isinstance(broadcast_name, str) if broadcast := self.broadcasts.get(advertisement.address): broadcast.update(advertisement) return bumble.utils.AsyncRunner.spawn( self.on_new_broadcast(broadcast_name, advertisement) ) async def on_new_broadcast( self, name: str, advertisement: bumble.device.Advertisement ) -> None: periodic_advertising_sync = await self.device.create_periodic_advertising_sync( advertiser_address=advertisement.address, sid=advertisement.sid, sync_timeout=self.sync_timeout, filter_duplicates=self.filter_duplicates, ) broadcast = self.Broadcast( name, periodic_advertising_sync, ) broadcast.update(advertisement) self.broadcasts[advertisement.address] = broadcast periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast)) self.emit('new_broadcast', broadcast) def on_broadcast_loss(self, broadcast: Broadcast) -> None: del self.broadcasts[broadcast.sync.advertiser_address] bumble.utils.AsyncRunner.spawn(broadcast.sync.terminate()) self.emit('broadcast_loss', broadcast) class PrintingBroadcastScanner: def __init__( self, device: bumble.device.Device, filter_duplicates: bool, sync_timeout: float ) -> None: self.scanner = BroadcastScanner(device, filter_duplicates, sync_timeout) self.scanner.on('new_broadcast', self.on_new_broadcast) self.scanner.on('broadcast_loss', self.on_broadcast_loss) self.scanner.on('update', self.refresh) self.status_message = '' async def start(self) -> None: self.status_message = color('Scanning...', 'green') await self.scanner.start() def on_new_broadcast(self, broadcast: BroadcastScanner.Broadcast) -> None: self.status_message = color( f'+Found {len(self.scanner.broadcasts)} broadcasts', 'green' ) broadcast.on('change', self.refresh) broadcast.on('update', self.refresh) self.refresh() def on_broadcast_loss(self, broadcast: BroadcastScanner.Broadcast) -> None: self.status_message = color( f'-Found {len(self.scanner.broadcasts)} broadcasts', 'green' ) self.refresh() def refresh(self) -> None: # Clear the screen from the top print('\033[H') print('\033[0J') print('\033[H') # Print the status message print(self.status_message) print("==========================================") # Print all broadcasts for broadcast in self.scanner.broadcasts.values(): broadcast.print() print('------------------------------------------') # Clear the screen to the bottom print('\033[0J') @contextlib.asynccontextmanager async def create_device(transport: str) -> AsyncGenerator[bumble.device.Device, Any]: async with await bumble.transport.open_transport(transport) as ( hci_source, hci_sink, ): device_config = bumble.device.DeviceConfiguration( name=AURACAST_DEFAULT_DEVICE_NAME, address=AURACAST_DEFAULT_DEVICE_ADDRESS, keystore='JsonKeyStore', ) device = bumble.device.Device.from_config_with_hci( device_config, hci_source, hci_sink, ) await device.power_on() yield device async def find_broadcast_by_name( device: bumble.device.Device, name: Optional[str] ) -> BroadcastScanner.Broadcast: result = asyncio.get_running_loop().create_future() def on_broadcast_change(broadcast: BroadcastScanner.Broadcast) -> None: if broadcast.basic_audio_announcement and not result.done(): print(color('Broadcast basic audio announcement received', 'green')) result.set_result(broadcast) def on_new_broadcast(broadcast: BroadcastScanner.Broadcast) -> None: if name is None or broadcast.name == name: print(color('Broadcast found:', 'green'), broadcast.name) broadcast.on('change', lambda: on_broadcast_change(broadcast)) return print(color(f'Skipping broadcast {broadcast.name}')) scanner = BroadcastScanner(device, False, AURACAST_DEFAULT_SYNC_TIMEOUT) scanner.on('new_broadcast', on_new_broadcast) await scanner.start() broadcast = await result await scanner.stop() return broadcast async def run_scan( filter_duplicates: bool, sync_timeout: float, transport: str ) -> None: async with create_device(transport) as device: if not device.supports_le_periodic_advertising: print(color('Periodic advertising not supported', 'red')) return scanner = PrintingBroadcastScanner(device, filter_duplicates, sync_timeout) await scanner.start() await asyncio.get_running_loop().create_future() async def run_assist( broadcast_name: Optional[str], source_id: Optional[int], command: str, transport: str, address: str, ) -> None: async with create_device(transport) as device: if not device.supports_le_periodic_advertising: print(color('Periodic advertising not supported', 'red')) return # Connect to the server print(f'=== Connecting to {address}...') connection = await device.connect(address) peer = bumble.device.Peer(connection) print(f'=== Connected to {peer}') print("+++ Encrypting connection...") await peer.connection.encrypt() print("+++ Connection encrypted") # Request a larger MTU mtu = AURACAST_DEFAULT_ATT_MTU print(color(f'$$$ Requesting MTU={mtu}', 'yellow')) await peer.request_mtu(mtu) # Get the BASS service bass = await peer.discover_service_and_create_proxy( bumble.profiles.bass.BroadcastAudioScanServiceProxy ) # Check that the service was found if not bass: print(color('!!! Broadcast Audio Scan Service not found', 'red')) return # Subscribe to and read the broadcast receive state characteristics for i, broadcast_receive_state in enumerate(bass.broadcast_receive_states): try: await broadcast_receive_state.subscribe( lambda value, i=i: print( f"{color(f'Broadcast Receive State Update [{i}]:', 'green')} {value}" ) ) except bumble.core.ProtocolError as error: print( color( f'!!! Failed to subscribe to Broadcast Receive State characteristic:', 'red', ), error, ) value = await broadcast_receive_state.read_value() print( f'{color(f"Initial Broadcast Receive State [{i}]:", "green")} {value}' ) if command == 'monitor-state': await peer.sustain() return if command == 'add-source': # Find the requested broadcast await bass.remote_scan_started() if broadcast_name: print(color('Scanning for broadcast:', 'cyan'), broadcast_name) else: print(color('Scanning for any broadcast', 'cyan')) broadcast = await find_broadcast_by_name(device, broadcast_name) if broadcast.broadcast_audio_announcement is None: print(color('No broadcast audio announcement found', 'red')) return if ( broadcast.basic_audio_announcement is None or not broadcast.basic_audio_announcement.subgroups ): print(color('No subgroups found', 'red')) return # Add the source print(color('Adding source:', 'blue'), broadcast.sync.advertiser_address) await bass.add_source( broadcast.sync.advertiser_address, broadcast.sync.sid, broadcast.broadcast_audio_announcement.broadcast_id, bumble.profiles.bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_AVAILABLE, 0xFFFF, [ bumble.profiles.bass.SubgroupInfo( bumble.profiles.bass.SubgroupInfo.ANY_BIS, bytes(broadcast.basic_audio_announcement.subgroups[0].metadata), ) ], ) # Initiate a PA Sync Transfer await broadcast.sync.transfer(peer.connection) # Notify the sink that we're done scanning. await bass.remote_scan_stopped() await peer.sustain() return if command == 'modify-source': if source_id is None: print(color('!!! modify-source requires --source-id')) return # Find the requested broadcast await bass.remote_scan_started() if broadcast_name: print(color('Scanning for broadcast:', 'cyan'), broadcast_name) else: print(color('Scanning for any broadcast', 'cyan')) broadcast = await find_broadcast_by_name(device, broadcast_name) if broadcast.broadcast_audio_announcement is None: print(color('No broadcast audio announcement found', 'red')) return if ( broadcast.basic_audio_announcement is None or not broadcast.basic_audio_announcement.subgroups ): print(color('No subgroups found', 'red')) return # Modify the source print( color('Modifying source:', 'blue'), source_id, ) await bass.modify_source( source_id, bumble.profiles.bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_NOT_AVAILABLE, 0xFFFF, [ bumble.profiles.bass.SubgroupInfo( bumble.profiles.bass.SubgroupInfo.ANY_BIS, bytes(broadcast.basic_audio_announcement.subgroups[0].metadata), ) ], ) await peer.sustain() return if command == 'remove-source': if source_id is None: print(color('!!! remove-source requires --source-id')) return # Remove the source print(color('Removing source:', 'blue'), source_id) await bass.remove_source(source_id) await peer.sustain() return print(color(f'!!! invalid command {command}')) async def run_pair(transport: str, address: str) -> None: async with create_device(transport) as device: # Connect to the server print(f'=== Connecting to {address}...') async with device.connect_as_gatt(address) as peer: print(f'=== Connected to {peer}') print("+++ Initiating pairing...") await peer.connection.pair() print("+++ Paired") def run_async(async_command: Coroutine) -> None: try: asyncio.run(async_command) except bumble.core.ProtocolError as error: if error.error_namespace == 'att' and error.error_code in list( bumble.profiles.bass.ApplicationError ): message = bumble.profiles.bass.ApplicationError(error.error_code).name else: message = str(error) print( color('!!! An error occurred while executing the command:', 'red'), message ) # ----------------------------------------------------------------------------- # Main # ----------------------------------------------------------------------------- @click.group() @click.pass_context def auracast( ctx, ): ctx.ensure_object(dict) @auracast.command('scan') @click.option( '--filter-duplicates', is_flag=True, default=False, help='Filter duplicates' ) @click.option( '--sync-timeout', metavar='SYNC_TIMEOUT', type=float, default=AURACAST_DEFAULT_SYNC_TIMEOUT, help='Sync timeout (in seconds)', ) @click.argument('transport') @click.pass_context def scan(ctx, filter_duplicates, sync_timeout, transport): """Scan for public broadcasts""" run_async(run_scan(filter_duplicates, sync_timeout, transport)) @auracast.command('assist') @click.option( '--broadcast-name', metavar='BROADCAST_NAME', help='Broadcast Name to tune to', ) @click.option( '--source-id', metavar='SOURCE_ID', type=int, help='Source ID (for remove-source command)', ) @click.option( '--command', type=click.Choice( ['monitor-state', 'add-source', 'modify-source', 'remove-source'] ), required=True, ) @click.argument('transport') @click.argument('address') @click.pass_context def assist(ctx, broadcast_name, source_id, command, transport, address): """Scan for broadcasts on behalf of a audio server""" run_async(run_assist(broadcast_name, source_id, command, transport, address)) @auracast.command('pair') @click.argument('transport') @click.argument('address') @click.pass_context def pair(ctx, transport, address): """Pair with an audio server""" run_async(run_pair(transport, address)) def main(): logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) auracast() # ----------------------------------------------------------------------------- if __name__ == "__main__": main() # pylint: disable=no-value-for-parameter