xref: /aosp_15_r20/external/autotest/server/cros/chaos_lib/chaos_parser.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2014 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6
7
8import argparse
9import copy
10import csv
11import logging
12import os
13import re
14import shutil
15
16CONNECT_FAIL = object()
17CONFIG_FAIL = object()
18RESULTS_DIR = '/tmp/chaos'
19
20
21class ChaosParser(object):
22    """Defines a parser for chaos test results"""
23
24    def __init__(self, results_dir, create_file, print_config_failures):
25        """ Constructs a parser interface.
26
27        @param results_dir: complete path to restuls directory for a chaos test.
28        @param create_file: True to create csv files; False otherwise.
29        @param print_config_failures: True to print the config info to stdout;
30                                      False otherwise.
31
32        """
33        self._test_results_dir = results_dir
34        self._create_file = create_file
35        self._print_config_failures = print_config_failures
36
37
38    def convert_set_to_string(self, set_list):
39        """Converts a set to a single string.
40
41        @param set_list: a set to convert
42
43        @returns a string, which is all items separated by the word 'and'
44
45        """
46        return_string = str()
47        for i in set_list:
48            return_string += str('%s and ' % i)
49        return return_string[:-5]
50
51
52    def create_csv(self, filename, data_list):
53        """Creates a file in .csv format.
54
55        @param filename: name for the csv file
56        @param data_list: a list of all the info to write to a file
57
58        """
59        if not os.path.exists(RESULTS_DIR):
60            os.mkdir(RESULTS_DIR)
61        try:
62            path = os.path.join(RESULTS_DIR, filename + '.csv')
63            with open(path, 'wb') as f:
64                writer = csv.writer(f)
65                writer.writerow(data_list)
66                logging.info('Created CSV file %s', path)
67        except IOError as e:
68            logging.error('File operation failed with %s: %s', e.errno,
69                           e.strerror)
70            return
71
72
73    def get_ap_name(self, line):
74        """Gets the router name from the string passed.
75
76        @param line: Test ERROR string from chaos status.log
77
78        @returns the router name or brand.
79
80        """
81        router_info = re.search('Router name: ([\w\s]+)', line)
82        return router_info.group(1)
83
84
85    def get_ap_mode_chan_freq(self, ssid):
86        """Gets the AP band from ssid using channel.
87
88        @param ssid: A valid chaos test SSID as a string
89
90        @returns the AP band, mode, and channel.
91
92        """
93        channel_security_info = ssid.split('_')
94        channel_info = channel_security_info[-2]
95        mode = channel_security_info[-3]
96        channel = int(re.split('(\d+)', channel_info)[1])
97        # TODO Choose if we want to keep band, we never put it in the
98        # spreadsheet and is currently unused.
99        if channel in range(1, 15):
100            band = '2.4GHz'
101        else:
102            band = '5GHz'
103        return {'mode': mode.upper(), 'channel': channel,
104                'band': band}
105
106
107    def generate_percentage_string(self, passed_tests, total_tests):
108        """Creates a pass percentage string in the formation x/y (zz%)
109
110        @param passed_tests: int of passed tests
111        @param total_tests: int of total tests
112
113        @returns a formatted string as described above.
114
115        """
116        percent = float(passed_tests)/float(total_tests) * 100
117        percent_string = str(int(round(percent))) + '%'
118        return str('%d/%d (%s)' % (passed_tests, total_tests, percent_string))
119
120
121    def parse_keyval(self, filepath):
122        """Parses the 'keyvalue' file to get device details.
123
124        @param filepath: the complete path to the keyval file
125
126        @returns a board with device name and OS version.
127
128        """
129        # Android information does not exist in the keyfile, add temporary
130        # information into the dictionary.  crbug.com/570408
131        lsb_dict = {'board': 'unknown',
132                    'version': 'unknown'}
133        f = open(filepath, 'r')
134        for line in f:
135            line = line.split('=')
136            if 'RELEASE_BOARD' in line[0]:
137                lsb_dict = {'board':line[1].rstrip()}
138            elif 'RELEASE_VERSION' in line[0]:
139                lsb_dict['version'] = line[1].rstrip()
140            else:
141                continue
142        f.close()
143        return lsb_dict
144
145
146    def parse_status_log(self, board, os_version, security, status_log_path):
147        """Parses the entire status.log file from chaos test for test failures.
148           and creates two CSV files for connect fail and configuration fail
149           respectively.
150
151        @param board: the board the test was run against as a string
152        @param os_version: the version of ChromeOS as a string
153        @param security: the security used during the test as a string
154        @param status_log_path: complete path to the status.log file
155
156        """
157        # Items that can have multiple values
158        modes = list()
159        channels = list()
160        test_fail_aps = list()
161        static_config_failures = list()
162        dynamic_config_failures = list()
163        kernel_version = ""
164        fw_version = ""
165        f = open(status_log_path, 'r')
166        total = 0
167        for line in f:
168            line = line.strip()
169            if line.startswith('START\tnetwork_WiFi'):
170               # Do not count PDU failures in total tests run.
171               if 'PDU' in line:
172                   continue
173               total += 1
174            elif 'kernel_version' in line:
175                kernel_version = re.search('[\d.]+', line).group(0)
176            elif 'firmware_version' in line:
177               fw_version = re.search('firmware_version\': \'([\w\s:().]+)',
178                                      line).group(1)
179            elif line.startswith('ERROR') or line.startswith('FAIL'):
180                title_info = line.split()
181                if 'reboot' in title_info:
182                    continue
183                # Get the hostname for the AP that failed configuration.
184                if 'PDU' in title_info[1]:
185                    continue
186                else:
187                    # Get the router name, band for the AP that failed
188                    # connect.
189                    if 'Config' in title_info[1]:
190                        failure_type = CONFIG_FAIL
191                    else:
192                        failure_type = CONNECT_FAIL
193
194                    if (failure_type == CONFIG_FAIL and
195                        'chromeos' in title_info[1]):
196                        ssid = title_info[1].split('.')[1].split('_')[0]
197                    else:
198                        ssid_info = title_info[1].split('.')
199                        ssid = ssid_info[1]
200                        network_dict = self.get_ap_mode_chan_freq(ssid)
201                        modes.append(network_dict['mode'])
202                        channels.append(network_dict['channel'])
203
204                    # Security mismatches and Ping failures are not connect
205                    # failures.
206                    if (('Ping command' in line or 'correct security' in line)
207                        or failure_type == CONFIG_FAIL):
208                        if 'StaticAPConfigurator' in line:
209                            static_config_failures.append(ssid)
210                        else:
211                            dynamic_config_failures.append(ssid)
212                    else:
213                        test_fail_aps.append(ssid)
214            elif ('END GOOD' in line and ('ChaosConnectDisconnect' in line or
215                                          'ChaosLongConnect' in line)):
216                    test_name = line.split()[2]
217                    ssid = test_name.split('.')[1]
218                    network_dict = self.get_ap_mode_chan_freq(ssid)
219                    modes.append(network_dict['mode'])
220                    channels.append(network_dict['channel'])
221            else:
222                continue
223
224        config_pass = total - (len(dynamic_config_failures) +
225                               len(static_config_failures))
226        config_pass_string = self.generate_percentage_string(config_pass,
227                                                             total)
228        connect_pass = config_pass - len(test_fail_aps)
229        connect_pass_string = self.generate_percentage_string(connect_pass,
230                                                              config_pass)
231
232        base_csv_list = [board, os_version, fw_version, kernel_version,
233                         self.convert_set_to_string(set(modes)),
234                         self.convert_set_to_string(set(channels)),
235                         security]
236
237        static_config_csv_list = copy.deepcopy(base_csv_list)
238        static_config_csv_list.append(config_pass_string)
239        static_config_csv_list.extend(static_config_failures)
240
241        dynamic_config_csv_list = copy.deepcopy(base_csv_list)
242        dynamic_config_csv_list.append(config_pass_string)
243        dynamic_config_csv_list.extend(dynamic_config_failures)
244
245        connect_csv_list = copy.deepcopy(base_csv_list)
246        connect_csv_list.append(connect_pass_string)
247        connect_csv_list.extend(test_fail_aps)
248
249        print('Connect failure for security: %s' % security)
250        print(','.join(connect_csv_list))
251        print('\n')
252
253        if self._print_config_failures:
254            config_files = [('Static', static_config_csv_list),
255                            ('Dynamic', dynamic_config_csv_list)]
256            for config_data in config_files:
257                self.print_config_failures(config_data[0], security,
258                                           config_data[1])
259
260        if self._create_file:
261            self.create_csv('chaos_WiFi_dynamic_config_fail.' + security,
262                            dynamic_config_csv_list)
263            self.create_csv('chaos_WiFi_static_config_fail.' + security,
264                            static_config_csv_list)
265            self.create_csv('chaos_WiFi_connect_fail.' + security,
266                            connect_csv_list)
267
268
269    def print_config_failures(self, config_type, security, config_csv_list):
270        """Prints out the configuration failures.
271
272        @param config_type: string describing the configurator type
273        @param security: the security type as a string
274        @param config_csv_list: list of the configuration failures
275
276        """
277        # 8 because that is the lenth of the base list
278        if len(config_csv_list) <= 8:
279            return
280        print('%s config failures for security: %s' % (config_type, security))
281        print(','.join(config_csv_list))
282        print('\n')
283
284
285    def traverse_results_dir(self, path):
286        """Walks through the results directory and get the pathnames for the
287           status.log and the keyval files.
288
289        @param path: complete path to a specific test result directory.
290
291        @returns a dict with absolute pathnames for the 'status.log' and
292                'keyfile' files.
293
294        """
295        status = None
296        keyval = None
297
298        for root, dir_name, file_name in os.walk(path):
299            for name in file_name:
300                current_path = os.path.join(root, name)
301                if name == 'status.log' and not status:
302                       status = current_path
303                elif name == 'keyval' and ('param-debug_info' in
304                                           open(current_path).read()):
305                    # This is a keyval file for a single test and not a suite.
306                    keyval = os.path.join(root, name)
307                    break
308                else:
309                    continue
310        if not keyval:
311            raise Exception('Did Chaos tests complete successfully? Rerun tests'
312                            ' with missing results.')
313        return {'status_file': status, 'keyval_file': keyval}
314
315
316    def parse_results_dir(self):
317        """Parses each result directory.
318
319        For each results directory created by test_that, parse it and
320        create summary files.
321
322        """
323        if os.path.exists(RESULTS_DIR):
324            shutil.rmtree(RESULTS_DIR)
325        test_processed = False
326        for results_dir in os.listdir(self._test_results_dir):
327            if 'results' in results_dir:
328                path = os.path.join(self._test_results_dir, results_dir)
329                test = results_dir.split('.')[1]
330                status_key_dict = self.traverse_results_dir(path)
331                status_log_path = status_key_dict['status_file']
332                lsb_info = self.parse_keyval(status_key_dict['keyval_file'])
333                if test is not None:
334                    self.parse_status_log(lsb_info['board'],
335                                          lsb_info['version'],
336                                          test,
337                                          status_log_path)
338                    test_processed = True
339        if not test_processed:
340            raise RuntimeError('chaos_parse: Did not find any results directory'
341                               'to process')
342
343
344def main():
345    """Main function to call the parser."""
346    logging.basicConfig(level=logging.INFO)
347    arg_parser = argparse.ArgumentParser()
348    arg_parser.add_argument('-d', '--directory', dest='dir_name',
349                            help='Pathname to results generated by test_that',
350                            required=True)
351    arg_parser.add_argument('--create_file', dest='create_file',
352                            action='store_true', default=False)
353    arg_parser.add_argument('--print_config_failures',
354                            dest='print_config_failures',
355                            action='store_true',
356                            default=False)
357    arguments = arg_parser.parse_args()
358    if not arguments.dir_name:
359        raise RuntimeError('chaos_parser: No directory name supplied. Use -h'
360                           ' for help')
361    if not os.path.exists(arguments.dir_name):
362        raise RuntimeError('chaos_parser: Invalid directory name supplied.')
363    parser = ChaosParser(arguments.dir_name, arguments.create_file,
364                         arguments.print_config_failures)
365    parser.parse_results_dir()
366
367
368if __name__ == '__main__':
369    main()
370