xref: /aosp_15_r20/external/pigweed/pw_emu/py/pw_emu/qemu.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Pigweed qemu frontend."""
15
16
17import io
18import json
19import logging
20import os
21import re
22import socket
23import sys
24
25from pathlib import Path
26from typing import Any
27
28from pw_emu.core import (
29    ConfigError,
30    Connector,
31    Launcher,
32    Error,
33    InvalidChannelType,
34    RunError,
35    WrongEmulator,
36)
37
38_QMP_LOG = logging.getLogger('pw_qemu.qemu.qmp')
39
40
41class QmpError(Error):
42    """Exception for QMP errors."""
43
44    def __init__(self, err: str):
45        super().__init__(err)
46
47
48class QmpClient:
49    """Send qmp requests the server."""
50
51    def __init__(self, stream: io.RawIOBase):
52        self._stream = stream
53
54        # Perform the QMP "capabilities negotiation" handshake.
55        # https://wiki.qemu.org/Documentation/QMP#Capabilities_Negotiation
56        #
57        # When the QMP connection is established, QEMU first sends a greeting
58        # message with its version and capabilities. Then the client sends
59        # 'qmp_capabilities' to exit capabilities negotiation mode. The result
60        # is an empty 'return'.
61        #
62        # self.request() will consume both the initial greeting and the
63        # subsequent 'return' response.
64        self.request('qmp_capabilities')
65
66    def request(self, cmd: str, args: dict[str, Any] | None = None) -> Any:
67        """Issue a command using the qmp interface.
68
69        Returns a map with the response or None if there is no
70        response for this command.
71
72        """
73
74        req: dict[str, Any] = {'execute': cmd}
75        if args:
76            req['arguments'] = args
77        _QMP_LOG.debug(' -> {json.dumps(cmd)}')
78        self._stream.write(json.dumps(req).encode('utf-8'))
79        while True:
80            line = self._stream.readline()
81            _QMP_LOG.debug(' <- {line}')
82            resp = json.loads(line)
83            if 'error' in resp.keys():
84                raise QmpError(resp['error']['desc'])
85            if 'return' in resp.keys():
86                return resp['return']
87
88
89class QemuLauncher(Launcher):
90    """Start a new qemu process for a given target and config file."""
91
92    def __init__(self, config_path: Path | None = None):
93        super().__init__('qemu', config_path)
94        self._start_cmd: list[str] = []
95        self._chardevs_id_to_name = {
96            'compat_monitor0': 'qmp',
97            'compat_monitor1': 'monitor',
98            'gdb': 'gdb',
99        }
100        self._chardevs: dict[str, Any] = {}
101        self._qmp_init_sock: socket.socket | None = None
102
103    def _set_qemu_channel_tcp(self, name: str, filename: str) -> None:
104        """Parse a TCP chardev and return (host, port) tuple.
105
106        Format for the tcp chardev backend:
107
108        [disconnected|isconnected:]tcp:<host>:<port>[,<options>][ <->
109        <host>:<port>]
110
111        """
112
113        host_port: Any = filename.split(',')[0]
114        if host_port.split(':')[0] != 'tcp':
115            host_port = host_port.split(':')[2:]
116        else:
117            host_port = host_port.split(':')[1:]
118        # IPV6 hosts have :
119        host = ':'.join(host_port[0:-1])
120        port = host_port[-1]
121        self._handles.add_channel_tcp(name, host, int(port))
122
123    def _set_qemu_channel_pty(self, name: str, filename: str) -> None:
124        """Parse a PTY chardev and return the path.
125
126        Format for the pty chardev backend: pty:<path>
127        """
128
129        path = filename.split(':')[1]
130
131        self._handles.add_channel_pty(name, path)
132
133        if os.path.lexists(self._path(name)):
134            os.unlink(self._path(name))
135        os.symlink(path, self._path(name))
136
137    def _set_qemu_channel(self, name: str, filename: str) -> None:
138        """Setups a chardev channel type."""
139
140        if filename.startswith('pty'):
141            self._set_qemu_channel_pty(name, filename)
142        elif 'tcp' in filename:
143            self._set_qemu_channel_tcp(name, filename)
144
145    def _get_channels_config(self, chan: str, opt: str) -> Any:
146        val = self._config.get_emu(['channels', chan, opt])
147        if val is not None:
148            return val
149        return self._config.get_emu(['channels', opt])
150
151    def _configure_default_channels(self) -> None:
152        """Configure the default channels."""
153
154        # keep qmp first so that it gets the compat_monitor0 label
155        for chan in ['qmp', 'monitor', 'gdb']:
156            chan_type = self._get_channels_config(chan, 'type')
157            if not chan_type:
158                chan_type = 'tcp'
159            if chan_type == 'pty':
160                if sys.platform == 'win32':
161                    raise InvalidChannelType(chan_type)
162                backend = 'pty'
163            elif chan_type == 'tcp':
164                backend = 'tcp:localhost:0,ipv4=on,server=on,wait=off'
165            else:
166                raise InvalidChannelType(chan_type)
167            self._start_cmd.extend([f'-{chan}', backend])
168
169    def _get_chardev_config(self, name: str, opt: str) -> Any:
170        val = self._config.get_target_emu(['channels', 'chardevs', name, opt])
171        if not val:
172            val = self._get_channels_config(name, opt)
173        return val
174
175    def _configure_serial_channels(self, serials: dict) -> None:
176        """Create "standard" serial devices.
177
178        We can't control the serial allocation number for "standard"
179        -serial devices so fill the slots for the not needed serials
180        with null chardevs e.g. for serial3, serial1 generate the
181        following arguments, in this order:
182
183         -serial null -serial {backend} -serial null - serial {backend}
184
185        """
186
187        min_ser = sys.maxsize
188        max_ser = -1
189        for serial in serials.keys():
190            num = int(serial.split('serial')[1])
191            if num < min_ser:
192                min_ser = num
193            if num > max_ser:
194                max_ser = num
195        for i in range(min_ser, max_ser + 1):
196            if serials.get(f'serial{i}'):
197                name = serials[f'serial{i}']
198                chan_type = self._get_chardev_config(name, 'type')
199                if not chan_type:
200                    chan_type = 'tcp'
201                if chan_type == 'pty':
202                    backend = 'pty'
203                elif chan_type == 'tcp':
204                    backend = 'tcp:localhost:0,ipv4=on,server=on,wait=off'
205                else:
206                    raise InvalidChannelType(chan_type)
207                self._start_cmd.extend(['-serial', backend])
208            else:
209                self._start_cmd.extend(['-serial', 'null'])
210
211    def _configure_chardev_channels(self) -> None:
212        """Configure chardevs."""
213
214        self._chardevs = self._config.get_target_emu(
215            ['channels', 'chardevs'], True, dict
216        )
217
218        serials = {}
219        for name, config in self._chardevs.items():
220            chardev_id = config['id']
221            self._chardevs_id_to_name[chardev_id] = name
222
223            chardev_type = self._get_chardev_config(name, 'type')
224            if chardev_type is None:
225                chardev_type = 'tcp'
226
227            if chardev_type == 'pty':
228                backend = 'pty'
229            elif chardev_type == 'tcp':
230                backend = 'socket,host=localhost,port=0,server=on,wait=off'
231            else:
232                raise InvalidChannelType(chardev_type)
233
234            # serials are configured differently
235            if re.search(r'serial[0-9]*', chardev_id):
236                serials[chardev_id] = name
237            else:
238                self._start_cmd.extend(
239                    ['-chardev', f'{backend},id={chardev_id}']
240                )
241
242        self._configure_serial_channels(serials)
243
244    def _pre_start(
245        self,
246        target: str,
247        file: Path | None = None,
248        pause: bool = False,
249        debug: bool = False,
250        args: str | None = None,
251    ) -> list[str]:
252        qemu = self._config.get_target_emu(['executable'])
253        if not qemu:
254            qemu = self._config.get_emu(['executable'], optional=False)
255        machine = self._config.get_target_emu(['machine'], optional=False)
256
257        self._start_cmd = [f'{qemu}', '-nographic', '-nodefaults']
258        self._start_cmd.extend(['-display', 'none'])
259        self._start_cmd.extend(['-machine', f'{machine}'])
260
261        try:
262            self._configure_default_channels()
263            self._configure_chardev_channels()
264        except KeyError as err:
265            raise ConfigError(self._config.path, str(err))
266
267        if pause:
268            self._start_cmd.append('-S')
269        if debug:
270            self._start_cmd.extend(['-d', 'guest_errors'])
271
272        if file:
273            self._start_cmd.extend(['-kernel', str(file)])
274
275        self._start_cmd.extend(self._config.get_emu(['args'], entry_type=list))
276        self._start_cmd.extend(
277            self._config.get_target_emu(['args'], entry_type=list)
278        )
279        if args:
280            self._start_cmd.extend(args.split(' '))
281
282        # initial/bootstrap qmp connection
283        self._qmp_init_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
284        self._qmp_init_sock.bind(('localhost', 0))
285        port = self._qmp_init_sock.getsockname()[1]
286        self._qmp_init_sock.listen()
287        self._qmp_init_sock.settimeout(30)
288        self._start_cmd.extend(['-qmp', f'tcp:localhost:{port}'])
289
290        return self._start_cmd
291
292    def _post_start(self) -> None:
293        assert self._qmp_init_sock is not None
294        try:
295            conn, _ = self._qmp_init_sock.accept()
296        except (KeyboardInterrupt, socket.timeout):
297            raise RunError('qemu', 'qmp connection failed')
298        self._qmp_init_sock.close()
299        try:
300            qmp = QmpClient(conn.makefile('rwb', buffering=0))
301        except json.decoder.JSONDecodeError:
302            raise RunError('qemu', 'qmp handshake failed')
303        conn.close()
304
305        resp = qmp.request('query-chardev')
306        for chardev in resp:
307            label = chardev['label']
308            name = self._chardevs_id_to_name.get(label)
309            if name:
310                self._set_qemu_channel(name, chardev['filename'])
311
312    def _get_connector(self, wdir: Path) -> Connector:
313        return QemuConnector(wdir)
314
315
316class QemuConnector(Connector):
317    """qemu implementation for the emulator specific connector methods."""
318
319    def __init__(self, wdir: Path) -> None:
320        super().__init__(wdir)
321        if self.get_emu() != 'qemu':
322            raise WrongEmulator('qemu', self.get_emu())
323        self._qmp: QmpClient | None = None
324
325    def _q(self) -> QmpClient:
326        if not self._qmp:
327            self._qmp = QmpClient(self.get_channel_stream('qmp'))
328        return self._qmp
329
330    def reset(self) -> None:
331        self._q().request('system_reset')
332
333    def cont(self) -> None:
334        self._q().request('cont')
335
336    def set_property(self, path: str, prop: str, value: Any) -> None:
337        args = {
338            'path': '{}'.format(path),
339            'property': prop,
340            'value': value,
341        }
342        self._q().request('qom-set', args)
343
344    def get_property(self, path: str, prop: str) -> Any:
345        args = {
346            'path': '{}'.format(path),
347            'property': prop,
348        }
349        return self._q().request('qom-get', args)
350
351    def list_properties(self, path: str) -> list[Any]:
352        args = {
353            'path': '{}'.format(path),
354        }
355        return self._q().request('qom-list', args)
356