# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import pytest import functools import pytest_asyncio import logging import sys from bumble import att, device from bumble.profiles import hap from .test_utils import TwoDevices from bumble.keys import PairingKeys # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) foo_preset = hap.PresetRecord(1, "foo preset") bar_preset = hap.PresetRecord(50, "bar preset") foobar_preset = hap.PresetRecord(5, "foobar preset") unavailable_preset = hap.PresetRecord( 78, "foobar preset", hap.PresetRecord.Property( hap.PresetRecord.Property.Writable.CANNOT_BE_WRITTEN, hap.PresetRecord.Property.IsAvailable.IS_UNAVAILABLE, ), ) server_features = hap.HearingAidFeatures( hap.HearingAidType.MONAURAL_HEARING_AID, hap.PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED, hap.IndependentPresets.IDENTICAL_PRESET_RECORD, hap.DynamicPresets.PRESET_RECORDS_DOES_NOT_CHANGE, hap.WritablePresetsSupport.WRITABLE_PRESET_RECORDS_SUPPORTED, ) TIMEOUT = 0.1 async def assert_queue_is_empty(queue: asyncio.Queue): assert queue.empty() # Check that nothing is being added during TIMEOUT secondes if sys.version_info >= (3, 11): with pytest.raises(TimeoutError): await asyncio.wait_for(queue.get(), TIMEOUT) else: with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(queue.get(), TIMEOUT) # ----------------------------------------------------------------------------- @pytest_asyncio.fixture async def hap_client(): devices = TwoDevices() devices[0].add_service( hap.HearingAccessService( devices[0], server_features, [foo_preset, bar_preset, foobar_preset, unavailable_preset], ) ) await devices.setup_connection() # TODO negotiate MTU > 49 to not truncate preset names # Mock encryption. devices.connections[0].encryption = 1 # type: ignore devices.connections[1].encryption = 1 # type: ignore devices[0].on_pairing( devices.connections[0], devices.connections[0].peer_address, PairingKeys(), True ) peer = device.Peer(devices.connections[1]) # type: ignore hap_client = await peer.discover_service_and_create_proxy( hap.HearingAccessServiceProxy ) assert hap_client await hap_client.setup_subscription() yield hap_client # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_init_service(hap_client: hap.HearingAccessServiceProxy): assert ( hap.HearingAidFeatures_from_bytes(await hap_client.server_features.read_value()) == server_features ) assert (await hap_client.active_preset_index.read_value()) == (foo_preset.index) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_read_all_presets(hap_client: hap.HearingAccessServiceProxy): await hap_client.hearing_aid_preset_control_point.write_value( bytes([hap.HearingAidPresetControlPointOpcode.READ_PRESETS_REQUEST, 1, 0xFF]) ) assert (await hap_client.preset_control_point_indications.get()) == bytes( [hap.HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE, 0] ) + bytes(foo_preset) assert (await hap_client.preset_control_point_indications.get()) == bytes( [hap.HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE, 0] ) + bytes(foobar_preset) assert (await hap_client.preset_control_point_indications.get()) == bytes( [hap.HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE, 0] ) + bytes(bar_preset) assert (await hap_client.preset_control_point_indications.get()) == bytes( [hap.HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE, 1] ) + bytes(unavailable_preset) await assert_queue_is_empty(hap_client.preset_control_point_indications) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_read_partial_presets(hap_client: hap.HearingAccessServiceProxy): await hap_client.hearing_aid_preset_control_point.write_value( bytes([hap.HearingAidPresetControlPointOpcode.READ_PRESETS_REQUEST, 3, 2]) ) assert (await hap_client.preset_control_point_indications.get())[2:] == bytes( foobar_preset ) assert (await hap_client.preset_control_point_indications.get())[2:] == bytes( bar_preset ) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_set_active_preset_valid(hap_client: hap.HearingAccessServiceProxy): await hap_client.hearing_aid_preset_control_point.write_value( bytes( [hap.HearingAidPresetControlPointOpcode.SET_ACTIVE_PRESET, bar_preset.index] ) ) assert (await hap_client.active_preset_index_notification.get()) == bar_preset.index assert (await hap_client.active_preset_index.read_value()) == (bar_preset.index) await assert_queue_is_empty(hap_client.active_preset_index_notification) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_set_active_preset_invalid(hap_client: hap.HearingAccessServiceProxy): with pytest.raises(att.ATT_Error) as e: await hap_client.hearing_aid_preset_control_point.write_value( bytes( [ hap.HearingAidPresetControlPointOpcode.SET_ACTIVE_PRESET, unavailable_preset.index, ] ), with_response=True, ) assert e.value.error_code == hap.ErrorCode.PRESET_OPERATION_NOT_POSSIBLE # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_set_next_preset(hap_client: hap.HearingAccessServiceProxy): await hap_client.hearing_aid_preset_control_point.write_value( bytes([hap.HearingAidPresetControlPointOpcode.SET_NEXT_PRESET]) ) assert ( await hap_client.active_preset_index_notification.get() ) == foobar_preset.index assert (await hap_client.active_preset_index.read_value()) == (foobar_preset.index) await assert_queue_is_empty(hap_client.active_preset_index_notification) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_set_next_preset_will_loop_to_first( hap_client: hap.HearingAccessServiceProxy, ): async def go_next(new_preset: hap.PresetRecord): await hap_client.hearing_aid_preset_control_point.write_value( bytes([hap.HearingAidPresetControlPointOpcode.SET_NEXT_PRESET]) ) assert ( await hap_client.active_preset_index_notification.get() ) == new_preset.index assert (await hap_client.active_preset_index.read_value()) == (new_preset.index) await go_next(foobar_preset) await go_next(bar_preset) await go_next(foo_preset) # Note that there is a invalid preset in the preset record of the server await assert_queue_is_empty(hap_client.active_preset_index_notification) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_set_previous_preset_will_loop_to_last( hap_client: hap.HearingAccessServiceProxy, ): await hap_client.hearing_aid_preset_control_point.write_value( bytes([hap.HearingAidPresetControlPointOpcode.SET_PREVIOUS_PRESET]) ) assert (await hap_client.active_preset_index_notification.get()) == bar_preset.index assert (await hap_client.active_preset_index.read_value()) == (bar_preset.index) await assert_queue_is_empty(hap_client.active_preset_index_notification)