1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import json
18import os
19import time
20from collections import defaultdict
21
22from acts.metrics.loggers.blackbox import BlackboxMetricLogger
23from acts_contrib.test_utils.bt.bt_test_utils import disable_bluetooth
24from acts_contrib.test_utils.coex.audio_test_utils import AudioCaptureResult
25from acts_contrib.test_utils.coex.audio_test_utils import get_audio_capture_device
26from acts_contrib.test_utils.coex.CoexBaseTest import CoexBaseTest
27from acts_contrib.test_utils.coex.coex_test_utils import bokeh_chart_plot
28from acts_contrib.test_utils.coex.coex_test_utils import collect_bluetooth_manager_dumpsys_logs
29from acts_contrib.test_utils.coex.coex_test_utils import multithread_func
30from acts_contrib.test_utils.coex.coex_test_utils import wifi_connection_check
31from acts_contrib.test_utils.wifi.wifi_test_utils import wifi_connect
32from acts_contrib.test_utils.wifi.wifi_test_utils import wifi_test_device_init
33from acts.utils import get_current_epoch_time
34
35RSSI_POLL_RESULTS = 'Monitoring , Handle: 0x0003, POLL'
36RSSI_RESULTS = 'Monitoring , Handle: 0x0003, '
37
38
39def get_atten_range(start, stop, step):
40    """Function to derive attenuation range for tests.
41
42    Args:
43        start: Start attenuation value.
44        stop: Stop attenuation value.
45        step: Step attenuation value.
46    """
47    temp = start
48    while temp < stop:
49        yield temp
50        temp += step
51
52
53class CoexPerformanceBaseTest(CoexBaseTest):
54    """Base test class for performance tests.
55
56    Attributes:
57        rvr : Dict to save attenuation, throughput, fixed_attenuation.
58        a2dp_streaming : Used to denote a2dp test cases.
59    """
60
61    def __init__(self, controllers):
62        super().__init__(controllers)
63        self.a2dp_streaming = False
64        self.rvr = {}
65        self.bt_range_metric = BlackboxMetricLogger.for_test_case(
66            metric_name='bt_range')
67        self.wifi_max_atten_metric = BlackboxMetricLogger.for_test_case(
68            metric_name='wifi_max_atten')
69        self.wifi_min_atten_metric = BlackboxMetricLogger.for_test_case(
70            metric_name='wifi_min_atten')
71        self.wifi_range_metric = BlackboxMetricLogger.for_test_case(
72            metric_name='wifi_range_metric')
73
74    def setup_class(self):
75        req_params = ['test_params', 'Attenuator']
76        opt_params = ['audio_params']
77        self.unpack_userparams(req_params, opt_params)
78        if hasattr(self, 'Attenuator'):
79            self.num_atten = self.attenuators[0].instrument.num_atten
80        else:
81            self.log.error('Attenuator should be connected to run tests.')
82            return False
83        for i in range(self.num_atten):
84            self.attenuators[i].set_atten(0)
85        super().setup_class()
86        self.performance_files_list = []
87        if "performance_result_path" in self.user_params["test_params"]:
88            self.performance_files_list = [
89                os.path.join(self.test_params["performance_result_path"],
90                             files) for files in os.listdir(
91                                 self.test_params["performance_result_path"])
92            ]
93        self.bt_atten_range = list(get_atten_range(
94                            self.test_params["bt_atten_start"],
95                            self.test_params["bt_atten_stop"],
96                            self.test_params["bt_atten_step"]))
97        self.wifi_atten_range = list(get_atten_range(
98                            self.test_params["attenuation_start"],
99                            self.test_params["attenuation_stop"],
100                            self.test_params["attenuation_step"]))
101
102    def setup_test(self):
103        if ('a2dp_streaming' in self.current_test_name and
104                hasattr(self, 'audio_params')):
105            self.audio = get_audio_capture_device(self.sec_ad, self.audio_params)
106            self.a2dp_streaming = True
107        for i in range(self.num_atten):
108            self.attenuators[i].set_atten(0)
109        if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
110            wifi_connect(self.pri_ad, self.network, num_of_tries=5)
111        super().setup_test()
112
113    def teardown_test(self):
114        self.performance_baseline_check()
115        for i in range(self.num_atten):
116            self.attenuators[i].set_atten(0)
117            current_atten = int(self.attenuators[i].get_atten())
118            self.log.debug(
119                "Setting attenuation to zero : Current atten {} : {}".format(
120                    self.attenuators[i], current_atten))
121        self.a2dp_streaming = False
122        if not disable_bluetooth(self.pri_ad.droid):
123            self.log.info("Failed to disable bluetooth")
124            return False
125        self.destroy_android_and_relay_object()
126        self.rvr = {}
127
128    def teardown_class(self):
129        self.reset_wifi_and_store_results()
130
131    def set_attenuation_and_run_iperf(self, called_func=None):
132        """Sets attenuation and runs iperf for Attenuation max value.
133
134        Args:
135            called_func : Function object to run.
136
137        Returns:
138            True if Pass
139            False if Fail
140        """
141        self.attenuators[self.num_atten - 1].set_atten(0)
142        self.rvr["bt_attenuation"] = []
143        self.rvr["test_name"] = self.current_test_name
144        self.rvr["bt_gap_analysis"] = {}
145        self.rvr["bt_range"] = []
146        status_flag = True
147        for bt_atten in self.bt_atten_range:
148            self.rvr[bt_atten] = {}
149            self.rvr[bt_atten]["fixed_attenuation"] = (
150                self.test_params["fixed_attenuation"][str(
151                    self.network["channel"])])
152            self.log.info('Setting bt attenuation to: {} dB'.format(bt_atten))
153            self.attenuators[self.num_atten - 1].set_atten(bt_atten)
154            for i in range(self.num_atten - 1):
155                self.attenuators[i].set_atten(0)
156            if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
157                wifi_test_device_init(self.pri_ad)
158                wifi_connect(self.pri_ad, self.network, num_of_tries=5)
159            adb_rssi_results = self.pri_ad.search_logcat(RSSI_RESULTS)
160            if adb_rssi_results:
161                self.log.debug(adb_rssi_results[-1])
162                self.log.info('Android device: {}'.format(
163                    (adb_rssi_results[-1]['log_message']).split(',')[5]))
164            (self.rvr[bt_atten]["throughput_received"],
165             self.rvr[bt_atten]["a2dp_packet_drop"],
166             status_flag) = self.rvr_throughput(bt_atten, called_func)
167            self.wifi_max_atten_metric.metric_value = max(self.rvr[bt_atten]
168                                                          ["attenuation"])
169            self.wifi_min_atten_metric.metric_value = min(self.rvr[bt_atten]
170                                                          ["attenuation"])
171
172            if self.rvr[bt_atten]["throughput_received"]:
173                for i, atten in enumerate(self.rvr[bt_atten]["attenuation"]):
174                    if self.rvr[bt_atten]["throughput_received"][i] < 1.0:
175                        self.wifi_range_metric.metric_value = (
176                            self.rvr[bt_atten]["attenuation"][i-1])
177                        break
178                else:
179                    self.wifi_range_metric.metric_value = max(
180                        self.rvr[bt_atten]["attenuation"])
181            else:
182                self.wifi_range_metric.metric_value = max(
183                    self.rvr[bt_atten]["attenuation"])
184            if self.a2dp_streaming:
185                if not any(x > 0 for x in self.a2dp_dropped_list):
186                    self.rvr[bt_atten]["a2dp_packet_drop"] = []
187        if not self.rvr["bt_range"]:
188            self.rvr["bt_range"].append(0)
189        return status_flag
190
191    def rvr_throughput(self, bt_atten, called_func=None):
192        """Sets attenuation and runs the function passed.
193
194        Args:
195            bt_atten: Bluetooth attenuation.
196            called_func: Functions object to run parallely.
197
198        Returns:
199            Throughput, a2dp_drops and True/False.
200        """
201        self.iperf_received = []
202        self.iperf_variables.received = []
203        self.a2dp_dropped_list = []
204        self.rvr["bt_attenuation"].append(bt_atten)
205        self.rvr[bt_atten]["audio_artifacts"] = {}
206        self.rvr[bt_atten]["attenuation"] = []
207        self.rvr["bt_gap_analysis"][bt_atten] = {}
208        for atten in self.wifi_atten_range:
209            tag = '{}_{}'.format(bt_atten, atten)
210            self.rvr[bt_atten]["attenuation"].append(
211                atten + self.rvr[bt_atten]["fixed_attenuation"])
212            self.log.info('Setting wifi attenuation to: {} dB'.format(atten))
213            for i in range(self.num_atten - 1):
214                self.attenuators[i].set_atten(atten)
215            if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
216                self.iperf_received.append(0)
217                return self.iperf_received, self.a2dp_dropped_list, False
218            time.sleep(5)  # Time for attenuation to set.
219            begin_time = get_current_epoch_time()
220            if self.a2dp_streaming:
221                self.audio.start()
222            if called_func:
223                if not multithread_func(self.log, called_func):
224                    self.iperf_received.append(float(str(
225                        self.iperf_variables.received[-1]).strip("Mb/s")))
226                    return self.iperf_received, self.a2dp_dropped_list, False
227            else:
228                self.run_iperf_and_get_result()
229
230            adb_rssi_poll_results = self.pri_ad.search_logcat(
231                RSSI_POLL_RESULTS, begin_time)
232            adb_rssi_results = self.pri_ad.search_logcat(
233                RSSI_RESULTS, begin_time)
234            if adb_rssi_results:
235                self.log.debug(adb_rssi_poll_results)
236                self.log.debug(adb_rssi_results[-1])
237                self.log.info('Android device: {}'.format((
238                    adb_rssi_results[-1]['log_message']).split(',')[5]))
239            if self.a2dp_streaming:
240                self.path = self.audio.stop()
241                analysis_path = AudioCaptureResult(
242                    self.path).audio_quality_analysis(self.audio_params)
243                with open(analysis_path) as f:
244                    self.rvr[bt_atten]["audio_artifacts"][atten] = f.readline()
245                content = json.loads(self.rvr[bt_atten]["audio_artifacts"][atten])
246                self.rvr["bt_gap_analysis"][bt_atten][atten] = {}
247                for idx, data in enumerate(content["quality_result"]):
248                    if data['artifacts']['delay_during_playback']:
249                        self.rvr["bt_gap_analysis"][bt_atten][atten][idx] = (
250                                data['artifacts']['delay_during_playback'])
251                        self.rvr["bt_range"].append(bt_atten)
252                    else:
253                        self.rvr["bt_gap_analysis"][bt_atten][atten][idx] = 0
254                file_path = collect_bluetooth_manager_dumpsys_logs(
255                    self.pri_ad, self.current_test_name)
256                self.a2dp_dropped_list.append(
257                    self.a2dp_dumpsys.parse(file_path))
258            self.iperf_received.append(
259                    float(str(self.iperf_variables.throughput[-1]).strip("Mb/s")))
260        for i in range(self.num_atten - 1):
261            self.attenuators[i].set_atten(0)
262        return self.iperf_received, self.a2dp_dropped_list, True
263
264    def performance_baseline_check(self):
265        """Checks for performance_result_path in config. If present, plots
266        comparision chart else plot chart for that particular test run.
267
268        Returns:
269            True if success, False otherwise.
270        """
271        if self.rvr:
272            with open(self.json_file, 'a') as results_file:
273                json.dump({str(k): v for k, v in self.rvr.items()},
274                          results_file, indent=4, sort_keys=True)
275            self.bt_range_metric.metric_value = self.rvr["bt_range"][0]
276            self.log.info('First occurrence of audio gap in bt '
277                          'range: {}'.format(self.bt_range_metric.metric_value))
278            self.log.info('Bluetooth min range: '
279                          '{} dB'.format(min(self.rvr['bt_attenuation'])))
280            self.log.info('Bluetooth max range: '
281                          '{} dB'.format(max(self.rvr['bt_attenuation'])))
282            self.plot_graph_for_attenuation()
283            if not self.performance_files_list:
284                self.log.warning('Performance file list is empty. Could not '
285                                 'calculate throughput limits')
286                return
287            self.throughput_pass_fail_check()
288        else:
289            self.log.error("Throughput dict empty!")
290            return False
291        return True
292
293    def plot_graph_for_attenuation(self):
294        """Plots graph and add as JSON formatted results for attenuation with
295        respect to its iperf values.
296        """
297        data_sets = defaultdict(dict)
298        legends = defaultdict(list)
299
300        x_label = 'WIFI Attenuation (dB)'
301        y_label = []
302
303        fig_property = {
304            "title": self.current_test_name,
305            "x_label": x_label,
306            "linewidth": 3,
307            "markersize": 10
308        }
309
310        for bt_atten in self.rvr["bt_attenuation"]:
311            y_label.insert(0, 'Throughput (Mbps)')
312            legends[bt_atten].insert(
313                0, str("BT Attenuation @ %sdB" % bt_atten))
314            data_sets[bt_atten]["attenuation"] = (
315                self.rvr[bt_atten]["attenuation"])
316            data_sets[bt_atten]["throughput_received"] = (
317                self.rvr[bt_atten]["throughput_received"])
318
319        if self.a2dp_streaming:
320            for bt_atten in self.bt_atten_range:
321                legends[bt_atten].insert(
322                    0, ('Packet drops(in %) @ {}dB'.format(bt_atten)))
323                data_sets[bt_atten]["a2dp_attenuation"] = (
324                    self.rvr[bt_atten]["attenuation"])
325                data_sets[bt_atten]["a2dp_packet_drops"] = (
326                    self.rvr[bt_atten]["a2dp_packet_drop"])
327            y_label.insert(0, "Packets Dropped")
328        fig_property["y_label"] = y_label
329        shaded_region = None
330
331        if "performance_result_path" in self.user_params["test_params"]:
332            shaded_region = self.comparision_results_calculation(data_sets, legends)
333
334        output_file_path = os.path.join(self.pri_ad.log_path,
335                                        self.current_test_name,
336                                        "attenuation_plot.html")
337        bokeh_chart_plot(list(self.rvr["bt_attenuation"]),
338                         data_sets,
339                         legends,
340                         fig_property,
341                         shaded_region=shaded_region,
342                         output_file_path=output_file_path)
343
344    def comparision_results_calculation(self, data_sets, legends):
345        """Compares rvr results with baseline values by calculating throughput
346        limits.
347
348        Args:
349            data_sets: including lists of x_data and lists of y_data.
350                ex: [[[x_data1], [x_data2]], [[y_data1],[y_data2]]]
351            legends: list of legend for each curve.
352
353        Returns:
354            None if test_file is not found, otherwise shaded_region
355            will be returned.
356        """
357        try:
358            attenuation_path = next(
359                file_name for file_name in self.performance_files_list
360                if self.current_test_name in file_name
361            )
362        except StopIteration:
363            self.log.warning("Test_file not found. "
364                             "No comparision values to calculate")
365            return
366        with open(attenuation_path, 'r') as throughput_file:
367            throughput_results = json.load(throughput_file)
368        for bt_atten in self.bt_atten_range:
369            throughput_received = []
370            user_attenuation = []
371            legends[bt_atten].insert(
372                0, ('Performance Results @ {}dB'.format(bt_atten)))
373            for att in self.rvr[bt_atten]["attenuation"]:
374                attenuation = att - self.rvr[bt_atten]["fixed_attenuation"]
375                throughput_received.append(throughput_results[str(bt_atten)]
376                    ["throughput_received"][attenuation])
377                user_attenuation.append(att)
378            data_sets[bt_atten][
379                "user_attenuation"] = user_attenuation
380            data_sets[bt_atten]["user_throughput"] = throughput_received
381        throughput_limits = self.get_throughput_limits(attenuation_path)
382        shaded_region = defaultdict(dict)
383        for bt_atten in self.bt_atten_range:
384            shaded_region[bt_atten] = {
385                "x_vector": throughput_limits[bt_atten]["attenuation"],
386                "lower_limit":
387                throughput_limits[bt_atten]["lower_limit"],
388                "upper_limit":
389                throughput_limits[bt_atten]["upper_limit"]
390            }
391        return shaded_region
392
393    def total_attenuation(self, performance_dict):
394        """Calculates attenuation with adding fixed attenuation.
395
396        Args:
397            performance_dict: dict containing attenuation and fixed attenuation.
398
399        Returns:
400            Total attenuation is returned.
401        """
402        if "fixed_attenuation" in self.test_params:
403            total_atten = [
404                att + performance_dict["fixed_attenuation"]
405                for att in performance_dict["attenuation"]
406            ]
407            return total_atten
408
409    def throughput_pass_fail_check(self):
410        """Check the test result and decide if it passed or failed
411        by comparing with throughput limits.The pass/fail tolerances are
412        provided in the config file.
413
414        Returns:
415            None if test_file is not found, True if successful,
416            False otherwise.
417        """
418        try:
419            performance_path = next(
420                file_name for file_name in self.performance_files_list
421                if self.current_test_name in file_name
422            )
423        except StopIteration:
424            self.log.warning("Test_file not found. Couldn't "
425                             "calculate throughput limits")
426            return
427        throughput_limits = self.get_throughput_limits(performance_path)
428
429        failure_count = 0
430        for bt_atten in self.bt_atten_range:
431            for idx, current_throughput in enumerate(
432                    self.rvr[bt_atten]["throughput_received"]):
433                current_att = self.rvr[bt_atten]["attenuation"][idx]
434                if (current_throughput <
435                        (throughput_limits[bt_atten]["lower_limit"][idx]) or
436                        current_throughput >
437                        (throughput_limits[bt_atten]["upper_limit"][idx])):
438                    failure_count = failure_count + 1
439                    self.log.info(
440                        "Throughput at {} dB attenuation is beyond limits. "
441                        "Throughput is {} Mbps. Expected within [{}, {}] Mbps.".
442                        format(
443                            current_att, current_throughput,
444                            throughput_limits[bt_atten]["lower_limit"][idx],
445                            throughput_limits[bt_atten]["upper_limit"][
446                                idx]))
447            if failure_count >= self.test_params["failure_count_tolerance"]:
448                self.log.error(
449                    "Test failed. Found {} points outside throughput limits.".
450                    format(failure_count))
451                return False
452            self.log.info(
453                "Test passed. Found {} points outside throughput limits.".
454                format(failure_count))
455            return True
456
457    def get_throughput_limits(self, performance_path):
458        """Compute throughput limits for current test.
459
460        Checks the RvR test result and compares to a throughput limits for
461        the same configuration. The pass/fail tolerances are provided in the
462        config file.
463
464        Args:
465            performance_path: path to baseline file used to generate limits
466
467        Returns:
468            throughput_limits: dict containing attenuation and throughput
469            limit data
470        """
471        with open(performance_path, 'r') as performance_file:
472            performance_results = json.load(performance_file)
473        throughput_limits = defaultdict(dict)
474        for bt_atten in self.bt_atten_range:
475            performance_attenuation = (self.total_attenuation(
476                performance_results[str(bt_atten)]))
477            attenuation = []
478            lower_limit = []
479            upper_limit = []
480            for idx, current_throughput in enumerate(
481                    self.rvr[bt_atten]["throughput_received"]):
482                current_att = self.rvr[bt_atten]["attenuation"][idx]
483                att_distances = [
484                    abs(current_att - performance_att)
485                    for performance_att in performance_attenuation
486                ]
487                sorted_distances = sorted(
488                    enumerate(att_distances), key=lambda x: x[1])
489                closest_indeces = [dist[0] for dist in sorted_distances[0:3]]
490                closest_throughputs = [
491                    performance_results[str(bt_atten)]["throughput_received"][
492                        index] for index in closest_indeces
493                ]
494                closest_throughputs.sort()
495                attenuation.append(current_att)
496                lower_limit.append(
497                    max(closest_throughputs[0] -
498                        max(self.test_params["abs_tolerance"],
499                            closest_throughputs[0] *
500                            self.test_params["pct_tolerance"] / 100), 0))
501                upper_limit.append(closest_throughputs[-1] + max(
502                    self.test_params["abs_tolerance"], closest_throughputs[-1] *
503                    self.test_params["pct_tolerance"] / 100))
504            throughput_limits[bt_atten]["attenuation"] = attenuation
505            throughput_limits[bt_atten]["lower_limit"] = lower_limit
506            throughput_limits[bt_atten]["upper_limit"] = upper_limit
507        return throughput_limits
508
509