1# Copyright 2022 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Common methods and variables used by Cr-Fuchsia testing infrastructure.""" 5 6import ipaddress 7import json 8import logging 9import os 10import signal 11import shutil 12import socket 13import subprocess 14import sys 15import time 16 17from argparse import ArgumentParser 18from typing import Iterable, List, Optional, Tuple 19from dataclasses import dataclass 20 21from compatible_utils import get_ssh_prefix, get_host_arch 22 23 24def _find_src_root() -> str: 25 """Find the root of the src folder.""" 26 if os.environ.get('SRC_ROOT'): 27 return os.environ['SRC_ROOT'] 28 return os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, 29 os.pardir) 30 31 32# The absolute path of the root folder to work on. It may not always be the 33# src folder since there may not be source code at all, but it's expected to 34# have folders like third_party/fuchsia-sdk in it. 35DIR_SRC_ROOT = os.path.abspath(_find_src_root()) 36 37 38def _find_fuchsia_images_root() -> str: 39 """Define the root of the fuchsia images.""" 40 if os.environ.get('FUCHSIA_IMAGES_ROOT'): 41 return os.environ['FUCHSIA_IMAGES_ROOT'] 42 return os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-sdk', 'images') 43 44 45IMAGES_ROOT = os.path.abspath(_find_fuchsia_images_root()) 46 47 48def _find_fuchsia_internal_images_root() -> str: 49 """Define the root of the fuchsia images.""" 50 if os.environ.get('FUCHSIA_INTERNAL_IMAGES_ROOT'): 51 return os.environ['FUCHSIA_INTERNAL_IMAGES_ROOT'] 52 return IMAGES_ROOT + '-internal' 53 54 55INTERNAL_IMAGES_ROOT = os.path.abspath(_find_fuchsia_internal_images_root()) 56 57REPO_ALIAS = 'fuchsia.com' 58 59 60def _find_fuchsia_sdk_root() -> str: 61 """Define the root of the fuchsia sdk.""" 62 if os.environ.get('FUCHSIA_SDK_ROOT'): 63 return os.environ['FUCHSIA_SDK_ROOT'] 64 return os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-sdk', 'sdk') 65 66 67SDK_ROOT = os.path.abspath(_find_fuchsia_sdk_root()) 68 69 70def _find_fuchsia_gn_sdk_root() -> str: 71 """Define the root of the fuchsia sdk.""" 72 if os.environ.get('FUCHSIA_GN_SDK_ROOT'): 73 return os.environ['FUCHSIA_GN_SDK_ROOT'] 74 return os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-gn-sdk', 'src') 75 76 77GN_SDK_ROOT = os.path.abspath(_find_fuchsia_gn_sdk_root()) 78 79SDK_TOOLS_DIR = os.path.join(SDK_ROOT, 'tools', get_host_arch()) 80_FFX_TOOL = os.path.join(SDK_TOOLS_DIR, 'ffx') 81_FFX_ISOLATE_DIR = 'FFX_ISOLATE_DIR' 82 83 84def set_ffx_isolate_dir(isolate_dir: str) -> None: 85 """Sets the global environment so the following ffx calls will have the 86 isolate dir being carried.""" 87 assert not has_ffx_isolate_dir(), 'The isolate dir is already set.' 88 os.environ[_FFX_ISOLATE_DIR] = isolate_dir 89 90 91def get_ffx_isolate_dir() -> str: 92 """Returns the global environment of the isolate dir of ffx. This function 93 should only be called after set_ffx_isolate_dir.""" 94 return os.environ[_FFX_ISOLATE_DIR] 95 96 97def has_ffx_isolate_dir() -> bool: 98 """Returns whether the isolate dir of ffx is set.""" 99 return _FFX_ISOLATE_DIR in os.environ 100 101 102def get_hash_from_sdk(): 103 """Retrieve version info from the SDK.""" 104 105 version_file = os.path.join(SDK_ROOT, 'meta', 'manifest.json') 106 assert os.path.exists(version_file), \ 107 'Could not detect version file. Make sure the SDK is downloaded.' 108 with open(version_file, 'r') as f: 109 return json.load(f)['id'] 110 111 112def get_host_tool_path(tool): 113 """Get a tool from the SDK.""" 114 115 return os.path.join(SDK_TOOLS_DIR, tool) 116 117 118def get_host_os(): 119 """Get host operating system.""" 120 121 host_platform = sys.platform 122 if host_platform.startswith('linux'): 123 return 'linux' 124 if host_platform.startswith('darwin'): 125 return 'mac' 126 raise Exception('Unsupported host platform: %s' % host_platform) 127 128 129def make_clean_directory(directory_name): 130 """If the directory exists, delete it and remake with no contents.""" 131 132 if os.path.exists(directory_name): 133 shutil.rmtree(directory_name) 134 os.makedirs(directory_name) 135 136 137def _get_daemon_status(): 138 """Determines daemon status via `ffx daemon socket`. 139 140 Returns: 141 dict of status of the socket. Status will have a key Running or 142 NotRunning to indicate if the daemon is running. 143 """ 144 status = json.loads( 145 run_ffx_command(cmd=('daemon', 'socket'), 146 capture_output=True, 147 json_out=True).stdout.strip()) 148 return status.get('pid', {}).get('status', {'NotRunning': True}) 149 150 151def is_daemon_running() -> bool: 152 """Returns if the daemon is running.""" 153 return 'Running' in _get_daemon_status() 154 155 156def _wait_for_daemon(start=True, timeout_seconds=100): 157 """Waits for daemon to reach desired state in a polling loop. 158 159 Sleeps for 5s between polls. 160 161 Args: 162 start: bool. Indicates to wait for daemon to start up. If False, 163 indicates waiting for daemon to die. 164 timeout_seconds: int. Number of seconds to wait for the daemon to reach 165 the desired status. 166 Raises: 167 TimeoutError: if the daemon does not reach the desired state in time. 168 """ 169 wanted_status = 'start' if start else 'stop' 170 sleep_period_seconds = 5 171 attempts = int(timeout_seconds / sleep_period_seconds) 172 for i in range(attempts): 173 if is_daemon_running() == start: 174 return 175 if i != attempts: 176 logging.info('Waiting for daemon to %s...', wanted_status) 177 time.sleep(sleep_period_seconds) 178 179 raise TimeoutError(f'Daemon did not {wanted_status} in time.') 180 181 182# The following two functions are the temporary work around before 183# https://fxbug.dev/92296 and https://fxbug.dev/125873 are being fixed. 184def start_ffx_daemon(): 185 """Starts the ffx daemon by using doctor --restart-daemon since daemon start 186 blocks the current shell. 187 188 Note, doctor --restart-daemon usually fails since the timeout in ffx is 189 short and won't be sufficient to wait for the daemon to really start. 190 191 Also, doctor --restart-daemon always restarts the daemon, so this function 192 should be used with caution unless it's really needed to "restart" the 193 daemon by explicitly calling stop daemon first. 194 """ 195 assert not is_daemon_running(), "Call stop_ffx_daemon first." 196 run_ffx_command(cmd=('doctor', '--restart-daemon'), check=False) 197 _wait_for_daemon(start=True) 198 199 200def stop_ffx_daemon(): 201 """Stops the ffx daemon""" 202 run_ffx_command(cmd=('daemon', 'stop', '-t', '10000')) 203 _wait_for_daemon(start=False) 204 205 206def run_ffx_command(check: bool = True, 207 capture_output: Optional[bool] = None, 208 timeout: Optional[int] = None, 209 **kwargs) -> subprocess.CompletedProcess: 210 """Runs `ffx` with the given arguments, waiting for it to exit. 211 212 ** 213 The arguments below are named after |subprocess.run| arguments. They are 214 overloaded to avoid them from being forwarded to |subprocess.Popen|. 215 ** 216 See run_continuous_ffx_command for additional arguments. 217 Args: 218 check: If True, CalledProcessError is raised if ffx returns a non-zero 219 exit code. 220 capture_output: Whether to capture both stdout/stderr. 221 timeout: Optional timeout (in seconds). Throws TimeoutError if process 222 does not complete in timeout period. 223 Returns: 224 A CompletedProcess instance 225 Raises: 226 CalledProcessError if |check| is true. 227 """ 228 if capture_output: 229 kwargs['stdout'] = subprocess.PIPE 230 kwargs['stderr'] = subprocess.PIPE 231 proc = None 232 try: 233 proc = run_continuous_ffx_command(**kwargs) 234 stdout, stderr = proc.communicate(input=kwargs.get('stdin'), 235 timeout=timeout) 236 completed_proc = subprocess.CompletedProcess( 237 args=proc.args, 238 returncode=proc.returncode, 239 stdout=stdout, 240 stderr=stderr) 241 if check: 242 completed_proc.check_returncode() 243 return completed_proc 244 except subprocess.CalledProcessError as cpe: 245 logging.error('%s %s failed with returncode %s.', 246 os.path.relpath(_FFX_TOOL), 247 subprocess.list2cmdline(proc.args[1:]), cpe.returncode) 248 if cpe.stdout: 249 logging.error('stdout of the command: %s', cpe.stdout) 250 if cpe.stderr: 251 logging.error('stderr or the command: %s', cpe.stderr) 252 raise 253 254 255def run_continuous_ffx_command(cmd: Iterable[str], 256 target_id: Optional[str] = None, 257 configs: Optional[List[str]] = None, 258 json_out: bool = False, 259 encoding: Optional[str] = 'utf-8', 260 **kwargs) -> subprocess.Popen: 261 """Runs `ffx` with the given arguments, returning immediately. 262 263 Args: 264 cmd: A sequence of arguments to ffx. 265 target_id: Whether to execute the command for a specific target. The 266 target_id could be in the form of a nodename or an address. 267 configs: A list of configs to be applied to the current command. 268 json_out: Have command output returned as JSON. Must be parsed by 269 caller. 270 encoding: Optional, desired encoding for output/stderr pipes. 271 Returns: 272 A subprocess.Popen instance 273 """ 274 275 ffx_cmd = [_FFX_TOOL] 276 if json_out: 277 ffx_cmd.extend(('--machine', 'json')) 278 if target_id: 279 ffx_cmd.extend(('--target', target_id)) 280 if configs: 281 for config in configs: 282 ffx_cmd.extend(('--config', config)) 283 ffx_cmd.extend(cmd) 284 285 return subprocess.Popen(ffx_cmd, encoding=encoding, **kwargs) 286 287 288def read_package_paths(out_dir: str, pkg_name: str) -> List[str]: 289 """ 290 Returns: 291 A list of the absolute path to all FAR files the package depends on. 292 """ 293 with open( 294 os.path.join(DIR_SRC_ROOT, out_dir, 'gen', 'package_metadata', 295 f'{pkg_name}.meta')) as meta_file: 296 data = json.load(meta_file) 297 packages = [] 298 for package in data['packages']: 299 packages.append(os.path.join(DIR_SRC_ROOT, out_dir, package)) 300 return packages 301 302 303def register_common_args(parser: ArgumentParser) -> None: 304 """Register commonly used arguments.""" 305 common_args = parser.add_argument_group('common', 'common arguments') 306 common_args.add_argument( 307 '--out-dir', 308 '-C', 309 type=os.path.realpath, 310 help='Path to the directory in which build files are located. ') 311 312 313def register_device_args(parser: ArgumentParser) -> None: 314 """Register device arguments.""" 315 device_args = parser.add_argument_group('device', 'device arguments') 316 device_args.add_argument('--target-id', 317 default=os.environ.get('FUCHSIA_NODENAME'), 318 help=('Specify the target device. This could be ' 319 'a node-name (e.g. fuchsia-emulator) or an ' 320 'an ip address along with an optional port ' 321 '(e.g. [fe80::e1c4:fd22:5ee5:878e]:22222, ' 322 '1.2.3.4, 1.2.3.4:33333). If unspecified, ' 323 'the default target in ffx will be used.')) 324 325 326def register_log_args(parser: ArgumentParser) -> None: 327 """Register commonly used arguments.""" 328 329 log_args = parser.add_argument_group('logging', 'logging arguments') 330 log_args.add_argument('--logs-dir', 331 type=os.path.realpath, 332 help=('Directory to write logs to.')) 333 334 335def get_component_uri(package: str) -> str: 336 """Retrieve the uri for a package.""" 337 # If the input is a full package already, do nothing 338 if package.startswith('fuchsia-pkg://'): 339 return package 340 return f'fuchsia-pkg://{REPO_ALIAS}/{package}#meta/{package}.cm' 341 342 343def ssh_run(cmd: List[str], 344 target_id: Optional[str], 345 check=True, 346 **kwargs) -> subprocess.CompletedProcess: 347 """Runs a command on the |target_id| via ssh.""" 348 ssh_prefix = get_ssh_prefix(get_ssh_address(target_id)) 349 return subprocess.run(ssh_prefix + ['--'] + cmd, check=check, **kwargs) 350 351 352def resolve_packages(packages: List[str], target_id: Optional[str]) -> None: 353 """Ensure that all |packages| are installed on a device.""" 354 355 ssh_run(['pkgctl', 'gc'], target_id, check=False) 356 357 def _retry_command(cmd: List[str], 358 retries: int = 2, 359 **kwargs) -> Optional[subprocess.CompletedProcess]: 360 """Helper function for retrying a subprocess.run command.""" 361 362 for i in range(retries): 363 if i == retries - 1: 364 proc = ssh_run(cmd, **kwargs, check=True) 365 return proc 366 proc = ssh_run(cmd, **kwargs, check=False) 367 if proc.returncode == 0: 368 return proc 369 time.sleep(3) 370 return None 371 372 for package in packages: 373 resolve_cmd = [ 374 'pkgctl', 'resolve', 375 'fuchsia-pkg://%s/%s' % (REPO_ALIAS, package) 376 ] 377 _retry_command(resolve_cmd, target_id=target_id) 378 379 380def get_ip_address(target_id: Optional[str], ipv4_only: bool = False): 381 """Determines address of the given target; returns the value from 382 ipaddress.ip_address.""" 383 return ipaddress.ip_address(get_ssh_address(target_id, ipv4_only)[0]) 384 385 386def get_ssh_address(target_id: Optional[str], 387 ipv4_only: bool = False) -> Tuple[str, int]: 388 """Determines SSH address for given target.""" 389 cmd = ['target', 'list'] 390 if ipv4_only: 391 cmd.append('--no-ipv6') 392 if target_id: 393 # target list does not respect -t / --target flag. 394 cmd.append(target_id) 395 target = json.loads( 396 run_ffx_command(cmd=cmd, json_out=True, 397 capture_output=True).stdout.strip()) 398 addr = target[0]['addresses'][0] 399 ssh_port = int(addr['ssh_port']) 400 if ssh_port == 0: 401 # Returning an unset ssh_port means the default port 22. 402 ssh_port = 22 403 return (addr['ip'], ssh_port) 404 405 406def find_in_dir(target_name: str, parent_dir: str) -> Optional[str]: 407 """Finds path in SDK. 408 409 Args: 410 target_name: Name of target to find, as a string. 411 parent_dir: Directory to start search in. 412 413 Returns: 414 Full path to the target, None if not found. 415 """ 416 # Doesn't make sense to look for a full path. Only extract the basename. 417 target_name = os.path.basename(target_name) 418 for root, dirs, _ in os.walk(parent_dir): 419 if target_name in dirs: 420 return os.path.abspath(os.path.join(root, target_name)) 421 422 return None 423 424 425def find_image_in_sdk(product_name: str) -> Optional[str]: 426 """Finds image dir in SDK for product given. 427 428 Args: 429 product_name: Name of product's image directory to find. 430 431 Returns: 432 Full path to the target, None if not found. 433 """ 434 top_image_dir = os.path.join(SDK_ROOT, os.pardir, 'images') 435 path = find_in_dir(product_name, parent_dir=top_image_dir) 436 if path: 437 return find_in_dir('images', parent_dir=path) 438 return path 439 440 441def catch_sigterm() -> None: 442 """Catches the kill signal and allows the process to exit cleanly.""" 443 def _sigterm_handler(*_): 444 sys.exit(0) 445 446 signal.signal(signal.SIGTERM, _sigterm_handler) 447 448 449def wait_for_sigterm(extra_msg: str = '') -> None: 450 """ 451 Spin-wait for either ctrl+c or sigterm. Caller can use try-finally 452 statement to perform extra cleanup. 453 454 Args: 455 extra_msg: The extra message to be logged. 456 """ 457 try: 458 while True: 459 # We do expect receiving either ctrl+c or sigterm, so this line 460 # literally means sleep forever. 461 time.sleep(10000) 462 except KeyboardInterrupt: 463 logging.info('Ctrl-C received; %s', extra_msg) 464 except SystemExit: 465 logging.info('SIGTERM received; %s', extra_msg) 466 467 468@dataclass 469class BuildInfo: 470 """A structure replica of the output of build section in `ffx target show`. 471 """ 472 version: Optional[str] = None 473 product: Optional[str] = None 474 board: Optional[str] = None 475 commit: Optional[str] = None 476 477 478def get_build_info(target: Optional[str] = None) -> Optional[BuildInfo]: 479 """Retrieves build info from the device. 480 481 Returns: 482 A BuildInfo struct, or None if anything goes wrong. 483 Any field in BuildInfo can be None to indicate the missing of the field. 484 """ 485 info_cmd = run_ffx_command(cmd=('--machine', 'json', 'target', 'show'), 486 target_id=target, 487 capture_output=True, 488 check=False) 489 # If the information was not retrieved, return empty strings to indicate 490 # unknown system info. 491 if info_cmd.returncode != 0: 492 logging.error('ffx target show returns %d', info_cmd.returncode) 493 return None 494 try: 495 info_json = json.loads(info_cmd.stdout.strip()) 496 except json.decoder.JSONDecodeError as error: 497 logging.error('Unexpected json string: %s, exception: %s', 498 info_cmd.stdout, error) 499 return None 500 if isinstance(info_json, dict) and 'build' in info_json and isinstance( 501 info_json['build'], dict): 502 return BuildInfo(**info_json['build']) 503 return None 504 505 506def get_system_info(target: Optional[str] = None) -> Tuple[str, str]: 507 """Retrieves installed OS version from the device. 508 509 Returns: 510 Tuple of strings, containing {product, version number), or a pair of 511 empty strings to indicate an error. 512 """ 513 build_info = get_build_info(target) 514 if not build_info: 515 return ('', '') 516 517 return (build_info.product or '', build_info.version or '') 518 519 520def get_free_local_port() -> int: 521 """Returns an ipv4 port available locally. It does not reserve the port and 522 may cause race condition. Copied from catapult 523 https://crsrc.org/c/third_party/catapult/telemetry/telemetry/core/util.py;drc=e3f9ae73db5135ad998108113af7ef82a47efc51;l=61""" 524 # AF_INET restricts port to IPv4 addresses. 525 # SOCK_STREAM means that it is a TCP socket. 526 tmp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 527 # Setting SOL_SOCKET + SO_REUSEADDR to 1 allows the reuse of local 528 # addresses, this is so sockets do not fail to bind for being in the 529 # CLOSE_WAIT state. 530 tmp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 531 tmp.bind(('', 0)) 532 port = tmp.getsockname()[1] 533 tmp.close() 534 535 return port 536