1 2 3# Copyright (C) 2024 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""Util for aware test.""" 17import base64 18import datetime 19import json 20import logging 21import time 22from typing import Any, Callable, Dict, List, Optional 23 24from aware import constants 25 26from mobly import asserts 27from mobly.controllers import android_device 28from mobly.controllers.android_device_lib import callback_handler_v2 29from mobly.snippet import errors 30 31 32_WAIT_DOZE_MODE_IN_SEC = 5 33_TIMEOUT_INTERVAL_IN_SEC = 1 34_WAIT_WIFI_STATE_TIME_OUT = datetime.timedelta(seconds=10) 35_WAIT_TIME_SEC = 3 36_CONTROL_WIFI_TIMEOUT_SEC = 10 37 38 39def callback_no_response( 40 callback: callback_handler_v2.CallbackHandlerV2, 41 event_name: str, 42 timeout: int = _WAIT_WIFI_STATE_TIME_OUT.total_seconds(), 43 use_callbackid: bool = False, 44 ): 45 """Makes a callback call and expects no response within a given timeout. 46 47 Args: 48 callback: Snippet callback object. 49 event_name: event name to wait. 50 timeout: Timeout in second. 51 use_callbackid: Using callbackid in eventname, default False. 52 53 Raises: 54 CallBackError: if receive response. 55 """ 56 if use_callbackid: 57 event_name += callback.callback_id 58 try: 59 data = callback.waitAndGet(event_name=event_name, timeout=timeout) 60 raise CallBackError(f' Unexpected response {data}') 61 except errors.CallbackHandlerTimeoutError: 62 return 63 64 65class CallBackError(Exception): 66 """Error raised when there is a problem to get callback response.""" 67 68def control_wifi( 69 ad: android_device.AndroidDevice, 70 wifi_state: bool, 71): 72 """Control Android Wi-Fi status. 73 74 Args: 75 ad: Android test device. 76 wifi_state: True if or Wi-Fi on False if Wi-Fi off. 77 timeout_seconds: Maximum wait time (seconds), default is 10 seconds. 78 79 Raises: 80 TimeoutError: If the Wi-Fi state cannot be set within the timeout (in seconds). 81 """ 82 if _check_wifi_status(ad) == wifi_state: 83 return 84 if wifi_state: 85 ad.adb.shell("svc wifi enable") 86 else: 87 ad.adb.shell("svc wifi disable") 88 start_time = time.time() 89 while True: 90 if _check_wifi_status(ad) == wifi_state: 91 return 92 # Check for timeout 93 if time.time() - start_time > _CONTROL_WIFI_TIMEOUT_SEC: 94 raise TimeoutError( 95 f"Failed to set Wi-Fi state to {wifi_state} within {_CONTROL_WIFI_TIMEOUT_SEC} seconds." 96 ) 97 98 time.sleep(1) # Wait for a second before checking again 99 100def _check_wifi_status(ad: android_device.AndroidDevice): 101 """Check Android Wi-Fi status. 102 103 Args: 104 ad: android device object. 105 106 Returns: 107 True if wifi on, False if wifi off. 108 """ 109 cmd = ad.adb.shell("cmd wifi status").decode("utf-8").strip() 110 first_line = cmd.split("\n")[0] 111 logging.info("device wifi status: %s", first_line) 112 if "enabled" in first_line: 113 return True 114 else: 115 return False 116 117 118def set_doze_mode(ad: android_device.AndroidDevice, state: bool) -> bool: 119 """Enables/Disables Android doze mode. 120 121 Args: 122 ad: android device object. 123 state: bool, True if intent to enable Android doze mode, False otherwise. 124 125 Returns: 126 True if doze mode is enabled, False otherwise. 127 128 Raises: 129 TimeoutError: If timeout is hit. 130 """ 131 if state: 132 ad.log.info("Enables Android doze mode") 133 _dumpsys(ad, "battery unplug") 134 _dumpsys(ad, "deviceidle enable") 135 _dumpsys(ad, "deviceidle force-idle") 136 time.sleep(_WAIT_DOZE_MODE_IN_SEC) 137 else: 138 ad.log.info("Disables Android doze mode") 139 _dumpsys(ad, "deviceidle disable") 140 _dumpsys(ad, "battery reset") 141 for _ in range(10 + 1): 142 adb_shell_result = _dumpsys(ad, "deviceidle get deep") 143 logging.info("dumpsys deviceidle get deep: %s", adb_shell_result) 144 if adb_shell_result.startswith(constants.DeviceidleState.IDLE.value): 145 return True 146 if adb_shell_result.startswith(constants.DeviceidleState.ACTIVE.value): 147 return False 148 time.sleep(_TIMEOUT_INTERVAL_IN_SEC) 149 # At this point, timeout must have occurred. 150 raise errors.CallbackHandlerTimeoutError( 151 ad, "Timed out after waiting for doze_mode set to {state}" 152 ) 153 154 155def _dumpsys(ad: android_device.AndroidDevice, command: str) -> str: 156 """Dumpsys device info. 157 158 Args: 159 ad: android device object. 160 command: adb command. 161 162 Returns: 163 Android dumsys info 164 """ 165 return ad.adb.shell(f"dumpsys {command}").decode().strip() 166 167 168def check_android_os_version( 169 ad: android_device.AndroidDevice, 170 operator_func: Callable[[Any, Any], bool], 171 android_version: constants.AndroidVersion, 172 ) -> bool: 173 """Compares device's Android OS version with the given one. 174 175 Args: 176 ad: Android devices. 177 operator_func: Operator used in the comparison. 178 android_version: The given Android OS version. 179 180 Returns: 181 bool: The comparison result. 182 """ 183 device_os_version = int(ad.adb.shell("getprop ro.build.version.release")) 184 result = False 185 if isinstance(operator_func, constants.Operator): 186 return operator_func.value(device_os_version, android_version) 187 return result 188 189 190def _get_airplane_mode(ad: android_device.AndroidDevice) -> bool: 191 """Gets the airplane mode. 192 193 Args: 194 ad: android device object. 195 196 Returns: 197 True if airplane mode On, False for Off. 198 """ 199 state = ad.adb.shell("settings get global airplane_mode_on") 200 return bool(int(state)) 201 202 203def set_airplane_mode(ad: android_device.AndroidDevice, state: bool): 204 """Sets the airplane mode to the given state. 205 206 Args: 207 ad: android device object. 208 state: bool, True for Airplane mode on, False for off. 209 """ 210 ad.adb.shell( 211 ["settings", "put", "global", "airplane_mode_on", str(int(state))] 212 ) 213 ad.adb.shell([ 214 "am", 215 "broadcast", 216 "-a", 217 "android.intent.action.AIRPLANE_MODE", 218 "--ez", 219 "state", 220 str(state), 221 ]) 222 start_time = time.time() 223 while _get_airplane_mode(ad) != state: 224 time.sleep(_TIMEOUT_INTERVAL_IN_SEC) 225 asserts.assert_greater( 226 time.time() - start_time > _WAIT_TIME_SEC, 227 f"Failed to set airplane mode to: {state}", 228 ) 229 230 231def decode_list(list_of_b64_strings: List[str]) -> List[bytes]: 232 """Converts the list of b64 encoded strings to a list of bytearray. 233 234 Args: 235 list_of_b64_strings: A list of strings, each of which is b64 encoded array. 236 237 Returns: 238 A list of bytearrays. 239 """ 240 decoded_list = [] 241 for string_item in list_of_b64_strings: 242 decoded_list.append(base64.b64decode(string_item)) 243 return decoded_list 244 245 246def encode_list( 247 list_of_objects: List[Any]) -> List[str]: 248 """Converts a list of strings/bytearrays to a list of b64 encoded bytearrays. 249 250 A None object is treated as a zero-length bytearray. 251 252 Args: 253 list_of_objects: A list of strings or bytearray objects. 254 Returns: 255 A list of the same objects, converted to bytes and b64 encoded. 256 """ 257 encoded_list = [] 258 for obj in list_of_objects: 259 if obj is None: 260 obj = bytes() 261 if isinstance(obj, str): 262 encoded_list.append(base64.b64encode(bytes(obj, "utf-8")).decode("utf-8")) 263 else: 264 encoded_list.append(base64.b64encode(bytes(obj)).decode("utf-8")) 265 return encoded_list 266 267 268def construct_max_match_filter(max_size: int)-> List[bytes]: 269 """Constructs a maximum size match filter that fits into the 'max_size' bytes. 270 271 Match filters are a set of LVs (Length, Value pairs) where L is 1 byte. The 272 maximum size match filter will contain max_size/2 LVs with all Vs (except 273 possibly the last one) of 1 byte, the last V may be 2 bytes for odd max_size. 274 275 Args: 276 max_size: Maximum size of the match filter. 277 Returns: 278 A list of bytearrays. 279 """ 280 mf_list = [] 281 num_lvs = max_size // 2 282 for i in range(num_lvs - 1): 283 mf_list.append(bytes([i])) 284 if max_size % 2 == 0: 285 mf_list.append(bytes([255])) 286 else: 287 mf_list.append(bytes([254, 255])) 288 return mf_list 289 290 291def validate_forbidden_callbacks(ad: android_device.AndroidDevice, 292 limited_cb: Optional[Dict[str, int]] = None 293 ) -> None: 294 """Validate the specified callbacks have not been called more than permitted. 295 296 In addition to the input configuration also validates that forbidden callbacks 297 have never been called. 298 299 Args: 300 ad: Device on which to run. 301 limited_cb: Dictionary of CB_EV_* ids and maximum permitted calls (0 302 meaning never). 303 Raises: 304 CallBackError: If forbidden callbacks are triggered. 305 """ 306 cb_data = json.loads(ad.adb.shell("cmd wifiaware native_cb get_cb_count")) 307 if limited_cb is None: 308 limited_cb = {} 309 # Add callbacks which should never be called. 310 limited_cb["5"] = 0 311 fail = False 312 for cb_event in limited_cb.keys(): 313 if cb_event in cb_data: 314 if cb_data[cb_event] > limited_cb[cb_event]: 315 fail = True 316 ad.log.info( 317 "Callback %s observed %d times: more than permitted %d times", 318 cb_event, cb_data[cb_event], limited_cb[cb_event]) 319 break 320 if fail: 321 raise CallBackError("Forbidden callbacks observed.") 322 323 324def reset_device_parameters(ad: android_device.AndroidDevice): 325 """Reset device configurations which may have been set by tests. 326 Should be done before tests start (in case previous one was killed 327 without tearing down) and after they end (to leave device in usable 328 state). 329 330 Args: 331 ad: device to be reset 332 """ 333 ad.adb.shell("cmd wifiaware reset") 334 335 336def reset_device_statistics(ad: android_device.AndroidDevice,): 337 """Reset device statistics. 338 339 Args: 340 ad: device to be reset 341 """ 342 ad.adb.shell("cmd wifiaware native_cb get_cb_count --reset") 343 344def get_aware_capabilities(ad: android_device.AndroidDevice): 345 """Get the Wi-Fi Aware capabilities from the specified device. The 346 capabilities are a dictionary keyed by aware_const.CAP_* keys. 347 348 Args: 349 ad: the Android device 350 Returns: the capability dictionary. 351 """ 352 return json.loads(ad.adb.shell('cmd wifiaware state_mgr get_capabilities')) 353 354def create_discovery_config(service_name, 355 p_type=None, 356 s_type=None, 357 ssi=None, 358 match_filter=None, 359 match_filter_list=None, 360 ttl=0, 361 term_cb_enable=True, 362 instant_mode=None): 363 """Create a publish discovery configuration based on input parameters. 364 365 Args: 366 service_name: Service name - required 367 d_type: Discovery type (publish or subscribe constants) 368 ssi: Supplemental information - defaults to None 369 match_filter, match_filter_list: The match_filter, only one mechanism can 370 be used to specify. Defaults to None. 371 ttl: Time-to-live - defaults to 0 (i.e. non-self terminating) 372 term_cb_enable: True (default) to enable callback on termination, False 373 means that no callback is called when session terminates. 374 instant_mode: set the band to use instant communication mode, 2G or 5G 375 Returns: 376 publish discovery configuration object. 377 """ 378 config = {} 379 config[constants.SERVICE_NAME] = service_name 380 if p_type is not None: 381 config[constants.PUBLISH_TYPE] = p_type 382 if s_type is not None: 383 config[constants.SUBSCRIBE_TYPE] = s_type 384 if ssi is not None: 385 config[constants.SERVICE_SPECIFIC_INFO] = ssi 386 if match_filter is not None: 387 config[constants.MATCH_FILTER] = match_filter 388 if match_filter_list is not None: 389 config[constants.MATCH_FILTER_LIST] = match_filter_list 390 if instant_mode is not None: 391 config[constants.INSTANTMODE_ENABLE] = instant_mode 392 config[constants.TTL_SEC] = ttl 393 config[constants.TERMINATE_NOTIFICATION_ENABLED] = term_cb_enable 394 return config 395