1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from dataclasses import dataclass 16import re 17from mobly import asserts 18from mobly.controllers import android_device 19from mobly.controllers.android_device_lib.adb import AdbError 20from net_tests_utils.host.python import adb_utils, assert_utils 21 22 23class PatternNotFoundException(Exception): 24 """Raised when the given pattern cannot be found.""" 25 26 27class UnsupportedOperationException(Exception): 28 pass 29 30 31def get_apf_counter( 32 ad: android_device.AndroidDevice, iface: str, counter_name: str 33) -> int: 34 counters = get_apf_counters_from_dumpsys(ad, iface) 35 return counters.get(counter_name, 0) 36 37 38def get_apf_counters_from_dumpsys( 39 ad: android_device.AndroidDevice, iface_name: str 40) -> dict: 41 dumpsys = adb_utils.get_dumpsys_for_service(ad, "network_stack") 42 43 # Extract IpClient section of the specified interface. 44 # This takes inputs like: 45 # IpClient.wlan0 46 # ... 47 # IpClient.wlan1 48 # ... 49 iface_pattern = re.compile( 50 r"^IpClient\." + iface_name + r"\n" + r"((^\s.*\n)+)", re.MULTILINE 51 ) 52 iface_result = iface_pattern.search(dumpsys) 53 if iface_result is None: 54 raise PatternNotFoundException("Cannot find IpClient for " + iface_name) 55 56 # Extract APF counters section from IpClient section, which looks like: 57 # APF packet counters: 58 # COUNTER_NAME: VALUE 59 # .... 60 apf_pattern = re.compile( 61 r"APF packet counters:.*\n.(\s+[A-Z_0-9]+: \d+\n)+", re.MULTILINE 62 ) 63 apf_result = apf_pattern.search(iface_result.group(0)) 64 if apf_result is None: 65 raise PatternNotFoundException( 66 "Cannot find APF counters in text: " + iface_result.group(0) 67 ) 68 69 # Extract key-value pairs from APF counters section into a list of tuples, 70 # e.g. [('COUNTER1', '1'), ('COUNTER2', '2')]. 71 counter_pattern = re.compile(r"(?P<name>[A-Z_0-9]+): (?P<value>\d+)") 72 counter_result = counter_pattern.findall(apf_result.group(0)) 73 if counter_result is None: 74 raise PatternNotFoundException( 75 "Cannot extract APF counters in text: " + apf_result.group(0) 76 ) 77 78 # Convert into a dict. 79 result = {} 80 for key, value_str in counter_result: 81 result[key] = int(value_str) 82 83 ad.log.debug("Getting apf counters: " + str(result)) 84 return result 85 86def get_ipv4_addresses( 87 ad: android_device.AndroidDevice, iface_name: str 88) -> list[str]: 89 """Retrieves the IPv4 addresses of a given interface on an Android device. 90 91 This function executes an ADB shell command (`ip -4 address show`) to get the 92 network interface information and extracts the IPv4 address from the output. 93 If devices have no IPv4 address, raise PatternNotFoundException. 94 95 Args: 96 ad: The Android device object. 97 iface_name: The name of the network interface (e.g., "wlan0"). 98 99 Returns: 100 The IPv4 addresses of the interface as a list of string. 101 Return empty list if no IPv4 address. 102 """ 103 # output format: 104 # 54: wlan2: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000 105 # inet 192.168.195.162/24 brd 192.168.195.255 scope global wlan2 106 # valid_lft forever preferred_lft forever 107 # inet 192.168.1.1/24 brd 192.168.1.255 scope global wlan2 108 # valid_lft forever preferred_lft forever 109 output = adb_utils.adb_shell(ad, f"ip -4 address show {iface_name}") 110 pattern = r"inet\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\/\d+" 111 matches = re.findall(pattern, output) 112 113 if matches: 114 return matches 115 else: 116 return [] 117 118def get_ipv6_addresses( 119 ad: android_device.AndroidDevice, iface_name: str 120) -> list[str]: 121 """Retrieves the IPv6 addresses of a given interface on an Android device. 122 123 This function executes an ADB shell command (`ip -6 address show`) to get the 124 network interface information and extracts the IPv6 address from the output. 125 If devices have no IPv6 address, raise PatternNotFoundException. 126 127 Args: 128 ad: The Android device object. 129 iface_name: The name of the network interface (e.g., "wlan0"). 130 131 Returns: 132 The IPv6 addresses of the interface as a list of string. 133 Return empty list if no IPv6 address. 134 """ 135 # output format 136 # 54: wlan2: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 state UP qlen 1000 137 # inet6 fe80::10a3:5dff:fe52:de32/64 scope link 138 # valid_lft forever preferred_lft forever 139 # inet6 fe80::1233:aadb:3d32:1234/64 scope link 140 # valid_lft forever preferred_lft forever 141 output = adb_utils.adb_shell(ad, f"ip -6 address show {iface_name}") 142 pattern = r"inet6\s+([0-9a-fA-F:]+)\/\d+" 143 matches = re.findall(pattern, output) 144 145 if matches: 146 return matches 147 else: 148 return [] 149 150def get_hardware_address( 151 ad: android_device.AndroidDevice, iface_name: str 152) -> str: 153 """Retrieves the hardware (MAC) address for a given network interface. 154 155 Returns: 156 The hex representative of the MAC address in uppercase. 157 E.g. 12:34:56:78:90:AB 158 159 Raises: 160 PatternNotFoundException: If the MAC address is not found in the command 161 output. 162 """ 163 164 # Run the "ip link" command and get its output. 165 ip_link_output = adb_utils.adb_shell(ad, f"ip link show {iface_name}") 166 167 # Regular expression to extract the MAC address. 168 # Parse hardware address from ip link output like below: 169 # 46: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq ... 170 # link/ether 72:05:77:82:21:e0 brd ff:ff:ff:ff:ff:ff 171 pattern = r"link/ether (([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2})" 172 match = re.search(pattern, ip_link_output) 173 174 if match: 175 return match.group(1).upper() # Extract the MAC address string. 176 else: 177 raise PatternNotFoundException( 178 "Cannot get hardware address for " + iface_name 179 ) 180 181def is_packet_capture_supported( 182 ad: android_device.AndroidDevice, 183) -> bool: 184 185 try: 186 # Invoke the shell command with empty argument and see how NetworkStack respond. 187 # If supported, an IllegalArgumentException with help page will be printed. 188 assert_utils.expect_throws( 189 lambda: start_capture_packets(ad, ""), 190 assert_utils.UnexpectedBehaviorError 191 ) 192 assert_utils.expect_throws( 193 lambda: stop_capture_packets(ad, ""), 194 assert_utils.UnexpectedBehaviorError 195 ) 196 assert_utils.expect_throws( 197 lambda: get_matched_packet_counts(ad, "", ""), 198 assert_utils.UnexpectedBehaviorError 199 ) 200 except assert_utils.UnexpectedExceptionError: 201 return False 202 203 # If no UnsupportOperationException is thrown, regard it as supported 204 return True 205 206def is_send_raw_packet_downstream_supported( 207 ad: android_device.AndroidDevice, 208) -> bool: 209 try: 210 # Invoke the shell command with empty argument and see how NetworkStack respond. 211 # If supported, an IllegalArgumentException with help page will be printed. 212 send_raw_packet_downstream(ad, "", "") 213 except assert_utils.UnexpectedBehaviorError: 214 return True 215 except UnsupportedOperationException: 216 return False 217 218 219def send_raw_packet_downstream( 220 ad: android_device.AndroidDevice, 221 iface_name: str, 222 packet_in_hex: str, 223) -> None: 224 """Sends a raw packet over the specified downstream interface. 225 226 This function constructs and sends a raw packet using the 227 `send-raw-packet-downstream` 228 command provided by NetworkStack process. It's primarily intended for testing 229 purposes. 230 231 Args: 232 ad: The AndroidDevice object representing the connected device. 233 iface_name: The name of the network interface to use (e.g., "wlan0", 234 "eth0"). 235 packet_in_hex: The raw packet data starting from L2 header encoded in 236 hexadecimal string format. 237 238 Raises: 239 UnsupportedOperationException: If the NetworkStack doesn't support 240 the `send-raw-packet` command. 241 UnexpectedBehaviorException: If the command execution produces unexpected 242 output other than an empty response or "Unknown command". 243 244 Important Considerations: 245 Security: This method only works on tethering downstream interfaces due 246 to security restrictions. 247 Packet Format: The `packet_in_hex` must be a valid hexadecimal 248 representation of a packet starting from L2 header. 249 """ 250 251 cmd = f"cmd network_stack send-raw-packet-downstream {iface_name} {packet_in_hex}" 252 253 # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise. 254 adb_output = AdbOutputHandler(ad, cmd).get_output() 255 if adb_output: 256 raise assert_utils.UnexpectedBehaviorError( 257 f"Got unexpected output: {adb_output} for command: {cmd}." 258 ) 259 260def start_capture_packets( 261 ad: android_device.AndroidDevice, 262 iface_name: str 263) -> None: 264 """Starts packet capturing on a specified network interface. 265 266 This function initiates packet capture on the given network interface of an 267 Android device using an ADB shell command. It handles potential errors 268 related to unsupported commands or unexpected output. 269 This command only supports downstream tethering interface. 270 271 Args: 272 ad: The Android device object. 273 iface_name: The name of the network interface (e.g., "wlan0"). 274 """ 275 cmd = f"cmd network_stack capture start {iface_name}" 276 277 # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise. 278 adb_output = AdbOutputHandler(ad, cmd).get_output() 279 if adb_output != "success": 280 raise assert_utils.UnexpectedBehaviorError( 281 f"Got unexpected output: {adb_output} for command: {cmd}." 282 ) 283 284def stop_capture_packets( 285 ad: android_device.AndroidDevice, 286 iface_name: str 287) -> None: 288 """Stops packet capturing on a specified network interface. 289 290 This function terminates packet capture on the given network interface of an 291 Android device using an ADB shell command. It handles potential errors 292 related to unsupported commands or unexpected output. 293 294 Args: 295 ad: The Android device object. 296 iface_name: The name of the network interface (e.g., "wlan0"). 297 """ 298 cmd = f"cmd network_stack capture stop {iface_name}" 299 300 # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise. 301 adb_output = AdbOutputHandler(ad, cmd).get_output() 302 if adb_output != "success": 303 raise assert_utils.UnexpectedBehaviorError( 304 f"Got unexpected output: {adb_output} for command: {cmd}." 305 ) 306 307def get_matched_packet_counts( 308 ad: android_device.AndroidDevice, 309 iface_name: str, 310 packet_in_hex: str 311) -> int: 312 """Gets the number of captured packets matching a specific hexadecimal pattern. 313 314 This function retrieves the count of captured packets on the specified 315 network interface that match a given hexadecimal pattern. It uses an ADB 316 shell command and handles potential errors related to unsupported commands, 317 unexpected output, or invalid output format. 318 319 Args: 320 ad: The Android device object. 321 iface_name: The name of the network interface (e.g., "wlan0"). 322 packet_in_hex: The hexadecimal string representing the packet pattern. 323 324 Returns: 325 The number of matched packets as an integer. 326 """ 327 cmd = f"cmd network_stack capture matched-packet-counts {iface_name} {packet_in_hex}" 328 329 # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise. 330 adb_output = AdbOutputHandler(ad, cmd).get_output() 331 try: 332 return int(adb_output) 333 except ValueError as e: 334 raise assert_utils.UnexpectedBehaviorError( 335 f"Got unexpected exception: {e} for command: {cmd}." 336 ) 337 338@dataclass 339class ApfCapabilities: 340 """APF program support capabilities. 341 342 See android.net.apf.ApfCapabilities. 343 344 Attributes: 345 apf_version_supported (int): Version of APF instruction set supported for 346 packet filtering. 0 indicates no support for packet filtering using APF 347 programs. 348 apf_ram_size (int): Size of APF ram. 349 apf_packet_format (int): Format of packets passed to APF filter. Should be 350 one of ARPHRD_* 351 """ 352 353 apf_version_supported: int 354 apf_ram_size: int 355 apf_packet_format: int 356 357 def __init__( 358 self, 359 apf_version_supported: int, 360 apf_ram_size: int, 361 apf_packet_format: int, 362 ): 363 self.apf_version_supported = apf_version_supported 364 self.apf_ram_size = apf_ram_size 365 self.apf_packet_format = apf_packet_format 366 367 def __str__(self): 368 """Returns a user-friendly string representation of the APF capabilities.""" 369 return ( 370 f"APF Version: {self.apf_version_supported}\n" 371 f"Ram Size: {self.apf_ram_size} bytes\n" 372 f"Packet Format: {self.apf_packet_format}" 373 ) 374 375 376def get_apf_capabilities( 377 ad: android_device.AndroidDevice, iface_name: str 378) -> ApfCapabilities: 379 output = adb_utils.adb_shell( 380 ad, f"cmd network_stack apf {iface_name} capabilities" 381 ) 382 try: 383 values = [int(value_str) for value_str in output.split(",")] 384 except ValueError: 385 return ApfCapabilities(0, 0, 0) # Conversion to integer failed 386 return ApfCapabilities(values[0], values[1], values[2]) 387 388 389def assume_apf_version_support_at_least( 390 ad: android_device.AndroidDevice, iface_name: str, expected_version: int 391) -> None: 392 caps = get_apf_capabilities(ad, iface_name) 393 asserts.abort_class_if( 394 caps.apf_version_supported < expected_version, 395 f"Supported apf version {caps.apf_version_supported} < expected version" 396 f" {expected_version}", 397 ) 398 399class AdbOutputHandler: 400 def __init__(self, ad, cmd): 401 self._ad = ad 402 self._cmd = cmd 403 404 def get_output(self) -> str: 405 try: 406 return adb_utils.adb_shell(self._ad, self._cmd) 407 except AdbError as e: 408 output = str(e.stdout) 409 if "Unknown command" in output: 410 raise UnsupportedOperationException( 411 f"{self._cmd} is not supported." 412 ) 413 return output