1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import logging
20import os
21import pytest
22import pytest_asyncio
23
24from typing import Tuple, Optional
25
26from .test_utils import TwoDevices
27from bumble import core
28from bumble import hfp
29from bumble import rfcomm
30from bumble import hci
31
32
33# -----------------------------------------------------------------------------
34# Logging
35# -----------------------------------------------------------------------------
36logger = logging.getLogger(__name__)
37
38
39# -----------------------------------------------------------------------------
40def _default_hf_configuration() -> hfp.HfConfiguration:
41    return hfp.HfConfiguration(
42        supported_hf_features=[
43            hfp.HfFeature.CODEC_NEGOTIATION,
44            hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
45            hfp.HfFeature.HF_INDICATORS,
46            hfp.HfFeature.ENHANCED_CALL_STATUS,
47            hfp.HfFeature.THREE_WAY_CALLING,
48            hfp.HfFeature.CLI_PRESENTATION_CAPABILITY,
49        ],
50        supported_hf_indicators=[
51            hfp.HfIndicator.ENHANCED_SAFETY,
52            hfp.HfIndicator.BATTERY_LEVEL,
53        ],
54        supported_audio_codecs=[
55            hfp.AudioCodec.CVSD,
56            hfp.AudioCodec.MSBC,
57        ],
58    )
59
60
61# -----------------------------------------------------------------------------
62def _default_hf_sdp_features() -> hfp.HfSdpFeature:
63    return (
64        hfp.HfSdpFeature.WIDE_BAND
65        | hfp.HfSdpFeature.THREE_WAY_CALLING
66        | hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY
67    )
68
69
70# -----------------------------------------------------------------------------
71def _default_ag_configuration() -> hfp.AgConfiguration:
72    return hfp.AgConfiguration(
73        supported_ag_features=[
74            hfp.AgFeature.HF_INDICATORS,
75            hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
76            hfp.AgFeature.REJECT_CALL,
77            hfp.AgFeature.CODEC_NEGOTIATION,
78            hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
79            hfp.AgFeature.ENHANCED_CALL_STATUS,
80            hfp.AgFeature.THREE_WAY_CALLING,
81        ],
82        supported_ag_indicators=[
83            hfp.AgIndicatorState.call(),
84            hfp.AgIndicatorState.service(),
85            hfp.AgIndicatorState.callsetup(),
86            hfp.AgIndicatorState.callsetup(),
87            hfp.AgIndicatorState.signal(),
88            hfp.AgIndicatorState.roam(),
89            hfp.AgIndicatorState.battchg(),
90        ],
91        supported_hf_indicators=[
92            hfp.HfIndicator.ENHANCED_SAFETY,
93            hfp.HfIndicator.BATTERY_LEVEL,
94        ],
95        supported_ag_call_hold_operations=[
96            hfp.CallHoldOperation.ADD_HELD_CALL,
97            hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS,
98            hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT,
99            hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS,
100            hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS,
101            hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL,
102            hfp.CallHoldOperation.CONNECT_TWO_CALLS,
103        ],
104        supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
105    )
106
107
108# -----------------------------------------------------------------------------
109def _default_ag_sdp_features() -> hfp.AgSdpFeature:
110    return (
111        hfp.AgSdpFeature.WIDE_BAND
112        | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
113        | hfp.AgSdpFeature.THREE_WAY_CALLING
114    )
115
116
117# -----------------------------------------------------------------------------
118async def make_hfp_connections(
119    hf_config: Optional[hfp.HfConfiguration] = None,
120    ag_config: Optional[hfp.AgConfiguration] = None,
121):
122    if not hf_config:
123        hf_config = _default_hf_configuration()
124    if not ag_config:
125        ag_config = _default_ag_configuration()
126
127    # Setup devices
128    devices = TwoDevices()
129    await devices.setup_connection()
130
131    # Setup RFCOMM channel
132    wait_dlc = asyncio.get_running_loop().create_future()
133    rfcomm_channel = rfcomm.Server(devices.devices[0]).listen(wait_dlc.set_result)
134    assert devices.connections[0]
135    assert devices.connections[1]
136    client_mux = await rfcomm.Client(devices.connections[1]).start()
137
138    client_dlc = await client_mux.open_dlc(rfcomm_channel)
139    server_dlc = await wait_dlc
140
141    # Setup HFP connection
142    hf = hfp.HfProtocol(client_dlc, hf_config)
143    ag = hfp.AgProtocol(server_dlc, ag_config)
144
145    await hf.initiate_slc()
146    return (hf, ag)
147
148
149# -----------------------------------------------------------------------------
150@pytest_asyncio.fixture
151async def hfp_connections():
152    hf, ag = await make_hfp_connections()
153    hf_loop_task = asyncio.create_task(hf.run())
154
155    try:
156        yield (hf, ag)
157    finally:
158        # Close the coroutine.
159        hf.unsolicited_queue.put_nowait(None)
160        await hf_loop_task
161
162
163# -----------------------------------------------------------------------------
164@pytest.mark.asyncio
165async def test_slc_with_minimal_features():
166    hf, ag = await make_hfp_connections(
167        hfp.HfConfiguration(
168            supported_audio_codecs=[],
169            supported_hf_features=[],
170            supported_hf_indicators=[],
171        ),
172        hfp.AgConfiguration(
173            supported_ag_call_hold_operations=[],
174            supported_ag_features=[],
175            supported_ag_indicators=[
176                hfp.AgIndicatorState(
177                    indicator=hfp.AgIndicator.CALL,
178                    supported_values={0, 1},
179                    current_status=0,
180                )
181            ],
182            supported_hf_indicators=[],
183            supported_audio_codecs=[],
184        ),
185    )
186
187    assert hf.supported_ag_features == ag.supported_ag_features
188    assert hf.supported_hf_features == ag.supported_hf_features
189    assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations
190    for a, b in zip(hf.ag_indicators, ag.ag_indicators):
191        assert a.indicator == b.indicator
192        assert a.current_status == b.current_status
193
194
195# -----------------------------------------------------------------------------
196@pytest.mark.asyncio
197async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
198    hf, ag = hfp_connections
199
200    assert hf.supported_ag_features == ag.supported_ag_features
201    assert hf.supported_hf_features == ag.supported_hf_features
202    assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations
203    for a, b in zip(hf.ag_indicators, ag.ag_indicators):
204        assert a.indicator == b.indicator
205        assert a.current_status == b.current_status
206
207
208# -----------------------------------------------------------------------------
209@pytest.mark.asyncio
210async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
211    hf, ag = hfp_connections
212
213    future = asyncio.get_running_loop().create_future()
214    hf.on('ag_indicator', future.set_result)
215
216    ag.update_ag_indicator(hfp.AgIndicator.CALL, 1)
217
218    indicator: hfp.AgIndicatorState = await future
219    assert indicator.current_status == 1
220    assert indicator.indicator == hfp.AgIndicator.CALL
221
222
223# -----------------------------------------------------------------------------
224@pytest.mark.asyncio
225async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
226    hf, ag = hfp_connections
227
228    future = asyncio.get_running_loop().create_future()
229    ag.on('hf_indicator', future.set_result)
230
231    await hf.execute_command('AT+BIEV=2,100')
232
233    indicator: hfp.HfIndicatorState = await future
234    assert indicator.current_status == 100
235
236
237# -----------------------------------------------------------------------------
238@pytest.mark.asyncio
239async def test_codec_negotiation(
240    hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
241):
242    hf, ag = hfp_connections
243
244    futures = [
245        asyncio.get_running_loop().create_future(),
246        asyncio.get_running_loop().create_future(),
247    ]
248    hf.on('codec_negotiation', futures[0].set_result)
249    ag.on('codec_negotiation', futures[1].set_result)
250    await ag.negotiate_codec(hfp.AudioCodec.MSBC)
251
252    assert await futures[0] == await futures[1]
253
254
255# -----------------------------------------------------------------------------
256@pytest.mark.asyncio
257async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
258    hf, ag = hfp_connections
259    NUMBER = 'ATD123456789'
260
261    future = asyncio.get_running_loop().create_future()
262    ag.on('dial', future.set_result)
263    await hf.execute_command(f'ATD{NUMBER}')
264
265    number: str = await future
266    assert number == NUMBER
267
268
269# -----------------------------------------------------------------------------
270@pytest.mark.asyncio
271async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
272    hf, ag = hfp_connections
273
274    future = asyncio.get_running_loop().create_future()
275    ag.on('answer', lambda: future.set_result(None))
276    await hf.answer_incoming_call()
277
278    await future
279
280
281# -----------------------------------------------------------------------------
282@pytest.mark.asyncio
283async def test_reject_incoming_call(
284    hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
285):
286    hf, ag = hfp_connections
287
288    future = asyncio.get_running_loop().create_future()
289    ag.on('hang_up', lambda: future.set_result(None))
290    await hf.reject_incoming_call()
291
292    await future
293
294
295# -----------------------------------------------------------------------------
296@pytest.mark.asyncio
297async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
298    hf, ag = hfp_connections
299
300    future = asyncio.get_running_loop().create_future()
301    ag.on('hang_up', lambda: future.set_result(None))
302    await hf.terminate_call()
303
304    await future
305
306
307# -----------------------------------------------------------------------------
308@pytest.mark.asyncio
309async def test_query_calls_without_calls(
310    hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
311):
312    hf, ag = hfp_connections
313
314    assert await hf.query_current_calls() == []
315
316
317# -----------------------------------------------------------------------------
318@pytest.mark.asyncio
319async def test_query_calls_with_calls(
320    hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
321):
322    hf, ag = hfp_connections
323    ag.calls.append(
324        hfp.CallInfo(
325            index=1,
326            direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL,
327            status=hfp.CallInfoStatus.ACTIVE,
328            mode=hfp.CallInfoMode.VOICE,
329            multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE,
330            number='123456789',
331        )
332    )
333
334    assert await hf.query_current_calls() == ag.calls
335
336
337# -----------------------------------------------------------------------------
338@pytest.mark.asyncio
339@pytest.mark.parametrize(
340    "operation,",
341    (
342        hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS,
343        hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS,
344        hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS,
345        hfp.CallHoldOperation.ADD_HELD_CALL,
346        hfp.CallHoldOperation.CONNECT_TWO_CALLS,
347    ),
348)
349async def test_hold_call_without_call_index(
350    hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol],
351    operation: hfp.CallHoldOperation,
352):
353    hf, ag = hfp_connections
354    call_hold_future = asyncio.get_running_loop().create_future()
355    ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index)))
356
357    await hf.execute_command(f"AT+CHLD={operation.value}")
358
359    assert (await call_hold_future) == (operation, None)
360
361
362# -----------------------------------------------------------------------------
363@pytest.mark.asyncio
364@pytest.mark.parametrize(
365    "operation,",
366    (
367        hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL,
368        hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT,
369    ),
370)
371async def test_hold_call_with_call_index(
372    hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol],
373    operation: hfp.CallHoldOperation,
374):
375    hf, ag = hfp_connections
376    call_hold_future = asyncio.get_running_loop().create_future()
377    ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index)))
378    ag.calls.append(
379        hfp.CallInfo(
380            index=1,
381            direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL,
382            status=hfp.CallInfoStatus.ACTIVE,
383            mode=hfp.CallInfoMode.VOICE,
384            multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE,
385            number='123456789',
386        )
387    )
388
389    await hf.execute_command(f"AT+CHLD={operation.value.replace('x', '1')}")
390
391    assert (await call_hold_future) == (operation, 1)
392
393
394# -----------------------------------------------------------------------------
395@pytest.mark.asyncio
396async def test_ring(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
397    hf, ag = hfp_connections
398    ring_future = asyncio.get_running_loop().create_future()
399    hf.on("ring", lambda: ring_future.set_result(None))
400
401    ag.send_ring()
402
403    await ring_future
404
405
406# -----------------------------------------------------------------------------
407@pytest.mark.asyncio
408async def test_speaker_volume(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
409    hf, ag = hfp_connections
410    speaker_volume_future = asyncio.get_running_loop().create_future()
411    hf.on("speaker_volume", speaker_volume_future.set_result)
412
413    ag.set_speaker_volume(10)
414
415    assert await speaker_volume_future == 10
416
417
418# -----------------------------------------------------------------------------
419@pytest.mark.asyncio
420async def test_microphone_volume(
421    hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
422):
423    hf, ag = hfp_connections
424    microphone_volume_future = asyncio.get_running_loop().create_future()
425    hf.on("microphone_volume", microphone_volume_future.set_result)
426
427    ag.set_microphone_volume(10)
428
429    assert await microphone_volume_future == 10
430
431
432# -----------------------------------------------------------------------------
433@pytest.mark.asyncio
434async def test_cli_notification(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
435    hf, ag = hfp_connections
436    cli_notification_future = asyncio.get_running_loop().create_future()
437    hf.on("cli_notification", cli_notification_future.set_result)
438
439    ag.send_cli_notification(
440        hfp.CallLineIdentification(number="\"123456789\"", type=129, alpha="\"Bumble\"")
441    )
442
443    assert await cli_notification_future == hfp.CallLineIdentification(
444        number="123456789", type=129, alpha="Bumble", subaddr="", satype=None
445    )
446
447
448# -----------------------------------------------------------------------------
449@pytest.mark.asyncio
450async def test_voice_recognition_from_hf(
451    hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
452):
453    hf, ag = hfp_connections
454    voice_recognition_future = asyncio.get_running_loop().create_future()
455    ag.on("voice_recognition", voice_recognition_future.set_result)
456
457    await hf.execute_command("AT+BVRA=1")
458
459    assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE
460
461
462# -----------------------------------------------------------------------------
463@pytest.mark.asyncio
464async def test_voice_recognition_from_ag(
465    hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
466):
467    hf, ag = hfp_connections
468    voice_recognition_future = asyncio.get_running_loop().create_future()
469    hf.on("voice_recognition", voice_recognition_future.set_result)
470
471    ag.send_response("+BVRA: 1")
472
473    assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE
474
475
476# -----------------------------------------------------------------------------
477@pytest.mark.asyncio
478async def test_hf_sdp_record():
479    devices = TwoDevices()
480    await devices.setup_connection()
481
482    devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records(
483        1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8
484    )
485
486    assert await hfp.find_hf_sdp_record(devices.connections[1]) == (
487        2,
488        hfp.ProfileVersion.V1_8,
489        _default_hf_sdp_features(),
490    )
491
492
493# -----------------------------------------------------------------------------
494@pytest.mark.asyncio
495async def test_ag_sdp_record():
496    devices = TwoDevices()
497    await devices.setup_connection()
498
499    devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records(
500        1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8
501    )
502
503    assert await hfp.find_ag_sdp_record(devices.connections[1]) == (
504        2,
505        hfp.ProfileVersion.V1_8,
506        _default_ag_sdp_features(),
507    )
508
509
510# -----------------------------------------------------------------------------
511@pytest.mark.asyncio
512async def test_sco_setup():
513    devices = TwoDevices()
514
515    # Enable Classic connections
516    devices[0].classic_enabled = True
517    devices[1].classic_enabled = True
518
519    # Start
520    await devices[0].power_on()
521    await devices[1].power_on()
522
523    connections = await asyncio.gather(
524        devices[0].connect(
525            devices[1].public_address, transport=core.BT_BR_EDR_TRANSPORT
526        ),
527        devices[1].accept(devices[0].public_address),
528    )
529
530    def on_sco_request(_connection, _link_type: int):
531        connections[1].abort_on(
532            'disconnection',
533            devices[1].send_command(
534                hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(
535                    bd_addr=connections[1].peer_address,
536                    **hfp.ESCO_PARAMETERS[
537                        hfp.DefaultCodecParameters.ESCO_CVSD_S1
538                    ].asdict(),
539                )
540            ),
541        )
542
543    devices[1].on('sco_request', on_sco_request)
544
545    sco_connection_futures = [
546        asyncio.get_running_loop().create_future(),
547        asyncio.get_running_loop().create_future(),
548    ]
549
550    for device, future in zip(devices, sco_connection_futures):
551        device.on('sco_connection', future.set_result)
552
553    await devices[0].send_command(
554        hci.HCI_Enhanced_Setup_Synchronous_Connection_Command(
555            connection_handle=connections[0].handle,
556            **hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_CVSD_S1].asdict(),
557        )
558    )
559    sco_connections = await asyncio.gather(*sco_connection_futures)
560
561    sco_disconnection_futures = [
562        asyncio.get_running_loop().create_future(),
563        asyncio.get_running_loop().create_future(),
564    ]
565    for future, sco_connection in zip(sco_disconnection_futures, sco_connections):
566        sco_connection.on('disconnection', future.set_result)
567
568    await sco_connections[0].disconnect()
569    await asyncio.gather(*sco_disconnection_futures)
570
571
572# -----------------------------------------------------------------------------
573async def run():
574    await test_slc()
575
576
577# -----------------------------------------------------------------------------
578if __name__ == '__main__':
579    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
580    asyncio.run(run())
581