# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """RFCOMM grpc interface.""" import asyncio import logging import os import socket as socket_module import uuid as uuid_module from floss.pandora.floss import floss_enums from floss.pandora.floss import socket_manager from floss.pandora.floss import utils from floss.pandora.server import bluetooth as bluetooth_module from google.protobuf import empty_pb2 import grpc from pandora_experimental import rfcomm_grpc_aio from pandora_experimental import rfcomm_pb2 class RFCOMMService(rfcomm_grpc_aio.RFCOMMServicer): """Service to trigger Bluetooth RFCOMM procedures. This class implements the Pandora bluetooth test interfaces, where the meta class definition is automatically generated by the protobuf. The interface definition can be found in: https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/rfcomm.proto """ # Size of the buffer for data transactions. BUFFER_SIZE = 512 def __init__(self, bluetooth: bluetooth_module.Bluetooth): self.bluetooth = bluetooth # Used by new_stream_id() to generate IDs for the RPC client to specify the stream. self.current_stream_id = 0x12FC0 # key = stream_id, val = stream self.streams = dict() def new_stream_id(self) -> int: id = self.current_stream_id self.current_stream_id += 1 return id async def ConnectToServer(self, request: rfcomm_pb2.ConnectionRequest, context: grpc.ServicerContext) -> rfcomm_pb2.ConnectionResponse: class CreateRFCOMMObserver(socket_manager.SocketManagerCallbacks): """Observer to observe the created RFCOMM connection state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_outgoing_connection_result(self, connecting_id, result, socket, *, dbus_unix_fd_list=None): if connecting_id != self.task['connecting_id']: return future = self.task['create_rfcomm_channel'] if result is None or floss_enums.BtStatus(result) != floss_enums.BtStatus.SUCCESS: logging.error('Failed to create the RFCOMM channel with connecting_id: %s. Status: %s', connecting_id, result) future.get_loop().call_soon_threadsafe(future.set_result, None) return if not socket: future.get_loop().call_soon_threadsafe(future.set_result, None) return optional_fd = socket['optional_value']['fd'] if not optional_fd: future.get_loop().call_soon_threadsafe(future.set_result, None) return if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1: logging.error('on_outgoing_connection_result: Empty fd list') future.get_loop().call_soon_threadsafe(future.set_result, None) return fd_handle = optional_fd['optional_value'] if fd_handle > dbus_unix_fd_list.get_length(): logging.error('on_outgoing_connection_result: Invalid fd handle') future.get_loop().call_soon_threadsafe(future.set_result, None) return fd = dbus_unix_fd_list.get(fd_handle) fd_dup = os.dup(fd) future.get_loop().call_soon_threadsafe(future.set_result, fd_dup) address = utils.address_from(request.address) uuid = list(uuid_module.UUID(request.uuid).bytes) try: socket_result = self.bluetooth.create_insecure_rfcomm_socket_to_service_record(address, uuid) if socket_result is None: await context.abort(grpc.StatusCode.INTERNAL, 'Failed to call create_insecure_rfcomm_socket_to_service_record.') connecting_id = socket_result['id'] rfcomm_channel_creation = { 'create_rfcomm_channel': asyncio.get_running_loop().create_future(), 'connecting_id': connecting_id } observer = CreateRFCOMMObserver(rfcomm_channel_creation) name = utils.create_observer_name(observer) self.bluetooth.socket_manager.register_callback_observer(name, observer) fd = await asyncio.wait_for(rfcomm_channel_creation['create_rfcomm_channel'], timeout=5) if fd is None: await context.abort(grpc.StatusCode.INTERNAL, f'Failed to get the fd from RFCOMM socket with connecting_id: {connecting_id}') stream_id = self.new_stream_id() stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM) self.streams[stream_id] = stream finally: self.bluetooth.socket_manager.unregister_callback_observer(name, observer) return rfcomm_pb2.ConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=stream_id)) async def Disconnect(self, request: rfcomm_pb2.DisconnectionRequest, context: grpc.ServicerContext) -> rfcomm_pb2.DisconnectionResponse: stream_id = request.connection.id if stream_id in self.streams: stream = self.streams[stream_id] try: stream.shutdown(socket_module.SHUT_RDWR) stream.close() del self.streams[stream_id] except asyncio.TimeoutError as e: logging.error('Disconnect: asyncio.TimeoutError %s', e) else: logging.error('No stream found with ID %s', stream_id) return empty_pb2.Empty() async def StopServer(self, request: rfcomm_pb2.StopServerRequest, context: grpc.ServicerContext) -> rfcomm_pb2.StopServerResponse: class StopRFCOMMSocket(socket_manager.SocketManagerCallbacks): """Observer to observe stop state of RFCOMM connection.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_incoming_socket_closed(self, listener_id, reason): if listener_id != self.task['listener_id']: return if reason is None or floss_enums.BtStatus(reason) != floss_enums.BtStatus.SUCCESS: logging.error('Failed to stop RFCOMM channel with listener_id: %s. Status: %s', listener_id, reason) future = self.task['stop_rfcomm_channel'] future.get_loop().call_soon_threadsafe(future.set_result, reason) try: listener_id = request.server.id rfcomm_channel_stop = { 'stop_rfcomm_channel': asyncio.get_running_loop().create_future(), 'listener_id': listener_id } observer = StopRFCOMMSocket(rfcomm_channel_stop) name = utils.create_observer_name(observer) self.bluetooth.socket_manager.register_callback_observer(name, observer) if not self.bluetooth.close_socket(listener_id): await context.abort(grpc.StatusCode.INTERNAL, 'Failed to call close_socket.') status = await asyncio.wait_for(rfcomm_channel_stop['stop_rfcomm_channel'], timeout=5) if status != floss_enums.BtStatus.SUCCESS: await context.abort(grpc.StatusCode.INTERNAL, f'Failed to stop RFCOMM channel with listener_id: {listener_id}. Status: {status}') finally: self.bluetooth.socket_manager.unregister_callback_observer(name, observer) return empty_pb2.Empty() async def StartServer(self, request: rfcomm_pb2.StartServerRequest, context: grpc.ServicerContext) -> rfcomm_pb2.StartServerResponse: class StartServerObserver(socket_manager.SocketManagerCallbacks): """Observer to observe the RFCOMM server start.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_incoming_socket_ready(self, socket, status): if not socket or 'id' not in socket: return listener_id = socket['id'] if listener_id != self.task['socket_id']: return future = self.task['start_rfcomm_server'] if status is None or floss_enums.BtStatus(status) != floss_enums.BtStatus.SUCCESS: logging.error('Failed listening to RFCOMM channel with socket_id: %s. Status: %s', listener_id, status) future.get_loop().call_soon_threadsafe(future.set_result, None) else: future.get_loop().call_soon_threadsafe(future.set_result, listener_id) try: uuid = list(uuid_module.UUID(request.uuid).bytes) socket_result = self.bluetooth.listen_using_insecure_rfcomm_with_service_record(request.name, uuid) if socket_result is None: await context.abort(grpc.StatusCode.INTERNAL, 'Failed to call listen_using_insecure_rfcomm_with_service_record.') rfcomm_channel_listener = { 'start_rfcomm_server': asyncio.get_running_loop().create_future(), 'socket_id': socket_result['id'] } observer = StartServerObserver(rfcomm_channel_listener) name = utils.create_observer_name(observer) self.bluetooth.socket_manager.register_callback_observer(name, observer) listener_id = await asyncio.wait_for(rfcomm_channel_listener['start_rfcomm_server'], timeout=5) finally: self.bluetooth.socket_manager.unregister_callback_observer(name, observer) return rfcomm_pb2.StartServerResponse(server=rfcomm_pb2.ServerId(id=listener_id)) async def AcceptConnection(self, request: rfcomm_pb2.AcceptConnectionRequest, context: grpc.ServicerContext) -> rfcomm_pb2.AcceptConnectionResponse: class AcceptConnectionObserver(socket_manager.SocketManagerCallbacks): """Observer to observe the accepted RFCOMM connection.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_handle_incoming_connection(self, listener_id, connection, *, dbus_unix_fd_list=None): if listener_id != self.task['listener_id']: return future = self.task['accept_rfcomm_channel'] if not connection: future.get_loop().call_soon_threadsafe(future.set_result, None) return optional_fd = connection['fd'] if not optional_fd: future.get_loop().call_soon_threadsafe(future.set_result, None) return if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1: logging.error('on_handle_incoming_connection: Empty fd list') future.get_loop().call_soon_threadsafe(future.set_result, None) return fd_handle = optional_fd['optional_value'] if fd_handle > dbus_unix_fd_list.get_length(): logging.error('on_handle_incoming_connection: Invalid fd handle') future.get_loop().call_soon_threadsafe(future.set_result, None) return fd = dbus_unix_fd_list.get(fd_handle) fd_dup = os.dup(fd) future.get_loop().call_soon_threadsafe(future.set_result, fd_dup) try: listener_id = request.server.id rfcomm_channel_acceptance = { 'accept_rfcomm_channel': asyncio.get_running_loop().create_future(), 'listener_id': listener_id } observer = AcceptConnectionObserver(rfcomm_channel_acceptance) name = utils.create_observer_name(observer) self.bluetooth.socket_manager.register_callback_observer(name, observer) accept_socket_status = self.bluetooth.accept_socket(listener_id, timeout_ms=5) if accept_socket_status != floss_enums.BtStatus.SUCCESS: await context.abort( grpc.StatusCode.INTERNAL, f'Failed to accept the RFCOMM socket with listener_id: {listener_id}. ' f'Status: {accept_socket_status}.') fd = await asyncio.wait_for(rfcomm_channel_acceptance['accept_rfcomm_channel'], timeout=5) if fd is None: await context.abort(grpc.StatusCode.INTERNAL, f'Failed to get the fd from RFCOMM socket with listener_id: {listener_id}') stream_id = self.new_stream_id() stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM) self.streams[stream_id] = stream except asyncio.TimeoutError as e: logging.error('AcceptConnection: asyncio.TimeoutError %s', e) return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=None)) finally: self.bluetooth.socket_manager.unregister_callback_observer(name, observer) return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=stream_id)) async def Send(self, request: rfcomm_pb2.TxRequest, context: grpc.ServicerContext) -> rfcomm_pb2.TxResponse: stream_id = request.connection.id output_stream = self.streams.get(stream_id) if output_stream: try: output_stream.send(request.data) except Exception as e: logging.error('Exception during writing to output stream %s', e) else: logging.error('Output stream: %s not found for the stream_id: %s', output_stream, stream_id) return empty_pb2.Empty() async def Receive(self, request: rfcomm_pb2.RxRequest, context: grpc.ServicerContext) -> rfcomm_pb2.RxResponse: stream_id = request.connection.id input_stream = self.streams.get(stream_id) if input_stream: try: data = input_stream.recv(self.BUFFER_SIZE) if data: return rfcomm_pb2.RxResponse(data=bytes(data)) except Exception as e: logging.error('Exception during reading from input stream %s', e) else: logging.error('Input stream: %s not found for the stream_id: %s', input_stream, stream_id) # Return an empty byte array. return rfcomm_pb2.RxResponse(data=b'')