1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import json
20import sys
21import os
22import io
23import logging
24import websockets
25
26from typing import Optional
27
28import bumble.core
29from bumble.device import Device, ScoLink
30from bumble.transport import open_transport_or_link
31from bumble.core import (
32    BT_BR_EDR_TRANSPORT,
33)
34from bumble import hci, rfcomm, hfp
35
36
37logger = logging.getLogger(__name__)
38
39ws: Optional[websockets.WebSocketServerProtocol] = None
40ag_protocol: Optional[hfp.AgProtocol] = None
41source_file: Optional[io.BufferedReader] = None
42
43
44def _default_configuration() -> hfp.AgConfiguration:
45    return hfp.AgConfiguration(
46        supported_ag_features=[
47            hfp.AgFeature.HF_INDICATORS,
48            hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
49            hfp.AgFeature.REJECT_CALL,
50            hfp.AgFeature.CODEC_NEGOTIATION,
51            hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
52            hfp.AgFeature.ENHANCED_CALL_STATUS,
53        ],
54        supported_ag_indicators=[
55            hfp.AgIndicatorState.call(),
56            hfp.AgIndicatorState.callsetup(),
57            hfp.AgIndicatorState.callheld(),
58            hfp.AgIndicatorState.service(),
59            hfp.AgIndicatorState.signal(),
60            hfp.AgIndicatorState.roam(),
61            hfp.AgIndicatorState.battchg(),
62        ],
63        supported_hf_indicators=[
64            hfp.HfIndicator.ENHANCED_SAFETY,
65            hfp.HfIndicator.BATTERY_LEVEL,
66        ],
67        supported_ag_call_hold_operations=[],
68        supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
69    )
70
71
72def send_message(type: str, **kwargs) -> None:
73    if ws:
74        asyncio.create_task(ws.send(json.dumps({'type': type, **kwargs})))
75
76
77def on_speaker_volume(level: int):
78    send_message(type='speaker_volume', level=level)
79
80
81def on_microphone_volume(level: int):
82    send_message(type='microphone_volume', level=level)
83
84
85def on_sco_state_change(codec: int):
86    if codec == hfp.AudioCodec.CVSD:
87        sample_rate = 8000
88    elif codec == hfp.AudioCodec.MSBC:
89        sample_rate = 16000
90    else:
91        sample_rate = 0
92
93    send_message(type='sco_state_change', sample_rate=sample_rate)
94
95
96def on_sco_packet(packet: hci.HCI_SynchronousDataPacket):
97    if ws:
98        asyncio.create_task(ws.send(packet.data))
99    if source_file and (pcm_data := source_file.read(packet.data_total_length)):
100        assert ag_protocol
101        host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host
102        host.send_hci_packet(
103            hci.HCI_SynchronousDataPacket(
104                connection_handle=packet.connection_handle,
105                packet_status=0,
106                data_total_length=len(pcm_data),
107                data=pcm_data,
108            )
109        )
110
111
112def on_hfp_state_change(connected: bool):
113    send_message(type='hfp_state_change', connected=connected)
114
115
116async def ws_server(ws_client: websockets.WebSocketServerProtocol, path: str):
117    del path
118    global ws
119    ws = ws_client
120
121    async for message in ws_client:
122        if not ag_protocol:
123            continue
124
125        json_message = json.loads(message)
126        message_type = json_message['type']
127        connection = ag_protocol.dlc.multiplexer.l2cap_channel.connection
128        device = connection.device
129
130        try:
131            if message_type == 'at_response':
132                ag_protocol.send_response(json_message['response'])
133            elif message_type == 'ag_indicator':
134                ag_protocol.update_ag_indicator(
135                    hfp.AgIndicator(json_message['indicator']),
136                    int(json_message['value']),
137                )
138            elif message_type == 'negotiate_codec':
139                codec = hfp.AudioCodec(int(json_message['codec']))
140                await ag_protocol.negotiate_codec(codec)
141            elif message_type == 'connect_sco':
142                if ag_protocol.active_codec == hfp.AudioCodec.CVSD:
143                    esco_param = hfp.ESCO_PARAMETERS[
144                        hfp.DefaultCodecParameters.ESCO_CVSD_S4
145                    ]
146                elif ag_protocol.active_codec == hfp.AudioCodec.MSBC:
147                    esco_param = hfp.ESCO_PARAMETERS[
148                        hfp.DefaultCodecParameters.ESCO_MSBC_T2
149                    ]
150                else:
151                    raise ValueError(f'Unsupported codec {codec}')
152
153                await device.send_command(
154                    hci.HCI_Enhanced_Setup_Synchronous_Connection_Command(
155                        connection_handle=connection.handle, **esco_param.asdict()
156                    )
157                )
158            elif message_type == 'disconnect_sco':
159                # Copy the values to avoid iteration error.
160                for sco_link in list(device.sco_links.values()):
161                    await sco_link.disconnect()
162            elif message_type == 'update_calls':
163                ag_protocol.calls = [
164                    hfp.CallInfo(
165                        index=int(call['index']),
166                        direction=hfp.CallInfoDirection(int(call['direction'])),
167                        status=hfp.CallInfoStatus(int(call['status'])),
168                        number=call['number'],
169                        multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE,
170                        mode=hfp.CallInfoMode.VOICE,
171                    )
172                    for call in json_message['calls']
173                ]
174
175        except Exception as e:
176            send_message(type='error', message=e)
177
178
179# -----------------------------------------------------------------------------
180async def main() -> None:
181    if len(sys.argv) < 3:
182        print(
183            'Usage: run_hfp_gateway.py <device-config> <transport-spec> '
184            '[bluetooth-address] [wav-file-for-source]'
185        )
186        print(
187            'example: run_hfp_gateway.py hfp_gateway.json usb:0 E1:CA:72:48:C4:E8 sample.wav'
188        )
189        return
190
191    print('<<< connecting to HCI...')
192    async with await open_transport_or_link(sys.argv[2]) as hci_transport:
193        print('<<< connected')
194
195        # Create a device
196        device = Device.from_config_file_with_hci(
197            sys.argv[1], hci_transport.source, hci_transport.sink
198        )
199        device.classic_enabled = True
200        await device.power_on()
201
202        rfcomm_server = rfcomm.Server(device)
203        configuration = _default_configuration()
204
205        def on_dlc(dlc: rfcomm.DLC):
206            global ag_protocol
207            ag_protocol = hfp.AgProtocol(dlc, configuration)
208            ag_protocol.on('speaker_volume', on_speaker_volume)
209            ag_protocol.on('microphone_volume', on_microphone_volume)
210            on_hfp_state_change(True)
211            dlc.multiplexer.l2cap_channel.on(
212                'close', lambda: on_hfp_state_change(False)
213            )
214
215        channel = rfcomm_server.listen(on_dlc)
216        device.sdp_service_records = {
217            1: hfp.make_ag_sdp_records(1, channel, configuration)
218        }
219
220        def on_sco_connection(sco_link: ScoLink):
221            assert ag_protocol
222            on_sco_state_change(ag_protocol.active_codec)
223            sco_link.on('disconnection', lambda _: on_sco_state_change(0))
224            sco_link.sink = on_sco_packet
225
226        device.on('sco_connection', on_sco_connection)
227        if len(sys.argv) >= 4:
228            # Connect to a peer
229            target_address = sys.argv[3]
230            print(f'=== Connecting to {target_address}...')
231            connection = await device.connect(
232                target_address, transport=BT_BR_EDR_TRANSPORT
233            )
234            print(f'=== Connected to {connection.peer_address}!')
235
236            # Get a list of all the Handsfree services (should only be 1)
237            if not (hfp_record := await hfp.find_hf_sdp_record(connection)):
238                print('!!! no service found')
239                return
240
241            # Pick the first one
242            channel, version, hf_sdp_features = hfp_record
243            print(f'HF version: {version}')
244            print(f'HF features: {hf_sdp_features}')
245
246            # Request authentication
247            print('*** Authenticating...')
248            await connection.authenticate()
249            print('*** Authenticated')
250
251            # Enable encryption
252            print('*** Enabling encryption...')
253            await connection.encrypt()
254            print('*** Encryption on')
255
256            # Create a client and start it
257            print('@@@ Starting to RFCOMM client...')
258            rfcomm_client = rfcomm.Client(connection)
259            rfcomm_mux = await rfcomm_client.start()
260            print('@@@ Started')
261
262            print(f'### Opening session for channel {channel}...')
263            try:
264                session = await rfcomm_mux.open_dlc(channel)
265                print('### Session open', session)
266            except bumble.core.ConnectionError as error:
267                print(f'### Session open failed: {error}')
268                await rfcomm_mux.disconnect()
269                print('@@@ Disconnected from RFCOMM server')
270                return
271
272            on_dlc(session)
273
274        await websockets.serve(ws_server, port=8888)
275
276        if len(sys.argv) >= 5:
277            global source_file
278            source_file = open(sys.argv[4], 'rb')
279            # Skip header
280            source_file.seek(44)
281
282        await hci_transport.source.terminated
283
284
285# -----------------------------------------------------------------------------
286logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
287asyncio.run(main())
288