1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import json
21import logging
22import math
23import os
24import re
25import scipy.stats
26import time
27from acts import asserts
28from acts import context
29from acts import base_test
30from acts import utils
31from acts.controllers.utils_lib import ssh
32from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
33from acts_contrib.test_utils.wifi import ota_sniffer
34from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
35from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
36from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
37from functools import partial
38
39
40class WifiTxPowerCheckTest(base_test.BaseTestClass):
41    """Class for ping-based Wifi performance tests.
42
43    This class implements WiFi ping performance tests such as range and RTT.
44    The class setups up the AP in the desired configurations, configures
45    and connects the phone to the AP, and runs  For an example config file to
46    run this test class see example_connectivity_performance_ap_sta.json.
47    """
48
49    TEST_TIMEOUT = 10
50    RSSI_POLL_INTERVAL = 0.2
51    SHORT_SLEEP = 1
52    MED_SLEEP = 5
53    MAX_CONSECUTIVE_ZEROS = 5
54    DISCONNECTED_PING_RESULT = {
55        'connected': 0,
56        'rtt': [],
57        'time_stamp': [],
58        'ping_interarrivals': [],
59        'packet_loss_percentage': 100
60    }
61
62    BRCM_SAR_MAPPING = {
63        0: 'disable',
64        1: 'head',
65        2: 'grip',
66        16: 'bt',
67        32: 'hotspot'
68    }
69
70    BAND_TO_CHANNEL_MAP = {
71        ('2g', 1): [1, 6, 11],
72        ('5g', 1): [36, 40, 44, 48],
73        ('5g', 2): [52, 56, 60, 64],
74        ('5g', 3): range(100, 148, 4),
75        ('5g', 4): [149, 153, 157, 161],
76        ('6g', 1): ['6g{}'.format(channel) for channel in range(1, 46, 4)],
77        ('6g', 2): ['6g{}'.format(channel) for channel in range(49, 94, 4)],
78        ('6g', 3): ['6g{}'.format(channel) for channel in range(97, 114, 4)],
79        ('6g', 4): ['6g{}'.format(channel) for channel in range(117, 158, 4)],
80        ('6g', 5): ['6g{}'.format(channel) for channel in range(161, 186, 4)],
81        ('6g', 6): ['6g{}'.format(channel) for channel in range(189, 234, 4)]
82    }
83
84    def __init__(self, controllers):
85        base_test.BaseTestClass.__init__(self, controllers)
86        self.testcase_metric_logger = (
87            BlackboxMappedMetricLogger.for_test_case())
88        self.testclass_metric_logger = (
89            BlackboxMappedMetricLogger.for_test_class())
90        self.publish_testcase_metrics = True
91        self.tests = self.generate_test_cases(
92            ap_power='standard',
93            channels=[6, 36, 52, 100, 149, '6g37', '6g117', '6g213'],
94            modes=['bw20', 'bw40', 'bw80', 'bw160'],
95            test_types=[
96                'test_tx_power',
97            ],
98            country_codes=['US', 'GB', 'JP', 'CA', 'AU'],
99            sar_states=range(-1, 13))
100
101    def setup_class(self):
102        self.dut = self.android_devices[-1]
103        req_params = [
104            'tx_power_test_params', 'testbed_params', 'main_network',
105            'RetailAccessPoints', 'RemoteServer'
106        ]
107        opt_params = ['OTASniffer']
108        self.unpack_userparams(req_params, opt_params)
109        self.testclass_params = self.tx_power_test_params
110        self.num_atten = self.attenuators[0].instrument.num_atten
111        self.ping_server = ssh.connection.SshConnection(
112            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
113        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
114        if hasattr(self,
115                   'OTASniffer') and self.testbed_params['sniffer_enable']:
116            try:
117                self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
118            except:
119                self.log.warning('Could not start sniffer. Disabling sniffs.')
120                self.testbed_params['sniffer_enable'] = 0
121        self.log.info('Access Point Configuration: {}'.format(
122            self.access_point.ap_settings))
123        self.log_path = os.path.join(logging.log_path, 'results')
124        os.makedirs(self.log_path, exist_ok=True)
125        self.atten_dut_chain_map = {}
126        self.testclass_results = []
127
128        # Turn WiFi ON
129        if self.testclass_params.get('airplane_mode', 1):
130            self.log.info('Turning on airplane mode.')
131            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
132                                'Can not turn on airplane mode.')
133        wutils.wifi_toggle_state(self.dut, True)
134        self.dut.droid.wifiEnableVerboseLogging(1)
135        asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
136                             "Failed to enable WiFi verbose logging.")
137
138        # decode nvram
139        self.nvram_sar_data = self.read_nvram_sar_data()
140        self.csv_sar_data = self.read_sar_csv(self.testclass_params['sar_csv'])
141
142        # Configure test retries
143        self.user_params['retry_tests'] = [self.__class__.__name__]
144
145    def teardown_class(self):
146        # Turn WiFi OFF and reset AP
147        self.access_point.teardown()
148        for dev in self.android_devices:
149            wutils.wifi_toggle_state(dev, False)
150            dev.go_to_sleep()
151        self.process_testclass_results()
152
153    def setup_test(self):
154        self.retry_flag = False
155
156    def teardown_test(self):
157        self.retry_flag = False
158
159    def on_retry(self):
160        """Function to control test logic on retried tests.
161
162        This function is automatically executed on tests that are being
163        retried. In this case the function resets wifi, toggles it off and on
164        and sets a retry_flag to enable further tweaking the test logic on
165        second attempts.
166        """
167        self.retry_flag = True
168        for dev in self.android_devices:
169            wutils.reset_wifi(dev)
170            wutils.toggle_wifi_off_and_on(dev)
171
172    def read_sar_csv(self, sar_csv):
173        """Reads SAR powers from CSV.
174
175        This function reads SAR powers from a CSV and generate a dictionary
176        with all programmed TX powers on a per band and regulatory domain
177        basis.
178
179        Args:
180            sar_csv: path to SAR data file.
181        Returns:
182            sar_powers: dict containing all SAR data
183        """
184
185        sar_powers = {}
186        sar_csv_data = []
187        with open(sar_csv, mode='r') as f:
188            reader = csv.DictReader(f)
189            for row in reader:
190                row['Sub-band Powers'] = [
191                    float(val) for key, val in row.items()
192                    if 'Sub-band' in key and val != ''
193                ]
194                sar_csv_data.append(row)
195
196        for row in sar_csv_data:
197            sar_powers.setdefault(int(row['Scenario Index']), {})
198            sar_powers[int(row['Scenario Index'])].setdefault('SAR Powers', {})
199            sar_row_key = (row['Regulatory Domain'], row['Mode'], row['Band'])
200            sar_powers[int(row['Scenario Index'])]['SAR Powers'].setdefault(
201                sar_row_key, {})
202            sar_powers[int(
203                row['Scenario Index'])]['SAR Powers'][sar_row_key][int(
204                    row['Chain'])] = row['Sub-band Powers']
205        return sar_powers
206
207    def read_nvram_sar_data(self):
208        """Reads SAR powers from NVRAM.
209
210        This function reads SAR powers from the NVRAM found on the DUT and
211        generates a dictionary with all programmed TX powers on a per band and
212        regulatory domain basis. NThe NVRAM file is chosen based on the build,
213        but if no NVRAM file is found matching the expected name, the default
214        NVRAM will be loaded. The choice of NVRAM is not guaranteed to be
215        correct.
216
217        Returns:
218            nvram_sar_data: dict containing all SAR data
219        """
220
221        self._read_sar_config_info()
222        try:
223            hardware_version = self.dut.adb.shell(
224                'getprop ro.boot.hardware.revision')
225            nvram_path = '/vendor/firmware/bcmdhd.cal_{}'.format(
226                hardware_version)
227            nvram = self.dut.adb.shell('cat {}'.format(nvram_path))
228        except:
229            nvram = self.dut.adb.shell('cat /vendor/firmware/bcmdhd.cal')
230        current_context = context.get_current_context().get_full_output_path()
231        file_path = os.path.join(current_context, 'nvram_file')
232        with open(file_path, 'w') as file:
233            file.write(nvram)
234        nvram_sar_data = {}
235        for line in nvram.splitlines():
236            if 'dynsar' in line:
237                sar_config, sar_powers = self._parse_nvram_sar_line(line)
238                nvram_sar_data[sar_config] = sar_powers
239        file_path = os.path.join(current_context, 'nvram_sar_data')
240        with open(file_path, 'w') as file:
241            json.dump(wputils.serialize_dict(nvram_sar_data), file, indent=4)
242
243        return nvram_sar_data
244
245    def _read_sar_config_info(self):
246        """Function to read SAR scenario mapping,
247
248        This function reads sar_config.info file which contains the mapping
249        of SAR scenarios to NVRAM data tables.
250        """
251
252        self.sar_state_mapping = collections.OrderedDict([(-2, {
253            "google_name":
254            'WIFI_POWER_SCENARIO_INVALID'
255        }), (-1, {
256            "google_name": 'WIFI_POWER_SCENARIO_DISABLE'
257        }), (0, {
258            "google_name": 'WIFI_POWER_SCENARIO_VOICE_CALL'
259        }), (1, {
260            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_OFF'
261        }), (2, {
262            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_ON'
263        }), (3, {
264            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_OFF'
265        }), (4, {
266            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON'
267        }), (5, {
268            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_BT'
269        }), (6, {
270            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT'
271        }), (7, {
272            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT_MMW'
273        }), (8, {
274            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON_BT'
275        }), (9, {
276            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT'
277        }), (10, {
278            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT'
279        }), (11, {
280            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_MMW'
281        }), (12, {
282            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT_MMW'
283        })])
284        sar_config_path = '/vendor/firmware/sarconfig.info'
285        sar_config = self.dut.adb.shell(
286            'cat {}'.format(sar_config_path)).splitlines()
287        sar_config = [line.split(',') for line in sar_config]
288        sar_config = [[int(x) for x in line] for line in sar_config]
289
290        for sar_state in sar_config:
291            self.sar_state_mapping[sar_state[0]]['brcm_index'] = (
292                self.BRCM_SAR_MAPPING[sar_state[1]], bool(sar_state[2]))
293        current_context = context.get_current_context().get_full_output_path()
294        file_path = os.path.join(current_context, 'sarconfig')
295        with open(file_path, 'w') as file:
296            json.dump(wputils.serialize_dict(self.sar_state_mapping),
297                      file,
298                      indent=4)
299
300    def _parse_nvram_sar_line(self, sar_line):
301        """Helper function to decode SAR NVRAM data lines.
302
303        Args:
304            sar_line: single line of text from NVRAM file containing SAR data.
305        Returns:
306            sar_config: sar config referenced in this line
307            decoded_values: tx powers configured in this line
308        """
309
310        sar_config = collections.OrderedDict()
311        list_of_countries = ['fcc', 'jp', 'ca']
312        try:
313            sar_config['country'] = next(country
314                                         for country in list_of_countries
315                                         if country in sar_line.split('=')[0])
316        except:
317            sar_config['country'] = 'row'
318
319        list_of_sar_states = ['grip', 'bt', 'hotspot']
320        try:
321            sar_config['state'] = next(state for state in list_of_sar_states
322                                       if state in sar_line.split('=')[0])
323        except:
324            sar_config['state'] = 'head'
325
326        list_of_bands = ['2g', '5g', '6g']
327        sar_config['band'] = next(band for band in list_of_bands
328                                  if band in sar_line.split('=')[0])
329
330        sar_config['rsdb'] = 'rsdb' if 'rsdb' in sar_line else 'mimo'
331        sar_config['airplane_mode'] = '_2=' in sar_line
332
333        sar_powers = sar_line.split('=')[1].split(',')
334        decoded_powers = []
335        for sar_power in sar_powers:
336            # Note that core 0 and 1 are flipped in the NVRAM entries
337            decoded_powers.append([
338                (int(sar_power[4:], 16) & int('7f', 16)) / 4,
339                (int(sar_power[2:4], 16) & int('7f', 16)) / 4
340            ])
341
342        return tuple(sar_config.values()), decoded_powers
343
344    def get_sar_power_from_nvram(self, testcase_params):
345        """Function to get current expected SAR power from nvram
346
347        This functions gets the expected SAR TX power from the DUT NVRAM data.
348        The SAR power is looked up based on the current channel and regulatory
349        domain,
350
351        Args:
352            testcase_params: dict containing channel, sar state, country code
353        Returns:
354            sar_config: current expected sar config
355            sar_powers: current expected sar powers
356        """
357
358        if testcase_params['country_code'] == 'US':
359            reg_domain = 'fcc'
360        elif testcase_params['country_code'] == 'JP':
361            reg_domain = 'jp'
362        elif testcase_params['country_code'] == 'CA':
363            reg_domain = 'ca'
364        else:
365            reg_domain = 'row'
366        for band, channels in self.BAND_TO_CHANNEL_MAP.items():
367            if testcase_params['channel'] in channels:
368                current_band = band[0]
369                sub_band_idx = band[1]
370                break
371        sar_config = (reg_domain, self.sar_state_mapping[
372            testcase_params['sar_state']]['brcm_index'][0], current_band,
373                      'mimo', self.sar_state_mapping[
374                          testcase_params['sar_state']]['brcm_index'][1])
375        sar_powers = self.nvram_sar_data[sar_config][sub_band_idx - 1]
376        return sar_config, sar_powers
377
378    def get_sar_power_from_csv(self, testcase_params):
379        """Function to get current expected SAR power from CSV.
380
381        This functions gets the expected SAR TX power from the DUT NVRAM data.
382        The SAR power is looked up based on the current channel and regulatory
383        domain,
384
385        Args:
386            testcase_params: dict containing channel, sar state, country code
387        Returns:
388            sar_config: current expected sar config
389            sar_powers: current expected sar powers
390        """
391
392        if testcase_params['country_code'] == 'US':
393            reg_domain = 'fcc'
394        elif testcase_params['country_code'] == 'JP':
395            reg_domain = 'jp'
396        elif testcase_params['country_code'] == 'CA':
397            reg_domain = 'ca'
398        else:
399            reg_domain = 'row'
400        for band, channels in self.BAND_TO_CHANNEL_MAP.items():
401            if testcase_params['channel'] in channels:
402                current_band = band[0]
403                sub_band_idx = band[1]
404                break
405        sar_config = (reg_domain, 'mimo', current_band)
406        sar_powers = [
407            self.csv_sar_data[testcase_params['sar_state']]['SAR Powers']
408            [sar_config][0][sub_band_idx - 1],
409            self.csv_sar_data[testcase_params['sar_state']]['SAR Powers']
410            [sar_config][1][sub_band_idx - 1]
411        ]
412        return sar_config, sar_powers
413
414    def process_wl_curpower(self, wl_curpower_file, testcase_params):
415        """Function to parse wl_curpower output.
416
417        Args:
418            wl_curpower_file: path to curpower output file.
419            testcase_params: dict containing channel, sar state, country code
420        Returns:
421            wl_curpower_dict: dict formatted version of curpower data.
422        """
423
424        with open(wl_curpower_file, 'r') as file:
425            wl_curpower_out = file.read()
426
427        channel_regex = re.compile(r'Current Channel:\s+(?P<channel>[0-9]+)')
428        bandwidth_regex = re.compile(
429            r'Channel Width:\s+(?P<bandwidth>\S+)MHz\n')
430
431        channel = int(
432            re.search(channel_regex, wl_curpower_out).group('channel'))
433        bandwidth = int(
434            re.search(bandwidth_regex, wl_curpower_out).group('bandwidth'))
435
436        regulatory_limits = self.generate_regulatory_table(
437            wl_curpower_out, channel, bandwidth)
438        board_limits = self.generate_board_limit_table(wl_curpower_out,
439                                                       channel, bandwidth)
440        wl_curpower_dict = {
441            'channel': channel,
442            'bandwidth': bandwidth,
443            'country': testcase_params['country_code'],
444            'regulatory_limits': regulatory_limits,
445            'board_limits': board_limits
446        }
447        return wl_curpower_dict
448
449    def generate_regulatory_table(self, wl_curpower_out, channel, bw):
450        """"Helper function to generate regulatory limit table from curpower.
451
452        Args:
453            wl_curpower_out: curpower output
454            channel: current channel
455            bw: current bandwidth
456        Returns:
457            regulatory_table: dict with regulatory limits for current config
458        """
459
460        regulatory_group_map = {
461            'DSSS':
462            [('CCK', rate, 1)
463             for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]],
464            'OFDM_CDD1': [('LEGACY', rate, 1) for rate in [
465                '{}Mbps'.format(mbps)
466                for mbps in [6, 9, 12, 18, 24, 36, 48, 54]
467            ]],
468            'MCS0_7_CDD1':
469            [(mode, rate, 1)
470             for (mode,
471                  rate) in itertools.product(['HT' + str(bw), 'VHT' +
472                                              str(bw)], range(0, 8))],
473            'VHT8_9SS1_CDD1': [('VHT' + str(bw), 8, 1),
474                               ('VHT' + str(bw), 9, 1)],
475            'VHT10_11SS1_CDD1': [('VHT' + str(bw), 10, 1),
476                                 ('VHT' + str(bw), 11, 1)],
477            'MCS8_15':
478            [(mode, rate - 8 * ('VHT' in mode), 2)
479             for (mode,
480                  rate) in itertools.product(['HT' + str(bw), 'VHT' +
481                                              str(bw)], range(8, 16))],
482            'VHT8_9SS2': [('VHT' + str(bw), 8, 2), ('VHT' + str(bw), 9, 2)],
483            'VHT10_11SS2': [('VHT' + str(bw), 10, 2),
484                            ('VHT' + str(bw), 11, 2)],
485            'HE_MCS0-11_CDD1': [('HE' + str(bw), rate, 1)
486                                for rate in range(0, 12)],
487            'HE_MCS0_11SS2': [('HE' + str(bw), rate, 2)
488                              for rate in range(0, 12)],
489        }
490        tx_power_regex = re.compile(
491            '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)'
492        )
493
494        regulatory_section_regex = re.compile(
495            r'Regulatory Limits:(?P<regulatory_limits>[\S\s]+)Board Limits:')
496        regulatory_list = re.search(regulatory_section_regex,
497                                    wl_curpower_out).group('regulatory_limits')
498        regulatory_list = re.findall(tx_power_regex, regulatory_list)
499        regulatory_dict = {entry[0]: entry[2:] for entry in regulatory_list}
500
501        bw_index = int(math.log(bw / 10, 2)) - 1
502        regulatory_table = collections.OrderedDict()
503        for regulatory_group, rates in regulatory_group_map.items():
504            for rate in rates:
505                reg_power = regulatory_dict.get(regulatory_group,
506                                                ['0', '0', '0', '0'])[bw_index]
507                regulatory_table[rate] = float(
508                    reg_power) if reg_power != '-' else 0
509        return regulatory_table
510
511    def generate_board_limit_table(self, wl_curpower_out, channel, bw):
512        """"Helper function to generate board limit table from curpower.
513
514        Args:
515            wl_curpower_out: curpower output
516            channel: current channel
517            bw: current bandwidth
518        Returns:
519            board_limit_table: dict with board limits for current config
520        """
521
522        tx_power_regex = re.compile(
523            '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)'
524        )
525
526        board_section_regex = re.compile(
527            r'Board Limits:(?P<board_limits>[\S\s]+)Power Targets:')
528        board_limits_list = re.search(board_section_regex,
529                                      wl_curpower_out).group('board_limits')
530        board_limits_list = re.findall(tx_power_regex, board_limits_list)
531        board_limits_dict = {
532            entry[0]: entry[2:]
533            for entry in board_limits_list
534        }
535
536        mcs_regex_list = [[
537            re.compile('DSSS'),
538            [('CCK', rate, 1)
539             for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]]
540        ], [re.compile('OFDM(?P<mcs>[0-9]+)_CDD1'), [('LEGACY', '{}Mbps', 1)]],
541                          [
542                              re.compile('MCS(?P<mcs>[0-7])_CDD1'),
543                              [('HT{}'.format(bw), '{}', 1),
544                               ('VHT{}'.format(bw), '{}', 1)]
545                          ],
546                          [
547                              re.compile('VHT(?P<mcs>[8-9])SS1_CDD1'),
548                              [('VHT{}'.format(bw), '{}', 1)]
549                          ],
550                          [
551                              re.compile('VHT10_11SS1_CDD1'),
552                              [('VHT{}'.format(bw), '10', 1),
553                               ('VHT{}'.format(bw), '11', 1)]
554                          ],
555                          [
556                              re.compile('MCS(?P<mcs>[0-9]{2})'),
557                              [('HT{}'.format(bw), '{}', 2)]
558                          ],
559                          [
560                              re.compile('VHT(?P<mcs>[0-9])SS2'),
561                              [('VHT{}'.format(bw), '{}', 2)]
562                          ],
563                          [
564                              re.compile('VHT10_11SS2'),
565                              [('VHT{}'.format(bw), '10', 2),
566                               ('VHT{}'.format(bw), '11', 2)]
567                          ],
568                          [
569                              re.compile('HE_MCS(?P<mcs>[0-9]+)_CDD1'),
570                              [('HE{}'.format(bw), '{}', 1)]
571                          ],
572                          [
573                              re.compile('HE_MCS(?P<mcs>[0-9]+)SS2'),
574                              [('HE{}'.format(bw), '{}', 2)]
575                          ]]
576
577        bw_index = int(math.log(bw / 10, 2)) - 1
578        board_limit_table = collections.OrderedDict()
579        for mcs, board_limit in board_limits_dict.items():
580            for mcs_regex_tuple in mcs_regex_list:
581                mcs_match = re.match(mcs_regex_tuple[0], mcs)
582                if mcs_match:
583                    for possible_mcs in mcs_regex_tuple[1]:
584                        try:
585                            curr_mcs = (possible_mcs[0],
586                                        possible_mcs[1].format(
587                                            mcs_match.group('mcs')),
588                                        possible_mcs[2])
589                        except:
590                            curr_mcs = (possible_mcs[0], possible_mcs[1],
591                                        possible_mcs[2])
592                        board_limit_table[curr_mcs] = float(
593                            board_limit[bw_index]
594                        ) if board_limit[bw_index] != '-' else 0
595                    break
596        return board_limit_table
597
598    def pass_fail_check(self, result):
599        """Function to evaluate if current TX powqe matches CSV/NVRAM settings.
600
601        This function assesses whether the current TX power reported by the
602        DUT matches the powers programmed in NVRAM and CSV after applying the
603        correct TX power backoff used to account for CLPC errors.
604        """
605
606        if isinstance(result['testcase_params']['channel'],
607                      str) and '6g' in result['testcase_params']['channel']:
608            mode = 'HE' + str(result['testcase_params']['bandwidth'])
609        else:
610            mode = 'HE' + str(result['testcase_params']['bandwidth'])
611        regulatory_power = result['wl_curpower']['regulatory_limits'][(mode, 0,
612                                                                       2)]
613        board_power = result['wl_curpower']['board_limits'][(mode, str(0), 2)]
614        # try:
615        sar_config, nvram_powers = self.get_sar_power_from_nvram(
616            result['testcase_params'])
617        # except:
618        #     nvram_powers = [99, 99]
619        #     sar_config = 'SAR DISABLED'
620        try:
621            csv_config, csv_powers = self.get_sar_power_from_csv(
622                result['testcase_params'])
623        except:
624            #get from wl_curpower
625            csv_powers = [99, 99]
626        self.log.info("SAR state: {} ({})".format(
627            result['testcase_params']['sar_state'],
628            self.sar_state_mapping[result['testcase_params']['sar_state']],
629        ))
630        self.log.info("Country Code: {}".format(
631            result['testcase_params']['country_code']))
632        self.log.info('BRCM SAR Table: {}'.format(sar_config))
633        expected_power = [
634            min([csv_powers[0], regulatory_power, board_power]) - 1.5,
635            min([csv_powers[1], regulatory_power, board_power]) - 1.5
636        ]
637        power_str = "NVRAM Powers: {}, CSV Powers: {}, Reg Powers: {}, Board Power: {}, Expected Powers: {}, Reported Powers: {}".format(
638            nvram_powers, csv_powers, [regulatory_power] * 2,
639            [board_power] * 2, expected_power, result['tx_powers'])
640        max_error = max([
641            abs(expected_power[idx] - result['tx_powers'][idx])
642            for idx in [0, 1]
643        ])
644        if max_error > 1:
645            asserts.fail(power_str)
646        else:
647            asserts.explicit_pass(power_str)
648
649    def process_testclass_results(self):
650        pass
651
652    def run_tx_power_test(self, testcase_params):
653        """Main function to test tx power.
654
655        The function sets up the AP & DUT in the correct channel and mode
656        configuration, starts ping traffic and queries the current TX power.
657
658        Args:
659            testcase_params: dict containing all test parameters
660        Returns:
661            test_result: dict containing ping results and other meta data
662        """
663        # Prepare results dict
664        llstats_obj = wputils.LinkLayerStats(
665            self.dut, self.testclass_params.get('llstats_enabled', True))
666        test_result = collections.OrderedDict()
667        test_result['testcase_params'] = testcase_params.copy()
668        test_result['test_name'] = self.current_test_name
669        test_result['ap_config'] = self.access_point.ap_settings.copy()
670        test_result['attenuation'] = testcase_params['atten_range']
671        test_result['fixed_attenuation'] = self.testbed_params[
672            'fixed_attenuation'][str(testcase_params['channel'])]
673        test_result['rssi_results'] = []
674        test_result['ping_results'] = []
675        test_result['llstats'] = []
676        # Setup sniffer
677        if self.testbed_params['sniffer_enable']:
678            self.sniffer.start_capture(
679                testcase_params['test_network'],
680                chan=testcase_params['channel'],
681                bw=testcase_params['bandwidth'],
682                duration=testcase_params['ping_duration'] *
683                len(testcase_params['atten_range']) + self.TEST_TIMEOUT)
684        # Set sar state
685        if testcase_params['sar_state'] == -1:
686            self.dut.adb.shell('halutil -sar disable')
687        else:
688            self.dut.adb.shell('halutil -sar enable {}'.format(
689                testcase_params['sar_state']))
690        # Run ping and sweep attenuation as needed
691        self.log.info('Starting ping.')
692        thread_future = wputils.get_ping_stats_nb(self.ping_server,
693                                                  self.dut_ip, 10, 0.02, 64)
694
695        for atten in testcase_params['atten_range']:
696            for attenuator in self.attenuators:
697                attenuator.set_atten(atten, strict=False, retry=True)
698            # Set mcs
699            if isinstance(testcase_params['channel'],
700                          int) and testcase_params['channel'] < 13:
701                self.dut.adb.shell('wl 2g_rate -e 0 -s 2 -b {}'.format(
702                    testcase_params['bandwidth']))
703            elif isinstance(testcase_params['channel'],
704                            int) and testcase_params['channel'] > 13:
705                self.dut.adb.shell('wl 5g_rate -e 0 -s 2 -b {}'.format(
706                    testcase_params['bandwidth']))
707            else:
708                self.dut.adb.shell('wl 6g_rate -e 0 -s 2 -b {}'.format(
709                    testcase_params['bandwidth']))
710            # Refresh link layer stats
711            llstats_obj.update_stats()
712            # Check sar state
713            self.log.info('Current Country: {}'.format(
714                self.dut.adb.shell('wl country')))
715            # Dump last est power multiple times
716            chain_0_power = []
717            chain_1_power = []
718            for idx in range(30):
719                last_est_out = self.dut.adb.shell(
720                    "wl curpower | grep 'Last est. power'", ignore_status=True)
721                if "Last est. power" in last_est_out:
722                    try:
723                        per_chain_powers = last_est_out.split(
724                            ':')[1].strip().split('  ')
725                        per_chain_powers = [
726                            float(power) for power in per_chain_powers
727                        ]
728                    except:
729                        per_chain_powers = [0, 0]
730                        self.log.warning(
731                            'Could not parse output: {}'.format(last_est_out))
732                    self.log.info(
733                        'Current Tx Powers = {}'.format(per_chain_powers))
734                    if per_chain_powers[0] > 0:
735                        chain_0_power.append(per_chain_powers[0])
736                    if per_chain_powers[1] > 0:
737                        chain_1_power.append(per_chain_powers[1])
738                time.sleep(0.25)
739            # Check if empty
740            if len(chain_0_power) == 0 or len(chain_1_power) == 0:
741                test_result['tx_powers'] = [0, 0]
742                tx_power_frequency = [100, 100]
743            else:
744                test_result['tx_powers'] = [
745                    scipy.stats.mode(chain_0_power).mode[0],
746                    scipy.stats.mode(chain_1_power).mode[0]
747                ]
748                tx_power_frequency = [
749                    100 * scipy.stats.mode(chain_0_power).count[0] /
750                    len(chain_0_power),
751                    100 * scipy.stats.mode(chain_1_power).count[0] /
752                    len(chain_0_power)
753                ]
754            self.log.info(
755                'Filtered Tx Powers = {}. Frequency = [{:.0f}%, {:.0f}%]'.
756                format(test_result['tx_powers'], tx_power_frequency[0],
757                       tx_power_frequency[1]))
758            llstats_obj.update_stats()
759            curr_llstats = llstats_obj.llstats_incremental.copy()
760            test_result['llstats'].append(curr_llstats)
761            # DUMP wl curpower one
762            try:
763                wl_curpower = self.dut.adb.shell('wl curpower')
764            except:
765                time.sleep(0.25)
766                wl_curpower = self.dut.adb.shell('wl curpower',
767                                                 ignore_status=True)
768            current_context = context.get_current_context(
769            ).get_full_output_path()
770            wl_curpower_path = os.path.join(current_context,
771                                            'wl_curpower_output')
772            with open(wl_curpower_path, 'w') as file:
773                file.write(wl_curpower)
774            wl_curpower_dict = self.process_wl_curpower(
775                wl_curpower_path, testcase_params)
776            wl_curpower_path = os.path.join(current_context,
777                                            'wl_curpower_dict')
778            with open(wl_curpower_path, 'w') as file:
779                json.dump(wputils.serialize_dict(wl_curpower_dict),
780                          file,
781                          indent=4)
782            test_result['wl_curpower'] = wl_curpower_dict
783        thread_future.result()
784        if self.testbed_params['sniffer_enable']:
785            self.sniffer.stop_capture()
786        return test_result
787
788    def setup_ap(self, testcase_params):
789        """Sets up the access point in the configuration required by the test.
790
791        Args:
792            testcase_params: dict containing AP and other test params
793        """
794        band = self.access_point.band_lookup_by_channel(
795            testcase_params['channel'])
796        if '6G' in band:
797            frequency = wutils.WifiEnums.channel_6G_to_freq[int(
798                testcase_params['channel'].strip('6g'))]
799        else:
800            if testcase_params['channel'] < 13:
801                frequency = wutils.WifiEnums.channel_2G_to_freq[
802                    testcase_params['channel']]
803            else:
804                frequency = wutils.WifiEnums.channel_5G_to_freq[
805                    testcase_params['channel']]
806        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
807            self.access_point.set_region(self.testbed_params['DFS_region'])
808        else:
809            self.access_point.set_region(self.testbed_params['default_region'])
810        self.access_point.set_channel_and_bandwidth(band,
811                                                    testcase_params['channel'],
812                                                    testcase_params['mode'])
813        #self.access_point.set_channel(band, testcase_params['channel'])
814        #self.access_point.set_bandwidth(band, testcase_params['mode'])
815        if 'low' in testcase_params['ap_power']:
816            self.log.info('Setting low AP power.')
817            self.access_point.set_power(
818                band, self.testclass_params['low_ap_tx_power'])
819        self.log.info('Access Point Configuration: {}'.format(
820            self.access_point.ap_settings))
821
822    def setup_dut(self, testcase_params):
823        """Sets up the DUT in the configuration required by the test.
824
825        Args:
826            testcase_params: dict containing AP and other test params
827        """
828        # Turn screen off to preserve battery
829        if self.testbed_params.get('screen_on',
830                                   False) or self.testclass_params.get(
831                                       'screen_on', False):
832            self.dut.droid.wakeLockAcquireDim()
833        else:
834            self.dut.go_to_sleep()
835        if wputils.validate_network(self.dut,
836                                    testcase_params['test_network']['SSID']):
837            current_country = self.dut.adb.shell('wl country')
838            self.log.info('Current country code: {}'.format(current_country))
839            if testcase_params['country_code'] in current_country:
840                self.log.info('Already connected to desired network')
841                self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses(
842                    'wlan0')[0]
843                return
844        testcase_params['test_network']['channel'] = testcase_params['channel']
845        wutils.wifi_toggle_state(self.dut, False)
846        wutils.set_wifi_country_code(self.dut, testcase_params['country_code'])
847        wutils.wifi_toggle_state(self.dut, True)
848        wutils.reset_wifi(self.dut)
849        if self.testbed_params.get('txbf_off', False):
850            wputils.disable_beamforming(self.dut)
851        wutils.set_wifi_country_code(self.dut, testcase_params['country_code'])
852        current_country = self.dut.adb.shell('wl country')
853        self.log.info('Current country code: {}'.format(current_country))
854        if testcase_params['country_code'] not in current_country:
855            asserts.fail('Country code not correct.')
856        chan_list = self.dut.adb.shell('wl chan_info_list')
857        if str(testcase_params['channel']) not in chan_list:
858            asserts.skip('Channel {} not supported in {}'.format(
859                testcase_params['channel'], testcase_params['country_code']))
860        wutils.wifi_connect(self.dut,
861                            testcase_params['test_network'],
862                            num_of_tries=5,
863                            check_connectivity=True)
864        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
865
866    def setup_tx_power_test(self, testcase_params):
867        """Function that gets devices ready for the test.
868
869        Args:
870            testcase_params: dict containing test-specific parameters
871        """
872        # Configure AP
873        self.setup_ap(testcase_params)
874        # Set attenuator to 0 dB
875        for attenuator in self.attenuators:
876            attenuator.set_atten(0, strict=False, retry=True)
877        # Reset, configure, and connect DUT
878        self.setup_dut(testcase_params)
879
880    def check_skip_conditions(self, testcase_params):
881        """Checks if test should be skipped."""
882        # Check battery level before test
883        if not wputils.health_check(self.dut, 10):
884            asserts.skip('DUT battery level too low.')
885        if testcase_params[
886                'channel'] in wputils.CHANNELS_6GHz and not self.dut.droid.is6GhzBandSupported(
887                ):
888            asserts.skip('DUT does not support 6 GHz band.')
889        if not self.access_point.band_lookup_by_channel(
890                testcase_params['channel']):
891            asserts.skip('AP does not support requested channel.')
892
893    def compile_test_params(self, testcase_params):
894        """Function to compile all testcase parameters."""
895
896        self.check_skip_conditions(testcase_params)
897
898        band = self.access_point.band_lookup_by_channel(
899            testcase_params['channel'])
900        testcase_params['test_network'] = self.main_network[band]
901        testcase_params['attenuated_chain'] = -1
902        testcase_params.update(
903            ping_interval=self.testclass_params['ping_interval'],
904            ping_duration=self.testclass_params['ping_duration'],
905            ping_size=self.testclass_params['ping_size'],
906        )
907
908        testcase_params['atten_range'] = [0]
909        return testcase_params
910
911    def _test_ping(self, testcase_params):
912        """ Function that gets called for each range test case
913
914        The function gets called in each range test case. It customizes the
915        range test based on the test name of the test that called it
916
917        Args:
918            testcase_params: dict containing preliminary set of parameters
919        """
920        # Compile test parameters from config and test name
921        testcase_params = self.compile_test_params(testcase_params)
922        # Run ping test
923        self.setup_tx_power_test(testcase_params)
924        result = self.run_tx_power_test(testcase_params)
925        self.pass_fail_check(result)
926
927    def generate_test_cases(self, ap_power, channels, modes, test_types,
928                            country_codes, sar_states):
929        """Function that auto-generates test cases for a test class."""
930        test_cases = []
931        allowed_configs = {
932            20: [
933                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 64, 100,
934                116, 132, 140, 149, 153, 157, 161
935            ],
936            40: [36, 44, 100, 149, 157],
937            80: [36, 100, 149],
938            160: [36, '6g37', '6g117', '6g213']
939        }
940
941        for channel, mode, test_type, country_code, sar_state in itertools.product(
942                channels, modes, test_types, country_codes, sar_states):
943            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
944            if channel not in allowed_configs[bandwidth]:
945                continue
946            testcase_name = '{}_ch{}_{}_{}_sar_{}'.format(
947                test_type, channel, mode, country_code, sar_state)
948            testcase_params = collections.OrderedDict(
949                test_type=test_type,
950                ap_power=ap_power,
951                channel=channel,
952                mode=mode,
953                bandwidth=bandwidth,
954                country_code=country_code,
955                sar_state=sar_state)
956            setattr(self, testcase_name,
957                    partial(self._test_ping, testcase_params))
958            test_cases.append(testcase_name)
959        return test_cases
960
961
962class WifiTxPowerCheck_BasicSAR_Test(WifiTxPowerCheckTest):
963
964    def __init__(self, controllers):
965        base_test.BaseTestClass.__init__(self, controllers)
966        self.testcase_metric_logger = (
967            BlackboxMappedMetricLogger.for_test_case())
968        self.testclass_metric_logger = (
969            BlackboxMappedMetricLogger.for_test_class())
970        self.publish_testcase_metrics = True
971        self.tests = self.generate_test_cases(
972            ap_power='standard',
973            channels=[6, 36, 52, 100, 149, '6g37'],
974            modes=['bw20', 'bw160'],
975            test_types=[
976                'test_tx_power',
977            ],
978            country_codes=['US', 'GB', 'JP', 'CA'],
979            sar_states=[-1, 0, 1, 2, 3, 4])
980