1#
2#   Copyright 2021 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15import time
16
17from acts import asserts
18from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT
19from acts.test_decorators import test_tracker_info
20from acts_contrib.test_utils.net.net_test_utils import start_tcpdump
21from acts_contrib.test_utils.net.net_test_utils import stop_tcpdump
22from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
23from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
24from scapy.all import rdpcap, DHCP, IPv6
25from scapy.layers.inet6 import ICMPv6ND_NA as NA
26
27WLAN = "wlan0"
28PING_ADDR = "google.com"
29RAPID_COMMIT_OPTION = (80, b'')
30DEFAULT_IPV6_ALLROUTERS = "ff02::2"
31
32
33class DhcpTest(WifiBaseTest):
34    """DHCP related test for Android."""
35
36    def setup_class(self):
37        self.dut = self.android_devices[0]
38
39        wutils.wifi_test_device_init(self.dut)
40        req_params = []
41        opt_param = ["wifi_network", "configure_OpenWrt"]
42        self.unpack_userparams(
43            req_param_names=req_params, opt_param_names=opt_param)
44        asserts.assert_true(OPENWRT in self.user_params,
45                            "OpenWrtAP is not in testbed.")
46
47        self.openwrt = self.access_points[0]
48        if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip":
49            self.dut.log.info("Skip configure Wifi interface due to config setup.")
50        else:
51            self.configure_openwrt_ap_and_start(wpa_network=True)
52            self.wifi_network = self.openwrt.get_wifi_network()
53        self.openwrt.network_setting.setup_ipv6_bridge()
54        asserts.assert_true(self.openwrt.verify_wifi_status(),
55                            "OpenWrt Wifi interface is not ready.")
56
57    def teardown_class(self):
58        """Reset wifi and stop tcpdump cleanly."""
59        wutils.reset_wifi(self.dut)
60        self.openwrt.network_setting.clear_tcpdump()
61
62    def teardown_test(self):
63        """Reset wifi to make sure DUT tears down cleanly."""
64        wutils.reset_wifi(self.dut)
65
66    def _verify_ping(self, option="", dest=PING_ADDR):
67        try:
68            out = self.dut.adb.shell("ping%s -c1 %s" % (option, dest))
69            return "100%" not in out
70        except Exception as e:
71            self.dut.log.debug(e)
72            return False
73
74    def _verify_device_address(self, ipv4=True, ipv6=True, timeout=15):
75        """Verify device get assign address on wireless interface."""
76        current_time = time.time()
77        while time.time() < current_time + timeout:
78            try:
79                if ipv4:
80                    ipv4_addr = self.dut.droid.connectivityGetIPv4Addresses(WLAN)[0]
81                    self.dut.log.info("ipv4_address is %s" % ipv4_addr)
82                if ipv6:
83                    ipv6_addr = self.dut.droid.connectivityGetIPv6Addresses(WLAN)[0]
84                    self.dut.log.info("ipv6_address is %s" % ipv6_addr)
85                return True
86            except:
87                time.sleep(1)
88        return False
89
90    def verify_dhcp_packet(self, packets, support_rapid_commit):
91        for pkt in packets:
92            if pkt.haslayer(DHCP):
93                # Remove dhcp discover checking since rapid commit enable by default(aosp/2943087).
94                if pkt[DHCP].options[0][1] == 2:
95                    asserts.assert_true(not support_rapid_commit,
96                                        "Should not find DHCP OFFER when RAPID_COMMIT_OPTION supported.")
97                elif pkt[DHCP].options[0][1] == 3:
98                    asserts.assert_true(not support_rapid_commit,
99                                        "Should not find DHCP REQUEST when RAPID_COMMIT_OPTION supported.")
100                elif pkt[DHCP].options[0][1] == 5:
101                    send_option = RAPID_COMMIT_OPTION in pkt[DHCP].options
102                    asserts.assert_true(send_option == support_rapid_commit,
103                                        "Unexpected result in DHCP ACK.")
104
105    def verify_gratuitous_na(self, packets):
106        ipv6localaddress = self.dut.droid.connectivityGetLinkLocalIpv6Address(WLAN).strip("%wlan0")
107        self.dut.log.info("Device local address : %s" % ipv6localaddress)
108        ipv6globaladdress = sorted(self.dut.droid.connectivityGetIPv6Addresses(WLAN))
109        self.dut.log.info("Device global address : %s" % ipv6globaladdress)
110        target_address = []
111        for pkt in packets:
112            if pkt.haslayer(NA) and pkt.haslayer(IPv6) and pkt[IPv6].src == ipv6localaddress\
113                    and pkt[IPv6].dst == DEFAULT_IPV6_ALLROUTERS:
114                # broadcast global address
115                target_address.append(pkt.tgt)
116        self.dut.log.info("Broadcast target address : %s" % target_address)
117        asserts.assert_equal(ipv6globaladdress, sorted(target_address),
118                             "Target address from NA is not match to device ipv6 address.")
119
120    @test_tracker_info(uuid="01148659-6a3d-4a74-88b6-04b19c4acaaa")
121    def test_ipv4_ipv6_network(self):
122        """Verify device can get both ipv4 ipv6 address."""
123        wutils.connect_to_wifi_network(self.dut, self.wifi_network)
124
125        asserts.assert_true(self._verify_device_address(),
126                            "Fail to get ipv4/ipv6 address.")
127        asserts.assert_true(self._verify_ping(), "Fail to ping on ipv4.")
128        asserts.assert_true(self._verify_ping("6"), "Fail to ping on ipv6.")
129
130    @test_tracker_info(uuid="d3f37ba7-504e-48fc-95be-6eca9a148e4a")
131    def test_ipv6_only_prefer_option(self):
132        """Verify DUT can only get ipv6 address and ping out."""
133        self.openwrt.network_setting.add_ipv6_prefer_option()
134        wutils.connect_to_wifi_network(self.dut, self.wifi_network)
135
136        asserts.assert_true(self._verify_device_address(ipv4=False),
137                            "Fail to get ipv6 address.")
138        asserts.assert_false(self._verify_ping(),
139                             "Should not ping on success on ipv4.")
140        asserts.assert_true(self._verify_ping("6"),
141                            "Fail to ping on ipv6.")
142        self.openwrt.network_setting.remove_ipv6_prefer_option()
143
144    @test_tracker_info(uuid="a16f2a3c-e3ca-4fca-b3ee-bccb5cf34bab")
145    def test_dhcp_rapid_commit(self):
146        """Verify DUT can run with rapid commit on IPv4."""
147        self.dut.adb.shell("device_config put connectivity dhcp_rapid_commit_version 1")
148        self.openwrt.network_setting.add_dhcp_rapid_commit()
149        remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name)
150        wutils.connect_to_wifi_network(self.dut, self.wifi_network)
151        local_pcap_path = self.openwrt.network_setting.stop_tcpdump(
152            remote_pcap_path, self.dut.device_log_path)
153        self.dut.log.info("pcap file path : %s" % local_pcap_path)
154        packets = rdpcap(local_pcap_path)
155        self.verify_dhcp_packet(packets, True)
156        self.openwrt.network_setting.remove_dhcp_rapid_commit()
157
158    @test_tracker_info(uuid="cddb3d33-e5ef-4efd-8ae5-1325010a05c8")
159    def test_dhcp_4_way_handshake(self):
160        """Verify DUT can run with rapid commit on IPv4."""
161        self.dut.adb.shell("device_config put connectivity dhcp_rapid_commit_version 0")
162        remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name)
163        wutils.connect_to_wifi_network(self.dut, self.wifi_network)
164        local_pcap_path = self.openwrt.network_setting.stop_tcpdump(
165            remote_pcap_path, self.dut.device_log_path)
166        self.dut.log.info("pcap file path : %s" % local_pcap_path)
167        packets = rdpcap(local_pcap_path)
168        self.verify_dhcp_packet(packets, False)
169
170    @test_tracker_info(uuid="69fd9619-db35-406a-96e2-8425f8f5e8bd")
171    def test_gratuitous_na(self):
172        """Verify DUT will send NA after ipv6 address set."""
173        self.dut.adb.shell("device_config put connectivity ipclient_gratuitous_na_version 1")
174        remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name)
175        self.tcpdump_pid = start_tcpdump(self.dut, self.test_name)
176        wutils.connect_to_wifi_network(self.dut, self.wifi_network)
177        local_pcap_path = self.openwrt.network_setting.stop_tcpdump(
178            remote_pcap_path, self.dut.device_log_path)
179        stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name)
180        self.dut.log.info("pcap file path : %s" % local_pcap_path)
181        packets = rdpcap(local_pcap_path)
182        self.verify_gratuitous_na(packets)
183