1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the 'License');
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an 'AS IS' BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""HFP grpc interface."""
15
16import math
17
18from floss.pandora.floss import utils
19from floss.pandora.server import bluetooth as bluetooth_module
20from google.protobuf import empty_pb2
21import grpc
22from pandora_experimental import hfp_grpc_aio
23from pandora_experimental import hfp_pb2
24
25
26class HFPService(hfp_grpc_aio.HFPServicer):
27    """Service to trigger Bluetooth HFP procedures.
28
29    This class implements the Pandora bluetooth test interfaces,
30    where the meta class definition is automatically generated by the protobuf.
31    The interface definition can be found in:
32    https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/hfp.proto
33    """
34
35    def __init__(self, bluetooth: bluetooth_module.Bluetooth):
36        self.bluetooth = bluetooth
37
38    def enable_phone_for_testing(self):
39        self.bluetooth.set_phone_ops_enabled(True)
40        self.bluetooth.set_mps_qualification_enabled(True)
41
42    async def EnableSlc(self, request: hfp_pb2.EnableSlcRequest, context: grpc.ServicerContext) -> empty_pb2.Empty:
43        self.enable_phone_for_testing()
44        address = utils.connection_from(request.connection).address
45        self.bluetooth.connect_device(address)
46        return empty_pb2.Empty()
47
48    async def EnableSlcAsHandsfree(self, request: hfp_pb2.EnableSlcAsHandsfreeRequest,
49                                   context: grpc.ServicerContext) -> empty_pb2.Empty:
50        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
51        context.set_details("Method not implemented!")  # type: ignore
52        raise NotImplementedError("Method not implemented!")
53
54    async def DisableSlc(self, request: hfp_pb2.DisableSlcRequest, context: grpc.ServicerContext) -> empty_pb2.Empty:
55        address = utils.connection_from(request.connection).address
56        self.bluetooth.disconnect_device(address)
57        return empty_pb2.Empty()
58
59    async def DisableSlcAsHandsfree(self, request: hfp_pb2.DisableSlcAsHandsfreeRequest,
60                                    context: grpc.ServicerContext) -> empty_pb2.Empty:
61        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
62        context.set_details("Method not implemented!")  # type: ignore
63        raise NotImplementedError("Method not implemented!")
64
65    async def DeclineCall(self, request: hfp_pb2.DeclineCallRequest,
66                          context: grpc.ServicerContext) -> hfp_pb2.DeclineCallResponse:
67        self.enable_phone_for_testing()
68        self.bluetooth.hangup_call()
69        return hfp_pb2.DeclineCallResponse()
70
71    async def DeclineCallAsHandsfree(self, request: hfp_pb2.DeclineCallAsHandsfreeRequest,
72                                     context: grpc.ServicerContext) -> hfp_pb2.DeclineCallAsHandsfreeResponse:
73        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
74        context.set_details("Method not implemented!")  # type: ignore
75        raise NotImplementedError("Method not implemented!")
76
77    async def SetBatteryLevel(self, request: hfp_pb2.SetBatteryLevelRequest,
78                              context: grpc.ServicerContext) -> (empty_pb2.Empty):
79        self.enable_phone_for_testing()
80        if request.battery_percentage > 100 or request.battery_percentage < 0:
81            await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Wrong battery percentage.')
82        self.bluetooth.set_battery_level(math.floor((request.battery_percentage / 100) * 5))
83        return empty_pb2.Empty()
84
85    async def ConnectToAudioAsHandsfree(self, request: hfp_pb2.ConnectToAudioAsHandsfreeRequest,
86                                        context: grpc.ServicerContext) -> hfp_pb2.ConnectToAudioAsHandsfreeResponse:
87        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
88        context.set_details("Method not implemented!")  # type: ignore
89        raise NotImplementedError("Method not implemented!")
90
91    async def DisconnectFromAudioAsHandsfree(
92            self, request: hfp_pb2.DisconnectFromAudioAsHandsfreeRequest,
93            context: grpc.ServicerContext) -> hfp_pb2.DisconnectFromAudioAsHandsfreeResponse:
94        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
95        context.set_details("Method not implemented!")  # type: ignore
96        raise NotImplementedError("Method not implemented!")
97
98    async def SetVoiceRecognition(self, request: hfp_pb2.SetVoiceRecognitionRequest,
99                                  context: grpc.ServicerContext) -> hfp_pb2.SetVoiceRecognitionResponse:
100        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
101        context.set_details("Method not implemented!")  # type: ignore
102        raise NotImplementedError("Method not implemented!")
103
104    async def SetVoiceRecognitionAsHandsfree(
105            self, request: hfp_pb2.SetVoiceRecognitionAsHandsfreeRequest,
106            context: grpc.ServicerContext) -> hfp_pb2.SetVoiceRecognitionAsHandsfreeResponse:
107        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
108        context.set_details("Method not implemented!")  # type: ignore
109        raise NotImplementedError("Method not implemented!")
110
111    async def MakeCall(self, request: hfp_pb2.MakeCallRequest,
112                       context: grpc.ServicerContext) -> hfp_pb2.MakeCallResponse:
113        self.enable_phone_for_testing()
114        number = request.number
115        if number is None or len(number) == 0:
116            await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Cannot call empty number.')
117        call_result = self.bluetooth.dial_call(number)
118        if call_result is None or not call_result:
119            await context.abort(grpc.StatusCode.INTERNAL, 'Failed to make a call.')
120        return hfp_pb2.MakeCallResponse()
121
122    async def MakeCallAsHandsfree(self, request: hfp_pb2.MakeCallAsHandsfreeRequest,
123                                  context: grpc.ServicerContext) -> hfp_pb2.MakeCallAsHandsfreeResponse:
124        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
125        context.set_details('Method not implemented!')  # type: ignore
126        raise NotImplementedError('Method not implemented!')
127
128    async def AnswerCall(self, request: hfp_pb2.AnswerCallRequest,
129                         context: grpc.ServicerContext) -> hfp_pb2.AnswerCallResponse:
130        self.enable_phone_for_testing()
131        answer_result = self.bluetooth.answer_call()
132        if answer_result is None or not answer_result:
133            await context.abort(grpc.StatusCode.INTERNAL, 'Failed to answer call.')
134        return hfp_pb2.AnswerCallResponse()
135
136    async def AnswerCallAsHandsfree(self, request: hfp_pb2.AnswerCallAsHandsfreeRequest,
137                                    context: grpc.ServicerContext) -> hfp_pb2.AnswerCallAsHandsfreeResponse:
138        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
139        context.set_details('Method not implemented!')  # type: ignore
140        raise NotImplementedError('Method not implemented!')
141
142    async def SetAudioPath(self, request: hfp_pb2.SetAudioPathRequest,
143                           context: grpc.ServicerContext) -> hfp_pb2.SetAudioPathResponse:
144        self.enable_phone_for_testing()
145        connected_devices = self.bluetooth.get_connected_audio_devices()
146        if len(connected_devices) == 0:
147            await context.abort(grpc.StatusCode.INTERNAL, 'No connected devices.')
148        if request.audio_path == hfp_pb2.AUDIO_PATH_SPEAKERS:
149            self.bluetooth.audio_disconnect(connected_devices[0])
150        elif request.audio_path == hfp_pb2.AUDIO_PATH_HANDSFREE:
151            self.bluetooth.audio_connect(connected_devices[0])
152        return hfp_pb2.SetAudioPathResponse()
153
154    async def SwapActiveCall(self, request: hfp_pb2.SwapActiveCallRequest,
155                             context: grpc.ServicerContext) -> hfp_pb2.SwapActiveCallResponse:
156        self.enable_phone_for_testing()
157        swap_result = self.bluetooth.swap_active_call()
158        if swap_result is None or not swap_result:
159            await context.abort(grpc.StatusCode.INTERNAL, 'Failed to swap active call.')
160        return hfp_pb2.SwapActiveCallResponse()
161
162    async def SetInBandRingtone(self, request: hfp_pb2.SetInBandRingtoneRequest,
163                                context: grpc.ServicerContext) -> hfp_pb2.SetInBandRingtoneResponse:
164        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
165        context.set_details('Method not implemented!')  # type: ignore
166        raise NotImplementedError('Method not implemented!')
167
168    async def ClearCallHistory(self, request: hfp_pb2.ClearCallHistoryRequest,
169                               context: grpc.ServicerContext) -> hfp_pb2.ClearCallHistoryResponse:
170        self.enable_phone_for_testing()
171        if not all((self.bluetooth.set_memory_call(), self.bluetooth.set_last_call())):
172            await context.abort(grpc.StatusCode.INTERNAL, 'Failed to clear hall history.')
173        return hfp_pb2.ClearCallHistoryResponse()
174
175    async def EndCallAsHandsfree(self, request: hfp_pb2.EndCallAsHandsfreeRequest,
176                                 context: grpc.ServicerContext) -> hfp_pb2.EndCallAsHandsfreeResponse:
177        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
178        context.set_details('Method not implemented!')  # type: ignore
179        raise NotImplementedError('Method not implemented!')
180
181    async def CallTransferAsHandsfree(self, request: hfp_pb2.CallTransferAsHandsfreeRequest,
182                                      context: grpc.ServicerContext) -> hfp_pb2.CallTransferAsHandsfreeResponse:
183        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
184        context.set_details('Method not implemented!')  # type: ignore
185        raise NotImplementedError('Method not implemented!')
186
187    async def SendDtmfFromHandsfree(self, request: hfp_pb2.SendDtmfFromHandsfreeRequest,
188                                    context: grpc.ServicerContext) -> hfp_pb2.SendDtmfFromHandsfreeResponse:
189        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
190        context.set_details('Method not implemented!')  # type: ignore
191        raise NotImplementedError('Method not implemented!')
192