1# Copyright (C) 2024 The Android Open Source Project 2# Licensed under the Apache License, Version 2.0 (the "License"); 3# you may not use this file except in compliance with the License. 4# You may obtain a copy of the License at 5# 6# http://www.apache.org/licenses/LICENSE-2.0 7# 8# Unless required by applicable law or agreed to in writing, software 9# distributed under the License is distributed on an "AS IS" BASIS, 10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11# See the License for the specific language governing permissions and 12# limitations under the License. 13 14import asyncio 15import logging 16from typing import Dict, Optional 17 18from bumble import core 19from bumble.device import Device 20from bumble.hci import Address 21from bumble.rfcomm import ( 22 Client, 23 DLC, 24 make_service_sdp_records, 25 find_rfcomm_channel_with_uuid, 26 Server, 27) 28from bumble.pandora import utils 29import grpc 30from pandora_experimental.rfcomm_grpc_aio import RFCOMMServicer 31from pandora_experimental.rfcomm_pb2 import ( 32 AcceptConnectionRequest, 33 AcceptConnectionResponse, 34 ConnectionRequest, 35 ConnectionResponse, 36 DisconnectionRequest, 37 DisconnectionResponse, 38 RfcommConnection, 39 RxRequest, 40 RxResponse, 41 ServerId, 42 StartServerRequest, 43 StartServerResponse, 44 StopServerRequest, 45 StopServerResponse, 46 TxRequest, 47 TxResponse, 48) 49 50FIRST_SERVICE_RECORD_HANDLE = 0x00010010 51 52 53class RFCOMMService(RFCOMMServicer): 54 device: Device 55 56 def __init__(self, device: Device) -> None: 57 super().__init__() 58 self.server = None 59 self.device = device 60 self.server_ports = {} # key = channel, value = ServerInstance 61 self.connections = {} # key = id, value = dlc 62 self.next_conn_id = 1 63 self.next_scn = 7 64 65 class Connection: 66 67 client: Optional[Client] 68 69 def __init__(self, dlc, client=None): 70 self.dlc = dlc 71 self.data_queue = asyncio.Queue() 72 self.client = client 73 74 class ServerPort: 75 76 def __init__(self, name, uuid, wait_dlc): 77 self.name = name 78 self.uuid = uuid 79 self.wait_dlc = wait_dlc 80 self.accepted = False 81 self.saved_dlc = None 82 83 def accept(self): 84 self.accepted = True 85 if self.saved_dlc is not None: 86 self.wait_dlc.set_result(self.saved_dlc) 87 88 def acceptor(self, dlc): 89 if self.accepted: 90 self.wait_dlc.set_result(dlc) 91 else: 92 self.saved_dlc = dlc 93 94 @utils.rpc 95 async def ConnectToServer(self, request: ConnectionRequest, context: grpc.ServicerContext) -> ConnectionResponse: 96 logging.info(f"ConnectToServer") 97 address = Address(address=bytes(reversed(request.address)), address_type=Address.PUBLIC_DEVICE_ADDRESS) 98 acl_connection = self.device.find_connection_by_bd_addr(address, transport=0) # BR/EDR 99 if acl_connection is None: 100 acl_connection = await self.device.connect(address, transport=0) # BR/EDR transport 101 102 channel = await find_rfcomm_channel_with_uuid(acl_connection, request.uuid) 103 104 client = Client(acl_connection) 105 mux = await client.start() 106 assert mux is not None 107 108 dlc = await mux.open_dlc(channel) 109 id = self.next_conn_id 110 self.next_conn_id += 1 111 self.connections[id] = self.Connection(dlc=dlc, client=client) 112 self.connections[id].dlc.sink = self.connections[id].data_queue.put_nowait 113 return ConnectionResponse(connection=RfcommConnection(id=id)) 114 115 @utils.rpc 116 async def StartServer(self, request: StartServerRequest, context: grpc.ServicerContext) -> StartServerResponse: 117 uuid = core.UUID(request.uuid) 118 logging.info(f"StartServer {uuid}") 119 120 if self.server is None: 121 self.server = Server(self.device) 122 123 for existing_id, port in self.server_ports.items(): 124 if port.uuid == uuid: 125 logging.warning(f"Server port already started for {uuid}, returning existing port") 126 return StartServerResponse(server=ServerId(id=existing_id)) 127 128 wait_dlc = asyncio.get_running_loop().create_future() 129 server_port = self.ServerPort(name=request.name, uuid=uuid, wait_dlc=wait_dlc) 130 open_channel = self.server.listen(acceptor=server_port.acceptor, channel=self.next_scn) 131 self.next_scn += 1 132 handle = FIRST_SERVICE_RECORD_HANDLE + open_channel 133 self.device.sdp_service_records[handle] = make_service_sdp_records(handle, open_channel, uuid) 134 self.server_ports[open_channel] = server_port 135 return StartServerResponse(server=ServerId(id=open_channel)) 136 137 @utils.rpc 138 async def AcceptConnection(self, request: AcceptConnectionRequest, 139 context: grpc.ServicerContext) -> AcceptConnectionResponse: 140 logging.info(f"AcceptConnection") 141 assert self.server_ports[request.server.id] is not None 142 self.server_ports[request.server.id].accept() 143 dlc = await self.server_ports[request.server.id].wait_dlc 144 id = self.next_conn_id 145 self.next_conn_id += 1 146 self.connections[id] = self.Connection(dlc=dlc) 147 self.connections[id].dlc.sink = self.connections[id].data_queue.put_nowait 148 return AcceptConnectionResponse(connection=RfcommConnection(id=id)) 149 150 @utils.rpc 151 async def Disconnect(self, request: DisconnectionRequest, context: grpc.ServicerContext) -> DisconnectionResponse: 152 logging.info(f"Disconnect") 153 rfcomm_connection = self.connections[request.connection.id] 154 assert rfcomm_connection is not None 155 if rfcomm_connection.client is not None: 156 await rfcomm_connection.client.shutdown() 157 del rfcomm_connection 158 return DisconnectionResponse() 159 160 @utils.rpc 161 async def StopServer(self, request: StopServerRequest, context: grpc.ServicerContext) -> StopServerResponse: 162 logging.info(f"StopServer") 163 assert self.server_ports[request.server.id] is not None 164 del self.server_ports[request.server.id] 165 166 return StopServerResponse() 167 168 @utils.rpc 169 async def Send(self, request: TxRequest, context: grpc.ServicerContext) -> TxResponse: 170 logging.info(f"Send") 171 assert self.connections[request.connection.id] is not None 172 self.connections[request.connection.id].dlc.write(request.data) 173 return TxResponse() 174 175 @utils.rpc 176 async def Receive(self, request: RxRequest, context: grpc.ServicerContext) -> RxResponse: 177 logging.info(f"Receive") 178 assert self.connections[request.connection.id] is not None 179 received_data = await self.connections[request.connection.id].data_queue.get() 180 return RxResponse(data=received_data) 181