1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import json 20import sys 21import os 22import io 23import logging 24import websockets 25 26from typing import Optional 27 28import bumble.core 29from bumble.device import Device, ScoLink 30from bumble.transport import open_transport_or_link 31from bumble.core import ( 32 BT_BR_EDR_TRANSPORT, 33) 34from bumble import hci, rfcomm, hfp 35 36 37logger = logging.getLogger(__name__) 38 39ws: Optional[websockets.WebSocketServerProtocol] = None 40ag_protocol: Optional[hfp.AgProtocol] = None 41source_file: Optional[io.BufferedReader] = None 42 43 44def _default_configuration() -> hfp.AgConfiguration: 45 return hfp.AgConfiguration( 46 supported_ag_features=[ 47 hfp.AgFeature.HF_INDICATORS, 48 hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY, 49 hfp.AgFeature.REJECT_CALL, 50 hfp.AgFeature.CODEC_NEGOTIATION, 51 hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, 52 hfp.AgFeature.ENHANCED_CALL_STATUS, 53 ], 54 supported_ag_indicators=[ 55 hfp.AgIndicatorState.call(), 56 hfp.AgIndicatorState.callsetup(), 57 hfp.AgIndicatorState.callheld(), 58 hfp.AgIndicatorState.service(), 59 hfp.AgIndicatorState.signal(), 60 hfp.AgIndicatorState.roam(), 61 hfp.AgIndicatorState.battchg(), 62 ], 63 supported_hf_indicators=[ 64 hfp.HfIndicator.ENHANCED_SAFETY, 65 hfp.HfIndicator.BATTERY_LEVEL, 66 ], 67 supported_ag_call_hold_operations=[], 68 supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC], 69 ) 70 71 72def send_message(type: str, **kwargs) -> None: 73 if ws: 74 asyncio.create_task(ws.send(json.dumps({'type': type, **kwargs}))) 75 76 77def on_speaker_volume(level: int): 78 send_message(type='speaker_volume', level=level) 79 80 81def on_microphone_volume(level: int): 82 send_message(type='microphone_volume', level=level) 83 84 85def on_sco_state_change(codec: int): 86 if codec == hfp.AudioCodec.CVSD: 87 sample_rate = 8000 88 elif codec == hfp.AudioCodec.MSBC: 89 sample_rate = 16000 90 else: 91 sample_rate = 0 92 93 send_message(type='sco_state_change', sample_rate=sample_rate) 94 95 96def on_sco_packet(packet: hci.HCI_SynchronousDataPacket): 97 if ws: 98 asyncio.create_task(ws.send(packet.data)) 99 if source_file and (pcm_data := source_file.read(packet.data_total_length)): 100 assert ag_protocol 101 host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host 102 host.send_hci_packet( 103 hci.HCI_SynchronousDataPacket( 104 connection_handle=packet.connection_handle, 105 packet_status=0, 106 data_total_length=len(pcm_data), 107 data=pcm_data, 108 ) 109 ) 110 111 112def on_hfp_state_change(connected: bool): 113 send_message(type='hfp_state_change', connected=connected) 114 115 116async def ws_server(ws_client: websockets.WebSocketServerProtocol, path: str): 117 del path 118 global ws 119 ws = ws_client 120 121 async for message in ws_client: 122 if not ag_protocol: 123 continue 124 125 json_message = json.loads(message) 126 message_type = json_message['type'] 127 connection = ag_protocol.dlc.multiplexer.l2cap_channel.connection 128 device = connection.device 129 130 try: 131 if message_type == 'at_response': 132 ag_protocol.send_response(json_message['response']) 133 elif message_type == 'ag_indicator': 134 ag_protocol.update_ag_indicator( 135 hfp.AgIndicator(json_message['indicator']), 136 int(json_message['value']), 137 ) 138 elif message_type == 'negotiate_codec': 139 codec = hfp.AudioCodec(int(json_message['codec'])) 140 await ag_protocol.negotiate_codec(codec) 141 elif message_type == 'connect_sco': 142 if ag_protocol.active_codec == hfp.AudioCodec.CVSD: 143 esco_param = hfp.ESCO_PARAMETERS[ 144 hfp.DefaultCodecParameters.ESCO_CVSD_S4 145 ] 146 elif ag_protocol.active_codec == hfp.AudioCodec.MSBC: 147 esco_param = hfp.ESCO_PARAMETERS[ 148 hfp.DefaultCodecParameters.ESCO_MSBC_T2 149 ] 150 else: 151 raise ValueError(f'Unsupported codec {codec}') 152 153 await device.send_command( 154 hci.HCI_Enhanced_Setup_Synchronous_Connection_Command( 155 connection_handle=connection.handle, **esco_param.asdict() 156 ) 157 ) 158 elif message_type == 'disconnect_sco': 159 # Copy the values to avoid iteration error. 160 for sco_link in list(device.sco_links.values()): 161 await sco_link.disconnect() 162 elif message_type == 'update_calls': 163 ag_protocol.calls = [ 164 hfp.CallInfo( 165 index=int(call['index']), 166 direction=hfp.CallInfoDirection(int(call['direction'])), 167 status=hfp.CallInfoStatus(int(call['status'])), 168 number=call['number'], 169 multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, 170 mode=hfp.CallInfoMode.VOICE, 171 ) 172 for call in json_message['calls'] 173 ] 174 175 except Exception as e: 176 send_message(type='error', message=e) 177 178 179# ----------------------------------------------------------------------------- 180async def main() -> None: 181 if len(sys.argv) < 3: 182 print( 183 'Usage: run_hfp_gateway.py <device-config> <transport-spec> ' 184 '[bluetooth-address] [wav-file-for-source]' 185 ) 186 print( 187 'example: run_hfp_gateway.py hfp_gateway.json usb:0 E1:CA:72:48:C4:E8 sample.wav' 188 ) 189 return 190 191 print('<<< connecting to HCI...') 192 async with await open_transport_or_link(sys.argv[2]) as hci_transport: 193 print('<<< connected') 194 195 # Create a device 196 device = Device.from_config_file_with_hci( 197 sys.argv[1], hci_transport.source, hci_transport.sink 198 ) 199 device.classic_enabled = True 200 await device.power_on() 201 202 rfcomm_server = rfcomm.Server(device) 203 configuration = _default_configuration() 204 205 def on_dlc(dlc: rfcomm.DLC): 206 global ag_protocol 207 ag_protocol = hfp.AgProtocol(dlc, configuration) 208 ag_protocol.on('speaker_volume', on_speaker_volume) 209 ag_protocol.on('microphone_volume', on_microphone_volume) 210 on_hfp_state_change(True) 211 dlc.multiplexer.l2cap_channel.on( 212 'close', lambda: on_hfp_state_change(False) 213 ) 214 215 channel = rfcomm_server.listen(on_dlc) 216 device.sdp_service_records = { 217 1: hfp.make_ag_sdp_records(1, channel, configuration) 218 } 219 220 def on_sco_connection(sco_link: ScoLink): 221 assert ag_protocol 222 on_sco_state_change(ag_protocol.active_codec) 223 sco_link.on('disconnection', lambda _: on_sco_state_change(0)) 224 sco_link.sink = on_sco_packet 225 226 device.on('sco_connection', on_sco_connection) 227 if len(sys.argv) >= 4: 228 # Connect to a peer 229 target_address = sys.argv[3] 230 print(f'=== Connecting to {target_address}...') 231 connection = await device.connect( 232 target_address, transport=BT_BR_EDR_TRANSPORT 233 ) 234 print(f'=== Connected to {connection.peer_address}!') 235 236 # Get a list of all the Handsfree services (should only be 1) 237 if not (hfp_record := await hfp.find_hf_sdp_record(connection)): 238 print('!!! no service found') 239 return 240 241 # Pick the first one 242 channel, version, hf_sdp_features = hfp_record 243 print(f'HF version: {version}') 244 print(f'HF features: {hf_sdp_features}') 245 246 # Request authentication 247 print('*** Authenticating...') 248 await connection.authenticate() 249 print('*** Authenticated') 250 251 # Enable encryption 252 print('*** Enabling encryption...') 253 await connection.encrypt() 254 print('*** Encryption on') 255 256 # Create a client and start it 257 print('@@@ Starting to RFCOMM client...') 258 rfcomm_client = rfcomm.Client(connection) 259 rfcomm_mux = await rfcomm_client.start() 260 print('@@@ Started') 261 262 print(f'### Opening session for channel {channel}...') 263 try: 264 session = await rfcomm_mux.open_dlc(channel) 265 print('### Session open', session) 266 except bumble.core.ConnectionError as error: 267 print(f'### Session open failed: {error}') 268 await rfcomm_mux.disconnect() 269 print('@@@ Disconnected from RFCOMM server') 270 return 271 272 on_dlc(session) 273 274 await websockets.serve(ws_server, port=8888) 275 276 if len(sys.argv) >= 5: 277 global source_file 278 source_file = open(sys.argv[4], 'rb') 279 # Skip header 280 source_file.seek(44) 281 282 await hci_transport.source.terminated 283 284 285# ----------------------------------------------------------------------------- 286logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 287asyncio.run(main()) 288