#!/usr/bin/env python3.4 # # Copyright 2017 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import collections import csv import itertools import json import logging import math import os import re import scipy.stats import time from acts import asserts from acts import context from acts import base_test from acts import utils from acts.controllers.utils_lib import ssh from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger from acts_contrib.test_utils.wifi import ota_sniffer from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap from acts_contrib.test_utils.wifi import wifi_test_utils as wutils from functools import partial class WifiTxPowerCheckTest(base_test.BaseTestClass): """Class for ping-based Wifi performance tests. This class implements WiFi ping performance tests such as range and RTT. The class setups up the AP in the desired configurations, configures and connects the phone to the AP, and runs For an example config file to run this test class see example_connectivity_performance_ap_sta.json. """ TEST_TIMEOUT = 10 RSSI_POLL_INTERVAL = 0.2 SHORT_SLEEP = 1 MED_SLEEP = 5 MAX_CONSECUTIVE_ZEROS = 5 DISCONNECTED_PING_RESULT = { 'connected': 0, 'rtt': [], 'time_stamp': [], 'ping_interarrivals': [], 'packet_loss_percentage': 100 } BRCM_SAR_MAPPING = { 0: 'disable', 1: 'head', 2: 'grip', 16: 'bt', 32: 'hotspot' } BAND_TO_CHANNEL_MAP = { ('2g', 1): [1, 6, 11], ('5g', 1): [36, 40, 44, 48], ('5g', 2): [52, 56, 60, 64], ('5g', 3): range(100, 148, 4), ('5g', 4): [149, 153, 157, 161], ('6g', 1): ['6g{}'.format(channel) for channel in range(1, 46, 4)], ('6g', 2): ['6g{}'.format(channel) for channel in range(49, 94, 4)], ('6g', 3): ['6g{}'.format(channel) for channel in range(97, 114, 4)], ('6g', 4): ['6g{}'.format(channel) for channel in range(117, 158, 4)], ('6g', 5): ['6g{}'.format(channel) for channel in range(161, 186, 4)], ('6g', 6): ['6g{}'.format(channel) for channel in range(189, 234, 4)] } def __init__(self, controllers): base_test.BaseTestClass.__init__(self, controllers) self.testcase_metric_logger = ( BlackboxMappedMetricLogger.for_test_case()) self.testclass_metric_logger = ( BlackboxMappedMetricLogger.for_test_class()) self.publish_testcase_metrics = True self.tests = self.generate_test_cases( ap_power='standard', channels=[6, 36, 52, 100, 149, '6g37', '6g117', '6g213'], modes=['bw20', 'bw40', 'bw80', 'bw160'], test_types=[ 'test_tx_power', ], country_codes=['US', 'GB', 'JP', 'CA', 'AU'], sar_states=range(-1, 13)) def setup_class(self): self.dut = self.android_devices[-1] req_params = [ 'tx_power_test_params', 'testbed_params', 'main_network', 'RetailAccessPoints', 'RemoteServer' ] opt_params = ['OTASniffer'] self.unpack_userparams(req_params, opt_params) self.testclass_params = self.tx_power_test_params self.num_atten = self.attenuators[0].instrument.num_atten self.ping_server = ssh.connection.SshConnection( ssh.settings.from_config(self.RemoteServer[0]['ssh_config'])) self.access_point = retail_ap.create(self.RetailAccessPoints)[0] if hasattr(self, 'OTASniffer') and self.testbed_params['sniffer_enable']: try: self.sniffer = ota_sniffer.create(self.OTASniffer)[0] except: self.log.warning('Could not start sniffer. Disabling sniffs.') self.testbed_params['sniffer_enable'] = 0 self.log.info('Access Point Configuration: {}'.format( self.access_point.ap_settings)) self.log_path = os.path.join(logging.log_path, 'results') os.makedirs(self.log_path, exist_ok=True) self.atten_dut_chain_map = {} self.testclass_results = [] # Turn WiFi ON if self.testclass_params.get('airplane_mode', 1): self.log.info('Turning on airplane mode.') asserts.assert_true(utils.force_airplane_mode(self.dut, True), 'Can not turn on airplane mode.') wutils.wifi_toggle_state(self.dut, True) self.dut.droid.wifiEnableVerboseLogging(1) asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1, "Failed to enable WiFi verbose logging.") # decode nvram self.nvram_sar_data = self.read_nvram_sar_data() self.csv_sar_data = self.read_sar_csv(self.testclass_params['sar_csv']) # Configure test retries self.user_params['retry_tests'] = [self.__class__.__name__] def teardown_class(self): # Turn WiFi OFF and reset AP self.access_point.teardown() for dev in self.android_devices: wutils.wifi_toggle_state(dev, False) dev.go_to_sleep() self.process_testclass_results() def setup_test(self): self.retry_flag = False def teardown_test(self): self.retry_flag = False def on_retry(self): """Function to control test logic on retried tests. This function is automatically executed on tests that are being retried. In this case the function resets wifi, toggles it off and on and sets a retry_flag to enable further tweaking the test logic on second attempts. """ self.retry_flag = True for dev in self.android_devices: wutils.reset_wifi(dev) wutils.toggle_wifi_off_and_on(dev) def read_sar_csv(self, sar_csv): """Reads SAR powers from CSV. This function reads SAR powers from a CSV and generate a dictionary with all programmed TX powers on a per band and regulatory domain basis. Args: sar_csv: path to SAR data file. Returns: sar_powers: dict containing all SAR data """ sar_powers = {} sar_csv_data = [] with open(sar_csv, mode='r') as f: reader = csv.DictReader(f) for row in reader: row['Sub-band Powers'] = [ float(val) for key, val in row.items() if 'Sub-band' in key and val != '' ] sar_csv_data.append(row) for row in sar_csv_data: sar_powers.setdefault(int(row['Scenario Index']), {}) sar_powers[int(row['Scenario Index'])].setdefault('SAR Powers', {}) sar_row_key = (row['Regulatory Domain'], row['Mode'], row['Band']) sar_powers[int(row['Scenario Index'])]['SAR Powers'].setdefault( sar_row_key, {}) sar_powers[int( row['Scenario Index'])]['SAR Powers'][sar_row_key][int( row['Chain'])] = row['Sub-band Powers'] return sar_powers def read_nvram_sar_data(self): """Reads SAR powers from NVRAM. This function reads SAR powers from the NVRAM found on the DUT and generates a dictionary with all programmed TX powers on a per band and regulatory domain basis. NThe NVRAM file is chosen based on the build, but if no NVRAM file is found matching the expected name, the default NVRAM will be loaded. The choice of NVRAM is not guaranteed to be correct. Returns: nvram_sar_data: dict containing all SAR data """ self._read_sar_config_info() try: hardware_version = self.dut.adb.shell( 'getprop ro.boot.hardware.revision') nvram_path = '/vendor/firmware/bcmdhd.cal_{}'.format( hardware_version) nvram = self.dut.adb.shell('cat {}'.format(nvram_path)) except: nvram = self.dut.adb.shell('cat /vendor/firmware/bcmdhd.cal') current_context = context.get_current_context().get_full_output_path() file_path = os.path.join(current_context, 'nvram_file') with open(file_path, 'w') as file: file.write(nvram) nvram_sar_data = {} for line in nvram.splitlines(): if 'dynsar' in line: sar_config, sar_powers = self._parse_nvram_sar_line(line) nvram_sar_data[sar_config] = sar_powers file_path = os.path.join(current_context, 'nvram_sar_data') with open(file_path, 'w') as file: json.dump(wputils.serialize_dict(nvram_sar_data), file, indent=4) return nvram_sar_data def _read_sar_config_info(self): """Function to read SAR scenario mapping, This function reads sar_config.info file which contains the mapping of SAR scenarios to NVRAM data tables. """ self.sar_state_mapping = collections.OrderedDict([(-2, { "google_name": 'WIFI_POWER_SCENARIO_INVALID' }), (-1, { "google_name": 'WIFI_POWER_SCENARIO_DISABLE' }), (0, { "google_name": 'WIFI_POWER_SCENARIO_VOICE_CALL' }), (1, { "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_OFF' }), (2, { "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_ON' }), (3, { "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_OFF' }), (4, { "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON' }), (5, { "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_BT' }), (6, { "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT' }), (7, { "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT_MMW' }), (8, { "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON_BT' }), (9, { "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT' }), (10, { "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT' }), (11, { "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_MMW' }), (12, { "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT_MMW' })]) sar_config_path = '/vendor/firmware/sarconfig.info' sar_config = self.dut.adb.shell( 'cat {}'.format(sar_config_path)).splitlines() sar_config = [line.split(',') for line in sar_config] sar_config = [[int(x) for x in line] for line in sar_config] for sar_state in sar_config: self.sar_state_mapping[sar_state[0]]['brcm_index'] = ( self.BRCM_SAR_MAPPING[sar_state[1]], bool(sar_state[2])) current_context = context.get_current_context().get_full_output_path() file_path = os.path.join(current_context, 'sarconfig') with open(file_path, 'w') as file: json.dump(wputils.serialize_dict(self.sar_state_mapping), file, indent=4) def _parse_nvram_sar_line(self, sar_line): """Helper function to decode SAR NVRAM data lines. Args: sar_line: single line of text from NVRAM file containing SAR data. Returns: sar_config: sar config referenced in this line decoded_values: tx powers configured in this line """ sar_config = collections.OrderedDict() list_of_countries = ['fcc', 'jp', 'ca'] try: sar_config['country'] = next(country for country in list_of_countries if country in sar_line.split('=')[0]) except: sar_config['country'] = 'row' list_of_sar_states = ['grip', 'bt', 'hotspot'] try: sar_config['state'] = next(state for state in list_of_sar_states if state in sar_line.split('=')[0]) except: sar_config['state'] = 'head' list_of_bands = ['2g', '5g', '6g'] sar_config['band'] = next(band for band in list_of_bands if band in sar_line.split('=')[0]) sar_config['rsdb'] = 'rsdb' if 'rsdb' in sar_line else 'mimo' sar_config['airplane_mode'] = '_2=' in sar_line sar_powers = sar_line.split('=')[1].split(',') decoded_powers = [] for sar_power in sar_powers: # Note that core 0 and 1 are flipped in the NVRAM entries decoded_powers.append([ (int(sar_power[4:], 16) & int('7f', 16)) / 4, (int(sar_power[2:4], 16) & int('7f', 16)) / 4 ]) return tuple(sar_config.values()), decoded_powers def get_sar_power_from_nvram(self, testcase_params): """Function to get current expected SAR power from nvram This functions gets the expected SAR TX power from the DUT NVRAM data. The SAR power is looked up based on the current channel and regulatory domain, Args: testcase_params: dict containing channel, sar state, country code Returns: sar_config: current expected sar config sar_powers: current expected sar powers """ if testcase_params['country_code'] == 'US': reg_domain = 'fcc' elif testcase_params['country_code'] == 'JP': reg_domain = 'jp' elif testcase_params['country_code'] == 'CA': reg_domain = 'ca' else: reg_domain = 'row' for band, channels in self.BAND_TO_CHANNEL_MAP.items(): if testcase_params['channel'] in channels: current_band = band[0] sub_band_idx = band[1] break sar_config = (reg_domain, self.sar_state_mapping[ testcase_params['sar_state']]['brcm_index'][0], current_band, 'mimo', self.sar_state_mapping[ testcase_params['sar_state']]['brcm_index'][1]) sar_powers = self.nvram_sar_data[sar_config][sub_band_idx - 1] return sar_config, sar_powers def get_sar_power_from_csv(self, testcase_params): """Function to get current expected SAR power from CSV. This functions gets the expected SAR TX power from the DUT NVRAM data. The SAR power is looked up based on the current channel and regulatory domain, Args: testcase_params: dict containing channel, sar state, country code Returns: sar_config: current expected sar config sar_powers: current expected sar powers """ if testcase_params['country_code'] == 'US': reg_domain = 'fcc' elif testcase_params['country_code'] == 'JP': reg_domain = 'jp' elif testcase_params['country_code'] == 'CA': reg_domain = 'ca' else: reg_domain = 'row' for band, channels in self.BAND_TO_CHANNEL_MAP.items(): if testcase_params['channel'] in channels: current_band = band[0] sub_band_idx = band[1] break sar_config = (reg_domain, 'mimo', current_band) sar_powers = [ self.csv_sar_data[testcase_params['sar_state']]['SAR Powers'] [sar_config][0][sub_band_idx - 1], self.csv_sar_data[testcase_params['sar_state']]['SAR Powers'] [sar_config][1][sub_band_idx - 1] ] return sar_config, sar_powers def process_wl_curpower(self, wl_curpower_file, testcase_params): """Function to parse wl_curpower output. Args: wl_curpower_file: path to curpower output file. testcase_params: dict containing channel, sar state, country code Returns: wl_curpower_dict: dict formatted version of curpower data. """ with open(wl_curpower_file, 'r') as file: wl_curpower_out = file.read() channel_regex = re.compile(r'Current Channel:\s+(?P[0-9]+)') bandwidth_regex = re.compile( r'Channel Width:\s+(?P\S+)MHz\n') channel = int( re.search(channel_regex, wl_curpower_out).group('channel')) bandwidth = int( re.search(bandwidth_regex, wl_curpower_out).group('bandwidth')) regulatory_limits = self.generate_regulatory_table( wl_curpower_out, channel, bandwidth) board_limits = self.generate_board_limit_table(wl_curpower_out, channel, bandwidth) wl_curpower_dict = { 'channel': channel, 'bandwidth': bandwidth, 'country': testcase_params['country_code'], 'regulatory_limits': regulatory_limits, 'board_limits': board_limits } return wl_curpower_dict def generate_regulatory_table(self, wl_curpower_out, channel, bw): """"Helper function to generate regulatory limit table from curpower. Args: wl_curpower_out: curpower output channel: current channel bw: current bandwidth Returns: regulatory_table: dict with regulatory limits for current config """ regulatory_group_map = { 'DSSS': [('CCK', rate, 1) for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]], 'OFDM_CDD1': [('LEGACY', rate, 1) for rate in [ '{}Mbps'.format(mbps) for mbps in [6, 9, 12, 18, 24, 36, 48, 54] ]], 'MCS0_7_CDD1': [(mode, rate, 1) for (mode, rate) in itertools.product(['HT' + str(bw), 'VHT' + str(bw)], range(0, 8))], 'VHT8_9SS1_CDD1': [('VHT' + str(bw), 8, 1), ('VHT' + str(bw), 9, 1)], 'VHT10_11SS1_CDD1': [('VHT' + str(bw), 10, 1), ('VHT' + str(bw), 11, 1)], 'MCS8_15': [(mode, rate - 8 * ('VHT' in mode), 2) for (mode, rate) in itertools.product(['HT' + str(bw), 'VHT' + str(bw)], range(8, 16))], 'VHT8_9SS2': [('VHT' + str(bw), 8, 2), ('VHT' + str(bw), 9, 2)], 'VHT10_11SS2': [('VHT' + str(bw), 10, 2), ('VHT' + str(bw), 11, 2)], 'HE_MCS0-11_CDD1': [('HE' + str(bw), rate, 1) for rate in range(0, 12)], 'HE_MCS0_11SS2': [('HE' + str(bw), rate, 2) for rate in range(0, 12)], } tx_power_regex = re.compile( '(?P\S+)\s+(?P[2])\s+(?P[0-9.-]+)\s*(?P[0-9.-]*)\s*(?P[0-9.-]*)\s*(?P[0-9.-]*)' ) regulatory_section_regex = re.compile( r'Regulatory Limits:(?P[\S\s]+)Board Limits:') regulatory_list = re.search(regulatory_section_regex, wl_curpower_out).group('regulatory_limits') regulatory_list = re.findall(tx_power_regex, regulatory_list) regulatory_dict = {entry[0]: entry[2:] for entry in regulatory_list} bw_index = int(math.log(bw / 10, 2)) - 1 regulatory_table = collections.OrderedDict() for regulatory_group, rates in regulatory_group_map.items(): for rate in rates: reg_power = regulatory_dict.get(regulatory_group, ['0', '0', '0', '0'])[bw_index] regulatory_table[rate] = float( reg_power) if reg_power != '-' else 0 return regulatory_table def generate_board_limit_table(self, wl_curpower_out, channel, bw): """"Helper function to generate board limit table from curpower. Args: wl_curpower_out: curpower output channel: current channel bw: current bandwidth Returns: board_limit_table: dict with board limits for current config """ tx_power_regex = re.compile( '(?P\S+)\s+(?P[2])\s+(?P[0-9.-]+)\s*(?P[0-9.-]*)\s*(?P[0-9.-]*)\s*(?P[0-9.-]*)' ) board_section_regex = re.compile( r'Board Limits:(?P[\S\s]+)Power Targets:') board_limits_list = re.search(board_section_regex, wl_curpower_out).group('board_limits') board_limits_list = re.findall(tx_power_regex, board_limits_list) board_limits_dict = { entry[0]: entry[2:] for entry in board_limits_list } mcs_regex_list = [[ re.compile('DSSS'), [('CCK', rate, 1) for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]] ], [re.compile('OFDM(?P[0-9]+)_CDD1'), [('LEGACY', '{}Mbps', 1)]], [ re.compile('MCS(?P[0-7])_CDD1'), [('HT{}'.format(bw), '{}', 1), ('VHT{}'.format(bw), '{}', 1)] ], [ re.compile('VHT(?P[8-9])SS1_CDD1'), [('VHT{}'.format(bw), '{}', 1)] ], [ re.compile('VHT10_11SS1_CDD1'), [('VHT{}'.format(bw), '10', 1), ('VHT{}'.format(bw), '11', 1)] ], [ re.compile('MCS(?P[0-9]{2})'), [('HT{}'.format(bw), '{}', 2)] ], [ re.compile('VHT(?P[0-9])SS2'), [('VHT{}'.format(bw), '{}', 2)] ], [ re.compile('VHT10_11SS2'), [('VHT{}'.format(bw), '10', 2), ('VHT{}'.format(bw), '11', 2)] ], [ re.compile('HE_MCS(?P[0-9]+)_CDD1'), [('HE{}'.format(bw), '{}', 1)] ], [ re.compile('HE_MCS(?P[0-9]+)SS2'), [('HE{}'.format(bw), '{}', 2)] ]] bw_index = int(math.log(bw / 10, 2)) - 1 board_limit_table = collections.OrderedDict() for mcs, board_limit in board_limits_dict.items(): for mcs_regex_tuple in mcs_regex_list: mcs_match = re.match(mcs_regex_tuple[0], mcs) if mcs_match: for possible_mcs in mcs_regex_tuple[1]: try: curr_mcs = (possible_mcs[0], possible_mcs[1].format( mcs_match.group('mcs')), possible_mcs[2]) except: curr_mcs = (possible_mcs[0], possible_mcs[1], possible_mcs[2]) board_limit_table[curr_mcs] = float( board_limit[bw_index] ) if board_limit[bw_index] != '-' else 0 break return board_limit_table def pass_fail_check(self, result): """Function to evaluate if current TX powqe matches CSV/NVRAM settings. This function assesses whether the current TX power reported by the DUT matches the powers programmed in NVRAM and CSV after applying the correct TX power backoff used to account for CLPC errors. """ if isinstance(result['testcase_params']['channel'], str) and '6g' in result['testcase_params']['channel']: mode = 'HE' + str(result['testcase_params']['bandwidth']) else: mode = 'HE' + str(result['testcase_params']['bandwidth']) regulatory_power = result['wl_curpower']['regulatory_limits'][(mode, 0, 2)] board_power = result['wl_curpower']['board_limits'][(mode, str(0), 2)] # try: sar_config, nvram_powers = self.get_sar_power_from_nvram( result['testcase_params']) # except: # nvram_powers = [99, 99] # sar_config = 'SAR DISABLED' try: csv_config, csv_powers = self.get_sar_power_from_csv( result['testcase_params']) except: #get from wl_curpower csv_powers = [99, 99] self.log.info("SAR state: {} ({})".format( result['testcase_params']['sar_state'], self.sar_state_mapping[result['testcase_params']['sar_state']], )) self.log.info("Country Code: {}".format( result['testcase_params']['country_code'])) self.log.info('BRCM SAR Table: {}'.format(sar_config)) expected_power = [ min([csv_powers[0], regulatory_power, board_power]) - 1.5, min([csv_powers[1], regulatory_power, board_power]) - 1.5 ] power_str = "NVRAM Powers: {}, CSV Powers: {}, Reg Powers: {}, Board Power: {}, Expected Powers: {}, Reported Powers: {}".format( nvram_powers, csv_powers, [regulatory_power] * 2, [board_power] * 2, expected_power, result['tx_powers']) max_error = max([ abs(expected_power[idx] - result['tx_powers'][idx]) for idx in [0, 1] ]) if max_error > 1: asserts.fail(power_str) else: asserts.explicit_pass(power_str) def process_testclass_results(self): pass def run_tx_power_test(self, testcase_params): """Main function to test tx power. The function sets up the AP & DUT in the correct channel and mode configuration, starts ping traffic and queries the current TX power. Args: testcase_params: dict containing all test parameters Returns: test_result: dict containing ping results and other meta data """ # Prepare results dict llstats_obj = wputils.LinkLayerStats( self.dut, self.testclass_params.get('llstats_enabled', True)) test_result = collections.OrderedDict() test_result['testcase_params'] = testcase_params.copy() test_result['test_name'] = self.current_test_name test_result['ap_config'] = self.access_point.ap_settings.copy() test_result['attenuation'] = testcase_params['atten_range'] test_result['fixed_attenuation'] = self.testbed_params[ 'fixed_attenuation'][str(testcase_params['channel'])] test_result['rssi_results'] = [] test_result['ping_results'] = [] test_result['llstats'] = [] # Setup sniffer if self.testbed_params['sniffer_enable']: self.sniffer.start_capture( testcase_params['test_network'], chan=testcase_params['channel'], bw=testcase_params['bandwidth'], duration=testcase_params['ping_duration'] * len(testcase_params['atten_range']) + self.TEST_TIMEOUT) # Set sar state if testcase_params['sar_state'] == -1: self.dut.adb.shell('halutil -sar disable') else: self.dut.adb.shell('halutil -sar enable {}'.format( testcase_params['sar_state'])) # Run ping and sweep attenuation as needed self.log.info('Starting ping.') thread_future = wputils.get_ping_stats_nb(self.ping_server, self.dut_ip, 10, 0.02, 64) for atten in testcase_params['atten_range']: for attenuator in self.attenuators: attenuator.set_atten(atten, strict=False, retry=True) # Set mcs if isinstance(testcase_params['channel'], int) and testcase_params['channel'] < 13: self.dut.adb.shell('wl 2g_rate -e 0 -s 2 -b {}'.format( testcase_params['bandwidth'])) elif isinstance(testcase_params['channel'], int) and testcase_params['channel'] > 13: self.dut.adb.shell('wl 5g_rate -e 0 -s 2 -b {}'.format( testcase_params['bandwidth'])) else: self.dut.adb.shell('wl 6g_rate -e 0 -s 2 -b {}'.format( testcase_params['bandwidth'])) # Refresh link layer stats llstats_obj.update_stats() # Check sar state self.log.info('Current Country: {}'.format( self.dut.adb.shell('wl country'))) # Dump last est power multiple times chain_0_power = [] chain_1_power = [] for idx in range(30): last_est_out = self.dut.adb.shell( "wl curpower | grep 'Last est. power'", ignore_status=True) if "Last est. power" in last_est_out: try: per_chain_powers = last_est_out.split( ':')[1].strip().split(' ') per_chain_powers = [ float(power) for power in per_chain_powers ] except: per_chain_powers = [0, 0] self.log.warning( 'Could not parse output: {}'.format(last_est_out)) self.log.info( 'Current Tx Powers = {}'.format(per_chain_powers)) if per_chain_powers[0] > 0: chain_0_power.append(per_chain_powers[0]) if per_chain_powers[1] > 0: chain_1_power.append(per_chain_powers[1]) time.sleep(0.25) # Check if empty if len(chain_0_power) == 0 or len(chain_1_power) == 0: test_result['tx_powers'] = [0, 0] tx_power_frequency = [100, 100] else: test_result['tx_powers'] = [ scipy.stats.mode(chain_0_power).mode[0], scipy.stats.mode(chain_1_power).mode[0] ] tx_power_frequency = [ 100 * scipy.stats.mode(chain_0_power).count[0] / len(chain_0_power), 100 * scipy.stats.mode(chain_1_power).count[0] / len(chain_0_power) ] self.log.info( 'Filtered Tx Powers = {}. Frequency = [{:.0f}%, {:.0f}%]'. format(test_result['tx_powers'], tx_power_frequency[0], tx_power_frequency[1])) llstats_obj.update_stats() curr_llstats = llstats_obj.llstats_incremental.copy() test_result['llstats'].append(curr_llstats) # DUMP wl curpower one try: wl_curpower = self.dut.adb.shell('wl curpower') except: time.sleep(0.25) wl_curpower = self.dut.adb.shell('wl curpower', ignore_status=True) current_context = context.get_current_context( ).get_full_output_path() wl_curpower_path = os.path.join(current_context, 'wl_curpower_output') with open(wl_curpower_path, 'w') as file: file.write(wl_curpower) wl_curpower_dict = self.process_wl_curpower( wl_curpower_path, testcase_params) wl_curpower_path = os.path.join(current_context, 'wl_curpower_dict') with open(wl_curpower_path, 'w') as file: json.dump(wputils.serialize_dict(wl_curpower_dict), file, indent=4) test_result['wl_curpower'] = wl_curpower_dict thread_future.result() if self.testbed_params['sniffer_enable']: self.sniffer.stop_capture() return test_result def setup_ap(self, testcase_params): """Sets up the access point in the configuration required by the test. Args: testcase_params: dict containing AP and other test params """ band = self.access_point.band_lookup_by_channel( testcase_params['channel']) if '6G' in band: frequency = wutils.WifiEnums.channel_6G_to_freq[int( testcase_params['channel'].strip('6g'))] else: if testcase_params['channel'] < 13: frequency = wutils.WifiEnums.channel_2G_to_freq[ testcase_params['channel']] else: frequency = wutils.WifiEnums.channel_5G_to_freq[ testcase_params['channel']] if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: self.access_point.set_region(self.testbed_params['DFS_region']) else: self.access_point.set_region(self.testbed_params['default_region']) self.access_point.set_channel_and_bandwidth(band, testcase_params['channel'], testcase_params['mode']) #self.access_point.set_channel(band, testcase_params['channel']) #self.access_point.set_bandwidth(band, testcase_params['mode']) if 'low' in testcase_params['ap_power']: self.log.info('Setting low AP power.') self.access_point.set_power( band, self.testclass_params['low_ap_tx_power']) self.log.info('Access Point Configuration: {}'.format( self.access_point.ap_settings)) def setup_dut(self, testcase_params): """Sets up the DUT in the configuration required by the test. Args: testcase_params: dict containing AP and other test params """ # Turn screen off to preserve battery if self.testbed_params.get('screen_on', False) or self.testclass_params.get( 'screen_on', False): self.dut.droid.wakeLockAcquireDim() else: self.dut.go_to_sleep() if wputils.validate_network(self.dut, testcase_params['test_network']['SSID']): current_country = self.dut.adb.shell('wl country') self.log.info('Current country code: {}'.format(current_country)) if testcase_params['country_code'] in current_country: self.log.info('Already connected to desired network') self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses( 'wlan0')[0] return testcase_params['test_network']['channel'] = testcase_params['channel'] wutils.wifi_toggle_state(self.dut, False) wutils.set_wifi_country_code(self.dut, testcase_params['country_code']) wutils.wifi_toggle_state(self.dut, True) wutils.reset_wifi(self.dut) if self.testbed_params.get('txbf_off', False): wputils.disable_beamforming(self.dut) wutils.set_wifi_country_code(self.dut, testcase_params['country_code']) current_country = self.dut.adb.shell('wl country') self.log.info('Current country code: {}'.format(current_country)) if testcase_params['country_code'] not in current_country: asserts.fail('Country code not correct.') chan_list = self.dut.adb.shell('wl chan_info_list') if str(testcase_params['channel']) not in chan_list: asserts.skip('Channel {} not supported in {}'.format( testcase_params['channel'], testcase_params['country_code'])) wutils.wifi_connect(self.dut, testcase_params['test_network'], num_of_tries=5, check_connectivity=True) self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] def setup_tx_power_test(self, testcase_params): """Function that gets devices ready for the test. Args: testcase_params: dict containing test-specific parameters """ # Configure AP self.setup_ap(testcase_params) # Set attenuator to 0 dB for attenuator in self.attenuators: attenuator.set_atten(0, strict=False, retry=True) # Reset, configure, and connect DUT self.setup_dut(testcase_params) def check_skip_conditions(self, testcase_params): """Checks if test should be skipped.""" # Check battery level before test if not wputils.health_check(self.dut, 10): asserts.skip('DUT battery level too low.') if testcase_params[ 'channel'] in wputils.CHANNELS_6GHz and not self.dut.droid.is6GhzBandSupported( ): asserts.skip('DUT does not support 6 GHz band.') if not self.access_point.band_lookup_by_channel( testcase_params['channel']): asserts.skip('AP does not support requested channel.') def compile_test_params(self, testcase_params): """Function to compile all testcase parameters.""" self.check_skip_conditions(testcase_params) band = self.access_point.band_lookup_by_channel( testcase_params['channel']) testcase_params['test_network'] = self.main_network[band] testcase_params['attenuated_chain'] = -1 testcase_params.update( ping_interval=self.testclass_params['ping_interval'], ping_duration=self.testclass_params['ping_duration'], ping_size=self.testclass_params['ping_size'], ) testcase_params['atten_range'] = [0] return testcase_params def _test_ping(self, testcase_params): """ Function that gets called for each range test case The function gets called in each range test case. It customizes the range test based on the test name of the test that called it Args: testcase_params: dict containing preliminary set of parameters """ # Compile test parameters from config and test name testcase_params = self.compile_test_params(testcase_params) # Run ping test self.setup_tx_power_test(testcase_params) result = self.run_tx_power_test(testcase_params) self.pass_fail_check(result) def generate_test_cases(self, ap_power, channels, modes, test_types, country_codes, sar_states): """Function that auto-generates test cases for a test class.""" test_cases = [] allowed_configs = { 20: [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 64, 100, 116, 132, 140, 149, 153, 157, 161 ], 40: [36, 44, 100, 149, 157], 80: [36, 100, 149], 160: [36, '6g37', '6g117', '6g213'] } for channel, mode, test_type, country_code, sar_state in itertools.product( channels, modes, test_types, country_codes, sar_states): bandwidth = int(''.join([x for x in mode if x.isdigit()])) if channel not in allowed_configs[bandwidth]: continue testcase_name = '{}_ch{}_{}_{}_sar_{}'.format( test_type, channel, mode, country_code, sar_state) testcase_params = collections.OrderedDict( test_type=test_type, ap_power=ap_power, channel=channel, mode=mode, bandwidth=bandwidth, country_code=country_code, sar_state=sar_state) setattr(self, testcase_name, partial(self._test_ping, testcase_params)) test_cases.append(testcase_name) return test_cases class WifiTxPowerCheck_BasicSAR_Test(WifiTxPowerCheckTest): def __init__(self, controllers): base_test.BaseTestClass.__init__(self, controllers) self.testcase_metric_logger = ( BlackboxMappedMetricLogger.for_test_case()) self.testclass_metric_logger = ( BlackboxMappedMetricLogger.for_test_class()) self.publish_testcase_metrics = True self.tests = self.generate_test_cases( ap_power='standard', channels=[6, 36, 52, 100, 149, '6g37'], modes=['bw20', 'bw160'], test_types=[ 'test_tx_power', ], country_codes=['US', 'GB', 'JP', 'CA'], sar_states=[-1, 0, 1, 2, 3, 4])