1# Lint as: python2, python3 2# Copyright (c) 2014 The Chromium Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6 7 8import argparse 9import copy 10import csv 11import logging 12import os 13import re 14import shutil 15 16CONNECT_FAIL = object() 17CONFIG_FAIL = object() 18RESULTS_DIR = '/tmp/chaos' 19 20 21class ChaosParser(object): 22 """Defines a parser for chaos test results""" 23 24 def __init__(self, results_dir, create_file, print_config_failures): 25 """ Constructs a parser interface. 26 27 @param results_dir: complete path to restuls directory for a chaos test. 28 @param create_file: True to create csv files; False otherwise. 29 @param print_config_failures: True to print the config info to stdout; 30 False otherwise. 31 32 """ 33 self._test_results_dir = results_dir 34 self._create_file = create_file 35 self._print_config_failures = print_config_failures 36 37 38 def convert_set_to_string(self, set_list): 39 """Converts a set to a single string. 40 41 @param set_list: a set to convert 42 43 @returns a string, which is all items separated by the word 'and' 44 45 """ 46 return_string = str() 47 for i in set_list: 48 return_string += str('%s and ' % i) 49 return return_string[:-5] 50 51 52 def create_csv(self, filename, data_list): 53 """Creates a file in .csv format. 54 55 @param filename: name for the csv file 56 @param data_list: a list of all the info to write to a file 57 58 """ 59 if not os.path.exists(RESULTS_DIR): 60 os.mkdir(RESULTS_DIR) 61 try: 62 path = os.path.join(RESULTS_DIR, filename + '.csv') 63 with open(path, 'wb') as f: 64 writer = csv.writer(f) 65 writer.writerow(data_list) 66 logging.info('Created CSV file %s', path) 67 except IOError as e: 68 logging.error('File operation failed with %s: %s', e.errno, 69 e.strerror) 70 return 71 72 73 def get_ap_name(self, line): 74 """Gets the router name from the string passed. 75 76 @param line: Test ERROR string from chaos status.log 77 78 @returns the router name or brand. 79 80 """ 81 router_info = re.search('Router name: ([\w\s]+)', line) 82 return router_info.group(1) 83 84 85 def get_ap_mode_chan_freq(self, ssid): 86 """Gets the AP band from ssid using channel. 87 88 @param ssid: A valid chaos test SSID as a string 89 90 @returns the AP band, mode, and channel. 91 92 """ 93 channel_security_info = ssid.split('_') 94 channel_info = channel_security_info[-2] 95 mode = channel_security_info[-3] 96 channel = int(re.split('(\d+)', channel_info)[1]) 97 # TODO Choose if we want to keep band, we never put it in the 98 # spreadsheet and is currently unused. 99 if channel in range(1, 15): 100 band = '2.4GHz' 101 else: 102 band = '5GHz' 103 return {'mode': mode.upper(), 'channel': channel, 104 'band': band} 105 106 107 def generate_percentage_string(self, passed_tests, total_tests): 108 """Creates a pass percentage string in the formation x/y (zz%) 109 110 @param passed_tests: int of passed tests 111 @param total_tests: int of total tests 112 113 @returns a formatted string as described above. 114 115 """ 116 percent = float(passed_tests)/float(total_tests) * 100 117 percent_string = str(int(round(percent))) + '%' 118 return str('%d/%d (%s)' % (passed_tests, total_tests, percent_string)) 119 120 121 def parse_keyval(self, filepath): 122 """Parses the 'keyvalue' file to get device details. 123 124 @param filepath: the complete path to the keyval file 125 126 @returns a board with device name and OS version. 127 128 """ 129 # Android information does not exist in the keyfile, add temporary 130 # information into the dictionary. crbug.com/570408 131 lsb_dict = {'board': 'unknown', 132 'version': 'unknown'} 133 f = open(filepath, 'r') 134 for line in f: 135 line = line.split('=') 136 if 'RELEASE_BOARD' in line[0]: 137 lsb_dict = {'board':line[1].rstrip()} 138 elif 'RELEASE_VERSION' in line[0]: 139 lsb_dict['version'] = line[1].rstrip() 140 else: 141 continue 142 f.close() 143 return lsb_dict 144 145 146 def parse_status_log(self, board, os_version, security, status_log_path): 147 """Parses the entire status.log file from chaos test for test failures. 148 and creates two CSV files for connect fail and configuration fail 149 respectively. 150 151 @param board: the board the test was run against as a string 152 @param os_version: the version of ChromeOS as a string 153 @param security: the security used during the test as a string 154 @param status_log_path: complete path to the status.log file 155 156 """ 157 # Items that can have multiple values 158 modes = list() 159 channels = list() 160 test_fail_aps = list() 161 static_config_failures = list() 162 dynamic_config_failures = list() 163 kernel_version = "" 164 fw_version = "" 165 f = open(status_log_path, 'r') 166 total = 0 167 for line in f: 168 line = line.strip() 169 if line.startswith('START\tnetwork_WiFi'): 170 # Do not count PDU failures in total tests run. 171 if 'PDU' in line: 172 continue 173 total += 1 174 elif 'kernel_version' in line: 175 kernel_version = re.search('[\d.]+', line).group(0) 176 elif 'firmware_version' in line: 177 fw_version = re.search('firmware_version\': \'([\w\s:().]+)', 178 line).group(1) 179 elif line.startswith('ERROR') or line.startswith('FAIL'): 180 title_info = line.split() 181 if 'reboot' in title_info: 182 continue 183 # Get the hostname for the AP that failed configuration. 184 if 'PDU' in title_info[1]: 185 continue 186 else: 187 # Get the router name, band for the AP that failed 188 # connect. 189 if 'Config' in title_info[1]: 190 failure_type = CONFIG_FAIL 191 else: 192 failure_type = CONNECT_FAIL 193 194 if (failure_type == CONFIG_FAIL and 195 'chromeos' in title_info[1]): 196 ssid = title_info[1].split('.')[1].split('_')[0] 197 else: 198 ssid_info = title_info[1].split('.') 199 ssid = ssid_info[1] 200 network_dict = self.get_ap_mode_chan_freq(ssid) 201 modes.append(network_dict['mode']) 202 channels.append(network_dict['channel']) 203 204 # Security mismatches and Ping failures are not connect 205 # failures. 206 if (('Ping command' in line or 'correct security' in line) 207 or failure_type == CONFIG_FAIL): 208 if 'StaticAPConfigurator' in line: 209 static_config_failures.append(ssid) 210 else: 211 dynamic_config_failures.append(ssid) 212 else: 213 test_fail_aps.append(ssid) 214 elif ('END GOOD' in line and ('ChaosConnectDisconnect' in line or 215 'ChaosLongConnect' in line)): 216 test_name = line.split()[2] 217 ssid = test_name.split('.')[1] 218 network_dict = self.get_ap_mode_chan_freq(ssid) 219 modes.append(network_dict['mode']) 220 channels.append(network_dict['channel']) 221 else: 222 continue 223 224 config_pass = total - (len(dynamic_config_failures) + 225 len(static_config_failures)) 226 config_pass_string = self.generate_percentage_string(config_pass, 227 total) 228 connect_pass = config_pass - len(test_fail_aps) 229 connect_pass_string = self.generate_percentage_string(connect_pass, 230 config_pass) 231 232 base_csv_list = [board, os_version, fw_version, kernel_version, 233 self.convert_set_to_string(set(modes)), 234 self.convert_set_to_string(set(channels)), 235 security] 236 237 static_config_csv_list = copy.deepcopy(base_csv_list) 238 static_config_csv_list.append(config_pass_string) 239 static_config_csv_list.extend(static_config_failures) 240 241 dynamic_config_csv_list = copy.deepcopy(base_csv_list) 242 dynamic_config_csv_list.append(config_pass_string) 243 dynamic_config_csv_list.extend(dynamic_config_failures) 244 245 connect_csv_list = copy.deepcopy(base_csv_list) 246 connect_csv_list.append(connect_pass_string) 247 connect_csv_list.extend(test_fail_aps) 248 249 print('Connect failure for security: %s' % security) 250 print(','.join(connect_csv_list)) 251 print('\n') 252 253 if self._print_config_failures: 254 config_files = [('Static', static_config_csv_list), 255 ('Dynamic', dynamic_config_csv_list)] 256 for config_data in config_files: 257 self.print_config_failures(config_data[0], security, 258 config_data[1]) 259 260 if self._create_file: 261 self.create_csv('chaos_WiFi_dynamic_config_fail.' + security, 262 dynamic_config_csv_list) 263 self.create_csv('chaos_WiFi_static_config_fail.' + security, 264 static_config_csv_list) 265 self.create_csv('chaos_WiFi_connect_fail.' + security, 266 connect_csv_list) 267 268 269 def print_config_failures(self, config_type, security, config_csv_list): 270 """Prints out the configuration failures. 271 272 @param config_type: string describing the configurator type 273 @param security: the security type as a string 274 @param config_csv_list: list of the configuration failures 275 276 """ 277 # 8 because that is the lenth of the base list 278 if len(config_csv_list) <= 8: 279 return 280 print('%s config failures for security: %s' % (config_type, security)) 281 print(','.join(config_csv_list)) 282 print('\n') 283 284 285 def traverse_results_dir(self, path): 286 """Walks through the results directory and get the pathnames for the 287 status.log and the keyval files. 288 289 @param path: complete path to a specific test result directory. 290 291 @returns a dict with absolute pathnames for the 'status.log' and 292 'keyfile' files. 293 294 """ 295 status = None 296 keyval = None 297 298 for root, dir_name, file_name in os.walk(path): 299 for name in file_name: 300 current_path = os.path.join(root, name) 301 if name == 'status.log' and not status: 302 status = current_path 303 elif name == 'keyval' and ('param-debug_info' in 304 open(current_path).read()): 305 # This is a keyval file for a single test and not a suite. 306 keyval = os.path.join(root, name) 307 break 308 else: 309 continue 310 if not keyval: 311 raise Exception('Did Chaos tests complete successfully? Rerun tests' 312 ' with missing results.') 313 return {'status_file': status, 'keyval_file': keyval} 314 315 316 def parse_results_dir(self): 317 """Parses each result directory. 318 319 For each results directory created by test_that, parse it and 320 create summary files. 321 322 """ 323 if os.path.exists(RESULTS_DIR): 324 shutil.rmtree(RESULTS_DIR) 325 test_processed = False 326 for results_dir in os.listdir(self._test_results_dir): 327 if 'results' in results_dir: 328 path = os.path.join(self._test_results_dir, results_dir) 329 test = results_dir.split('.')[1] 330 status_key_dict = self.traverse_results_dir(path) 331 status_log_path = status_key_dict['status_file'] 332 lsb_info = self.parse_keyval(status_key_dict['keyval_file']) 333 if test is not None: 334 self.parse_status_log(lsb_info['board'], 335 lsb_info['version'], 336 test, 337 status_log_path) 338 test_processed = True 339 if not test_processed: 340 raise RuntimeError('chaos_parse: Did not find any results directory' 341 'to process') 342 343 344def main(): 345 """Main function to call the parser.""" 346 logging.basicConfig(level=logging.INFO) 347 arg_parser = argparse.ArgumentParser() 348 arg_parser.add_argument('-d', '--directory', dest='dir_name', 349 help='Pathname to results generated by test_that', 350 required=True) 351 arg_parser.add_argument('--create_file', dest='create_file', 352 action='store_true', default=False) 353 arg_parser.add_argument('--print_config_failures', 354 dest='print_config_failures', 355 action='store_true', 356 default=False) 357 arguments = arg_parser.parse_args() 358 if not arguments.dir_name: 359 raise RuntimeError('chaos_parser: No directory name supplied. Use -h' 360 ' for help') 361 if not os.path.exists(arguments.dir_name): 362 raise RuntimeError('chaos_parser: Invalid directory name supplied.') 363 parser = ChaosParser(arguments.dir_name, arguments.create_file, 364 arguments.print_config_failures) 365 parser.parse_results_dir() 366 367 368if __name__ == '__main__': 369 main() 370