1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the 'License'); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an 'AS IS' BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""HFP grpc interface.""" 15 16import math 17 18from floss.pandora.floss import utils 19from floss.pandora.server import bluetooth as bluetooth_module 20from google.protobuf import empty_pb2 21import grpc 22from pandora_experimental import hfp_grpc_aio 23from pandora_experimental import hfp_pb2 24 25 26class HFPService(hfp_grpc_aio.HFPServicer): 27 """Service to trigger Bluetooth HFP procedures. 28 29 This class implements the Pandora bluetooth test interfaces, 30 where the meta class definition is automatically generated by the protobuf. 31 The interface definition can be found in: 32 https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/hfp.proto 33 """ 34 35 def __init__(self, bluetooth: bluetooth_module.Bluetooth): 36 self.bluetooth = bluetooth 37 38 def enable_phone_for_testing(self): 39 self.bluetooth.set_phone_ops_enabled(True) 40 self.bluetooth.set_mps_qualification_enabled(True) 41 42 async def EnableSlc(self, request: hfp_pb2.EnableSlcRequest, context: grpc.ServicerContext) -> empty_pb2.Empty: 43 self.enable_phone_for_testing() 44 address = utils.connection_from(request.connection).address 45 self.bluetooth.connect_device(address) 46 return empty_pb2.Empty() 47 48 async def EnableSlcAsHandsfree(self, request: hfp_pb2.EnableSlcAsHandsfreeRequest, 49 context: grpc.ServicerContext) -> empty_pb2.Empty: 50 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 51 context.set_details("Method not implemented!") # type: ignore 52 raise NotImplementedError("Method not implemented!") 53 54 async def DisableSlc(self, request: hfp_pb2.DisableSlcRequest, context: grpc.ServicerContext) -> empty_pb2.Empty: 55 address = utils.connection_from(request.connection).address 56 self.bluetooth.disconnect_device(address) 57 return empty_pb2.Empty() 58 59 async def DisableSlcAsHandsfree(self, request: hfp_pb2.DisableSlcAsHandsfreeRequest, 60 context: grpc.ServicerContext) -> empty_pb2.Empty: 61 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 62 context.set_details("Method not implemented!") # type: ignore 63 raise NotImplementedError("Method not implemented!") 64 65 async def DeclineCall(self, request: hfp_pb2.DeclineCallRequest, 66 context: grpc.ServicerContext) -> hfp_pb2.DeclineCallResponse: 67 self.enable_phone_for_testing() 68 self.bluetooth.hangup_call() 69 return hfp_pb2.DeclineCallResponse() 70 71 async def DeclineCallAsHandsfree(self, request: hfp_pb2.DeclineCallAsHandsfreeRequest, 72 context: grpc.ServicerContext) -> hfp_pb2.DeclineCallAsHandsfreeResponse: 73 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 74 context.set_details("Method not implemented!") # type: ignore 75 raise NotImplementedError("Method not implemented!") 76 77 async def SetBatteryLevel(self, request: hfp_pb2.SetBatteryLevelRequest, 78 context: grpc.ServicerContext) -> (empty_pb2.Empty): 79 self.enable_phone_for_testing() 80 if request.battery_percentage > 100 or request.battery_percentage < 0: 81 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Wrong battery percentage.') 82 self.bluetooth.set_battery_level(math.floor((request.battery_percentage / 100) * 5)) 83 return empty_pb2.Empty() 84 85 async def ConnectToAudioAsHandsfree(self, request: hfp_pb2.ConnectToAudioAsHandsfreeRequest, 86 context: grpc.ServicerContext) -> hfp_pb2.ConnectToAudioAsHandsfreeResponse: 87 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 88 context.set_details("Method not implemented!") # type: ignore 89 raise NotImplementedError("Method not implemented!") 90 91 async def DisconnectFromAudioAsHandsfree( 92 self, request: hfp_pb2.DisconnectFromAudioAsHandsfreeRequest, 93 context: grpc.ServicerContext) -> hfp_pb2.DisconnectFromAudioAsHandsfreeResponse: 94 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 95 context.set_details("Method not implemented!") # type: ignore 96 raise NotImplementedError("Method not implemented!") 97 98 async def SetVoiceRecognition(self, request: hfp_pb2.SetVoiceRecognitionRequest, 99 context: grpc.ServicerContext) -> hfp_pb2.SetVoiceRecognitionResponse: 100 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 101 context.set_details("Method not implemented!") # type: ignore 102 raise NotImplementedError("Method not implemented!") 103 104 async def SetVoiceRecognitionAsHandsfree( 105 self, request: hfp_pb2.SetVoiceRecognitionAsHandsfreeRequest, 106 context: grpc.ServicerContext) -> hfp_pb2.SetVoiceRecognitionAsHandsfreeResponse: 107 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 108 context.set_details("Method not implemented!") # type: ignore 109 raise NotImplementedError("Method not implemented!") 110 111 async def MakeCall(self, request: hfp_pb2.MakeCallRequest, 112 context: grpc.ServicerContext) -> hfp_pb2.MakeCallResponse: 113 self.enable_phone_for_testing() 114 number = request.number 115 if number is None or len(number) == 0: 116 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Cannot call empty number.') 117 call_result = self.bluetooth.dial_call(number) 118 if call_result is None or not call_result: 119 await context.abort(grpc.StatusCode.INTERNAL, 'Failed to make a call.') 120 return hfp_pb2.MakeCallResponse() 121 122 async def MakeCallAsHandsfree(self, request: hfp_pb2.MakeCallAsHandsfreeRequest, 123 context: grpc.ServicerContext) -> hfp_pb2.MakeCallAsHandsfreeResponse: 124 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 125 context.set_details('Method not implemented!') # type: ignore 126 raise NotImplementedError('Method not implemented!') 127 128 async def AnswerCall(self, request: hfp_pb2.AnswerCallRequest, 129 context: grpc.ServicerContext) -> hfp_pb2.AnswerCallResponse: 130 self.enable_phone_for_testing() 131 answer_result = self.bluetooth.answer_call() 132 if answer_result is None or not answer_result: 133 await context.abort(grpc.StatusCode.INTERNAL, 'Failed to answer call.') 134 return hfp_pb2.AnswerCallResponse() 135 136 async def AnswerCallAsHandsfree(self, request: hfp_pb2.AnswerCallAsHandsfreeRequest, 137 context: grpc.ServicerContext) -> hfp_pb2.AnswerCallAsHandsfreeResponse: 138 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 139 context.set_details('Method not implemented!') # type: ignore 140 raise NotImplementedError('Method not implemented!') 141 142 async def SetAudioPath(self, request: hfp_pb2.SetAudioPathRequest, 143 context: grpc.ServicerContext) -> hfp_pb2.SetAudioPathResponse: 144 self.enable_phone_for_testing() 145 connected_devices = self.bluetooth.get_connected_audio_devices() 146 if len(connected_devices) == 0: 147 await context.abort(grpc.StatusCode.INTERNAL, 'No connected devices.') 148 if request.audio_path == hfp_pb2.AUDIO_PATH_SPEAKERS: 149 self.bluetooth.audio_disconnect(connected_devices[0]) 150 elif request.audio_path == hfp_pb2.AUDIO_PATH_HANDSFREE: 151 self.bluetooth.audio_connect(connected_devices[0]) 152 return hfp_pb2.SetAudioPathResponse() 153 154 async def SwapActiveCall(self, request: hfp_pb2.SwapActiveCallRequest, 155 context: grpc.ServicerContext) -> hfp_pb2.SwapActiveCallResponse: 156 self.enable_phone_for_testing() 157 swap_result = self.bluetooth.swap_active_call() 158 if swap_result is None or not swap_result: 159 await context.abort(grpc.StatusCode.INTERNAL, 'Failed to swap active call.') 160 return hfp_pb2.SwapActiveCallResponse() 161 162 async def SetInBandRingtone(self, request: hfp_pb2.SetInBandRingtoneRequest, 163 context: grpc.ServicerContext) -> hfp_pb2.SetInBandRingtoneResponse: 164 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 165 context.set_details('Method not implemented!') # type: ignore 166 raise NotImplementedError('Method not implemented!') 167 168 async def ClearCallHistory(self, request: hfp_pb2.ClearCallHistoryRequest, 169 context: grpc.ServicerContext) -> hfp_pb2.ClearCallHistoryResponse: 170 self.enable_phone_for_testing() 171 if not all((self.bluetooth.set_memory_call(), self.bluetooth.set_last_call())): 172 await context.abort(grpc.StatusCode.INTERNAL, 'Failed to clear hall history.') 173 return hfp_pb2.ClearCallHistoryResponse() 174 175 async def EndCallAsHandsfree(self, request: hfp_pb2.EndCallAsHandsfreeRequest, 176 context: grpc.ServicerContext) -> hfp_pb2.EndCallAsHandsfreeResponse: 177 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 178 context.set_details('Method not implemented!') # type: ignore 179 raise NotImplementedError('Method not implemented!') 180 181 async def CallTransferAsHandsfree(self, request: hfp_pb2.CallTransferAsHandsfreeRequest, 182 context: grpc.ServicerContext) -> hfp_pb2.CallTransferAsHandsfreeResponse: 183 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 184 context.set_details('Method not implemented!') # type: ignore 185 raise NotImplementedError('Method not implemented!') 186 187 async def SendDtmfFromHandsfree(self, request: hfp_pb2.SendDtmfFromHandsfreeRequest, 188 context: grpc.ServicerContext) -> hfp_pb2.SendDtmfFromHandsfreeResponse: 189 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 190 context.set_details('Method not implemented!') # type: ignore 191 raise NotImplementedError('Method not implemented!') 192