1# Copyright (C) 2024 The Android Open Source Project
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5#
6# http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11# See the License for the specific language governing permissions and
12# limitations under the License.
13
14import asyncio
15import logging
16from typing import Dict, Optional
17
18from bumble import core
19from bumble.device import Device
20from bumble.hci import Address
21from bumble.rfcomm import (
22    Client,
23    DLC,
24    make_service_sdp_records,
25    find_rfcomm_channel_with_uuid,
26    Server,
27)
28from bumble.pandora import utils
29import grpc
30from pandora_experimental.rfcomm_grpc_aio import RFCOMMServicer
31from pandora_experimental.rfcomm_pb2 import (
32    AcceptConnectionRequest,
33    AcceptConnectionResponse,
34    ConnectionRequest,
35    ConnectionResponse,
36    DisconnectionRequest,
37    DisconnectionResponse,
38    RfcommConnection,
39    RxRequest,
40    RxResponse,
41    ServerId,
42    StartServerRequest,
43    StartServerResponse,
44    StopServerRequest,
45    StopServerResponse,
46    TxRequest,
47    TxResponse,
48)
49
50FIRST_SERVICE_RECORD_HANDLE = 0x00010010
51
52
53class RFCOMMService(RFCOMMServicer):
54    device: Device
55
56    def __init__(self, device: Device) -> None:
57        super().__init__()
58        self.server = None
59        self.device = device
60        self.server_ports = {}  # key = channel, value = ServerInstance
61        self.connections = {}  # key = id, value = dlc
62        self.next_conn_id = 1
63        self.next_scn = 7
64
65    class Connection:
66
67        client: Optional[Client]
68
69        def __init__(self, dlc, client=None):
70            self.dlc = dlc
71            self.data_queue = asyncio.Queue()
72            self.client = client
73
74    class ServerPort:
75
76        def __init__(self, name, uuid, wait_dlc):
77            self.name = name
78            self.uuid = uuid
79            self.wait_dlc = wait_dlc
80            self.accepted = False
81            self.saved_dlc = None
82
83        def accept(self):
84            self.accepted = True
85            if self.saved_dlc is not None:
86                self.wait_dlc.set_result(self.saved_dlc)
87
88        def acceptor(self, dlc):
89            if self.accepted:
90                self.wait_dlc.set_result(dlc)
91            else:
92                self.saved_dlc = dlc
93
94    @utils.rpc
95    async def ConnectToServer(self, request: ConnectionRequest, context: grpc.ServicerContext) -> ConnectionResponse:
96        logging.info(f"ConnectToServer")
97        address = Address(address=bytes(reversed(request.address)), address_type=Address.PUBLIC_DEVICE_ADDRESS)
98        acl_connection = self.device.find_connection_by_bd_addr(address, transport=0)  # BR/EDR
99        if acl_connection is None:
100            acl_connection = await self.device.connect(address, transport=0)  # BR/EDR transport
101
102        channel = await find_rfcomm_channel_with_uuid(acl_connection, request.uuid)
103
104        client = Client(acl_connection)
105        mux = await client.start()
106        assert mux is not None
107
108        dlc = await mux.open_dlc(channel)
109        id = self.next_conn_id
110        self.next_conn_id += 1
111        self.connections[id] = self.Connection(dlc=dlc, client=client)
112        self.connections[id].dlc.sink = self.connections[id].data_queue.put_nowait
113        return ConnectionResponse(connection=RfcommConnection(id=id))
114
115    @utils.rpc
116    async def StartServer(self, request: StartServerRequest, context: grpc.ServicerContext) -> StartServerResponse:
117        uuid = core.UUID(request.uuid)
118        logging.info(f"StartServer {uuid}")
119
120        if self.server is None:
121            self.server = Server(self.device)
122
123        for existing_id, port in self.server_ports.items():
124            if port.uuid == uuid:
125                logging.warning(f"Server port already started for {uuid}, returning existing port")
126                return StartServerResponse(server=ServerId(id=existing_id))
127
128        wait_dlc = asyncio.get_running_loop().create_future()
129        server_port = self.ServerPort(name=request.name, uuid=uuid, wait_dlc=wait_dlc)
130        open_channel = self.server.listen(acceptor=server_port.acceptor, channel=self.next_scn)
131        self.next_scn += 1
132        handle = FIRST_SERVICE_RECORD_HANDLE + open_channel
133        self.device.sdp_service_records[handle] = make_service_sdp_records(handle, open_channel, uuid)
134        self.server_ports[open_channel] = server_port
135        return StartServerResponse(server=ServerId(id=open_channel))
136
137    @utils.rpc
138    async def AcceptConnection(self, request: AcceptConnectionRequest,
139                               context: grpc.ServicerContext) -> AcceptConnectionResponse:
140        logging.info(f"AcceptConnection")
141        assert self.server_ports[request.server.id] is not None
142        self.server_ports[request.server.id].accept()
143        dlc = await self.server_ports[request.server.id].wait_dlc
144        id = self.next_conn_id
145        self.next_conn_id += 1
146        self.connections[id] = self.Connection(dlc=dlc)
147        self.connections[id].dlc.sink = self.connections[id].data_queue.put_nowait
148        return AcceptConnectionResponse(connection=RfcommConnection(id=id))
149
150    @utils.rpc
151    async def Disconnect(self, request: DisconnectionRequest, context: grpc.ServicerContext) -> DisconnectionResponse:
152        logging.info(f"Disconnect")
153        rfcomm_connection = self.connections[request.connection.id]
154        assert rfcomm_connection is not None
155        if rfcomm_connection.client is not None:
156            await rfcomm_connection.client.shutdown()
157        del rfcomm_connection
158        return DisconnectionResponse()
159
160    @utils.rpc
161    async def StopServer(self, request: StopServerRequest, context: grpc.ServicerContext) -> StopServerResponse:
162        logging.info(f"StopServer")
163        assert self.server_ports[request.server.id] is not None
164        del self.server_ports[request.server.id]
165
166        return StopServerResponse()
167
168    @utils.rpc
169    async def Send(self, request: TxRequest, context: grpc.ServicerContext) -> TxResponse:
170        logging.info(f"Send")
171        assert self.connections[request.connection.id] is not None
172        self.connections[request.connection.id].dlc.write(request.data)
173        return TxResponse()
174
175    @utils.rpc
176    async def Receive(self, request: RxRequest, context: grpc.ServicerContext) -> RxResponse:
177        logging.info(f"Receive")
178        assert self.connections[request.connection.id] is not None
179        received_data = await self.connections[request.connection.id].data_queue.get()
180        return RxResponse(data=received_data)
181