1
2
3#  Copyright (C) 2024 The Android Open Source Project
4#
5#  Licensed under the Apache License, Version 2.0 (the "License");
6#  you may not use this file except in compliance with the License.
7#  You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#  Unless required by applicable law or agreed to in writing, software
12#  distributed under the License is distributed on an "AS IS" BASIS,
13#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#  See the License for the specific language governing permissions and
15#  limitations under the License.
16"""Util for aware test."""
17import base64
18import datetime
19import json
20import logging
21import time
22from typing import Any, Callable, Dict, List, Optional
23
24from aware import constants
25
26from mobly import asserts
27from mobly.controllers import android_device
28from mobly.controllers.android_device_lib import callback_handler_v2
29from mobly.snippet import errors
30
31
32_WAIT_DOZE_MODE_IN_SEC = 5
33_TIMEOUT_INTERVAL_IN_SEC = 1
34_WAIT_WIFI_STATE_TIME_OUT = datetime.timedelta(seconds=10)
35_WAIT_TIME_SEC = 3
36_CONTROL_WIFI_TIMEOUT_SEC = 10
37
38
39def callback_no_response(
40    callback: callback_handler_v2.CallbackHandlerV2,
41    event_name: str,
42    timeout: int = _WAIT_WIFI_STATE_TIME_OUT.total_seconds(),
43    use_callbackid: bool = False,
44    ):
45  """Makes a callback call and expects no response within a given timeout.
46
47  Args:
48    callback: Snippet callback object.
49    event_name: event name to wait.
50    timeout: Timeout in second.
51    use_callbackid: Using callbackid in eventname, default False.
52
53  Raises:
54    CallBackError: if receive response.
55  """
56  if use_callbackid:
57    event_name += callback.callback_id
58  try:
59    data = callback.waitAndGet(event_name=event_name, timeout=timeout)
60    raise CallBackError(f' Unexpected response {data}')
61  except errors.CallbackHandlerTimeoutError:
62    return
63
64
65class CallBackError(Exception):
66  """Error raised when there is a problem to get callback response."""
67
68def control_wifi(
69        ad: android_device.AndroidDevice,
70        wifi_state: bool,
71):
72    """Control Android Wi-Fi status.
73
74    Args:
75      ad: Android test device.
76      wifi_state: True if or Wi-Fi on False if Wi-Fi off.
77      timeout_seconds: Maximum wait time (seconds), default is 10 seconds.
78
79    Raises:
80      TimeoutError: If the Wi-Fi state cannot be set within the timeout (in seconds).
81    """
82    if _check_wifi_status(ad) == wifi_state:
83        return
84    if wifi_state:
85        ad.adb.shell("svc wifi enable")
86    else:
87        ad.adb.shell("svc wifi disable")
88    start_time = time.time()
89    while True:
90        if _check_wifi_status(ad) == wifi_state:
91            return
92        # Check for timeout
93        if time.time() - start_time > _CONTROL_WIFI_TIMEOUT_SEC:
94            raise TimeoutError(
95                f"Failed to set Wi-Fi state to {wifi_state} within {_CONTROL_WIFI_TIMEOUT_SEC} seconds."
96            )
97
98        time.sleep(1)  # Wait for a second before checking again
99
100def _check_wifi_status(ad: android_device.AndroidDevice):
101  """Check Android Wi-Fi status.
102
103  Args:
104      ad: android device object.
105
106  Returns:
107    True if wifi on, False if wifi off.
108  """
109  cmd = ad.adb.shell("cmd wifi status").decode("utf-8").strip()
110  first_line = cmd.split("\n")[0]
111  logging.info("device wifi status: %s", first_line)
112  if "enabled" in first_line:
113    return True
114  else:
115    return False
116
117
118def set_doze_mode(ad: android_device.AndroidDevice, state: bool) -> bool:
119  """Enables/Disables Android doze mode.
120
121  Args:
122      ad: android device object.
123      state: bool, True if intent to enable Android doze mode, False otherwise.
124
125  Returns:
126    True if doze mode is enabled, False otherwise.
127
128  Raises:
129    TimeoutError: If timeout is hit.
130  """
131  if state:
132    ad.log.info("Enables Android doze mode")
133    _dumpsys(ad, "battery unplug")
134    _dumpsys(ad, "deviceidle enable")
135    _dumpsys(ad, "deviceidle force-idle")
136    time.sleep(_WAIT_DOZE_MODE_IN_SEC)
137  else:
138    ad.log.info("Disables Android doze mode")
139    _dumpsys(ad, "deviceidle disable")
140    _dumpsys(ad, "battery reset")
141  for _ in range(10 + 1):
142    adb_shell_result = _dumpsys(ad, "deviceidle get deep")
143    logging.info("dumpsys deviceidle get deep: %s", adb_shell_result)
144    if adb_shell_result.startswith(constants.DeviceidleState.IDLE.value):
145      return True
146    if adb_shell_result.startswith(constants.DeviceidleState.ACTIVE.value):
147      return False
148    time.sleep(_TIMEOUT_INTERVAL_IN_SEC)
149  # At this point, timeout must have occurred.
150  raise errors.CallbackHandlerTimeoutError(
151      ad, "Timed out after waiting for doze_mode set to {state}"
152  )
153
154
155def _dumpsys(ad: android_device.AndroidDevice, command: str) -> str:
156  """Dumpsys device info.
157
158  Args:
159      ad: android device object.
160      command: adb command.
161
162  Returns:
163    Android dumsys info
164  """
165  return ad.adb.shell(f"dumpsys {command}").decode().strip()
166
167
168def check_android_os_version(
169    ad: android_device.AndroidDevice,
170    operator_func: Callable[[Any, Any], bool],
171    android_version: constants.AndroidVersion,
172    ) -> bool:
173  """Compares device's Android OS version with the given one.
174
175  Args:
176    ad: Android devices.
177    operator_func: Operator used in the comparison.
178    android_version: The given Android OS version.
179
180  Returns:
181    bool: The comparison result.
182  """
183  device_os_version = int(ad.adb.shell("getprop ro.build.version.release"))
184  result = False
185  if isinstance(operator_func, constants.Operator):
186    return operator_func.value(device_os_version, android_version)
187  return result
188
189
190def _get_airplane_mode(ad: android_device.AndroidDevice) -> bool:
191  """Gets the airplane mode.
192
193  Args:
194    ad: android device object.
195
196  Returns:
197    True if airplane mode On, False for Off.
198  """
199  state = ad.adb.shell("settings get global airplane_mode_on")
200  return bool(int(state))
201
202
203def set_airplane_mode(ad: android_device.AndroidDevice, state: bool):
204  """Sets the airplane mode to the given state.
205
206  Args:
207    ad: android device object.
208    state: bool, True for Airplane mode on, False for off.
209  """
210  ad.adb.shell(
211      ["settings", "put", "global", "airplane_mode_on", str(int(state))]
212  )
213  ad.adb.shell([
214      "am",
215      "broadcast",
216      "-a",
217      "android.intent.action.AIRPLANE_MODE",
218      "--ez",
219      "state",
220      str(state),
221  ])
222  start_time = time.time()
223  while _get_airplane_mode(ad) != state:
224    time.sleep(_TIMEOUT_INTERVAL_IN_SEC)
225    asserts.assert_greater(
226        time.time() - start_time > _WAIT_TIME_SEC,
227        f"Failed to set airplane mode to: {state}",
228    )
229
230
231def decode_list(list_of_b64_strings: List[str]) -> List[bytes]:
232  """Converts the list of b64 encoded strings to a list of bytearray.
233
234  Args:
235    list_of_b64_strings: A list of strings, each of which is b64 encoded array.
236
237  Returns:
238    A list of bytearrays.
239  """
240  decoded_list = []
241  for string_item in list_of_b64_strings:
242    decoded_list.append(base64.b64decode(string_item))
243  return decoded_list
244
245
246def encode_list(
247    list_of_objects: List[Any]) -> List[str]:
248  """Converts a list of strings/bytearrays to a list of b64 encoded bytearrays.
249
250  A None object is treated as a zero-length bytearray.
251
252  Args:
253    list_of_objects: A list of strings or bytearray objects.
254  Returns:
255    A list of the same objects, converted to bytes and b64 encoded.
256  """
257  encoded_list = []
258  for obj in list_of_objects:
259    if obj is None:
260      obj = bytes()
261    if isinstance(obj, str):
262      encoded_list.append(base64.b64encode(bytes(obj, "utf-8")).decode("utf-8"))
263    else:
264      encoded_list.append(base64.b64encode(bytes(obj)).decode("utf-8"))
265  return encoded_list
266
267
268def construct_max_match_filter(max_size: int)-> List[bytes]:
269  """Constructs a maximum size match filter that fits into the 'max_size' bytes.
270
271  Match filters are a set of LVs (Length, Value pairs) where L is 1 byte. The
272  maximum size match filter will contain max_size/2 LVs with all Vs (except
273  possibly the last one) of 1 byte, the last V may be 2 bytes for odd max_size.
274
275  Args:
276    max_size: Maximum size of the match filter.
277  Returns:
278    A list of bytearrays.
279  """
280  mf_list = []
281  num_lvs = max_size // 2
282  for i in range(num_lvs - 1):
283    mf_list.append(bytes([i]))
284  if max_size % 2 == 0:
285    mf_list.append(bytes([255]))
286  else:
287    mf_list.append(bytes([254, 255]))
288  return mf_list
289
290
291def validate_forbidden_callbacks(ad: android_device.AndroidDevice,
292                                 limited_cb: Optional[Dict[str, int]] = None
293                                ) -> None:
294  """Validate the specified callbacks have not been called more than permitted.
295
296  In addition to the input configuration also validates that forbidden callbacks
297  have never been called.
298
299  Args:
300    ad: Device on which to run.
301    limited_cb: Dictionary of CB_EV_* ids and maximum permitted calls (0
302                meaning never).
303  Raises:
304    CallBackError: If forbidden callbacks are triggered.
305  """
306  cb_data = json.loads(ad.adb.shell("cmd wifiaware native_cb get_cb_count"))
307  if limited_cb is None:
308    limited_cb = {}
309  # Add callbacks which should never be called.
310  limited_cb["5"] = 0
311  fail = False
312  for cb_event in limited_cb.keys():
313    if cb_event in cb_data:
314      if cb_data[cb_event] > limited_cb[cb_event]:
315        fail = True
316        ad.log.info(
317            "Callback %s observed %d times: more than permitted %d times",
318            cb_event, cb_data[cb_event], limited_cb[cb_event])
319        break
320  if fail:
321    raise CallBackError("Forbidden callbacks observed.")
322
323
324def reset_device_parameters(ad: android_device.AndroidDevice):
325  """Reset device configurations which may have been set by tests.
326  Should be done before tests start (in case previous one was killed
327  without tearing down) and after they end (to leave device in usable
328  state).
329
330  Args:
331    ad: device to be reset
332  """
333  ad.adb.shell("cmd wifiaware reset")
334
335
336def reset_device_statistics(ad: android_device.AndroidDevice,):
337  """Reset device statistics.
338
339  Args:
340    ad: device to be reset
341  """
342  ad.adb.shell("cmd wifiaware native_cb get_cb_count --reset")
343
344def get_aware_capabilities(ad: android_device.AndroidDevice):
345    """Get the Wi-Fi Aware capabilities from the specified device. The
346  capabilities are a dictionary keyed by aware_const.CAP_* keys.
347
348  Args:
349    ad: the Android device
350  Returns: the capability dictionary.
351  """
352    return json.loads(ad.adb.shell('cmd wifiaware state_mgr get_capabilities'))
353
354def create_discovery_config(service_name,
355                            p_type=None,
356                            s_type=None,
357                            ssi=None,
358                            match_filter=None,
359                            match_filter_list=None,
360                            ttl=0,
361                            term_cb_enable=True,
362                            instant_mode=None):
363    """Create a publish discovery configuration based on input parameters.
364
365    Args:
366        service_name: Service name - required
367        d_type: Discovery type (publish or subscribe constants)
368        ssi: Supplemental information - defaults to None
369        match_filter, match_filter_list: The match_filter, only one mechanism can
370                                     be used to specify. Defaults to None.
371        ttl: Time-to-live - defaults to 0 (i.e. non-self terminating)
372        term_cb_enable: True (default) to enable callback on termination, False
373                      means that no callback is called when session terminates.
374        instant_mode: set the band to use instant communication mode, 2G or 5G
375    Returns:
376        publish discovery configuration object.
377    """
378    config = {}
379    config[constants.SERVICE_NAME] = service_name
380    if p_type is not None:
381      config[constants.PUBLISH_TYPE] = p_type
382    if s_type is not None:
383      config[constants.SUBSCRIBE_TYPE] = s_type
384    if ssi is not None:
385        config[constants.SERVICE_SPECIFIC_INFO] = ssi
386    if match_filter is not None:
387        config[constants.MATCH_FILTER] = match_filter
388    if match_filter_list is not None:
389        config[constants.MATCH_FILTER_LIST] = match_filter_list
390    if instant_mode is not None:
391        config[constants.INSTANTMODE_ENABLE] = instant_mode
392    config[constants.TTL_SEC] = ttl
393    config[constants.TERMINATE_NOTIFICATION_ENABLED] = term_cb_enable
394    return config
395