xref: /aosp_15_r20/external/autotest/client/cros/dhcpv6_test_base.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""
7Base class for DHCPv6 tests.  This class just sets up a little bit of plumbing,
8like a virtual ethernet device with one end that looks like a real ethernet
9device to shill and a DHCPv6 test server on the end that doesn't look like a
10real ethernet interface to shill.  Child classes should override test_body()
11with the logic of their test.
12"""
13
14from __future__ import absolute_import
15from __future__ import division
16from __future__ import print_function
17
18import logging
19import six
20from six.moves import filter
21from six.moves import range
22import time
23import traceback
24
25from autotest_lib.client.bin import test
26from autotest_lib.client.common_lib import error
27from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
28from autotest_lib.client.cros import dhcpv6_test_server
29from autotest_lib.client.cros.networking import shill_proxy
30
31# These are keys that may be used with the DBus dictionary returned from
32# Dhcpv6TestBase.get_interface_ipconfig().
33DHCPV6_KEY_ADDRESS = 'Address'
34DHCPV6_KEY_DELEGATED_PREFIX = 'DelegatedPrefix'
35DHCPV6_KEY_DELEGATED_PREFIX_LENGTH = 'DelegatedPrefixLength'
36DHCPV6_KEY_NAMESERVERS = 'NameServers'
37DHCPV6_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains'
38
39# After DHCPv6 completes, an ipconfig should appear shortly after
40IPCONFIG_POLL_COUNT = 5
41IPCONFIG_POLL_PERIOD_SECONDS = 1
42
43class Dhcpv6TestBase(test.test):
44    """Parent class for tests that work verify DHCPv6 behavior."""
45    version = 1
46
47    def get_device(self, interface_name):
48        """Finds the corresponding Device object for an interface with
49        the name |interface_name|.
50
51        @param interface_name string The name of the interface to check.
52
53        @return DBus interface object representing the associated device.
54
55        """
56        return self.shill_proxy.find_object('Device',
57                                            {'Name': interface_name})
58
59
60    def find_ethernet_service(self, interface_name):
61        """Finds the corresponding service object for an Ethernet interface.
62
63        @param interface_name string The name of the associated interface
64
65        @return Service object representing the associated service.
66
67        """
68        device = self.get_device(interface_name)
69        device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
70        return self.shill_proxy.find_object('Service', {'Device': device_path})
71
72
73    def get_interface_ipconfig_objects(self, interface_name):
74        """
75        Returns a list of dbus object proxies for |interface_name|.
76        Returns an empty list if no such interface exists.
77
78        @param interface_name string name of the device to query (e.g., "eth0").
79
80        @return list of objects representing DBus IPConfig RPC endpoints.
81
82        """
83        device = self.get_device(interface_name)
84        if device is None:
85            return []
86
87        if six.PY2:
88            device_properties = device.GetProperties(utf8_strings=True)
89        else:
90            device_properties = device.GetProperties()
91        proxy = self.shill_proxy
92
93        ipconfig_object = proxy.DBUS_TYPE_IPCONFIG
94        return list(filter(bool,
95                      [ proxy.get_dbus_object(ipconfig_object, property_path)
96                        for property_path in device_properties['IPConfigs'] ]))
97
98
99    def get_interface_ipconfig(self, interface_name):
100        """
101        Returns a dictionary containing settings for an |interface_name| set
102        via DHCPv6.  Returns None if no such interface or setting bundle on
103        that interface can be found in shill.
104
105        @param interface_name string name of the device to query (e.g., "eth0").
106
107        @return dict containing the the properties of the IPConfig stripped
108            of DBus meta-data or None.
109
110        """
111        dhcp_properties = None
112        for ipconfig in self.get_interface_ipconfig_objects(interface_name):
113            logging.info('Looking at ipconfig %r', ipconfig)
114            if six.PY2:
115                ipconfig_properties = ipconfig.GetProperties(utf8_strings=True)
116            else:
117                ipconfig_properties = ipconfig.GetProperties()
118            if 'Method' not in ipconfig_properties:
119                logging.info('Found ipconfig object with no method field')
120                continue
121            if ipconfig_properties['Method'] != 'dhcp6':
122                logging.info('Found ipconfig object with method != dhcp6')
123                continue
124            if dhcp_properties != None:
125                raise error.TestFail('Found multiple ipconfig objects '
126                                     'with method == dhcp6')
127            dhcp_properties = ipconfig_properties
128        if dhcp_properties is None:
129            logging.info('Did not find IPConfig object with method == dhcp6')
130            return None
131        logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties)
132        return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties)
133
134
135    def run_once(self):
136        self._server = None
137        self._server_ip = None
138        self._ethernet_pair = None
139        self._shill_proxy = shill_proxy.ShillProxy()
140        try:
141            # TODO(zqiu): enable DHCPv6 for peer interface, either by restarting
142            # shill with appropriate command line options or via a new DBUS
143            # command.
144            self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair(
145                    interface_ip=None,
146                    peer_interface_name='pseudoethernet0',
147                    peer_interface_ip=None,
148                    interface_ipv6=dhcpv6_test_server.DHCPV6_SERVER_ADDRESS)
149            self._ethernet_pair.setup()
150            if not self._ethernet_pair.is_healthy:
151                raise error.TestFail('Could not create virtual ethernet pair.')
152            self._server_ip = self._ethernet_pair.interface_ip
153            self._server = dhcpv6_test_server.Dhcpv6TestServer(
154                    self._ethernet_pair.interface_name)
155            self._server.start()
156            self.test_body()
157        except (error.TestFail, error.TestNAError):
158            # Pass these through without modification.
159            raise
160        except Exception as e:
161            logging.error('Caught exception: %s.', str(e))
162            logging.error('Trace: %s', traceback.format_exc())
163            raise error.TestFail('Caught exception: %s.' % str(e))
164        finally:
165            if self._server is not None:
166                self._server.stop()
167            if self._ethernet_pair is not None:
168                self._ethernet_pair.teardown()
169
170    def test_body(self):
171        """
172        Override this method with the body of your test.  You may safely assume
173        that the the properties exposed by DhcpTestBase correctly return
174        references to the test apparatus.
175        """
176        raise error.TestFail('No test body implemented')
177
178    @property
179    def server_ip(self):
180        """
181        Return the IP address of the side of the interface that the DHCPv6 test
182        server is bound to.  The server itself is bound the the broadcast
183        address on the interface.
184        """
185        return self._server_ip
186
187    @property
188    def server(self):
189        """
190        Returns a reference to the DHCP test server.  Use this to add handlers
191        and run tests.
192        """
193        return self._server
194
195    @property
196    def ethernet_pair(self):
197        """
198        Returns a reference to the virtual ethernet pair created to run DHCP
199        tests on.
200        """
201        return self._ethernet_pair
202
203    @property
204    def shill_proxy(self):
205        """
206        Returns a the shill proxy instance.
207        """
208        return self._shill_proxy
209
210
211    def check_dhcpv6_config(self):
212        """
213        Compare the DHCPv6 ipconfig with DHCP lease parameters to ensure
214        that the DUT attained the correct values.
215
216        """
217        # Retrieve DHCPv6 configuration.
218        for attempt in range(IPCONFIG_POLL_COUNT):
219            dhcpv6_config = self.get_interface_ipconfig(
220                    self.ethernet_pair.peer_interface_name)
221            # Wait until both IP address and delegated prefix are obtained.
222            if (dhcpv6_config is not None and
223                dhcpv6_config.get(DHCPV6_KEY_ADDRESS) and
224                dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)):
225                break;
226            time.sleep(IPCONFIG_POLL_PERIOD_SECONDS)
227        else:
228            raise error.TestFail('Failed to retrieve DHCPv6 ipconfig object '
229                                 'from shill.')
230
231        # Verify Non-temporary Address prefix.
232        address = dhcpv6_config.get(DHCPV6_KEY_ADDRESS)
233        actual_prefix = address[:address.index('::')]
234        expected_prefix = dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX[:
235                dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX.index('::')]
236        if actual_prefix != expected_prefix:
237            raise error.TestFail('Address prefix mismatch: '
238                                 'actual %s expected %s.' %
239                                 (actual_prefix, expected_prefix))
240        # Verify Non-temporary Address suffix.
241        actual_suffix = int(address[address.index('::')+2:], 16)
242        if (actual_suffix < dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW or
243            actual_suffix > dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH):
244            raise error.TestFail('Invalid address suffix: '
245                                 'actual %x expected (%x-%x)' %
246                                 (actual_suffix,
247                                  dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW,
248                                  dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH))
249
250        # Verify delegated prefix.
251        delegated_prefix = dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)
252        for x in range(
253                dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_LOW,
254                dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_HIGH+1):
255            valid_prefix = \
256                    dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_RANGE_FORMAT % x
257            if delegated_prefix == valid_prefix:
258                break;
259        else:
260            raise error.TestFail('Invalid delegated prefix: %s' %
261                                 (delegated_prefix))
262        # Verify delegated prefix length.
263        delegated_prefix_length = \
264                int(dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX_LENGTH))
265        expected_delegated_prefix_length = \
266                dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_PREFIX_LENGTH
267        if delegated_prefix_length != expected_delegated_prefix_length:
268            raise error.TestFail('Delegated prefix length mismatch: '
269                                 'actual %d expected %d' %
270                                 (delegated_prefix_length,
271                                  expected_delegated_prefix_length))
272
273        # Verify name servers.
274        actual_name_servers = dhcpv6_config.get(DHCPV6_KEY_NAMESERVERS)
275        expected_name_servers = \
276                dhcpv6_test_server.DHCPV6_NAME_SERVERS.split(',')
277        if actual_name_servers != expected_name_servers:
278            raise error.TestFail('Name servers mismatch: actual %r expected %r'
279                                 % (actual_name_servers, expected_name_servers))
280        # Verify domain search.
281        actual_domain_search = dhcpv6_config.get(DHCPV6_KEY_SEARCH_DOMAIN_LIST)
282        expected_domain_search = \
283                dhcpv6_test_server.DHCPV6_DOMAIN_SEARCH.split(',')
284        if actual_domain_search != expected_domain_search:
285            raise error.TestFail('Domain search list mismatch: '
286                                 'actual %r expected %r' %
287                                 (actual_domain_search, expected_domain_search))
288