#!/usr/bin/env python3 # # Copyright 2018 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import itertools import pprint import time import acts.signals import acts_contrib.test_utils.wifi.wifi_test_utils as wutils from acts import asserts from acts.test_decorators import test_tracker_info from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest from acts.controllers import iperf_server as ipf from acts.controllers import attenuator from acts.controllers.sl4a_lib import rpc_client import json import logging import math import os from acts import utils import csv import serial import sys import urllib.request from acts_contrib.test_utils.wifi import wifi_performance_test_utils_RSSI as wperfutils WifiEnums = wutils.WifiEnums class WifiRvrTwTest(WifiBaseTest): """ Tests for wifi RVR performance. Test Bed Requirement: * One Android device * Wi-Fi networks visible to the device """ TEST_TIMEOUT = 10 IPERF_SETUP_TIME = 5 TURN_TABLE_SETUP_TIME = 5 def __init__(self, controllers): WifiBaseTest.__init__(self, controllers) def setup_class(self): self.dut = self.android_devices[0] req_params = ["rvr_networks", "rvr_test_params", "attenuators"] opt_params = ["angle_params", "usb_port"] self.unpack_userparams( req_param_names=req_params, opt_param_names=opt_params) asserts.assert_true( len(self.rvr_networks) > 0, "Need at least one network.") if "rvr_test_params" in self.user_params: self.iperf_server = self.iperf_servers[0] self.maxdb = self.rvr_test_params["rvr_atten_maxdb"] self.mindb = self.rvr_test_params["rvr_atten_mindb"] self.stepdb = self.rvr_test_params["rvr_atten_step"] self.country_code = self.rvr_test_params["country_code"] if "angle_params" in self.user_params: self.angle_list = self.angle_params if "usb_port" in self.user_params: self.turntable_port = self.read_comport(self.usb_port["turntable"]) # Init DUT wutils.wifi_test_device_init(self.dut, self.country_code) self.dut.droid.bluetoothToggleState(False) utils.set_location_service(self.dut, False) wutils.wifi_toggle_state(self.dut, True) utils.subprocess.check_output( "adb root", shell=True, timeout=self.TEST_TIMEOUT) utils.subprocess.check_output( "adb shell settings put system screen_off_timeout 18000000", shell=True, timeout=self.TEST_TIMEOUT) utils.subprocess.check_output( "adb shell svc power stayon true", shell=True, timeout=self.TEST_TIMEOUT) # create folder for rvr test result self.log_path = os.path.join(logging.log_path, "rvr_results") utils.create_dir(self.log_path) Header = ("Test_date", "Project", "Device_SN", "ROM", "HW_Stage", "test_SSID", "Frequency", "Turn_table_orientation", "Attenuate_dB", "Signal_poll_avg_rssi", "Chain_0_rssi", "Chain_1_rssi", "Link_speed", "TX_throughput_Mbps", "RX_throughput_Mbps", "HE_Capable", "Country_code", "Channel", "WiFi_chip", "Type", "Host_name", "AP_model", "Incremental_build_id", "Build_type", "TCP_UDP_Protocol", "Security_type", "Test_tool", "Airplane_mode_status", "BT_status", "Bug_ID", "Comment") self.csv_write(Header) def setup_test(self): self.dut.droid.wakeLockAcquireBright() self.dut.droid.wakeUpNow() rom_info = self.get_rominfo() self.testdate = time.strftime("%Y-%m-%d", time.localtime()) self.rom = rom_info[0] self.build_id = rom_info[1] self.build_type = rom_info[2] self.project = rom_info[3] self.ret_country_code = self.get_country_code() self.ret_hw_stage = self.get_hw_stage() self.ret_platform = wperfutils.detect_wifi_platform(self.dut) def teardown_test(self): self.dut.droid.wakeLockRelease() self.dut.droid.goToSleepNow() wutils.set_attns(self.attenuators, "default") def teardown_class(self): if "rvr_test_params" in self.user_params: self.iperf_server.stop() def on_fail(self, test_name, begin_time): self.dut.take_bug_report(test_name, begin_time) self.dut.cat_adb_log(test_name, begin_time) """Helper Functions""" def csv_write(self, data): """Output .CSV file for test result. Args: data: Dict containing attenuation, throughput and other meta data. """ with open( "{}/Result.csv".format(self.log_path), "a", newline="") as csv_file: csv_writer = csv.writer(csv_file, delimiter=",") csv_writer.writerow(data) csv_file.close() def set_atten(self, db): """Setup attenuator dB for current test. Args: db: Attenuator setup dB. """ if db < 0: db = 0 elif db > 95: db = 95 self.log.info("[Attenuation] %s", "Set dB = " + str(db) + "dB") for atten in self.attenuators: atten.set_atten(db) self.log.info("[Attenuation] %s", "Current dB = " + str(atten.get_atten()) + "dB") retry = 0 while atten.get_atten() != db and retry < 11: retry = retry + 1 self.log.info( "[Attenuation] %s", "Fail to set Attenuator to " + str(db) + ", " + str(retry) + " times try to reset") self.set_atten(db) if retry == 11: self.log.info("Attenuation] %s", "Retry Attenuator fail for 10 cycles, end test!") sys.exit() def read_comport(self, com): """Read com port for current test. Args: com: Serial port. Returns: port: Serial port with baud rate. """ port = serial.Serial(com, 9600, timeout=1) time.sleep(1) return port def get_angle(self, port): """Get turn table angle for current test. Args: port: Turn table com port. Returns: angle: Angle from turn table. """ angle = "" port.write("DG?;".encode()) time.sleep(0.1) degree_data = port.readline().decode("utf-8") for data in range(len(degree_data)): if (degree_data[data].isdigit()) is True: angle = angle + degree_data[data] if angle == "": return -1 return int(angle) def set_angle(self, port, angle): """Setup turn table angle for current test. Args: port: Turn table com port angle: Turn table setup angle """ if angle > 359: angle = 359 elif angle < 0: angle = 0 self.log.info("Set angle to " + str(angle)) input_angle = str("DG") + str(angle) + str(";") port.write(input_angle.encode()) time.sleep(self.TURN_TABLE_SETUP_TIME) def check_angle(self, port, angle): """Check turn table angle for current test. Args: port: Turn table com port angle: Turn table setup angle """ retrytime = self.TEST_TIMEOUT retry = 0 while self.get_angle(port) != angle and retry < retrytime: retry = retry + 1 self.log.info("Turntable] %s", "Current angle = " + str(self.get_angle(port))) self.log.info( "Turntable] %s", "Fail set angle to " + str(angle) + ", " + str(retry) + " times try to reset") self.set_angle(port, angle) time.sleep(self.TURN_TABLE_SETUP_TIME) if retry == retrytime: self.log.info( "Turntable] %s", "Retry turntable fail for " + str(retry) + " cycles, end test!") sys.exit() def get_wifiinfo(self): """Get WiFi RSSI/ link speed/ frequency for current test. Returns: [rssi,link_speed,frequency]: DUT WiFi RSSI,Link speed and Frequency. """ def is_number(string): for i in string: if i.isdigit() is False: if (i == "-" or i == "."): continue return str(-1) return string try: cmd = "adb shell iw wlan0 link" wifiinfo = utils.subprocess.check_output( cmd, shell=True, timeout=self.TEST_TIMEOUT) # Check RSSI Enhance rssi = self.get_rssi_func() # Check link speed link_speed = wifiinfo.decode( "utf-8")[wifiinfo.decode("utf-8").find("bitrate:") + 8:wifiinfo.decode("utf-8").find("Bit/s") - 2] link_speed = link_speed.strip(" ") link_speed = is_number(link_speed) # Check frequency frequency = wifiinfo.decode( "utf-8")[wifiinfo.decode("utf-8").find("freq:") + 6:wifiinfo.decode("utf-8").find("freq:") + 10] frequency = frequency.strip(" ") frequency = is_number(frequency) except: return -1, -1, -1 return [rssi, link_speed, frequency] def get_rssi_func(self): """Get RSSI from brcm/qcom wifi chip. Returns: current_rssi: DUT WiFi RSSI. """ if self.ret_platform == "brcm": rssi_future = wperfutils.get_connected_rssi_brcm(self.dut) signal_poll_avg_rssi_tmp = rssi_future.pop("signal_poll_avg_rssi").pop( "mean") chain_0_rssi_tmp = rssi_future.pop("chain_0_rssi").pop("mean") chain_1_rssi_tmp = rssi_future.pop("chain_1_rssi").pop("mean") current_rssi = { "signal_poll_avg_rssi": signal_poll_avg_rssi_tmp, "chain_0_rssi": chain_0_rssi_tmp, "chain_1_rssi": chain_1_rssi_tmp } elif self.ret_platform == "qcom": rssi_future = wperfutils.get_connected_rssi_qcom( self.dut, interface="wlan0") signal_poll_avg_rssi_tmp = rssi_future.pop("signal_poll_avg_rssi").pop( "mean") chain_0_rssi_tmp = rssi_future.pop("chain_0_rssi").pop("mean") chain_1_rssi_tmp = rssi_future.pop("chain_1_rssi").pop("mean") if math.isnan(signal_poll_avg_rssi_tmp): signal_poll_avg_rssi_tmp = -1 if math.isnan(chain_0_rssi_tmp): chain_0_rssi_tmp = -1 if math.isnan(chain_1_rssi_tmp): chain_1_rssi_tmp = -1 if signal_poll_avg_rssi_tmp == -1 & chain_0_rssi_tmp == -1 & chain_1_rssi_tmp == -1: current_rssi = -1 else: current_rssi = { "signal_poll_avg_rssi": signal_poll_avg_rssi_tmp, "chain_0_rssi": chain_0_rssi_tmp, "chain_1_rssi": chain_1_rssi_tmp } else: current_rssi = { "signal_poll_avg_rssi": float("nan"), "chain_0_rssi": float("nan"), "chain_1_rssi": float("nan") } return current_rssi def get_rominfo(self): """Get DUT ROM build info. Returns: rom, build_id, build_type, project: DUT Build info,Build ID, Build type, and Project name """ rom = "NA" build_id = "NA" build_type = "NA" project = "NA" rominfo = self.dut.adb.shell("getprop ro.build.display.id").split() if rominfo: rom = rominfo[2] build_id = rominfo[3] project, build_type = rominfo[0].split("-") return rom, build_id, build_type, project def get_hw_stage(self): """Get DUT HW stage. Returns: hw_stage: DUT HW stage e.g. EVT/DVT/PVT..etc. """ cmd = "adb shell getprop ro.boot.hardware.revision" hw_stage_temp = utils.subprocess.check_output( cmd, shell=True, timeout=self.TEST_TIMEOUT) hw_stage = hw_stage_temp.decode("utf-8").split("\n")[0] return hw_stage def get_country_code(self): """Get DUT country code. Returns: country_code: DUT country code e.g. US/JP/GE..etc. """ cmd = "adb shell cmd wifi get-country-code" country_code_temp = utils.subprocess.check_output( cmd, shell=True, timeout=self.TEST_TIMEOUT) country_code = country_code_temp.decode("utf-8").split(" ")[4].split( "\n")[0] return country_code def get_channel(self): """Get DUT WiFi channel. Returns: country_code: DUT channel e.g. 6/36/37..etc. """ if self.ret_platform == "brcm": cmd = 'adb shell wl assoc | grep "Primary channel:"' channel_temp = utils.subprocess.check_output( cmd, shell=True, timeout=self.TEST_TIMEOUT) channel = channel_temp.decode("utf-8").split(": ")[1].split("\n")[0] elif self.ret_platform == "qcom": cmd = "adb shell iw wlan0 info | grep channel" channel_temp = utils.subprocess.check_output( cmd, shell=True, timeout=self.TEST_TIMEOUT) channel = channel_temp.decode("utf-8").split(" ")[1].split("\n")[0] return channel def get_he_capable(self): """Get DUT WiFi high efficiency capable status . Returns: he_capable: DUT high efficiency capable status. """ if self.ret_platform == "brcm": cmd = 'adb shell wl assoc | grep "Chanspec:"' he_temp = utils.subprocess.check_output( cmd, shell=True, timeout=self.TEST_TIMEOUT) he_capable = he_temp.decode("utf-8").split(": ")[1].split("\n")[0].split( "MHz")[0].split(" ")[3] elif self.ret_platform == "qcom": cmd = "adb shell iw wlan0 info | grep channel" he_temp = utils.subprocess.check_output( cmd, shell=True, timeout=self.TEST_TIMEOUT) he_capable = he_temp.decode("utf-8").split("width: ")[1].split(" ")[0] return he_capable def post_process_results(self, rvr_result): """Saves JSON formatted results. Args: rvr_result: Dict containing attenuation, throughput and other meta data Returns: wifiinfo[0]: To check WiFi connection by RSSI value """ # Save output as text file wifiinfo = self.get_wifiinfo() if wifiinfo[0] != -1: rvr_result["signal_poll_avg_rssi"] = wifiinfo[0]["signal_poll_avg_rssi"] rvr_result["chain_0_rssi"] = wifiinfo[0]["chain_0_rssi"] rvr_result["chain_1_rssi"] = wifiinfo[0]["chain_1_rssi"] else: rvr_result["signal_poll_avg_rssi"] = wifiinfo[0] rvr_result["chain_0_rssi"] = wifiinfo[0] rvr_result["chain_1_rssi"] = wifiinfo[0] if rvr_result["signal_poll_avg_rssi"] == -1: rvr_result["channel"] = "NA" else: rvr_result["channel"] = self.ret_channel rvr_result["country_code"] = self.ret_country_code rvr_result["hw_stage"] = self.ret_hw_stage rvr_result["wifi_chip"] = self.ret_platform rvr_result["test_ssid"] = self.ssid rvr_result["test_angle"] = self.angle_list[self.angle] rvr_result["test_dB"] = self.db rvr_result["test_link_speed"] = wifiinfo[1] rvr_result["test_frequency"] = wifiinfo[2] data = ( self.testdate, self.project, self.dut.serial, self.rom, rvr_result["hw_stage"], rvr_result["test_ssid"], rvr_result["test_frequency"], rvr_result["test_angle"], rvr_result["test_dB"], rvr_result["signal_poll_avg_rssi"], rvr_result["chain_0_rssi"], rvr_result["chain_1_rssi"], rvr_result["test_link_speed"], rvr_result["throughput_TX"][0], rvr_result["throughput_RX"][0], "HE" + self.he_capable, rvr_result["country_code"], rvr_result["channel"], rvr_result["wifi_chip"], "OTA_RvR", "OTA_Testbed2", "RAXE500", self.build_id, self.build_type, "TCP", "WPA3", "iperf3", "OFF", "OFF", ) self.csv_write(data) results_file_path = "{}/{}_angle{}_{}dB.json".format( self.log_path, self.ssid, self.angle_list[self.angle], self.db) with open(results_file_path, "w") as results_file: json.dump(rvr_result, results_file, indent=4) return wifiinfo[0] def connect_to_wifi_network(self, network): """Connection logic for wifi networks. Args: params: Dictionary with network info. """ ssid = network[WifiEnums.SSID_KEY] self.dut.ed.clear_all_events() wutils.start_wifi_connection_scan(self.dut) scan_results = self.dut.droid.wifiGetScanResults() wutils.assert_network_in_list({WifiEnums.SSID_KEY: ssid}, scan_results) wutils.wifi_connect(self.dut, network, num_of_tries=3) def run_iperf_init(self, network): self.iperf_server.start(tag="init") self.log.info("[Iperf] %s", "Starting iperf traffic init.") time.sleep(self.IPERF_SETUP_TIME) try: port_arg = "-p {} -J -R -t10".format(self.iperf_server.port) self.dut.run_iperf_client( self.rvr_test_params["iperf_server_address"], port_arg, timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) self.iperf_server.stop() self.log.info("[Iperf] %s", "iperf traffic init Pass") except: self.log.warning("ValueError: iperf init ERROR.") def run_iperf_client(self, network): """Run iperf TX throughput after connection. Args: network: Dictionary with network info. Returns: rvr_result: Dict containing TX rvr_results. """ rvr_result = [] try: self.iperf_server.start(tag="TX_server_{}_angle{}_{}dB".format( self.ssid, self.angle_list[self.angle], self.db)) ssid = network[WifiEnums.SSID_KEY] self.log.info("[Iperf] %s", "Starting iperf traffic TX through {}".format(ssid)) time.sleep(self.IPERF_SETUP_TIME) port_arg = "-p {} -J {}".format(self.iperf_server.port, self.rvr_test_params["iperf_port_arg"]) success, data = self.dut.run_iperf_client( self.rvr_test_params["iperf_server_address"], port_arg, timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) # Parse and log result client_output_path = os.path.join( self.iperf_server.log_path, "IperfDUT,{},TX_client_{}_angle{}_{}dB".format( self.iperf_server.port, self.ssid, self.angle_list[self.angle], self.db)) with open(client_output_path, "w") as out_file: out_file.write("\n".join(data)) self.iperf_server.stop() iperf_file = self.iperf_server.log_files[-1] iperf_result = ipf.IPerfResult(iperf_file) curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ self.rvr_test_params["iperf_ignored_interval"]:-1]) / len(iperf_result.instantaneous_rates[ self.rvr_test_params["iperf_ignored_interval"]:-1]) ) * 8 * (1.024**2) rvr_result.append(curr_throughput) self.log.info( "[Iperf] %s", "TX Throughput at {0:.2f} dB is {1:.2f} Mbps".format( self.db, curr_throughput)) self.log.debug(pprint.pformat(data)) asserts.assert_true(success, "Error occurred in iPerf traffic.") return rvr_result except: rvr_result = ["NA"] self.log.warning("ValueError: TX iperf ERROR.") self.iperf_server.stop() return rvr_result def run_iperf_server(self, network): """Run iperf RX throughput after connection. Args: network: Dictionary with network info. Returns: rvr_result: Dict containing RX rvr_results. """ rvr_result = [] try: self.iperf_server.start(tag="RX_client_{}_angle{}_{}dB".format( self.ssid, self.angle_list[self.angle], self.db)) ssid = network[WifiEnums.SSID_KEY] self.log.info("[Iperf] %s", "Starting iperf traffic RX through {}".format(ssid)) time.sleep(self.IPERF_SETUP_TIME) port_arg = "-p {} -J -R {}".format(self.iperf_server.port, self.rvr_test_params["iperf_port_arg"]) success, data = self.dut.run_iperf_client( self.rvr_test_params["iperf_server_address"], port_arg, timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) # Parse and log result client_output_path = os.path.join( self.iperf_server.log_path, "IperfDUT,{},RX_server_{}_angle{}_{}dB".format( self.iperf_server.port, self.ssid, self.angle_list[self.angle], self.db)) with open(client_output_path, "w") as out_file: out_file.write("\n".join(data)) self.iperf_server.stop() iperf_file = client_output_path iperf_result = ipf.IPerfResult(iperf_file) curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ self.rvr_test_params["iperf_ignored_interval"]:-1]) / len(iperf_result.instantaneous_rates[ self.rvr_test_params["iperf_ignored_interval"]:-1]) ) * 8 * (1.024**2) rvr_result.append(curr_throughput) self.log.info( "[Iperf] %s", "RX Throughput at {0:.2f} dB is {1:.2f} Mbps".format( self.db, curr_throughput)) self.log.debug(pprint.pformat(data)) asserts.assert_true(success, "Error occurred in iPerf traffic.") return rvr_result except: rvr_result = ["NA"] self.log.warning("ValueError: RX iperf ERROR.") self.iperf_server.stop() return rvr_result def iperf_test_func(self, network): """Main function to test iperf TX/RX. Args: network: Dictionary with network info. """ # Initialize rvr_result = {} # Run RvR and log result rvr_result["throughput_RX"] = self.run_iperf_server(network) retry_time = 2 for retry in range(retry_time): if rvr_result["throughput_RX"] == ["NA"]: if not self.iperf_retry(): time.sleep(self.IPERF_SETUP_TIME) rvr_result["throughput_RX"] = self.run_iperf_server(network) else: break else: break rvr_result["throughput_TX"] = self.run_iperf_client(network) retry_time = 2 for retry in range(retry_time): if rvr_result["throughput_TX"] == ["NA"]: if not self.iperf_retry(): time.sleep(self.IPERF_SETUP_TIME) rvr_result["throughput_TX"] = self.run_iperf_client(network) else: break else: break self.post_process_results(rvr_result) self.rssi = wifiinfo[0] return self.rssi def iperf_retry(self): """Check iperf TX/RX status and retry.""" try: cmd = "adb -s {} shell pidof iperf3| xargs adb shell kill -9".format( self.dut.serial) utils.subprocess.call(cmd, shell=True, timeout=self.TEST_TIMEOUT) self.log.warning("ValueError: Killed DUT iperf process, keep test") except: self.log.info("[Iperf] %s", "No iperf DUT process found, keep test") wifiinfo = self.get_wifiinfo() print("--[iperf_retry]--", wifiinfo[0]) self.log.info("[WiFiinfo] %s", "Current RSSI = " + str(wifiinfo[0]) + "dBm") if wifiinfo[0] == -1: self.log.warning("ValueError: Cannot get RSSI, stop throughput test") return True else: return False def rvr_test(self, network): """Test function to run RvR. The function runs an RvR test in the current device/AP configuration. Function is called from another wrapper function that sets up the testbed for the RvR test Args: params: Dictionary with network info """ self.ssid = network[WifiEnums.SSID_KEY] self.log.info("Start rvr test") for angle in range(len(self.angle_list)): self.angle = angle self.set_angle(self.turntable_port, self.angle_list[angle]) self.check_angle(self.turntable_port, self.angle_list[angle]) self.set_atten(0) self.connect_to_wifi_network(network) self.ret_channel = self.get_channel() self.he_capable = self.get_he_capable() self.run_iperf_init(network) for db in range(self.mindb, self.maxdb + self.stepdb, self.stepdb): self.db = db self.set_atten(self.db) self.iperf_test_func(network) if self.rssi == -1: self.log.warning("ValueError: Cannot get RSSI. Run next angle") break else: continue wutils.reset_wifi(self.dut) """Tests""" def test_rvr_2g(self): network = self.rvr_networks[0] self.rvr_test(network) def test_rvr_5g(self): network = self.rvr_networks[1] self.rvr_test(network) def test_rvr_6g(self): network = self.rvr_networks[2] self.rvr_test(network)