1# Lint as: python2, python3 2# Copyright 2015 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7Base class for DHCPv6 tests. This class just sets up a little bit of plumbing, 8like a virtual ethernet device with one end that looks like a real ethernet 9device to shill and a DHCPv6 test server on the end that doesn't look like a 10real ethernet interface to shill. Child classes should override test_body() 11with the logic of their test. 12""" 13 14from __future__ import absolute_import 15from __future__ import division 16from __future__ import print_function 17 18import logging 19import six 20from six.moves import filter 21from six.moves import range 22import time 23import traceback 24 25from autotest_lib.client.bin import test 26from autotest_lib.client.common_lib import error 27from autotest_lib.client.common_lib.cros import virtual_ethernet_pair 28from autotest_lib.client.cros import dhcpv6_test_server 29from autotest_lib.client.cros.networking import shill_proxy 30 31# These are keys that may be used with the DBus dictionary returned from 32# Dhcpv6TestBase.get_interface_ipconfig(). 33DHCPV6_KEY_ADDRESS = 'Address' 34DHCPV6_KEY_DELEGATED_PREFIX = 'DelegatedPrefix' 35DHCPV6_KEY_DELEGATED_PREFIX_LENGTH = 'DelegatedPrefixLength' 36DHCPV6_KEY_NAMESERVERS = 'NameServers' 37DHCPV6_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains' 38 39# After DHCPv6 completes, an ipconfig should appear shortly after 40IPCONFIG_POLL_COUNT = 5 41IPCONFIG_POLL_PERIOD_SECONDS = 1 42 43class Dhcpv6TestBase(test.test): 44 """Parent class for tests that work verify DHCPv6 behavior.""" 45 version = 1 46 47 def get_device(self, interface_name): 48 """Finds the corresponding Device object for an interface with 49 the name |interface_name|. 50 51 @param interface_name string The name of the interface to check. 52 53 @return DBus interface object representing the associated device. 54 55 """ 56 return self.shill_proxy.find_object('Device', 57 {'Name': interface_name}) 58 59 60 def find_ethernet_service(self, interface_name): 61 """Finds the corresponding service object for an Ethernet interface. 62 63 @param interface_name string The name of the associated interface 64 65 @return Service object representing the associated service. 66 67 """ 68 device = self.get_device(interface_name) 69 device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path) 70 return self.shill_proxy.find_object('Service', {'Device': device_path}) 71 72 73 def get_interface_ipconfig_objects(self, interface_name): 74 """ 75 Returns a list of dbus object proxies for |interface_name|. 76 Returns an empty list if no such interface exists. 77 78 @param interface_name string name of the device to query (e.g., "eth0"). 79 80 @return list of objects representing DBus IPConfig RPC endpoints. 81 82 """ 83 device = self.get_device(interface_name) 84 if device is None: 85 return [] 86 87 if six.PY2: 88 device_properties = device.GetProperties(utf8_strings=True) 89 else: 90 device_properties = device.GetProperties() 91 proxy = self.shill_proxy 92 93 ipconfig_object = proxy.DBUS_TYPE_IPCONFIG 94 return list(filter(bool, 95 [ proxy.get_dbus_object(ipconfig_object, property_path) 96 for property_path in device_properties['IPConfigs'] ])) 97 98 99 def get_interface_ipconfig(self, interface_name): 100 """ 101 Returns a dictionary containing settings for an |interface_name| set 102 via DHCPv6. Returns None if no such interface or setting bundle on 103 that interface can be found in shill. 104 105 @param interface_name string name of the device to query (e.g., "eth0"). 106 107 @return dict containing the the properties of the IPConfig stripped 108 of DBus meta-data or None. 109 110 """ 111 dhcp_properties = None 112 for ipconfig in self.get_interface_ipconfig_objects(interface_name): 113 logging.info('Looking at ipconfig %r', ipconfig) 114 if six.PY2: 115 ipconfig_properties = ipconfig.GetProperties(utf8_strings=True) 116 else: 117 ipconfig_properties = ipconfig.GetProperties() 118 if 'Method' not in ipconfig_properties: 119 logging.info('Found ipconfig object with no method field') 120 continue 121 if ipconfig_properties['Method'] != 'dhcp6': 122 logging.info('Found ipconfig object with method != dhcp6') 123 continue 124 if dhcp_properties != None: 125 raise error.TestFail('Found multiple ipconfig objects ' 126 'with method == dhcp6') 127 dhcp_properties = ipconfig_properties 128 if dhcp_properties is None: 129 logging.info('Did not find IPConfig object with method == dhcp6') 130 return None 131 logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties) 132 return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties) 133 134 135 def run_once(self): 136 self._server = None 137 self._server_ip = None 138 self._ethernet_pair = None 139 self._shill_proxy = shill_proxy.ShillProxy() 140 try: 141 # TODO(zqiu): enable DHCPv6 for peer interface, either by restarting 142 # shill with appropriate command line options or via a new DBUS 143 # command. 144 self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair( 145 interface_ip=None, 146 peer_interface_name='pseudoethernet0', 147 peer_interface_ip=None, 148 interface_ipv6=dhcpv6_test_server.DHCPV6_SERVER_ADDRESS) 149 self._ethernet_pair.setup() 150 if not self._ethernet_pair.is_healthy: 151 raise error.TestFail('Could not create virtual ethernet pair.') 152 self._server_ip = self._ethernet_pair.interface_ip 153 self._server = dhcpv6_test_server.Dhcpv6TestServer( 154 self._ethernet_pair.interface_name) 155 self._server.start() 156 self.test_body() 157 except (error.TestFail, error.TestNAError): 158 # Pass these through without modification. 159 raise 160 except Exception as e: 161 logging.error('Caught exception: %s.', str(e)) 162 logging.error('Trace: %s', traceback.format_exc()) 163 raise error.TestFail('Caught exception: %s.' % str(e)) 164 finally: 165 if self._server is not None: 166 self._server.stop() 167 if self._ethernet_pair is not None: 168 self._ethernet_pair.teardown() 169 170 def test_body(self): 171 """ 172 Override this method with the body of your test. You may safely assume 173 that the the properties exposed by DhcpTestBase correctly return 174 references to the test apparatus. 175 """ 176 raise error.TestFail('No test body implemented') 177 178 @property 179 def server_ip(self): 180 """ 181 Return the IP address of the side of the interface that the DHCPv6 test 182 server is bound to. The server itself is bound the the broadcast 183 address on the interface. 184 """ 185 return self._server_ip 186 187 @property 188 def server(self): 189 """ 190 Returns a reference to the DHCP test server. Use this to add handlers 191 and run tests. 192 """ 193 return self._server 194 195 @property 196 def ethernet_pair(self): 197 """ 198 Returns a reference to the virtual ethernet pair created to run DHCP 199 tests on. 200 """ 201 return self._ethernet_pair 202 203 @property 204 def shill_proxy(self): 205 """ 206 Returns a the shill proxy instance. 207 """ 208 return self._shill_proxy 209 210 211 def check_dhcpv6_config(self): 212 """ 213 Compare the DHCPv6 ipconfig with DHCP lease parameters to ensure 214 that the DUT attained the correct values. 215 216 """ 217 # Retrieve DHCPv6 configuration. 218 for attempt in range(IPCONFIG_POLL_COUNT): 219 dhcpv6_config = self.get_interface_ipconfig( 220 self.ethernet_pair.peer_interface_name) 221 # Wait until both IP address and delegated prefix are obtained. 222 if (dhcpv6_config is not None and 223 dhcpv6_config.get(DHCPV6_KEY_ADDRESS) and 224 dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)): 225 break; 226 time.sleep(IPCONFIG_POLL_PERIOD_SECONDS) 227 else: 228 raise error.TestFail('Failed to retrieve DHCPv6 ipconfig object ' 229 'from shill.') 230 231 # Verify Non-temporary Address prefix. 232 address = dhcpv6_config.get(DHCPV6_KEY_ADDRESS) 233 actual_prefix = address[:address.index('::')] 234 expected_prefix = dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX[: 235 dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX.index('::')] 236 if actual_prefix != expected_prefix: 237 raise error.TestFail('Address prefix mismatch: ' 238 'actual %s expected %s.' % 239 (actual_prefix, expected_prefix)) 240 # Verify Non-temporary Address suffix. 241 actual_suffix = int(address[address.index('::')+2:], 16) 242 if (actual_suffix < dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW or 243 actual_suffix > dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH): 244 raise error.TestFail('Invalid address suffix: ' 245 'actual %x expected (%x-%x)' % 246 (actual_suffix, 247 dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW, 248 dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH)) 249 250 # Verify delegated prefix. 251 delegated_prefix = dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX) 252 for x in range( 253 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_LOW, 254 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_HIGH+1): 255 valid_prefix = \ 256 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_RANGE_FORMAT % x 257 if delegated_prefix == valid_prefix: 258 break; 259 else: 260 raise error.TestFail('Invalid delegated prefix: %s' % 261 (delegated_prefix)) 262 # Verify delegated prefix length. 263 delegated_prefix_length = \ 264 int(dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX_LENGTH)) 265 expected_delegated_prefix_length = \ 266 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_PREFIX_LENGTH 267 if delegated_prefix_length != expected_delegated_prefix_length: 268 raise error.TestFail('Delegated prefix length mismatch: ' 269 'actual %d expected %d' % 270 (delegated_prefix_length, 271 expected_delegated_prefix_length)) 272 273 # Verify name servers. 274 actual_name_servers = dhcpv6_config.get(DHCPV6_KEY_NAMESERVERS) 275 expected_name_servers = \ 276 dhcpv6_test_server.DHCPV6_NAME_SERVERS.split(',') 277 if actual_name_servers != expected_name_servers: 278 raise error.TestFail('Name servers mismatch: actual %r expected %r' 279 % (actual_name_servers, expected_name_servers)) 280 # Verify domain search. 281 actual_domain_search = dhcpv6_config.get(DHCPV6_KEY_SEARCH_DOMAIN_LIST) 282 expected_domain_search = \ 283 dhcpv6_test_server.DHCPV6_DOMAIN_SEARCH.split(',') 284 if actual_domain_search != expected_domain_search: 285 raise error.TestFail('Domain search list mismatch: ' 286 'actual %r expected %r' % 287 (actual_domain_search, expected_domain_search)) 288