1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import os 21import pytest 22import pytest_asyncio 23 24from typing import Tuple, Optional 25 26from .test_utils import TwoDevices 27from bumble import core 28from bumble import hfp 29from bumble import rfcomm 30from bumble import hci 31 32 33# ----------------------------------------------------------------------------- 34# Logging 35# ----------------------------------------------------------------------------- 36logger = logging.getLogger(__name__) 37 38 39# ----------------------------------------------------------------------------- 40def _default_hf_configuration() -> hfp.HfConfiguration: 41 return hfp.HfConfiguration( 42 supported_hf_features=[ 43 hfp.HfFeature.CODEC_NEGOTIATION, 44 hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED, 45 hfp.HfFeature.HF_INDICATORS, 46 hfp.HfFeature.ENHANCED_CALL_STATUS, 47 hfp.HfFeature.THREE_WAY_CALLING, 48 hfp.HfFeature.CLI_PRESENTATION_CAPABILITY, 49 ], 50 supported_hf_indicators=[ 51 hfp.HfIndicator.ENHANCED_SAFETY, 52 hfp.HfIndicator.BATTERY_LEVEL, 53 ], 54 supported_audio_codecs=[ 55 hfp.AudioCodec.CVSD, 56 hfp.AudioCodec.MSBC, 57 ], 58 ) 59 60 61# ----------------------------------------------------------------------------- 62def _default_hf_sdp_features() -> hfp.HfSdpFeature: 63 return ( 64 hfp.HfSdpFeature.WIDE_BAND 65 | hfp.HfSdpFeature.THREE_WAY_CALLING 66 | hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY 67 ) 68 69 70# ----------------------------------------------------------------------------- 71def _default_ag_configuration() -> hfp.AgConfiguration: 72 return hfp.AgConfiguration( 73 supported_ag_features=[ 74 hfp.AgFeature.HF_INDICATORS, 75 hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY, 76 hfp.AgFeature.REJECT_CALL, 77 hfp.AgFeature.CODEC_NEGOTIATION, 78 hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, 79 hfp.AgFeature.ENHANCED_CALL_STATUS, 80 hfp.AgFeature.THREE_WAY_CALLING, 81 ], 82 supported_ag_indicators=[ 83 hfp.AgIndicatorState.call(), 84 hfp.AgIndicatorState.service(), 85 hfp.AgIndicatorState.callsetup(), 86 hfp.AgIndicatorState.callsetup(), 87 hfp.AgIndicatorState.signal(), 88 hfp.AgIndicatorState.roam(), 89 hfp.AgIndicatorState.battchg(), 90 ], 91 supported_hf_indicators=[ 92 hfp.HfIndicator.ENHANCED_SAFETY, 93 hfp.HfIndicator.BATTERY_LEVEL, 94 ], 95 supported_ag_call_hold_operations=[ 96 hfp.CallHoldOperation.ADD_HELD_CALL, 97 hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS, 98 hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT, 99 hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS, 100 hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS, 101 hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL, 102 hfp.CallHoldOperation.CONNECT_TWO_CALLS, 103 ], 104 supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC], 105 ) 106 107 108# ----------------------------------------------------------------------------- 109def _default_ag_sdp_features() -> hfp.AgSdpFeature: 110 return ( 111 hfp.AgSdpFeature.WIDE_BAND 112 | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY 113 | hfp.AgSdpFeature.THREE_WAY_CALLING 114 ) 115 116 117# ----------------------------------------------------------------------------- 118async def make_hfp_connections( 119 hf_config: Optional[hfp.HfConfiguration] = None, 120 ag_config: Optional[hfp.AgConfiguration] = None, 121): 122 if not hf_config: 123 hf_config = _default_hf_configuration() 124 if not ag_config: 125 ag_config = _default_ag_configuration() 126 127 # Setup devices 128 devices = TwoDevices() 129 await devices.setup_connection() 130 131 # Setup RFCOMM channel 132 wait_dlc = asyncio.get_running_loop().create_future() 133 rfcomm_channel = rfcomm.Server(devices.devices[0]).listen(wait_dlc.set_result) 134 assert devices.connections[0] 135 assert devices.connections[1] 136 client_mux = await rfcomm.Client(devices.connections[1]).start() 137 138 client_dlc = await client_mux.open_dlc(rfcomm_channel) 139 server_dlc = await wait_dlc 140 141 # Setup HFP connection 142 hf = hfp.HfProtocol(client_dlc, hf_config) 143 ag = hfp.AgProtocol(server_dlc, ag_config) 144 145 await hf.initiate_slc() 146 return (hf, ag) 147 148 149# ----------------------------------------------------------------------------- 150@pytest_asyncio.fixture 151async def hfp_connections(): 152 hf, ag = await make_hfp_connections() 153 hf_loop_task = asyncio.create_task(hf.run()) 154 155 try: 156 yield (hf, ag) 157 finally: 158 # Close the coroutine. 159 hf.unsolicited_queue.put_nowait(None) 160 await hf_loop_task 161 162 163# ----------------------------------------------------------------------------- 164@pytest.mark.asyncio 165async def test_slc_with_minimal_features(): 166 hf, ag = await make_hfp_connections( 167 hfp.HfConfiguration( 168 supported_audio_codecs=[], 169 supported_hf_features=[], 170 supported_hf_indicators=[], 171 ), 172 hfp.AgConfiguration( 173 supported_ag_call_hold_operations=[], 174 supported_ag_features=[], 175 supported_ag_indicators=[ 176 hfp.AgIndicatorState( 177 indicator=hfp.AgIndicator.CALL, 178 supported_values={0, 1}, 179 current_status=0, 180 ) 181 ], 182 supported_hf_indicators=[], 183 supported_audio_codecs=[], 184 ), 185 ) 186 187 assert hf.supported_ag_features == ag.supported_ag_features 188 assert hf.supported_hf_features == ag.supported_hf_features 189 assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations 190 for a, b in zip(hf.ag_indicators, ag.ag_indicators): 191 assert a.indicator == b.indicator 192 assert a.current_status == b.current_status 193 194 195# ----------------------------------------------------------------------------- 196@pytest.mark.asyncio 197async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): 198 hf, ag = hfp_connections 199 200 assert hf.supported_ag_features == ag.supported_ag_features 201 assert hf.supported_hf_features == ag.supported_hf_features 202 assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations 203 for a, b in zip(hf.ag_indicators, ag.ag_indicators): 204 assert a.indicator == b.indicator 205 assert a.current_status == b.current_status 206 207 208# ----------------------------------------------------------------------------- 209@pytest.mark.asyncio 210async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): 211 hf, ag = hfp_connections 212 213 future = asyncio.get_running_loop().create_future() 214 hf.on('ag_indicator', future.set_result) 215 216 ag.update_ag_indicator(hfp.AgIndicator.CALL, 1) 217 218 indicator: hfp.AgIndicatorState = await future 219 assert indicator.current_status == 1 220 assert indicator.indicator == hfp.AgIndicator.CALL 221 222 223# ----------------------------------------------------------------------------- 224@pytest.mark.asyncio 225async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): 226 hf, ag = hfp_connections 227 228 future = asyncio.get_running_loop().create_future() 229 ag.on('hf_indicator', future.set_result) 230 231 await hf.execute_command('AT+BIEV=2,100') 232 233 indicator: hfp.HfIndicatorState = await future 234 assert indicator.current_status == 100 235 236 237# ----------------------------------------------------------------------------- 238@pytest.mark.asyncio 239async def test_codec_negotiation( 240 hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] 241): 242 hf, ag = hfp_connections 243 244 futures = [ 245 asyncio.get_running_loop().create_future(), 246 asyncio.get_running_loop().create_future(), 247 ] 248 hf.on('codec_negotiation', futures[0].set_result) 249 ag.on('codec_negotiation', futures[1].set_result) 250 await ag.negotiate_codec(hfp.AudioCodec.MSBC) 251 252 assert await futures[0] == await futures[1] 253 254 255# ----------------------------------------------------------------------------- 256@pytest.mark.asyncio 257async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): 258 hf, ag = hfp_connections 259 NUMBER = 'ATD123456789' 260 261 future = asyncio.get_running_loop().create_future() 262 ag.on('dial', future.set_result) 263 await hf.execute_command(f'ATD{NUMBER}') 264 265 number: str = await future 266 assert number == NUMBER 267 268 269# ----------------------------------------------------------------------------- 270@pytest.mark.asyncio 271async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): 272 hf, ag = hfp_connections 273 274 future = asyncio.get_running_loop().create_future() 275 ag.on('answer', lambda: future.set_result(None)) 276 await hf.answer_incoming_call() 277 278 await future 279 280 281# ----------------------------------------------------------------------------- 282@pytest.mark.asyncio 283async def test_reject_incoming_call( 284 hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] 285): 286 hf, ag = hfp_connections 287 288 future = asyncio.get_running_loop().create_future() 289 ag.on('hang_up', lambda: future.set_result(None)) 290 await hf.reject_incoming_call() 291 292 await future 293 294 295# ----------------------------------------------------------------------------- 296@pytest.mark.asyncio 297async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): 298 hf, ag = hfp_connections 299 300 future = asyncio.get_running_loop().create_future() 301 ag.on('hang_up', lambda: future.set_result(None)) 302 await hf.terminate_call() 303 304 await future 305 306 307# ----------------------------------------------------------------------------- 308@pytest.mark.asyncio 309async def test_query_calls_without_calls( 310 hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] 311): 312 hf, ag = hfp_connections 313 314 assert await hf.query_current_calls() == [] 315 316 317# ----------------------------------------------------------------------------- 318@pytest.mark.asyncio 319async def test_query_calls_with_calls( 320 hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] 321): 322 hf, ag = hfp_connections 323 ag.calls.append( 324 hfp.CallInfo( 325 index=1, 326 direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL, 327 status=hfp.CallInfoStatus.ACTIVE, 328 mode=hfp.CallInfoMode.VOICE, 329 multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, 330 number='123456789', 331 ) 332 ) 333 334 assert await hf.query_current_calls() == ag.calls 335 336 337# ----------------------------------------------------------------------------- 338@pytest.mark.asyncio 339@pytest.mark.parametrize( 340 "operation,", 341 ( 342 hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS, 343 hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS, 344 hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS, 345 hfp.CallHoldOperation.ADD_HELD_CALL, 346 hfp.CallHoldOperation.CONNECT_TWO_CALLS, 347 ), 348) 349async def test_hold_call_without_call_index( 350 hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol], 351 operation: hfp.CallHoldOperation, 352): 353 hf, ag = hfp_connections 354 call_hold_future = asyncio.get_running_loop().create_future() 355 ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index))) 356 357 await hf.execute_command(f"AT+CHLD={operation.value}") 358 359 assert (await call_hold_future) == (operation, None) 360 361 362# ----------------------------------------------------------------------------- 363@pytest.mark.asyncio 364@pytest.mark.parametrize( 365 "operation,", 366 ( 367 hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL, 368 hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT, 369 ), 370) 371async def test_hold_call_with_call_index( 372 hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol], 373 operation: hfp.CallHoldOperation, 374): 375 hf, ag = hfp_connections 376 call_hold_future = asyncio.get_running_loop().create_future() 377 ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index))) 378 ag.calls.append( 379 hfp.CallInfo( 380 index=1, 381 direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL, 382 status=hfp.CallInfoStatus.ACTIVE, 383 mode=hfp.CallInfoMode.VOICE, 384 multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, 385 number='123456789', 386 ) 387 ) 388 389 await hf.execute_command(f"AT+CHLD={operation.value.replace('x', '1')}") 390 391 assert (await call_hold_future) == (operation, 1) 392 393 394# ----------------------------------------------------------------------------- 395@pytest.mark.asyncio 396async def test_ring(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): 397 hf, ag = hfp_connections 398 ring_future = asyncio.get_running_loop().create_future() 399 hf.on("ring", lambda: ring_future.set_result(None)) 400 401 ag.send_ring() 402 403 await ring_future 404 405 406# ----------------------------------------------------------------------------- 407@pytest.mark.asyncio 408async def test_speaker_volume(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): 409 hf, ag = hfp_connections 410 speaker_volume_future = asyncio.get_running_loop().create_future() 411 hf.on("speaker_volume", speaker_volume_future.set_result) 412 413 ag.set_speaker_volume(10) 414 415 assert await speaker_volume_future == 10 416 417 418# ----------------------------------------------------------------------------- 419@pytest.mark.asyncio 420async def test_microphone_volume( 421 hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] 422): 423 hf, ag = hfp_connections 424 microphone_volume_future = asyncio.get_running_loop().create_future() 425 hf.on("microphone_volume", microphone_volume_future.set_result) 426 427 ag.set_microphone_volume(10) 428 429 assert await microphone_volume_future == 10 430 431 432# ----------------------------------------------------------------------------- 433@pytest.mark.asyncio 434async def test_cli_notification(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): 435 hf, ag = hfp_connections 436 cli_notification_future = asyncio.get_running_loop().create_future() 437 hf.on("cli_notification", cli_notification_future.set_result) 438 439 ag.send_cli_notification( 440 hfp.CallLineIdentification(number="\"123456789\"", type=129, alpha="\"Bumble\"") 441 ) 442 443 assert await cli_notification_future == hfp.CallLineIdentification( 444 number="123456789", type=129, alpha="Bumble", subaddr="", satype=None 445 ) 446 447 448# ----------------------------------------------------------------------------- 449@pytest.mark.asyncio 450async def test_voice_recognition_from_hf( 451 hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] 452): 453 hf, ag = hfp_connections 454 voice_recognition_future = asyncio.get_running_loop().create_future() 455 ag.on("voice_recognition", voice_recognition_future.set_result) 456 457 await hf.execute_command("AT+BVRA=1") 458 459 assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE 460 461 462# ----------------------------------------------------------------------------- 463@pytest.mark.asyncio 464async def test_voice_recognition_from_ag( 465 hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] 466): 467 hf, ag = hfp_connections 468 voice_recognition_future = asyncio.get_running_loop().create_future() 469 hf.on("voice_recognition", voice_recognition_future.set_result) 470 471 ag.send_response("+BVRA: 1") 472 473 assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE 474 475 476# ----------------------------------------------------------------------------- 477@pytest.mark.asyncio 478async def test_hf_sdp_record(): 479 devices = TwoDevices() 480 await devices.setup_connection() 481 482 devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records( 483 1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8 484 ) 485 486 assert await hfp.find_hf_sdp_record(devices.connections[1]) == ( 487 2, 488 hfp.ProfileVersion.V1_8, 489 _default_hf_sdp_features(), 490 ) 491 492 493# ----------------------------------------------------------------------------- 494@pytest.mark.asyncio 495async def test_ag_sdp_record(): 496 devices = TwoDevices() 497 await devices.setup_connection() 498 499 devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records( 500 1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8 501 ) 502 503 assert await hfp.find_ag_sdp_record(devices.connections[1]) == ( 504 2, 505 hfp.ProfileVersion.V1_8, 506 _default_ag_sdp_features(), 507 ) 508 509 510# ----------------------------------------------------------------------------- 511@pytest.mark.asyncio 512async def test_sco_setup(): 513 devices = TwoDevices() 514 515 # Enable Classic connections 516 devices[0].classic_enabled = True 517 devices[1].classic_enabled = True 518 519 # Start 520 await devices[0].power_on() 521 await devices[1].power_on() 522 523 connections = await asyncio.gather( 524 devices[0].connect( 525 devices[1].public_address, transport=core.BT_BR_EDR_TRANSPORT 526 ), 527 devices[1].accept(devices[0].public_address), 528 ) 529 530 def on_sco_request(_connection, _link_type: int): 531 connections[1].abort_on( 532 'disconnection', 533 devices[1].send_command( 534 hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command( 535 bd_addr=connections[1].peer_address, 536 **hfp.ESCO_PARAMETERS[ 537 hfp.DefaultCodecParameters.ESCO_CVSD_S1 538 ].asdict(), 539 ) 540 ), 541 ) 542 543 devices[1].on('sco_request', on_sco_request) 544 545 sco_connection_futures = [ 546 asyncio.get_running_loop().create_future(), 547 asyncio.get_running_loop().create_future(), 548 ] 549 550 for device, future in zip(devices, sco_connection_futures): 551 device.on('sco_connection', future.set_result) 552 553 await devices[0].send_command( 554 hci.HCI_Enhanced_Setup_Synchronous_Connection_Command( 555 connection_handle=connections[0].handle, 556 **hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_CVSD_S1].asdict(), 557 ) 558 ) 559 sco_connections = await asyncio.gather(*sco_connection_futures) 560 561 sco_disconnection_futures = [ 562 asyncio.get_running_loop().create_future(), 563 asyncio.get_running_loop().create_future(), 564 ] 565 for future, sco_connection in zip(sco_disconnection_futures, sco_connections): 566 sco_connection.on('disconnection', future.set_result) 567 568 await sco_connections[0].disconnect() 569 await asyncio.gather(*sco_disconnection_futures) 570 571 572# ----------------------------------------------------------------------------- 573async def run(): 574 await test_slc() 575 576 577# ----------------------------------------------------------------------------- 578if __name__ == '__main__': 579 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 580 asyncio.run(run()) 581