1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Tests the connection of a target.""" 5 6import logging 7import time 8 9from typing import Optional 10 11from boot_device import boot_device, BootMode 12from common import run_ffx_command 13 14 15def test_connection(target_id: Optional[str], wait_sec: int = 60) -> None: 16 """Runs echo tests to verify that the device can be connected to. 17 18 Devices may not be connectable right after being discovered by ffx, e.g. 19 after a `ffx target wait`, so this function retries up to |wait_sec| before 20 throwing an exception. 21 """ 22 start_sec = time.time() 23 while time.time() - start_sec < wait_sec: 24 if run_ffx_command(cmd=('target', 'echo'), 25 target_id=target_id, 26 check=False).returncode == 0: 27 return 28 time.sleep(10) 29 30 run_ffx_command(cmd=('target', 'echo'), target_id=target_id) 31 32 33def test_device_connection(target_id: Optional[str]) -> None: 34 """Runs test_connection against the target_id and restarts the device if 35 it cannot be connected.""" 36 start_sec = time.time() 37 while time.time() - start_sec < 1800: 38 # pylint: disable=bare-except 39 # First, test_connection with ffx target echo. 40 try: 41 test_connection(target_id=target_id, wait_sec=600) 42 return 43 except: 44 # If anything wrong, reboot the device and try again. 45 try: 46 boot_device(target_id, BootMode.REGULAR, must_boot=True) 47 except: 48 # If unfortunately, the reboot failed, it's still worth 49 # continuing the test rather than failing here. 50 pass 51 logging.warning( 52 run_ffx_command(cmd=('target', 'wait'), 53 target_id=target_id, 54 check=False, 55 capture_output=True).stdout) 56