xref: /aosp_15_r20/external/autotest/client/common_lib/cros/network/interface.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import collections
11import logging
12import os
13import re
14from six.moves import map
15from six.moves import range
16
17from autotest_lib.client.bin import local_host
18from autotest_lib.client.bin import utils
19from autotest_lib.client.common_lib import error
20from autotest_lib.client.common_lib.cros.network import netblock
21
22# A tuple consisting of a readable part number (one of NAME_* below)
23# and a kernel module that provides the driver for this part (e.g. ath9k).
24DeviceDescription = collections.namedtuple('DeviceDescription',
25                                           ['name', 'kernel_module'])
26
27
28# A tuple describing a default route, consisting of an interface name,
29# gateway IP address, and the metric value visible in the routing table.
30DefaultRoute = collections.namedtuple('DefaultRoute', ['interface_name',
31                                                       'gateway',
32                                                       'metric'])
33
34NAME_MARVELL_88W8797_SDIO = 'Marvell 88W8797 SDIO'
35NAME_MARVELL_88W8887_SDIO = 'Marvell 88W8887 SDIO'
36NAME_MARVELL_88W8897_SDIO = 'Marvell 88W8897 SDIO'
37NAME_MARVELL_88W8897_PCIE = 'Marvell 88W8897 PCIE'
38NAME_MARVELL_88W8997_PCIE = 'Marvell 88W8997 PCIE'
39NAME_ATHEROS_AR9280 = 'Atheros AR9280'
40NAME_ATHEROS_AR9382 = 'Atheros AR9382'
41NAME_ATHEROS_AR9462 = 'Atheros AR9462'
42NAME_QUALCOMM_ATHEROS_QCA6174 = 'Qualcomm Atheros QCA6174'
43NAME_QUALCOMM_ATHEROS_QCA6174_SDIO = 'Qualcomm Atheros QCA6174 SDIO'
44NAME_QUALCOMM_WCN3990 = 'Qualcomm WCN3990'
45NAME_INTEL_7260 = 'Intel 7260'
46NAME_INTEL_7265 = 'Intel 7265'
47NAME_INTEL_9000 = 'Intel 9000'
48NAME_INTEL_9260 = 'Intel 9260'
49NAME_INTEL_22260 = 'Intel 22260'
50NAME_INTEL_22560 = 'Intel 22560'
51NAME_BROADCOM_BCM4354_SDIO = 'Broadcom BCM4354 SDIO'
52NAME_BROADCOM_BCM4356_PCIE = 'Broadcom BCM4356 PCIE'
53NAME_BROADCOM_BCM4371_PCIE = 'Broadcom BCM4371 PCIE'
54NAME_REALTEK_8822C_PCIE = 'Realtek 8822C PCIE'
55NAME_UNKNOWN = 'Unknown WiFi Device'
56
57DEVICE_INFO_ROOT = '/sys/class/net'
58
59DeviceInfo = collections.namedtuple('DeviceInfo', ['vendor', 'device',
60                                                   'subsystem',
61                                                   'compatible'])
62# Provide default values for parameters.
63DeviceInfo.__new__.__defaults__ = (None, None, None, None)
64
65DEVICE_NAME_LOOKUP = {
66    DeviceInfo('0x02df', '0x9129'): NAME_MARVELL_88W8797_SDIO,
67    DeviceInfo('0x02df', '0x912d'): NAME_MARVELL_88W8897_SDIO,
68    DeviceInfo('0x02df', '0x9135'): NAME_MARVELL_88W8887_SDIO,
69    DeviceInfo('0x11ab', '0x2b38'): NAME_MARVELL_88W8897_PCIE,
70    DeviceInfo('0x1b4b', '0x2b42'): NAME_MARVELL_88W8997_PCIE,
71    DeviceInfo('0x168c', '0x002a'): NAME_ATHEROS_AR9280,
72    DeviceInfo('0x168c', '0x0030'): NAME_ATHEROS_AR9382,
73    DeviceInfo('0x168c', '0x0034'): NAME_ATHEROS_AR9462,
74    DeviceInfo('0x168c', '0x003e'): NAME_QUALCOMM_ATHEROS_QCA6174,
75    DeviceInfo('0x105b', '0xe09d'): NAME_QUALCOMM_ATHEROS_QCA6174,
76    DeviceInfo('0x0271', '0x050a'): NAME_QUALCOMM_ATHEROS_QCA6174_SDIO,
77    DeviceInfo('0x8086', '0x08b1'): NAME_INTEL_7260,
78    DeviceInfo('0x8086', '0x08b2'): NAME_INTEL_7260,
79    DeviceInfo('0x8086', '0x095a'): NAME_INTEL_7265,
80    DeviceInfo('0x8086', '0x095b'): NAME_INTEL_7265,
81    # Note that Intel 9000 is also Intel 9560 aka Jefferson Peak 2
82    DeviceInfo('0x8086', '0x9df0'): NAME_INTEL_9000,
83    DeviceInfo('0x8086', '0x31dc'): NAME_INTEL_9000,
84    DeviceInfo('0x8086', '0x2526'): NAME_INTEL_9260,
85    DeviceInfo('0x8086', '0x2723'): NAME_INTEL_22260,
86    # For integrated wifi chips, use device_id and subsystem_id together
87    # as an identifier.
88    # 0x02f0 is for Quasar on CML, 0x4070 and 0x0074 is for HrP2
89    DeviceInfo('0x8086', '0x02f0', subsystem='0x4070'): NAME_INTEL_22560,
90    DeviceInfo('0x8086', '0x02f0', subsystem='0x0074'): NAME_INTEL_22560,
91    DeviceInfo('0x8086', '0x4df0', subsystem='0x0074'): NAME_INTEL_22560,
92    # With the same Quasar, subsystem_id 0x0034 is JfP2
93    DeviceInfo('0x8086', '0x02f0', subsystem='0x0034'): NAME_INTEL_9000,
94    DeviceInfo('0x02d0', '0x4354'): NAME_BROADCOM_BCM4354_SDIO,
95    DeviceInfo('0x14e4', '0x43ec'): NAME_BROADCOM_BCM4356_PCIE,
96    DeviceInfo('0x14e4', '0x440d'): NAME_BROADCOM_BCM4371_PCIE,
97    DeviceInfo('0x10ec', '0xc822'): NAME_REALTEK_8822C_PCIE,
98
99    DeviceInfo(compatible='qcom,wcn3990-wifi'): NAME_QUALCOMM_WCN3990,
100}
101
102class Interface:
103    """Interace is a class that contains the queriable address properties
104    of an network device.
105    """
106    ADDRESS_TYPE_MAC = 'link/ether'
107    ADDRESS_TYPE_IPV4 = 'inet'
108    ADDRESS_TYPE_IPV6 = 'inet6'
109    ADDRESS_TYPES = [ ADDRESS_TYPE_MAC, ADDRESS_TYPE_IPV4, ADDRESS_TYPE_IPV6 ]
110
111
112    @staticmethod
113    def get_connected_ethernet_interface(ignore_failures=False):
114        """Get an interface object representing a connected ethernet device.
115
116        Raises an exception if no such interface exists.
117
118        @param ignore_failures bool function will return None instead of raising
119                an exception on failures.
120        @return an Interface object except under the conditions described above.
121
122        """
123        # Assume that ethernet devices are called ethX until proven otherwise.
124        for device_name in ['eth%d' % i for i in range(5)]:
125            ethernet_if = Interface(device_name)
126            if ethernet_if.exists and ethernet_if.ipv4_address:
127                return ethernet_if
128
129        else:
130            if ignore_failures:
131                return None
132
133            raise error.TestFail('Failed to find ethernet interface.')
134
135
136    def __init__(self, name, host=None, netns=None):
137        self._name = name
138        if host is None:
139            self.host = local_host.LocalHost()
140        else:
141            self.host = host
142        self._run = self.host.run
143        self._namespace = netns
144        self._ns_exec = 'ip netns exec %s ' % netns if netns else ''
145
146
147    @property
148    def name(self):
149        """@return name of the interface (e.g. 'wlan0')."""
150        return self._name
151
152
153    @property
154    def addresses(self):
155        """@return the addresses (MAC, IP) associated with interface."""
156        # "ip addr show %s 2> /dev/null" returns something that looks like:
157        #
158        # 2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast
159        #    link/ether ac:16:2d:07:51:0f brd ff:ff:ff:ff:ff:ff
160        #    inet 172.22.73.124/22 brd 172.22.75.255 scope global eth0
161        #    inet6 2620:0:1000:1b02:ae16:2dff:fe07:510f/64 scope global dynamic
162        #       valid_lft 2591982sec preferred_lft 604782sec
163        #    inet6 fe80::ae16:2dff:fe07:510f/64 scope link
164        #       valid_lft forever preferred_lft forever
165        #
166        # We extract the second column from any entry for which the first
167        # column is an address type we are interested in.  For example,
168        # for "inet 172.22.73.124/22 ...", we will capture "172.22.73.124/22".
169        result = self._run(self._ns_exec +
170                           'ip addr show %s 2> /dev/null' % self._name,
171                           ignore_status=True)
172        address_info = result.stdout
173        if result.exit_status != 0:
174            # The "ip" command will return non-zero if the interface does
175            # not exist.
176            return {}
177
178        addresses = {}
179        for address_line in address_info.splitlines():
180            address_parts = address_line.lstrip().split()
181            if len(address_parts) < 2:
182                continue
183            address_type, address_value = address_parts[:2]
184            if address_type in self.ADDRESS_TYPES:
185                if address_type not in addresses:
186                    addresses[address_type] = []
187                addresses[address_type].append(address_value)
188        return addresses
189
190
191    @property
192    def device_path(self):
193        """@return the sysfs path of the interface device"""
194        # This assumes that our path separator is the same as the remote host.
195        device_path = os.path.join(DEVICE_INFO_ROOT, self._name, 'device')
196        if not self.host.path_exists(device_path):
197            logging.error('No device information found at %s', device_path)
198            return None
199
200        return device_path
201
202
203    @property
204    def wiphy_name(self):
205        """
206        @return name of the wiphy (e.g., 'phy0'), if available.
207        Otherwise None.
208        """
209        readlink_result = self._run('readlink "%s"' %
210                os.path.join(DEVICE_INFO_ROOT, self._name, 'phy80211'),
211                ignore_status=True)
212        if readlink_result.exit_status != 0:
213            return None
214
215        return os.path.basename(readlink_result.stdout.strip())
216
217
218    @property
219    def module_name(self):
220        """@return Name of kernel module in use by this interface."""
221        module_readlink_result = self._run('readlink "%s"' %
222                os.path.join(self.device_path, 'driver', 'module'),
223                ignore_status=True)
224        if module_readlink_result.exit_status != 0:
225            return None
226
227        return os.path.basename(module_readlink_result.stdout.strip())
228
229    @property
230    def parent_device_name(self):
231        """
232        @return Name of device at which wiphy device is present. For example,
233        for a wifi NIC present on a PCI bus, this would be the same as
234        PCI_SLOT_PATH. """
235        path_readlink_result = self._run('readlink "%s"' % self.device_path)
236        if path_readlink_result.exit_status != 0:
237            return None
238
239        return os.path.basename(path_readlink_result.stdout.strip())
240
241    def _get_wifi_device_name(self):
242        """Helper for device_description()."""
243        device_path = self.device_path
244        if not device_path:
245            return None
246
247        read_file = (lambda path: self._run('cat "%s"' % path).stdout.rstrip()
248                     if self.host.path_exists(path) else None)
249
250        # Try to identify using either vendor/product ID, or using device tree
251        # "OF_COMPATIBLE_x".
252        vendor_id = read_file(os.path.join(device_path, 'vendor'))
253        product_id = read_file(os.path.join(device_path, 'device'))
254        subsystem_id = read_file(os.path.join(device_path, 'subsystem_device'))
255        uevent = read_file(os.path.join(device_path, 'uevent'))
256
257        # Device tree "compatible".
258        for line in uevent.splitlines():
259            key, _, value = line.partition('=')
260            if re.match('^OF_COMPATIBLE_[0-9]+$', key):
261                info = DeviceInfo(compatible=value)
262                if info in DEVICE_NAME_LOOKUP:
263                    return DEVICE_NAME_LOOKUP[info]
264
265        # {Vendor, Product, Subsystem} ID.
266        if subsystem_id is not None:
267            info = DeviceInfo(vendor_id, product_id, subsystem=subsystem_id)
268            if info in DEVICE_NAME_LOOKUP:
269                return DEVICE_NAME_LOOKUP[info]
270
271
272        # {Vendor, Product} ID.
273        info = DeviceInfo(vendor_id, product_id)
274        if info in DEVICE_NAME_LOOKUP:
275            return DEVICE_NAME_LOOKUP[info]
276
277        return None
278
279    @property
280    def device_description(self):
281        """@return DeviceDescription object for a WiFi interface, or None."""
282        if not self.is_wifi_device():
283            logging.error('Device description not supported on non-wifi '
284                          'interface: %s.', self._name)
285            return None
286
287        device_name = self._get_wifi_device_name()
288        if not device_name:
289            device_name = NAME_UNKNOWN
290            logging.error('Device is unknown.')
291        else:
292            logging.debug('Device is %s',  device_name)
293
294        module_name = self.module_name
295        kernel_release = self._run('uname -r').stdout.strip()
296        net_drivers_path = '/lib/modules/%s/kernel/drivers/net' % kernel_release
297        if module_name is not None and self.host.path_exists(net_drivers_path):
298            module_path = self._run('find %s -name %s.ko -printf %%P' % (
299                net_drivers_path, module_name)).stdout
300        else:
301            module_path = 'Unknown (kernel might have modules disabled)'
302        return DeviceDescription(device_name, module_path)
303
304
305    @property
306    def exists(self):
307        """@return True if this interface exists, False otherwise."""
308        # No valid interface has no addresses at all.
309        return bool(self.addresses)
310
311
312
313    def get_ip_flags(self):
314        """@return List of flags from 'ip addr show'."""
315        # "ip addr show %s 2> /dev/null" returns something that looks like:
316        #
317        # 2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast
318        #    link/ether ac:16:2d:07:51:0f brd ff:ff:ff:ff:ff:ff
319        #    inet 172.22.73.124/22 brd 172.22.75.255 scope global eth0
320        #    inet6 2620:0:1000:1b02:ae16:2dff:fe07:510f/64 scope global dynamic
321        #       valid_lft 2591982sec preferred_lft 604782sec
322        #    inet6 fe80::ae16:2dff:fe07:510f/64 scope link
323        #       valid_lft forever preferred_lft forever
324        #
325        # We only cares about the flags in the first line.
326        result = self._run(self._ns_exec +
327                           'ip addr show %s 2> /dev/null' % self._name,
328                           ignore_status=True)
329        address_info = result.stdout
330        if result.exit_status != 0:
331            # The "ip" command will return non-zero if the interface does
332            # not exist.
333            return []
334        status_line = address_info.splitlines()[0]
335        flags_str = status_line[status_line.find('<')+1:status_line.find('>')]
336        return flags_str.split(',')
337
338
339    @property
340    def is_up(self):
341        """@return True if this interface is UP, False otherwise."""
342        return 'UP' in self.get_ip_flags()
343
344
345    @property
346    def is_lower_up(self):
347        """
348        Check if the interface is in LOWER_UP state. This usually means (e.g.,
349        for ethernet) a link is detected.
350
351        @return True if this interface is LOWER_UP, False otherwise."""
352        return 'LOWER_UP' in self.get_ip_flags()
353
354
355    def is_link_operational(self):
356        """@return True if RFC 2683 IfOperStatus is UP (i.e., is able to pass
357        packets).
358        """
359        command = self._ns_exec + 'ip link show %s' % self._name
360        result = self._run(command, ignore_status=True)
361        if result.exit_status:
362            return False
363        return result.stdout.find('state UP') >= 0
364
365
366    @property
367    def mac_address(self):
368        """@return the (first) MAC address, e.g., "00:11:22:33:44:55"."""
369        return self.addresses.get(self.ADDRESS_TYPE_MAC, [None])[0]
370
371
372    @property
373    def ipv4_address_and_prefix(self):
374        """@return the IPv4 address/prefix, e.g., "192.186.0.1/24"."""
375        return self.addresses.get(self.ADDRESS_TYPE_IPV4, [None])[0]
376
377
378    @property
379    def ipv4_address(self):
380        """@return the (first) IPv4 address, e.g., "192.168.0.1"."""
381        netblock_addr = self.netblock
382        return netblock_addr.addr if netblock_addr else None
383
384
385    @property
386    def ipv4_prefix(self):
387        """@return the IPv4 address prefix e.g., 24."""
388        addr = self.netblock
389        return addr.prefix_len if addr else None
390
391
392    @property
393    def ipv4_subnet(self):
394        """@return string subnet of IPv4 address (e.g. '192.168.0.0')"""
395        addr = self.netblock
396        return addr.subnet if addr else None
397
398
399    @property
400    def ipv4_subnet_mask(self):
401        """@return the IPv4 subnet mask e.g., "255.255.255.0"."""
402        addr = self.netblock
403        return addr.netmask if addr else None
404
405
406    def is_wifi_device(self):
407        """@return True if iw thinks this is a wifi device."""
408        if self._run('iw dev %s info' % self._name,
409                     ignore_status=True).exit_status:
410            logging.debug('%s does not seem to be a wireless device.',
411                          self._name)
412            return False
413        return True
414
415
416    @property
417    def netblock(self):
418        """Return Netblock object for this interface's IPv4 address.
419
420        @return Netblock object (or None if no IPv4 address found).
421
422        """
423        netblock_str = self.ipv4_address_and_prefix
424        return netblock.from_addr(netblock_str) if netblock_str else None
425
426
427    @property
428    def signal_level(self):
429        """Get the signal level for an interface.
430
431        This is currently only defined for WiFi interfaces.
432
433        localhost test # iw dev mlan0 link
434        Connected to 04:f0:21:03:7d:b2 (on mlan0)
435                SSID: Perf_slvf0_ch36
436                freq: 5180
437                RX: 699407596 bytes (8165441 packets)
438                TX: 58632580 bytes (9923989 packets)
439                signal: -54 dBm
440                tx bitrate: 130.0 MBit/s MCS 15
441
442                bss flags:
443                dtim period:    2
444                beacon int:     100
445
446        @return signal level in dBm (a negative, integral number).
447
448        """
449        if not self.is_wifi_device():
450            return None
451
452        result_lines = self._run('iw dev %s link' %
453                                 self._name).stdout.splitlines()
454        signal_pattern = re.compile('signal:\s+([-0-9]+)\s+dbm')
455        for line in result_lines:
456            cleaned = line.strip().lower()
457            match = re.search(signal_pattern, cleaned)
458            if match is not None:
459                return int(match.group(1))
460
461        logging.error('Failed to find signal level for %s.', self._name)
462        return None
463
464
465    @property
466    def signal_level_all_chains(self):
467        """Get the signal level for each chain of an interface.
468
469        This is only defined for WiFi interfaces.
470
471        localhost test # iw wlan0 station dump
472        Station 44:48:c1:af:d7:31 (on wlan0)
473            inactive time:  13180 ms
474            rx bytes:   46886
475            rx packets: 459
476            tx bytes:   103159
477            tx packets: 745
478            tx retries: 17
479            tx failed:  0
480            beacon loss:    0
481            beacon rx:  128
482            rx drop misc:   2
483            signal:     -52 [-52, -53] dBm
484            signal avg: 56 dBm
485            beacon signal avg:  -49 dBm
486            tx bitrate: 400.0 MBit/s VHT-MCS 9 40MHz short GI VHT-NSS 2
487            rx bitrate: 400.0 MBit/s VHT-MCS 9 40MHz short GI VHT-NSS 2
488            authorized: yes
489            authenticated:  yes
490            associated: yes
491            preamble:   long
492            WMM/WME:    yes
493            MFP:        no
494            TDLS peer:  no
495            DTIM period:    1
496            beacon interval:100
497            short slot time:yes
498            connected time: 6874 seconds
499
500        @return array of signal level information for each antenna in dBm
501            (an array of negative, integral numbers e.g. [-67, -60]) or None if
502            chain specific data is not provided by the device.
503
504        """
505        if not self.is_wifi_device():
506            return None
507
508        result_lines = self._run('iw %s station dump' %
509                                 self._name).stdout.splitlines()
510        signal_pattern = re.compile('signal:\s+([-0-9]+)\[')
511        for line in result_lines:
512            cleaned = line.strip().replace(' ', '').lower()
513            match = re.search(signal_pattern, cleaned)
514            if match is not None:
515                signal_levels = cleaned[cleaned.find('[') + 1 :
516                                    cleaned.find(']')].split(',')
517                return list(map(int, signal_levels))
518        return None
519
520
521    @property
522    def mtu(self):
523        """@return the interface configured maximum transmission unit (MTU)."""
524        # "ip addr show %s 2> /dev/null" returns something that looks like:
525        #
526        # 2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast
527        #    link/ether ac:16:2d:07:51:0f brd ff:ff:ff:ff:ff:ff
528        #    inet 172.22.73.124/22 brd 172.22.75.255 scope global eth0
529        #    inet6 2620:0:1000:1b02:ae16:2dff:fe07:510f/64 scope global dynamic
530        #       valid_lft 2591982sec preferred_lft 604782sec
531        #    inet6 fe80::ae16:2dff:fe07:510f/64 scope link
532        #       valid_lft forever preferred_lft forever
533        #
534        # We extract the 'mtu' value (in this example "1500")
535        try:
536            result = self._run(self._ns_exec +
537                               'ip addr show %s 2> /dev/null' % self._name)
538            address_info = result.stdout
539        except error.CmdError as e:
540            # The "ip" command will return non-zero if the interface does
541            # not exist.
542            return None
543
544        match = re.search('mtu\s+(\d+)', address_info)
545        if not match:
546            raise error.TestFail('MTU information is not available.')
547        return int(match.group(1))
548
549
550    def noise_level(self, frequency_mhz):
551        """Get the noise level for an interface at a given frequency.
552
553        This is currently only defined for WiFi interfaces.
554
555        This only works on some devices because 'iw survey dump' (the method
556        used to get the noise) only works on some devices.  On other devices,
557        this method returns None.
558
559        @param frequency_mhz: frequency at which the noise level should be
560               measured and reported.
561        @return noise level in dBm (a negative, integral number) or None.
562
563        """
564        if not self.is_wifi_device():
565            return None
566
567        # This code has to find the frequency and then find the noise
568        # associated with that frequency because 'iw survey dump' output looks
569        # like this:
570        #
571        # localhost test # iw dev mlan0 survey dump
572        # ...
573        # Survey data from mlan0
574        #     frequency:              5805 MHz
575        #     noise:                  -91 dBm
576        #     channel active time:    124 ms
577        #     channel busy time:      1 ms
578        #     channel receive time:   1 ms
579        #     channel transmit time:  0 ms
580        # Survey data from mlan0
581        #     frequency:              5825 MHz
582        # ...
583
584        result_lines = self._run('iw dev %s survey dump' %
585                                 self._name).stdout.splitlines()
586        my_frequency_pattern = re.compile('frequency:\s*%d mhz' %
587                                          frequency_mhz)
588        any_frequency_pattern = re.compile('frequency:\s*\d{4} mhz')
589        inside_desired_frequency_block = False
590        noise_pattern = re.compile('noise:\s*([-0-9]+)\s+dbm')
591        for line in result_lines:
592            cleaned = line.strip().lower()
593            if my_frequency_pattern.match(cleaned):
594                inside_desired_frequency_block = True
595            elif inside_desired_frequency_block:
596                match = noise_pattern.match(cleaned)
597                if match is not None:
598                    return int(match.group(1))
599                if any_frequency_pattern.match(cleaned):
600                    inside_desired_frequency_block = False
601
602        logging.error('Failed to find noise level for %s at %d MHz.',
603                      self._name, frequency_mhz)
604        return None
605
606
607def get_interfaces():
608    """
609    Retrieve the list of network interfaces found on the system.
610
611    @return List of interfaces.
612
613    """
614    return [Interface(nic.strip()) for nic in os.listdir(DEVICE_INFO_ROOT)]
615
616
617def get_prioritized_default_route(host=None,
618                                  interface_name_regex=None,
619                                  namespace=None):
620    """
621    Query a local or remote host for its prioritized default interface
622    and route.
623
624    @param interface_name_regex string regex to filter routes by interface.
625    @return DefaultRoute tuple, or None if no default routes are found.
626
627    """
628    # Build a list of default routes, filtered by interface if requested.
629    # Example command output: 'default via 172.23.188.254 dev eth0  metric 2'
630    run = host.run if host is not None else utils.run
631    command = 'ip route show'
632    if namespace:
633        command = 'ip netns exec %s ' % namespace + command
634    output = run(command).stdout
635    output_regex_str = 'default\s+via\s+(\S+)\s+dev\s+(\S+)\s+metric\s+(\d+)'
636    output_regex = re.compile(output_regex_str)
637    defaults = []
638    for item in output.splitlines():
639        if 'default' not in item:
640            continue
641        match = output_regex.match(item.strip())
642        if match is None:
643            raise error.TestFail('Unexpected route output: %s' % item)
644        gateway = match.group(1)
645        interface_name = match.group(2)
646        metric = int(match.group(3))
647        if interface_name_regex is not None:
648            if re.match(interface_name_regex, interface_name) is None:
649                continue
650        defaults.append(DefaultRoute(interface_name=interface_name,
651                                     gateway=gateway, metric=metric))
652    if not defaults:
653        return None
654
655    # Sort and return the route with the lowest metric value.
656    defaults.sort(key=lambda x: x.metric)
657    return defaults[0]
658