1#!/usr/bin/env python3.4 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import csv 19import itertools 20import json 21import logging 22import math 23import os 24import re 25import scipy.stats 26import time 27from acts import asserts 28from acts import context 29from acts import base_test 30from acts import utils 31from acts.controllers.utils_lib import ssh 32from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger 33from acts_contrib.test_utils.wifi import ota_sniffer 34from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils 35from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap 36from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 37from functools import partial 38 39 40class WifiTxPowerCheckTest(base_test.BaseTestClass): 41 """Class for ping-based Wifi performance tests. 42 43 This class implements WiFi ping performance tests such as range and RTT. 44 The class setups up the AP in the desired configurations, configures 45 and connects the phone to the AP, and runs For an example config file to 46 run this test class see example_connectivity_performance_ap_sta.json. 47 """ 48 49 TEST_TIMEOUT = 10 50 RSSI_POLL_INTERVAL = 0.2 51 SHORT_SLEEP = 1 52 MED_SLEEP = 5 53 MAX_CONSECUTIVE_ZEROS = 5 54 DISCONNECTED_PING_RESULT = { 55 'connected': 0, 56 'rtt': [], 57 'time_stamp': [], 58 'ping_interarrivals': [], 59 'packet_loss_percentage': 100 60 } 61 62 BRCM_SAR_MAPPING = { 63 0: 'disable', 64 1: 'head', 65 2: 'grip', 66 16: 'bt', 67 32: 'hotspot' 68 } 69 70 BAND_TO_CHANNEL_MAP = { 71 ('2g', 1): [1, 6, 11], 72 ('5g', 1): [36, 40, 44, 48], 73 ('5g', 2): [52, 56, 60, 64], 74 ('5g', 3): range(100, 148, 4), 75 ('5g', 4): [149, 153, 157, 161], 76 ('6g', 1): ['6g{}'.format(channel) for channel in range(1, 46, 4)], 77 ('6g', 2): ['6g{}'.format(channel) for channel in range(49, 94, 4)], 78 ('6g', 3): ['6g{}'.format(channel) for channel in range(97, 114, 4)], 79 ('6g', 4): ['6g{}'.format(channel) for channel in range(117, 158, 4)], 80 ('6g', 5): ['6g{}'.format(channel) for channel in range(161, 186, 4)], 81 ('6g', 6): ['6g{}'.format(channel) for channel in range(189, 234, 4)] 82 } 83 84 def __init__(self, controllers): 85 base_test.BaseTestClass.__init__(self, controllers) 86 self.testcase_metric_logger = ( 87 BlackboxMappedMetricLogger.for_test_case()) 88 self.testclass_metric_logger = ( 89 BlackboxMappedMetricLogger.for_test_class()) 90 self.publish_testcase_metrics = True 91 self.tests = self.generate_test_cases( 92 ap_power='standard', 93 channels=[6, 36, 52, 100, 149, '6g37', '6g117', '6g213'], 94 modes=['bw20', 'bw40', 'bw80', 'bw160'], 95 test_types=[ 96 'test_tx_power', 97 ], 98 country_codes=['US', 'GB', 'JP', 'CA', 'AU'], 99 sar_states=range(-1, 13)) 100 101 def setup_class(self): 102 self.dut = self.android_devices[-1] 103 req_params = [ 104 'tx_power_test_params', 'testbed_params', 'main_network', 105 'RetailAccessPoints', 'RemoteServer' 106 ] 107 opt_params = ['OTASniffer'] 108 self.unpack_userparams(req_params, opt_params) 109 self.testclass_params = self.tx_power_test_params 110 self.num_atten = self.attenuators[0].instrument.num_atten 111 self.ping_server = ssh.connection.SshConnection( 112 ssh.settings.from_config(self.RemoteServer[0]['ssh_config'])) 113 self.access_point = retail_ap.create(self.RetailAccessPoints)[0] 114 if hasattr(self, 115 'OTASniffer') and self.testbed_params['sniffer_enable']: 116 try: 117 self.sniffer = ota_sniffer.create(self.OTASniffer)[0] 118 except: 119 self.log.warning('Could not start sniffer. Disabling sniffs.') 120 self.testbed_params['sniffer_enable'] = 0 121 self.log.info('Access Point Configuration: {}'.format( 122 self.access_point.ap_settings)) 123 self.log_path = os.path.join(logging.log_path, 'results') 124 os.makedirs(self.log_path, exist_ok=True) 125 self.atten_dut_chain_map = {} 126 self.testclass_results = [] 127 128 # Turn WiFi ON 129 if self.testclass_params.get('airplane_mode', 1): 130 self.log.info('Turning on airplane mode.') 131 asserts.assert_true(utils.force_airplane_mode(self.dut, True), 132 'Can not turn on airplane mode.') 133 wutils.wifi_toggle_state(self.dut, True) 134 self.dut.droid.wifiEnableVerboseLogging(1) 135 asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1, 136 "Failed to enable WiFi verbose logging.") 137 138 # decode nvram 139 self.nvram_sar_data = self.read_nvram_sar_data() 140 self.csv_sar_data = self.read_sar_csv(self.testclass_params['sar_csv']) 141 142 # Configure test retries 143 self.user_params['retry_tests'] = [self.__class__.__name__] 144 145 def teardown_class(self): 146 # Turn WiFi OFF and reset AP 147 self.access_point.teardown() 148 for dev in self.android_devices: 149 wutils.wifi_toggle_state(dev, False) 150 dev.go_to_sleep() 151 self.process_testclass_results() 152 153 def setup_test(self): 154 self.retry_flag = False 155 156 def teardown_test(self): 157 self.retry_flag = False 158 159 def on_retry(self): 160 """Function to control test logic on retried tests. 161 162 This function is automatically executed on tests that are being 163 retried. In this case the function resets wifi, toggles it off and on 164 and sets a retry_flag to enable further tweaking the test logic on 165 second attempts. 166 """ 167 self.retry_flag = True 168 for dev in self.android_devices: 169 wutils.reset_wifi(dev) 170 wutils.toggle_wifi_off_and_on(dev) 171 172 def read_sar_csv(self, sar_csv): 173 """Reads SAR powers from CSV. 174 175 This function reads SAR powers from a CSV and generate a dictionary 176 with all programmed TX powers on a per band and regulatory domain 177 basis. 178 179 Args: 180 sar_csv: path to SAR data file. 181 Returns: 182 sar_powers: dict containing all SAR data 183 """ 184 185 sar_powers = {} 186 sar_csv_data = [] 187 with open(sar_csv, mode='r') as f: 188 reader = csv.DictReader(f) 189 for row in reader: 190 row['Sub-band Powers'] = [ 191 float(val) for key, val in row.items() 192 if 'Sub-band' in key and val != '' 193 ] 194 sar_csv_data.append(row) 195 196 for row in sar_csv_data: 197 sar_powers.setdefault(int(row['Scenario Index']), {}) 198 sar_powers[int(row['Scenario Index'])].setdefault('SAR Powers', {}) 199 sar_row_key = (row['Regulatory Domain'], row['Mode'], row['Band']) 200 sar_powers[int(row['Scenario Index'])]['SAR Powers'].setdefault( 201 sar_row_key, {}) 202 sar_powers[int( 203 row['Scenario Index'])]['SAR Powers'][sar_row_key][int( 204 row['Chain'])] = row['Sub-band Powers'] 205 return sar_powers 206 207 def read_nvram_sar_data(self): 208 """Reads SAR powers from NVRAM. 209 210 This function reads SAR powers from the NVRAM found on the DUT and 211 generates a dictionary with all programmed TX powers on a per band and 212 regulatory domain basis. NThe NVRAM file is chosen based on the build, 213 but if no NVRAM file is found matching the expected name, the default 214 NVRAM will be loaded. The choice of NVRAM is not guaranteed to be 215 correct. 216 217 Returns: 218 nvram_sar_data: dict containing all SAR data 219 """ 220 221 self._read_sar_config_info() 222 try: 223 hardware_version = self.dut.adb.shell( 224 'getprop ro.boot.hardware.revision') 225 nvram_path = '/vendor/firmware/bcmdhd.cal_{}'.format( 226 hardware_version) 227 nvram = self.dut.adb.shell('cat {}'.format(nvram_path)) 228 except: 229 nvram = self.dut.adb.shell('cat /vendor/firmware/bcmdhd.cal') 230 current_context = context.get_current_context().get_full_output_path() 231 file_path = os.path.join(current_context, 'nvram_file') 232 with open(file_path, 'w') as file: 233 file.write(nvram) 234 nvram_sar_data = {} 235 for line in nvram.splitlines(): 236 if 'dynsar' in line: 237 sar_config, sar_powers = self._parse_nvram_sar_line(line) 238 nvram_sar_data[sar_config] = sar_powers 239 file_path = os.path.join(current_context, 'nvram_sar_data') 240 with open(file_path, 'w') as file: 241 json.dump(wputils.serialize_dict(nvram_sar_data), file, indent=4) 242 243 return nvram_sar_data 244 245 def _read_sar_config_info(self): 246 """Function to read SAR scenario mapping, 247 248 This function reads sar_config.info file which contains the mapping 249 of SAR scenarios to NVRAM data tables. 250 """ 251 252 self.sar_state_mapping = collections.OrderedDict([(-2, { 253 "google_name": 254 'WIFI_POWER_SCENARIO_INVALID' 255 }), (-1, { 256 "google_name": 'WIFI_POWER_SCENARIO_DISABLE' 257 }), (0, { 258 "google_name": 'WIFI_POWER_SCENARIO_VOICE_CALL' 259 }), (1, { 260 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_OFF' 261 }), (2, { 262 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_ON' 263 }), (3, { 264 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_OFF' 265 }), (4, { 266 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON' 267 }), (5, { 268 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_BT' 269 }), (6, { 270 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT' 271 }), (7, { 272 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT_MMW' 273 }), (8, { 274 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON_BT' 275 }), (9, { 276 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT' 277 }), (10, { 278 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT' 279 }), (11, { 280 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_MMW' 281 }), (12, { 282 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT_MMW' 283 })]) 284 sar_config_path = '/vendor/firmware/sarconfig.info' 285 sar_config = self.dut.adb.shell( 286 'cat {}'.format(sar_config_path)).splitlines() 287 sar_config = [line.split(',') for line in sar_config] 288 sar_config = [[int(x) for x in line] for line in sar_config] 289 290 for sar_state in sar_config: 291 self.sar_state_mapping[sar_state[0]]['brcm_index'] = ( 292 self.BRCM_SAR_MAPPING[sar_state[1]], bool(sar_state[2])) 293 current_context = context.get_current_context().get_full_output_path() 294 file_path = os.path.join(current_context, 'sarconfig') 295 with open(file_path, 'w') as file: 296 json.dump(wputils.serialize_dict(self.sar_state_mapping), 297 file, 298 indent=4) 299 300 def _parse_nvram_sar_line(self, sar_line): 301 """Helper function to decode SAR NVRAM data lines. 302 303 Args: 304 sar_line: single line of text from NVRAM file containing SAR data. 305 Returns: 306 sar_config: sar config referenced in this line 307 decoded_values: tx powers configured in this line 308 """ 309 310 sar_config = collections.OrderedDict() 311 list_of_countries = ['fcc', 'jp', 'ca'] 312 try: 313 sar_config['country'] = next(country 314 for country in list_of_countries 315 if country in sar_line.split('=')[0]) 316 except: 317 sar_config['country'] = 'row' 318 319 list_of_sar_states = ['grip', 'bt', 'hotspot'] 320 try: 321 sar_config['state'] = next(state for state in list_of_sar_states 322 if state in sar_line.split('=')[0]) 323 except: 324 sar_config['state'] = 'head' 325 326 list_of_bands = ['2g', '5g', '6g'] 327 sar_config['band'] = next(band for band in list_of_bands 328 if band in sar_line.split('=')[0]) 329 330 sar_config['rsdb'] = 'rsdb' if 'rsdb' in sar_line else 'mimo' 331 sar_config['airplane_mode'] = '_2=' in sar_line 332 333 sar_powers = sar_line.split('=')[1].split(',') 334 decoded_powers = [] 335 for sar_power in sar_powers: 336 # Note that core 0 and 1 are flipped in the NVRAM entries 337 decoded_powers.append([ 338 (int(sar_power[4:], 16) & int('7f', 16)) / 4, 339 (int(sar_power[2:4], 16) & int('7f', 16)) / 4 340 ]) 341 342 return tuple(sar_config.values()), decoded_powers 343 344 def get_sar_power_from_nvram(self, testcase_params): 345 """Function to get current expected SAR power from nvram 346 347 This functions gets the expected SAR TX power from the DUT NVRAM data. 348 The SAR power is looked up based on the current channel and regulatory 349 domain, 350 351 Args: 352 testcase_params: dict containing channel, sar state, country code 353 Returns: 354 sar_config: current expected sar config 355 sar_powers: current expected sar powers 356 """ 357 358 if testcase_params['country_code'] == 'US': 359 reg_domain = 'fcc' 360 elif testcase_params['country_code'] == 'JP': 361 reg_domain = 'jp' 362 elif testcase_params['country_code'] == 'CA': 363 reg_domain = 'ca' 364 else: 365 reg_domain = 'row' 366 for band, channels in self.BAND_TO_CHANNEL_MAP.items(): 367 if testcase_params['channel'] in channels: 368 current_band = band[0] 369 sub_band_idx = band[1] 370 break 371 sar_config = (reg_domain, self.sar_state_mapping[ 372 testcase_params['sar_state']]['brcm_index'][0], current_band, 373 'mimo', self.sar_state_mapping[ 374 testcase_params['sar_state']]['brcm_index'][1]) 375 sar_powers = self.nvram_sar_data[sar_config][sub_band_idx - 1] 376 return sar_config, sar_powers 377 378 def get_sar_power_from_csv(self, testcase_params): 379 """Function to get current expected SAR power from CSV. 380 381 This functions gets the expected SAR TX power from the DUT NVRAM data. 382 The SAR power is looked up based on the current channel and regulatory 383 domain, 384 385 Args: 386 testcase_params: dict containing channel, sar state, country code 387 Returns: 388 sar_config: current expected sar config 389 sar_powers: current expected sar powers 390 """ 391 392 if testcase_params['country_code'] == 'US': 393 reg_domain = 'fcc' 394 elif testcase_params['country_code'] == 'JP': 395 reg_domain = 'jp' 396 elif testcase_params['country_code'] == 'CA': 397 reg_domain = 'ca' 398 else: 399 reg_domain = 'row' 400 for band, channels in self.BAND_TO_CHANNEL_MAP.items(): 401 if testcase_params['channel'] in channels: 402 current_band = band[0] 403 sub_band_idx = band[1] 404 break 405 sar_config = (reg_domain, 'mimo', current_band) 406 sar_powers = [ 407 self.csv_sar_data[testcase_params['sar_state']]['SAR Powers'] 408 [sar_config][0][sub_band_idx - 1], 409 self.csv_sar_data[testcase_params['sar_state']]['SAR Powers'] 410 [sar_config][1][sub_band_idx - 1] 411 ] 412 return sar_config, sar_powers 413 414 def process_wl_curpower(self, wl_curpower_file, testcase_params): 415 """Function to parse wl_curpower output. 416 417 Args: 418 wl_curpower_file: path to curpower output file. 419 testcase_params: dict containing channel, sar state, country code 420 Returns: 421 wl_curpower_dict: dict formatted version of curpower data. 422 """ 423 424 with open(wl_curpower_file, 'r') as file: 425 wl_curpower_out = file.read() 426 427 channel_regex = re.compile(r'Current Channel:\s+(?P<channel>[0-9]+)') 428 bandwidth_regex = re.compile( 429 r'Channel Width:\s+(?P<bandwidth>\S+)MHz\n') 430 431 channel = int( 432 re.search(channel_regex, wl_curpower_out).group('channel')) 433 bandwidth = int( 434 re.search(bandwidth_regex, wl_curpower_out).group('bandwidth')) 435 436 regulatory_limits = self.generate_regulatory_table( 437 wl_curpower_out, channel, bandwidth) 438 board_limits = self.generate_board_limit_table(wl_curpower_out, 439 channel, bandwidth) 440 wl_curpower_dict = { 441 'channel': channel, 442 'bandwidth': bandwidth, 443 'country': testcase_params['country_code'], 444 'regulatory_limits': regulatory_limits, 445 'board_limits': board_limits 446 } 447 return wl_curpower_dict 448 449 def generate_regulatory_table(self, wl_curpower_out, channel, bw): 450 """"Helper function to generate regulatory limit table from curpower. 451 452 Args: 453 wl_curpower_out: curpower output 454 channel: current channel 455 bw: current bandwidth 456 Returns: 457 regulatory_table: dict with regulatory limits for current config 458 """ 459 460 regulatory_group_map = { 461 'DSSS': 462 [('CCK', rate, 1) 463 for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]], 464 'OFDM_CDD1': [('LEGACY', rate, 1) for rate in [ 465 '{}Mbps'.format(mbps) 466 for mbps in [6, 9, 12, 18, 24, 36, 48, 54] 467 ]], 468 'MCS0_7_CDD1': 469 [(mode, rate, 1) 470 for (mode, 471 rate) in itertools.product(['HT' + str(bw), 'VHT' + 472 str(bw)], range(0, 8))], 473 'VHT8_9SS1_CDD1': [('VHT' + str(bw), 8, 1), 474 ('VHT' + str(bw), 9, 1)], 475 'VHT10_11SS1_CDD1': [('VHT' + str(bw), 10, 1), 476 ('VHT' + str(bw), 11, 1)], 477 'MCS8_15': 478 [(mode, rate - 8 * ('VHT' in mode), 2) 479 for (mode, 480 rate) in itertools.product(['HT' + str(bw), 'VHT' + 481 str(bw)], range(8, 16))], 482 'VHT8_9SS2': [('VHT' + str(bw), 8, 2), ('VHT' + str(bw), 9, 2)], 483 'VHT10_11SS2': [('VHT' + str(bw), 10, 2), 484 ('VHT' + str(bw), 11, 2)], 485 'HE_MCS0-11_CDD1': [('HE' + str(bw), rate, 1) 486 for rate in range(0, 12)], 487 'HE_MCS0_11SS2': [('HE' + str(bw), rate, 2) 488 for rate in range(0, 12)], 489 } 490 tx_power_regex = re.compile( 491 '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)' 492 ) 493 494 regulatory_section_regex = re.compile( 495 r'Regulatory Limits:(?P<regulatory_limits>[\S\s]+)Board Limits:') 496 regulatory_list = re.search(regulatory_section_regex, 497 wl_curpower_out).group('regulatory_limits') 498 regulatory_list = re.findall(tx_power_regex, regulatory_list) 499 regulatory_dict = {entry[0]: entry[2:] for entry in regulatory_list} 500 501 bw_index = int(math.log(bw / 10, 2)) - 1 502 regulatory_table = collections.OrderedDict() 503 for regulatory_group, rates in regulatory_group_map.items(): 504 for rate in rates: 505 reg_power = regulatory_dict.get(regulatory_group, 506 ['0', '0', '0', '0'])[bw_index] 507 regulatory_table[rate] = float( 508 reg_power) if reg_power != '-' else 0 509 return regulatory_table 510 511 def generate_board_limit_table(self, wl_curpower_out, channel, bw): 512 """"Helper function to generate board limit table from curpower. 513 514 Args: 515 wl_curpower_out: curpower output 516 channel: current channel 517 bw: current bandwidth 518 Returns: 519 board_limit_table: dict with board limits for current config 520 """ 521 522 tx_power_regex = re.compile( 523 '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)' 524 ) 525 526 board_section_regex = re.compile( 527 r'Board Limits:(?P<board_limits>[\S\s]+)Power Targets:') 528 board_limits_list = re.search(board_section_regex, 529 wl_curpower_out).group('board_limits') 530 board_limits_list = re.findall(tx_power_regex, board_limits_list) 531 board_limits_dict = { 532 entry[0]: entry[2:] 533 for entry in board_limits_list 534 } 535 536 mcs_regex_list = [[ 537 re.compile('DSSS'), 538 [('CCK', rate, 1) 539 for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]] 540 ], [re.compile('OFDM(?P<mcs>[0-9]+)_CDD1'), [('LEGACY', '{}Mbps', 1)]], 541 [ 542 re.compile('MCS(?P<mcs>[0-7])_CDD1'), 543 [('HT{}'.format(bw), '{}', 1), 544 ('VHT{}'.format(bw), '{}', 1)] 545 ], 546 [ 547 re.compile('VHT(?P<mcs>[8-9])SS1_CDD1'), 548 [('VHT{}'.format(bw), '{}', 1)] 549 ], 550 [ 551 re.compile('VHT10_11SS1_CDD1'), 552 [('VHT{}'.format(bw), '10', 1), 553 ('VHT{}'.format(bw), '11', 1)] 554 ], 555 [ 556 re.compile('MCS(?P<mcs>[0-9]{2})'), 557 [('HT{}'.format(bw), '{}', 2)] 558 ], 559 [ 560 re.compile('VHT(?P<mcs>[0-9])SS2'), 561 [('VHT{}'.format(bw), '{}', 2)] 562 ], 563 [ 564 re.compile('VHT10_11SS2'), 565 [('VHT{}'.format(bw), '10', 2), 566 ('VHT{}'.format(bw), '11', 2)] 567 ], 568 [ 569 re.compile('HE_MCS(?P<mcs>[0-9]+)_CDD1'), 570 [('HE{}'.format(bw), '{}', 1)] 571 ], 572 [ 573 re.compile('HE_MCS(?P<mcs>[0-9]+)SS2'), 574 [('HE{}'.format(bw), '{}', 2)] 575 ]] 576 577 bw_index = int(math.log(bw / 10, 2)) - 1 578 board_limit_table = collections.OrderedDict() 579 for mcs, board_limit in board_limits_dict.items(): 580 for mcs_regex_tuple in mcs_regex_list: 581 mcs_match = re.match(mcs_regex_tuple[0], mcs) 582 if mcs_match: 583 for possible_mcs in mcs_regex_tuple[1]: 584 try: 585 curr_mcs = (possible_mcs[0], 586 possible_mcs[1].format( 587 mcs_match.group('mcs')), 588 possible_mcs[2]) 589 except: 590 curr_mcs = (possible_mcs[0], possible_mcs[1], 591 possible_mcs[2]) 592 board_limit_table[curr_mcs] = float( 593 board_limit[bw_index] 594 ) if board_limit[bw_index] != '-' else 0 595 break 596 return board_limit_table 597 598 def pass_fail_check(self, result): 599 """Function to evaluate if current TX powqe matches CSV/NVRAM settings. 600 601 This function assesses whether the current TX power reported by the 602 DUT matches the powers programmed in NVRAM and CSV after applying the 603 correct TX power backoff used to account for CLPC errors. 604 """ 605 606 if isinstance(result['testcase_params']['channel'], 607 str) and '6g' in result['testcase_params']['channel']: 608 mode = 'HE' + str(result['testcase_params']['bandwidth']) 609 else: 610 mode = 'HE' + str(result['testcase_params']['bandwidth']) 611 regulatory_power = result['wl_curpower']['regulatory_limits'][(mode, 0, 612 2)] 613 board_power = result['wl_curpower']['board_limits'][(mode, str(0), 2)] 614 # try: 615 sar_config, nvram_powers = self.get_sar_power_from_nvram( 616 result['testcase_params']) 617 # except: 618 # nvram_powers = [99, 99] 619 # sar_config = 'SAR DISABLED' 620 try: 621 csv_config, csv_powers = self.get_sar_power_from_csv( 622 result['testcase_params']) 623 except: 624 #get from wl_curpower 625 csv_powers = [99, 99] 626 self.log.info("SAR state: {} ({})".format( 627 result['testcase_params']['sar_state'], 628 self.sar_state_mapping[result['testcase_params']['sar_state']], 629 )) 630 self.log.info("Country Code: {}".format( 631 result['testcase_params']['country_code'])) 632 self.log.info('BRCM SAR Table: {}'.format(sar_config)) 633 expected_power = [ 634 min([csv_powers[0], regulatory_power, board_power]) - 1.5, 635 min([csv_powers[1], regulatory_power, board_power]) - 1.5 636 ] 637 power_str = "NVRAM Powers: {}, CSV Powers: {}, Reg Powers: {}, Board Power: {}, Expected Powers: {}, Reported Powers: {}".format( 638 nvram_powers, csv_powers, [regulatory_power] * 2, 639 [board_power] * 2, expected_power, result['tx_powers']) 640 max_error = max([ 641 abs(expected_power[idx] - result['tx_powers'][idx]) 642 for idx in [0, 1] 643 ]) 644 if max_error > 1: 645 asserts.fail(power_str) 646 else: 647 asserts.explicit_pass(power_str) 648 649 def process_testclass_results(self): 650 pass 651 652 def run_tx_power_test(self, testcase_params): 653 """Main function to test tx power. 654 655 The function sets up the AP & DUT in the correct channel and mode 656 configuration, starts ping traffic and queries the current TX power. 657 658 Args: 659 testcase_params: dict containing all test parameters 660 Returns: 661 test_result: dict containing ping results and other meta data 662 """ 663 # Prepare results dict 664 llstats_obj = wputils.LinkLayerStats( 665 self.dut, self.testclass_params.get('llstats_enabled', True)) 666 test_result = collections.OrderedDict() 667 test_result['testcase_params'] = testcase_params.copy() 668 test_result['test_name'] = self.current_test_name 669 test_result['ap_config'] = self.access_point.ap_settings.copy() 670 test_result['attenuation'] = testcase_params['atten_range'] 671 test_result['fixed_attenuation'] = self.testbed_params[ 672 'fixed_attenuation'][str(testcase_params['channel'])] 673 test_result['rssi_results'] = [] 674 test_result['ping_results'] = [] 675 test_result['llstats'] = [] 676 # Setup sniffer 677 if self.testbed_params['sniffer_enable']: 678 self.sniffer.start_capture( 679 testcase_params['test_network'], 680 chan=testcase_params['channel'], 681 bw=testcase_params['bandwidth'], 682 duration=testcase_params['ping_duration'] * 683 len(testcase_params['atten_range']) + self.TEST_TIMEOUT) 684 # Set sar state 685 if testcase_params['sar_state'] == -1: 686 self.dut.adb.shell('halutil -sar disable') 687 else: 688 self.dut.adb.shell('halutil -sar enable {}'.format( 689 testcase_params['sar_state'])) 690 # Run ping and sweep attenuation as needed 691 self.log.info('Starting ping.') 692 thread_future = wputils.get_ping_stats_nb(self.ping_server, 693 self.dut_ip, 10, 0.02, 64) 694 695 for atten in testcase_params['atten_range']: 696 for attenuator in self.attenuators: 697 attenuator.set_atten(atten, strict=False, retry=True) 698 # Set mcs 699 if isinstance(testcase_params['channel'], 700 int) and testcase_params['channel'] < 13: 701 self.dut.adb.shell('wl 2g_rate -e 0 -s 2 -b {}'.format( 702 testcase_params['bandwidth'])) 703 elif isinstance(testcase_params['channel'], 704 int) and testcase_params['channel'] > 13: 705 self.dut.adb.shell('wl 5g_rate -e 0 -s 2 -b {}'.format( 706 testcase_params['bandwidth'])) 707 else: 708 self.dut.adb.shell('wl 6g_rate -e 0 -s 2 -b {}'.format( 709 testcase_params['bandwidth'])) 710 # Refresh link layer stats 711 llstats_obj.update_stats() 712 # Check sar state 713 self.log.info('Current Country: {}'.format( 714 self.dut.adb.shell('wl country'))) 715 # Dump last est power multiple times 716 chain_0_power = [] 717 chain_1_power = [] 718 for idx in range(30): 719 last_est_out = self.dut.adb.shell( 720 "wl curpower | grep 'Last est. power'", ignore_status=True) 721 if "Last est. power" in last_est_out: 722 try: 723 per_chain_powers = last_est_out.split( 724 ':')[1].strip().split(' ') 725 per_chain_powers = [ 726 float(power) for power in per_chain_powers 727 ] 728 except: 729 per_chain_powers = [0, 0] 730 self.log.warning( 731 'Could not parse output: {}'.format(last_est_out)) 732 self.log.info( 733 'Current Tx Powers = {}'.format(per_chain_powers)) 734 if per_chain_powers[0] > 0: 735 chain_0_power.append(per_chain_powers[0]) 736 if per_chain_powers[1] > 0: 737 chain_1_power.append(per_chain_powers[1]) 738 time.sleep(0.25) 739 # Check if empty 740 if len(chain_0_power) == 0 or len(chain_1_power) == 0: 741 test_result['tx_powers'] = [0, 0] 742 tx_power_frequency = [100, 100] 743 else: 744 test_result['tx_powers'] = [ 745 scipy.stats.mode(chain_0_power).mode[0], 746 scipy.stats.mode(chain_1_power).mode[0] 747 ] 748 tx_power_frequency = [ 749 100 * scipy.stats.mode(chain_0_power).count[0] / 750 len(chain_0_power), 751 100 * scipy.stats.mode(chain_1_power).count[0] / 752 len(chain_0_power) 753 ] 754 self.log.info( 755 'Filtered Tx Powers = {}. Frequency = [{:.0f}%, {:.0f}%]'. 756 format(test_result['tx_powers'], tx_power_frequency[0], 757 tx_power_frequency[1])) 758 llstats_obj.update_stats() 759 curr_llstats = llstats_obj.llstats_incremental.copy() 760 test_result['llstats'].append(curr_llstats) 761 # DUMP wl curpower one 762 try: 763 wl_curpower = self.dut.adb.shell('wl curpower') 764 except: 765 time.sleep(0.25) 766 wl_curpower = self.dut.adb.shell('wl curpower', 767 ignore_status=True) 768 current_context = context.get_current_context( 769 ).get_full_output_path() 770 wl_curpower_path = os.path.join(current_context, 771 'wl_curpower_output') 772 with open(wl_curpower_path, 'w') as file: 773 file.write(wl_curpower) 774 wl_curpower_dict = self.process_wl_curpower( 775 wl_curpower_path, testcase_params) 776 wl_curpower_path = os.path.join(current_context, 777 'wl_curpower_dict') 778 with open(wl_curpower_path, 'w') as file: 779 json.dump(wputils.serialize_dict(wl_curpower_dict), 780 file, 781 indent=4) 782 test_result['wl_curpower'] = wl_curpower_dict 783 thread_future.result() 784 if self.testbed_params['sniffer_enable']: 785 self.sniffer.stop_capture() 786 return test_result 787 788 def setup_ap(self, testcase_params): 789 """Sets up the access point in the configuration required by the test. 790 791 Args: 792 testcase_params: dict containing AP and other test params 793 """ 794 band = self.access_point.band_lookup_by_channel( 795 testcase_params['channel']) 796 if '6G' in band: 797 frequency = wutils.WifiEnums.channel_6G_to_freq[int( 798 testcase_params['channel'].strip('6g'))] 799 else: 800 if testcase_params['channel'] < 13: 801 frequency = wutils.WifiEnums.channel_2G_to_freq[ 802 testcase_params['channel']] 803 else: 804 frequency = wutils.WifiEnums.channel_5G_to_freq[ 805 testcase_params['channel']] 806 if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: 807 self.access_point.set_region(self.testbed_params['DFS_region']) 808 else: 809 self.access_point.set_region(self.testbed_params['default_region']) 810 self.access_point.set_channel_and_bandwidth(band, 811 testcase_params['channel'], 812 testcase_params['mode']) 813 #self.access_point.set_channel(band, testcase_params['channel']) 814 #self.access_point.set_bandwidth(band, testcase_params['mode']) 815 if 'low' in testcase_params['ap_power']: 816 self.log.info('Setting low AP power.') 817 self.access_point.set_power( 818 band, self.testclass_params['low_ap_tx_power']) 819 self.log.info('Access Point Configuration: {}'.format( 820 self.access_point.ap_settings)) 821 822 def setup_dut(self, testcase_params): 823 """Sets up the DUT in the configuration required by the test. 824 825 Args: 826 testcase_params: dict containing AP and other test params 827 """ 828 # Turn screen off to preserve battery 829 if self.testbed_params.get('screen_on', 830 False) or self.testclass_params.get( 831 'screen_on', False): 832 self.dut.droid.wakeLockAcquireDim() 833 else: 834 self.dut.go_to_sleep() 835 if wputils.validate_network(self.dut, 836 testcase_params['test_network']['SSID']): 837 current_country = self.dut.adb.shell('wl country') 838 self.log.info('Current country code: {}'.format(current_country)) 839 if testcase_params['country_code'] in current_country: 840 self.log.info('Already connected to desired network') 841 self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses( 842 'wlan0')[0] 843 return 844 testcase_params['test_network']['channel'] = testcase_params['channel'] 845 wutils.wifi_toggle_state(self.dut, False) 846 wutils.set_wifi_country_code(self.dut, testcase_params['country_code']) 847 wutils.wifi_toggle_state(self.dut, True) 848 wutils.reset_wifi(self.dut) 849 if self.testbed_params.get('txbf_off', False): 850 wputils.disable_beamforming(self.dut) 851 wutils.set_wifi_country_code(self.dut, testcase_params['country_code']) 852 current_country = self.dut.adb.shell('wl country') 853 self.log.info('Current country code: {}'.format(current_country)) 854 if testcase_params['country_code'] not in current_country: 855 asserts.fail('Country code not correct.') 856 chan_list = self.dut.adb.shell('wl chan_info_list') 857 if str(testcase_params['channel']) not in chan_list: 858 asserts.skip('Channel {} not supported in {}'.format( 859 testcase_params['channel'], testcase_params['country_code'])) 860 wutils.wifi_connect(self.dut, 861 testcase_params['test_network'], 862 num_of_tries=5, 863 check_connectivity=True) 864 self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 865 866 def setup_tx_power_test(self, testcase_params): 867 """Function that gets devices ready for the test. 868 869 Args: 870 testcase_params: dict containing test-specific parameters 871 """ 872 # Configure AP 873 self.setup_ap(testcase_params) 874 # Set attenuator to 0 dB 875 for attenuator in self.attenuators: 876 attenuator.set_atten(0, strict=False, retry=True) 877 # Reset, configure, and connect DUT 878 self.setup_dut(testcase_params) 879 880 def check_skip_conditions(self, testcase_params): 881 """Checks if test should be skipped.""" 882 # Check battery level before test 883 if not wputils.health_check(self.dut, 10): 884 asserts.skip('DUT battery level too low.') 885 if testcase_params[ 886 'channel'] in wputils.CHANNELS_6GHz and not self.dut.droid.is6GhzBandSupported( 887 ): 888 asserts.skip('DUT does not support 6 GHz band.') 889 if not self.access_point.band_lookup_by_channel( 890 testcase_params['channel']): 891 asserts.skip('AP does not support requested channel.') 892 893 def compile_test_params(self, testcase_params): 894 """Function to compile all testcase parameters.""" 895 896 self.check_skip_conditions(testcase_params) 897 898 band = self.access_point.band_lookup_by_channel( 899 testcase_params['channel']) 900 testcase_params['test_network'] = self.main_network[band] 901 testcase_params['attenuated_chain'] = -1 902 testcase_params.update( 903 ping_interval=self.testclass_params['ping_interval'], 904 ping_duration=self.testclass_params['ping_duration'], 905 ping_size=self.testclass_params['ping_size'], 906 ) 907 908 testcase_params['atten_range'] = [0] 909 return testcase_params 910 911 def _test_ping(self, testcase_params): 912 """ Function that gets called for each range test case 913 914 The function gets called in each range test case. It customizes the 915 range test based on the test name of the test that called it 916 917 Args: 918 testcase_params: dict containing preliminary set of parameters 919 """ 920 # Compile test parameters from config and test name 921 testcase_params = self.compile_test_params(testcase_params) 922 # Run ping test 923 self.setup_tx_power_test(testcase_params) 924 result = self.run_tx_power_test(testcase_params) 925 self.pass_fail_check(result) 926 927 def generate_test_cases(self, ap_power, channels, modes, test_types, 928 country_codes, sar_states): 929 """Function that auto-generates test cases for a test class.""" 930 test_cases = [] 931 allowed_configs = { 932 20: [ 933 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 64, 100, 934 116, 132, 140, 149, 153, 157, 161 935 ], 936 40: [36, 44, 100, 149, 157], 937 80: [36, 100, 149], 938 160: [36, '6g37', '6g117', '6g213'] 939 } 940 941 for channel, mode, test_type, country_code, sar_state in itertools.product( 942 channels, modes, test_types, country_codes, sar_states): 943 bandwidth = int(''.join([x for x in mode if x.isdigit()])) 944 if channel not in allowed_configs[bandwidth]: 945 continue 946 testcase_name = '{}_ch{}_{}_{}_sar_{}'.format( 947 test_type, channel, mode, country_code, sar_state) 948 testcase_params = collections.OrderedDict( 949 test_type=test_type, 950 ap_power=ap_power, 951 channel=channel, 952 mode=mode, 953 bandwidth=bandwidth, 954 country_code=country_code, 955 sar_state=sar_state) 956 setattr(self, testcase_name, 957 partial(self._test_ping, testcase_params)) 958 test_cases.append(testcase_name) 959 return test_cases 960 961 962class WifiTxPowerCheck_BasicSAR_Test(WifiTxPowerCheckTest): 963 964 def __init__(self, controllers): 965 base_test.BaseTestClass.__init__(self, controllers) 966 self.testcase_metric_logger = ( 967 BlackboxMappedMetricLogger.for_test_case()) 968 self.testclass_metric_logger = ( 969 BlackboxMappedMetricLogger.for_test_class()) 970 self.publish_testcase_metrics = True 971 self.tests = self.generate_test_cases( 972 ap_power='standard', 973 channels=[6, 36, 52, 100, 149, '6g37'], 974 modes=['bw20', 'bw160'], 975 test_types=[ 976 'test_tx_power', 977 ], 978 country_codes=['US', 'GB', 'JP', 'CA'], 979 sar_states=[-1, 0, 1, 2, 3, 4]) 980