1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import json 18import os 19import time 20from collections import defaultdict 21 22from acts.metrics.loggers.blackbox import BlackboxMetricLogger 23from acts_contrib.test_utils.bt.bt_test_utils import disable_bluetooth 24from acts_contrib.test_utils.coex.audio_test_utils import AudioCaptureResult 25from acts_contrib.test_utils.coex.audio_test_utils import get_audio_capture_device 26from acts_contrib.test_utils.coex.CoexBaseTest import CoexBaseTest 27from acts_contrib.test_utils.coex.coex_test_utils import bokeh_chart_plot 28from acts_contrib.test_utils.coex.coex_test_utils import collect_bluetooth_manager_dumpsys_logs 29from acts_contrib.test_utils.coex.coex_test_utils import multithread_func 30from acts_contrib.test_utils.coex.coex_test_utils import wifi_connection_check 31from acts_contrib.test_utils.wifi.wifi_test_utils import wifi_connect 32from acts_contrib.test_utils.wifi.wifi_test_utils import wifi_test_device_init 33from acts.utils import get_current_epoch_time 34 35RSSI_POLL_RESULTS = 'Monitoring , Handle: 0x0003, POLL' 36RSSI_RESULTS = 'Monitoring , Handle: 0x0003, ' 37 38 39def get_atten_range(start, stop, step): 40 """Function to derive attenuation range for tests. 41 42 Args: 43 start: Start attenuation value. 44 stop: Stop attenuation value. 45 step: Step attenuation value. 46 """ 47 temp = start 48 while temp < stop: 49 yield temp 50 temp += step 51 52 53class CoexPerformanceBaseTest(CoexBaseTest): 54 """Base test class for performance tests. 55 56 Attributes: 57 rvr : Dict to save attenuation, throughput, fixed_attenuation. 58 a2dp_streaming : Used to denote a2dp test cases. 59 """ 60 61 def __init__(self, controllers): 62 super().__init__(controllers) 63 self.a2dp_streaming = False 64 self.rvr = {} 65 self.bt_range_metric = BlackboxMetricLogger.for_test_case( 66 metric_name='bt_range') 67 self.wifi_max_atten_metric = BlackboxMetricLogger.for_test_case( 68 metric_name='wifi_max_atten') 69 self.wifi_min_atten_metric = BlackboxMetricLogger.for_test_case( 70 metric_name='wifi_min_atten') 71 self.wifi_range_metric = BlackboxMetricLogger.for_test_case( 72 metric_name='wifi_range_metric') 73 74 def setup_class(self): 75 req_params = ['test_params', 'Attenuator'] 76 opt_params = ['audio_params'] 77 self.unpack_userparams(req_params, opt_params) 78 if hasattr(self, 'Attenuator'): 79 self.num_atten = self.attenuators[0].instrument.num_atten 80 else: 81 self.log.error('Attenuator should be connected to run tests.') 82 return False 83 for i in range(self.num_atten): 84 self.attenuators[i].set_atten(0) 85 super().setup_class() 86 self.performance_files_list = [] 87 if "performance_result_path" in self.user_params["test_params"]: 88 self.performance_files_list = [ 89 os.path.join(self.test_params["performance_result_path"], 90 files) for files in os.listdir( 91 self.test_params["performance_result_path"]) 92 ] 93 self.bt_atten_range = list(get_atten_range( 94 self.test_params["bt_atten_start"], 95 self.test_params["bt_atten_stop"], 96 self.test_params["bt_atten_step"])) 97 self.wifi_atten_range = list(get_atten_range( 98 self.test_params["attenuation_start"], 99 self.test_params["attenuation_stop"], 100 self.test_params["attenuation_step"])) 101 102 def setup_test(self): 103 if ('a2dp_streaming' in self.current_test_name and 104 hasattr(self, 'audio_params')): 105 self.audio = get_audio_capture_device(self.sec_ad, self.audio_params) 106 self.a2dp_streaming = True 107 for i in range(self.num_atten): 108 self.attenuators[i].set_atten(0) 109 if not wifi_connection_check(self.pri_ad, self.network["SSID"]): 110 wifi_connect(self.pri_ad, self.network, num_of_tries=5) 111 super().setup_test() 112 113 def teardown_test(self): 114 self.performance_baseline_check() 115 for i in range(self.num_atten): 116 self.attenuators[i].set_atten(0) 117 current_atten = int(self.attenuators[i].get_atten()) 118 self.log.debug( 119 "Setting attenuation to zero : Current atten {} : {}".format( 120 self.attenuators[i], current_atten)) 121 self.a2dp_streaming = False 122 if not disable_bluetooth(self.pri_ad.droid): 123 self.log.info("Failed to disable bluetooth") 124 return False 125 self.destroy_android_and_relay_object() 126 self.rvr = {} 127 128 def teardown_class(self): 129 self.reset_wifi_and_store_results() 130 131 def set_attenuation_and_run_iperf(self, called_func=None): 132 """Sets attenuation and runs iperf for Attenuation max value. 133 134 Args: 135 called_func : Function object to run. 136 137 Returns: 138 True if Pass 139 False if Fail 140 """ 141 self.attenuators[self.num_atten - 1].set_atten(0) 142 self.rvr["bt_attenuation"] = [] 143 self.rvr["test_name"] = self.current_test_name 144 self.rvr["bt_gap_analysis"] = {} 145 self.rvr["bt_range"] = [] 146 status_flag = True 147 for bt_atten in self.bt_atten_range: 148 self.rvr[bt_atten] = {} 149 self.rvr[bt_atten]["fixed_attenuation"] = ( 150 self.test_params["fixed_attenuation"][str( 151 self.network["channel"])]) 152 self.log.info('Setting bt attenuation to: {} dB'.format(bt_atten)) 153 self.attenuators[self.num_atten - 1].set_atten(bt_atten) 154 for i in range(self.num_atten - 1): 155 self.attenuators[i].set_atten(0) 156 if not wifi_connection_check(self.pri_ad, self.network["SSID"]): 157 wifi_test_device_init(self.pri_ad) 158 wifi_connect(self.pri_ad, self.network, num_of_tries=5) 159 adb_rssi_results = self.pri_ad.search_logcat(RSSI_RESULTS) 160 if adb_rssi_results: 161 self.log.debug(adb_rssi_results[-1]) 162 self.log.info('Android device: {}'.format( 163 (adb_rssi_results[-1]['log_message']).split(',')[5])) 164 (self.rvr[bt_atten]["throughput_received"], 165 self.rvr[bt_atten]["a2dp_packet_drop"], 166 status_flag) = self.rvr_throughput(bt_atten, called_func) 167 self.wifi_max_atten_metric.metric_value = max(self.rvr[bt_atten] 168 ["attenuation"]) 169 self.wifi_min_atten_metric.metric_value = min(self.rvr[bt_atten] 170 ["attenuation"]) 171 172 if self.rvr[bt_atten]["throughput_received"]: 173 for i, atten in enumerate(self.rvr[bt_atten]["attenuation"]): 174 if self.rvr[bt_atten]["throughput_received"][i] < 1.0: 175 self.wifi_range_metric.metric_value = ( 176 self.rvr[bt_atten]["attenuation"][i-1]) 177 break 178 else: 179 self.wifi_range_metric.metric_value = max( 180 self.rvr[bt_atten]["attenuation"]) 181 else: 182 self.wifi_range_metric.metric_value = max( 183 self.rvr[bt_atten]["attenuation"]) 184 if self.a2dp_streaming: 185 if not any(x > 0 for x in self.a2dp_dropped_list): 186 self.rvr[bt_atten]["a2dp_packet_drop"] = [] 187 if not self.rvr["bt_range"]: 188 self.rvr["bt_range"].append(0) 189 return status_flag 190 191 def rvr_throughput(self, bt_atten, called_func=None): 192 """Sets attenuation and runs the function passed. 193 194 Args: 195 bt_atten: Bluetooth attenuation. 196 called_func: Functions object to run parallely. 197 198 Returns: 199 Throughput, a2dp_drops and True/False. 200 """ 201 self.iperf_received = [] 202 self.iperf_variables.received = [] 203 self.a2dp_dropped_list = [] 204 self.rvr["bt_attenuation"].append(bt_atten) 205 self.rvr[bt_atten]["audio_artifacts"] = {} 206 self.rvr[bt_atten]["attenuation"] = [] 207 self.rvr["bt_gap_analysis"][bt_atten] = {} 208 for atten in self.wifi_atten_range: 209 tag = '{}_{}'.format(bt_atten, atten) 210 self.rvr[bt_atten]["attenuation"].append( 211 atten + self.rvr[bt_atten]["fixed_attenuation"]) 212 self.log.info('Setting wifi attenuation to: {} dB'.format(atten)) 213 for i in range(self.num_atten - 1): 214 self.attenuators[i].set_atten(atten) 215 if not wifi_connection_check(self.pri_ad, self.network["SSID"]): 216 self.iperf_received.append(0) 217 return self.iperf_received, self.a2dp_dropped_list, False 218 time.sleep(5) # Time for attenuation to set. 219 begin_time = get_current_epoch_time() 220 if self.a2dp_streaming: 221 self.audio.start() 222 if called_func: 223 if not multithread_func(self.log, called_func): 224 self.iperf_received.append(float(str( 225 self.iperf_variables.received[-1]).strip("Mb/s"))) 226 return self.iperf_received, self.a2dp_dropped_list, False 227 else: 228 self.run_iperf_and_get_result() 229 230 adb_rssi_poll_results = self.pri_ad.search_logcat( 231 RSSI_POLL_RESULTS, begin_time) 232 adb_rssi_results = self.pri_ad.search_logcat( 233 RSSI_RESULTS, begin_time) 234 if adb_rssi_results: 235 self.log.debug(adb_rssi_poll_results) 236 self.log.debug(adb_rssi_results[-1]) 237 self.log.info('Android device: {}'.format(( 238 adb_rssi_results[-1]['log_message']).split(',')[5])) 239 if self.a2dp_streaming: 240 self.path = self.audio.stop() 241 analysis_path = AudioCaptureResult( 242 self.path).audio_quality_analysis(self.audio_params) 243 with open(analysis_path) as f: 244 self.rvr[bt_atten]["audio_artifacts"][atten] = f.readline() 245 content = json.loads(self.rvr[bt_atten]["audio_artifacts"][atten]) 246 self.rvr["bt_gap_analysis"][bt_atten][atten] = {} 247 for idx, data in enumerate(content["quality_result"]): 248 if data['artifacts']['delay_during_playback']: 249 self.rvr["bt_gap_analysis"][bt_atten][atten][idx] = ( 250 data['artifacts']['delay_during_playback']) 251 self.rvr["bt_range"].append(bt_atten) 252 else: 253 self.rvr["bt_gap_analysis"][bt_atten][atten][idx] = 0 254 file_path = collect_bluetooth_manager_dumpsys_logs( 255 self.pri_ad, self.current_test_name) 256 self.a2dp_dropped_list.append( 257 self.a2dp_dumpsys.parse(file_path)) 258 self.iperf_received.append( 259 float(str(self.iperf_variables.throughput[-1]).strip("Mb/s"))) 260 for i in range(self.num_atten - 1): 261 self.attenuators[i].set_atten(0) 262 return self.iperf_received, self.a2dp_dropped_list, True 263 264 def performance_baseline_check(self): 265 """Checks for performance_result_path in config. If present, plots 266 comparision chart else plot chart for that particular test run. 267 268 Returns: 269 True if success, False otherwise. 270 """ 271 if self.rvr: 272 with open(self.json_file, 'a') as results_file: 273 json.dump({str(k): v for k, v in self.rvr.items()}, 274 results_file, indent=4, sort_keys=True) 275 self.bt_range_metric.metric_value = self.rvr["bt_range"][0] 276 self.log.info('First occurrence of audio gap in bt ' 277 'range: {}'.format(self.bt_range_metric.metric_value)) 278 self.log.info('Bluetooth min range: ' 279 '{} dB'.format(min(self.rvr['bt_attenuation']))) 280 self.log.info('Bluetooth max range: ' 281 '{} dB'.format(max(self.rvr['bt_attenuation']))) 282 self.plot_graph_for_attenuation() 283 if not self.performance_files_list: 284 self.log.warning('Performance file list is empty. Could not ' 285 'calculate throughput limits') 286 return 287 self.throughput_pass_fail_check() 288 else: 289 self.log.error("Throughput dict empty!") 290 return False 291 return True 292 293 def plot_graph_for_attenuation(self): 294 """Plots graph and add as JSON formatted results for attenuation with 295 respect to its iperf values. 296 """ 297 data_sets = defaultdict(dict) 298 legends = defaultdict(list) 299 300 x_label = 'WIFI Attenuation (dB)' 301 y_label = [] 302 303 fig_property = { 304 "title": self.current_test_name, 305 "x_label": x_label, 306 "linewidth": 3, 307 "markersize": 10 308 } 309 310 for bt_atten in self.rvr["bt_attenuation"]: 311 y_label.insert(0, 'Throughput (Mbps)') 312 legends[bt_atten].insert( 313 0, str("BT Attenuation @ %sdB" % bt_atten)) 314 data_sets[bt_atten]["attenuation"] = ( 315 self.rvr[bt_atten]["attenuation"]) 316 data_sets[bt_atten]["throughput_received"] = ( 317 self.rvr[bt_atten]["throughput_received"]) 318 319 if self.a2dp_streaming: 320 for bt_atten in self.bt_atten_range: 321 legends[bt_atten].insert( 322 0, ('Packet drops(in %) @ {}dB'.format(bt_atten))) 323 data_sets[bt_atten]["a2dp_attenuation"] = ( 324 self.rvr[bt_atten]["attenuation"]) 325 data_sets[bt_atten]["a2dp_packet_drops"] = ( 326 self.rvr[bt_atten]["a2dp_packet_drop"]) 327 y_label.insert(0, "Packets Dropped") 328 fig_property["y_label"] = y_label 329 shaded_region = None 330 331 if "performance_result_path" in self.user_params["test_params"]: 332 shaded_region = self.comparision_results_calculation(data_sets, legends) 333 334 output_file_path = os.path.join(self.pri_ad.log_path, 335 self.current_test_name, 336 "attenuation_plot.html") 337 bokeh_chart_plot(list(self.rvr["bt_attenuation"]), 338 data_sets, 339 legends, 340 fig_property, 341 shaded_region=shaded_region, 342 output_file_path=output_file_path) 343 344 def comparision_results_calculation(self, data_sets, legends): 345 """Compares rvr results with baseline values by calculating throughput 346 limits. 347 348 Args: 349 data_sets: including lists of x_data and lists of y_data. 350 ex: [[[x_data1], [x_data2]], [[y_data1],[y_data2]]] 351 legends: list of legend for each curve. 352 353 Returns: 354 None if test_file is not found, otherwise shaded_region 355 will be returned. 356 """ 357 try: 358 attenuation_path = next( 359 file_name for file_name in self.performance_files_list 360 if self.current_test_name in file_name 361 ) 362 except StopIteration: 363 self.log.warning("Test_file not found. " 364 "No comparision values to calculate") 365 return 366 with open(attenuation_path, 'r') as throughput_file: 367 throughput_results = json.load(throughput_file) 368 for bt_atten in self.bt_atten_range: 369 throughput_received = [] 370 user_attenuation = [] 371 legends[bt_atten].insert( 372 0, ('Performance Results @ {}dB'.format(bt_atten))) 373 for att in self.rvr[bt_atten]["attenuation"]: 374 attenuation = att - self.rvr[bt_atten]["fixed_attenuation"] 375 throughput_received.append(throughput_results[str(bt_atten)] 376 ["throughput_received"][attenuation]) 377 user_attenuation.append(att) 378 data_sets[bt_atten][ 379 "user_attenuation"] = user_attenuation 380 data_sets[bt_atten]["user_throughput"] = throughput_received 381 throughput_limits = self.get_throughput_limits(attenuation_path) 382 shaded_region = defaultdict(dict) 383 for bt_atten in self.bt_atten_range: 384 shaded_region[bt_atten] = { 385 "x_vector": throughput_limits[bt_atten]["attenuation"], 386 "lower_limit": 387 throughput_limits[bt_atten]["lower_limit"], 388 "upper_limit": 389 throughput_limits[bt_atten]["upper_limit"] 390 } 391 return shaded_region 392 393 def total_attenuation(self, performance_dict): 394 """Calculates attenuation with adding fixed attenuation. 395 396 Args: 397 performance_dict: dict containing attenuation and fixed attenuation. 398 399 Returns: 400 Total attenuation is returned. 401 """ 402 if "fixed_attenuation" in self.test_params: 403 total_atten = [ 404 att + performance_dict["fixed_attenuation"] 405 for att in performance_dict["attenuation"] 406 ] 407 return total_atten 408 409 def throughput_pass_fail_check(self): 410 """Check the test result and decide if it passed or failed 411 by comparing with throughput limits.The pass/fail tolerances are 412 provided in the config file. 413 414 Returns: 415 None if test_file is not found, True if successful, 416 False otherwise. 417 """ 418 try: 419 performance_path = next( 420 file_name for file_name in self.performance_files_list 421 if self.current_test_name in file_name 422 ) 423 except StopIteration: 424 self.log.warning("Test_file not found. Couldn't " 425 "calculate throughput limits") 426 return 427 throughput_limits = self.get_throughput_limits(performance_path) 428 429 failure_count = 0 430 for bt_atten in self.bt_atten_range: 431 for idx, current_throughput in enumerate( 432 self.rvr[bt_atten]["throughput_received"]): 433 current_att = self.rvr[bt_atten]["attenuation"][idx] 434 if (current_throughput < 435 (throughput_limits[bt_atten]["lower_limit"][idx]) or 436 current_throughput > 437 (throughput_limits[bt_atten]["upper_limit"][idx])): 438 failure_count = failure_count + 1 439 self.log.info( 440 "Throughput at {} dB attenuation is beyond limits. " 441 "Throughput is {} Mbps. Expected within [{}, {}] Mbps.". 442 format( 443 current_att, current_throughput, 444 throughput_limits[bt_atten]["lower_limit"][idx], 445 throughput_limits[bt_atten]["upper_limit"][ 446 idx])) 447 if failure_count >= self.test_params["failure_count_tolerance"]: 448 self.log.error( 449 "Test failed. Found {} points outside throughput limits.". 450 format(failure_count)) 451 return False 452 self.log.info( 453 "Test passed. Found {} points outside throughput limits.". 454 format(failure_count)) 455 return True 456 457 def get_throughput_limits(self, performance_path): 458 """Compute throughput limits for current test. 459 460 Checks the RvR test result and compares to a throughput limits for 461 the same configuration. The pass/fail tolerances are provided in the 462 config file. 463 464 Args: 465 performance_path: path to baseline file used to generate limits 466 467 Returns: 468 throughput_limits: dict containing attenuation and throughput 469 limit data 470 """ 471 with open(performance_path, 'r') as performance_file: 472 performance_results = json.load(performance_file) 473 throughput_limits = defaultdict(dict) 474 for bt_atten in self.bt_atten_range: 475 performance_attenuation = (self.total_attenuation( 476 performance_results[str(bt_atten)])) 477 attenuation = [] 478 lower_limit = [] 479 upper_limit = [] 480 for idx, current_throughput in enumerate( 481 self.rvr[bt_atten]["throughput_received"]): 482 current_att = self.rvr[bt_atten]["attenuation"][idx] 483 att_distances = [ 484 abs(current_att - performance_att) 485 for performance_att in performance_attenuation 486 ] 487 sorted_distances = sorted( 488 enumerate(att_distances), key=lambda x: x[1]) 489 closest_indeces = [dist[0] for dist in sorted_distances[0:3]] 490 closest_throughputs = [ 491 performance_results[str(bt_atten)]["throughput_received"][ 492 index] for index in closest_indeces 493 ] 494 closest_throughputs.sort() 495 attenuation.append(current_att) 496 lower_limit.append( 497 max(closest_throughputs[0] - 498 max(self.test_params["abs_tolerance"], 499 closest_throughputs[0] * 500 self.test_params["pct_tolerance"] / 100), 0)) 501 upper_limit.append(closest_throughputs[-1] + max( 502 self.test_params["abs_tolerance"], closest_throughputs[-1] * 503 self.test_params["pct_tolerance"] / 100)) 504 throughput_limits[bt_atten]["attenuation"] = attenuation 505 throughput_limits[bt_atten]["lower_limit"] = lower_limit 506 throughput_limits[bt_atten]["upper_limit"] = upper_limit 507 return throughput_limits 508 509