1# Copyright 2023 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Pigweed qemu frontend.""" 15 16 17import io 18import json 19import logging 20import os 21import re 22import socket 23import sys 24 25from pathlib import Path 26from typing import Any 27 28from pw_emu.core import ( 29 ConfigError, 30 Connector, 31 Launcher, 32 Error, 33 InvalidChannelType, 34 RunError, 35 WrongEmulator, 36) 37 38_QMP_LOG = logging.getLogger('pw_qemu.qemu.qmp') 39 40 41class QmpError(Error): 42 """Exception for QMP errors.""" 43 44 def __init__(self, err: str): 45 super().__init__(err) 46 47 48class QmpClient: 49 """Send qmp requests the server.""" 50 51 def __init__(self, stream: io.RawIOBase): 52 self._stream = stream 53 54 # Perform the QMP "capabilities negotiation" handshake. 55 # https://wiki.qemu.org/Documentation/QMP#Capabilities_Negotiation 56 # 57 # When the QMP connection is established, QEMU first sends a greeting 58 # message with its version and capabilities. Then the client sends 59 # 'qmp_capabilities' to exit capabilities negotiation mode. The result 60 # is an empty 'return'. 61 # 62 # self.request() will consume both the initial greeting and the 63 # subsequent 'return' response. 64 self.request('qmp_capabilities') 65 66 def request(self, cmd: str, args: dict[str, Any] | None = None) -> Any: 67 """Issue a command using the qmp interface. 68 69 Returns a map with the response or None if there is no 70 response for this command. 71 72 """ 73 74 req: dict[str, Any] = {'execute': cmd} 75 if args: 76 req['arguments'] = args 77 _QMP_LOG.debug(' -> {json.dumps(cmd)}') 78 self._stream.write(json.dumps(req).encode('utf-8')) 79 while True: 80 line = self._stream.readline() 81 _QMP_LOG.debug(' <- {line}') 82 resp = json.loads(line) 83 if 'error' in resp.keys(): 84 raise QmpError(resp['error']['desc']) 85 if 'return' in resp.keys(): 86 return resp['return'] 87 88 89class QemuLauncher(Launcher): 90 """Start a new qemu process for a given target and config file.""" 91 92 def __init__(self, config_path: Path | None = None): 93 super().__init__('qemu', config_path) 94 self._start_cmd: list[str] = [] 95 self._chardevs_id_to_name = { 96 'compat_monitor0': 'qmp', 97 'compat_monitor1': 'monitor', 98 'gdb': 'gdb', 99 } 100 self._chardevs: dict[str, Any] = {} 101 self._qmp_init_sock: socket.socket | None = None 102 103 def _set_qemu_channel_tcp(self, name: str, filename: str) -> None: 104 """Parse a TCP chardev and return (host, port) tuple. 105 106 Format for the tcp chardev backend: 107 108 [disconnected|isconnected:]tcp:<host>:<port>[,<options>][ <-> 109 <host>:<port>] 110 111 """ 112 113 host_port: Any = filename.split(',')[0] 114 if host_port.split(':')[0] != 'tcp': 115 host_port = host_port.split(':')[2:] 116 else: 117 host_port = host_port.split(':')[1:] 118 # IPV6 hosts have : 119 host = ':'.join(host_port[0:-1]) 120 port = host_port[-1] 121 self._handles.add_channel_tcp(name, host, int(port)) 122 123 def _set_qemu_channel_pty(self, name: str, filename: str) -> None: 124 """Parse a PTY chardev and return the path. 125 126 Format for the pty chardev backend: pty:<path> 127 """ 128 129 path = filename.split(':')[1] 130 131 self._handles.add_channel_pty(name, path) 132 133 if os.path.lexists(self._path(name)): 134 os.unlink(self._path(name)) 135 os.symlink(path, self._path(name)) 136 137 def _set_qemu_channel(self, name: str, filename: str) -> None: 138 """Setups a chardev channel type.""" 139 140 if filename.startswith('pty'): 141 self._set_qemu_channel_pty(name, filename) 142 elif 'tcp' in filename: 143 self._set_qemu_channel_tcp(name, filename) 144 145 def _get_channels_config(self, chan: str, opt: str) -> Any: 146 val = self._config.get_emu(['channels', chan, opt]) 147 if val is not None: 148 return val 149 return self._config.get_emu(['channels', opt]) 150 151 def _configure_default_channels(self) -> None: 152 """Configure the default channels.""" 153 154 # keep qmp first so that it gets the compat_monitor0 label 155 for chan in ['qmp', 'monitor', 'gdb']: 156 chan_type = self._get_channels_config(chan, 'type') 157 if not chan_type: 158 chan_type = 'tcp' 159 if chan_type == 'pty': 160 if sys.platform == 'win32': 161 raise InvalidChannelType(chan_type) 162 backend = 'pty' 163 elif chan_type == 'tcp': 164 backend = 'tcp:localhost:0,ipv4=on,server=on,wait=off' 165 else: 166 raise InvalidChannelType(chan_type) 167 self._start_cmd.extend([f'-{chan}', backend]) 168 169 def _get_chardev_config(self, name: str, opt: str) -> Any: 170 val = self._config.get_target_emu(['channels', 'chardevs', name, opt]) 171 if not val: 172 val = self._get_channels_config(name, opt) 173 return val 174 175 def _configure_serial_channels(self, serials: dict) -> None: 176 """Create "standard" serial devices. 177 178 We can't control the serial allocation number for "standard" 179 -serial devices so fill the slots for the not needed serials 180 with null chardevs e.g. for serial3, serial1 generate the 181 following arguments, in this order: 182 183 -serial null -serial {backend} -serial null - serial {backend} 184 185 """ 186 187 min_ser = sys.maxsize 188 max_ser = -1 189 for serial in serials.keys(): 190 num = int(serial.split('serial')[1]) 191 if num < min_ser: 192 min_ser = num 193 if num > max_ser: 194 max_ser = num 195 for i in range(min_ser, max_ser + 1): 196 if serials.get(f'serial{i}'): 197 name = serials[f'serial{i}'] 198 chan_type = self._get_chardev_config(name, 'type') 199 if not chan_type: 200 chan_type = 'tcp' 201 if chan_type == 'pty': 202 backend = 'pty' 203 elif chan_type == 'tcp': 204 backend = 'tcp:localhost:0,ipv4=on,server=on,wait=off' 205 else: 206 raise InvalidChannelType(chan_type) 207 self._start_cmd.extend(['-serial', backend]) 208 else: 209 self._start_cmd.extend(['-serial', 'null']) 210 211 def _configure_chardev_channels(self) -> None: 212 """Configure chardevs.""" 213 214 self._chardevs = self._config.get_target_emu( 215 ['channels', 'chardevs'], True, dict 216 ) 217 218 serials = {} 219 for name, config in self._chardevs.items(): 220 chardev_id = config['id'] 221 self._chardevs_id_to_name[chardev_id] = name 222 223 chardev_type = self._get_chardev_config(name, 'type') 224 if chardev_type is None: 225 chardev_type = 'tcp' 226 227 if chardev_type == 'pty': 228 backend = 'pty' 229 elif chardev_type == 'tcp': 230 backend = 'socket,host=localhost,port=0,server=on,wait=off' 231 else: 232 raise InvalidChannelType(chardev_type) 233 234 # serials are configured differently 235 if re.search(r'serial[0-9]*', chardev_id): 236 serials[chardev_id] = name 237 else: 238 self._start_cmd.extend( 239 ['-chardev', f'{backend},id={chardev_id}'] 240 ) 241 242 self._configure_serial_channels(serials) 243 244 def _pre_start( 245 self, 246 target: str, 247 file: Path | None = None, 248 pause: bool = False, 249 debug: bool = False, 250 args: str | None = None, 251 ) -> list[str]: 252 qemu = self._config.get_target_emu(['executable']) 253 if not qemu: 254 qemu = self._config.get_emu(['executable'], optional=False) 255 machine = self._config.get_target_emu(['machine'], optional=False) 256 257 self._start_cmd = [f'{qemu}', '-nographic', '-nodefaults'] 258 self._start_cmd.extend(['-display', 'none']) 259 self._start_cmd.extend(['-machine', f'{machine}']) 260 261 try: 262 self._configure_default_channels() 263 self._configure_chardev_channels() 264 except KeyError as err: 265 raise ConfigError(self._config.path, str(err)) 266 267 if pause: 268 self._start_cmd.append('-S') 269 if debug: 270 self._start_cmd.extend(['-d', 'guest_errors']) 271 272 if file: 273 self._start_cmd.extend(['-kernel', str(file)]) 274 275 self._start_cmd.extend(self._config.get_emu(['args'], entry_type=list)) 276 self._start_cmd.extend( 277 self._config.get_target_emu(['args'], entry_type=list) 278 ) 279 if args: 280 self._start_cmd.extend(args.split(' ')) 281 282 # initial/bootstrap qmp connection 283 self._qmp_init_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 284 self._qmp_init_sock.bind(('localhost', 0)) 285 port = self._qmp_init_sock.getsockname()[1] 286 self._qmp_init_sock.listen() 287 self._qmp_init_sock.settimeout(30) 288 self._start_cmd.extend(['-qmp', f'tcp:localhost:{port}']) 289 290 return self._start_cmd 291 292 def _post_start(self) -> None: 293 assert self._qmp_init_sock is not None 294 try: 295 conn, _ = self._qmp_init_sock.accept() 296 except (KeyboardInterrupt, socket.timeout): 297 raise RunError('qemu', 'qmp connection failed') 298 self._qmp_init_sock.close() 299 try: 300 qmp = QmpClient(conn.makefile('rwb', buffering=0)) 301 except json.decoder.JSONDecodeError: 302 raise RunError('qemu', 'qmp handshake failed') 303 conn.close() 304 305 resp = qmp.request('query-chardev') 306 for chardev in resp: 307 label = chardev['label'] 308 name = self._chardevs_id_to_name.get(label) 309 if name: 310 self._set_qemu_channel(name, chardev['filename']) 311 312 def _get_connector(self, wdir: Path) -> Connector: 313 return QemuConnector(wdir) 314 315 316class QemuConnector(Connector): 317 """qemu implementation for the emulator specific connector methods.""" 318 319 def __init__(self, wdir: Path) -> None: 320 super().__init__(wdir) 321 if self.get_emu() != 'qemu': 322 raise WrongEmulator('qemu', self.get_emu()) 323 self._qmp: QmpClient | None = None 324 325 def _q(self) -> QmpClient: 326 if not self._qmp: 327 self._qmp = QmpClient(self.get_channel_stream('qmp')) 328 return self._qmp 329 330 def reset(self) -> None: 331 self._q().request('system_reset') 332 333 def cont(self) -> None: 334 self._q().request('cont') 335 336 def set_property(self, path: str, prop: str, value: Any) -> None: 337 args = { 338 'path': '{}'.format(path), 339 'property': prop, 340 'value': value, 341 } 342 self._q().request('qom-set', args) 343 344 def get_property(self, path: str, prop: str) -> Any: 345 args = { 346 'path': '{}'.format(path), 347 'property': prop, 348 } 349 return self._q().request('qom-get', args) 350 351 def list_properties(self, path: str) -> list[Any]: 352 args = { 353 'path': '{}'.format(path), 354 } 355 return self._q().request('qom-list', args) 356