#!/usr/bin/env python3.5 # # Copyright 2021 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time import re import statistics from datetime import datetime from acts import utils from acts import signals from acts.base_test import BaseTestClass from acts_contrib.test_utils.gnss.testtracker_util import log_testtracker_uuid from acts_contrib.test_utils.tel.tel_logging_utils import start_adb_tcpdump from acts_contrib.test_utils.tel.tel_logging_utils import stop_adb_tcpdump from acts_contrib.test_utils.tel.tel_logging_utils import get_tcpdump_log from acts_contrib.test_utils.tel.tel_test_utils import toggle_airplane_mode from acts_contrib.test_utils.gnss import gnss_test_utils as gutils CONCURRENCY_TYPE = { "gnss": "GNSS location received", "gnss_meas": "GNSS measurement received", "ap_location": "reportLocation" } GPS_XML_CONFIG = { "CS": { 'IgnorePosition': 'true', 'IgnoreEph': 'true', 'IgnoreTime': 'true', 'AsstIgnoreLto': 'true', 'IgnoreJniTime': 'true', }, "WS": { 'IgnorePosition': 'true', 'AsstIgnoreLto': 'true', 'IgnoreJniTime': 'true', }, "HS": {} } ONCHIP_CONFIG = { "enable": {"EnableOnChipStopNotification": "1"}, "disable": {"EnableOnChipStopNotification": "2"}, } class GnssConcurrencyTest(BaseTestClass): """ GNSS Concurrency TTFF Tests. """ def setup_class(self): super().setup_class() self.ad = self.android_devices[0] req_params = [ "standalone_cs_criteria", "chre_tolerate_rate", "qdsp6m_path", "outlier_criteria", "max_outliers", "pixel_lab_location", "max_interval", "onchip_interval", "ttff_test_cycle" ] self.unpack_userparams(req_param_names=req_params) gutils._init_device(self.ad) self.ad.adb.shell("setprop persist.vendor.radio.adb_log_on 0") self.ad.adb.shell("sync") def setup_test(self): gutils.log_current_epoch_time(self.ad, "test_start_time") log_testtracker_uuid(self.ad, self.current_test_name) gutils.clear_logd_gnss_qxdm_log(self.ad) gutils.start_pixel_logger(self.ad) start_adb_tcpdump(self.ad) # related properties gutils.check_location_service(self.ad) gutils.get_baseband_and_gms_version(self.ad) self.load_chre_nanoapp() def teardown_test(self): gutils.stop_pixel_logger(self.ad) stop_adb_tcpdump(self.ad) gutils.log_current_epoch_time(self.ad, "test_end_time") def on_fail(self, test_name, begin_time): self.ad.take_bug_report(test_name, begin_time) gutils.get_gnss_qxdm_log(self.ad, self.qdsp6m_path) get_tcpdump_log(self.ad, test_name, begin_time) def is_brcm_test(self): """ Check the test is for BRCM and skip if not. """ if gutils.check_chipset_vendor_by_qualcomm(self.ad): raise signals.TestSkip("Not BRCM chipset. Skip the test.") def load_chre_nanoapp(self): """ Load CHRE nanoapp to target Android Device. """ for _ in range(0, 3): try: self.ad.log.info("Start to load the nanoapp") cmd = "chre_power_test_client load" if gutils.is_device_wearable(self.ad): extra_cmd = "tcm /vendor/etc/chre/power_test_tcm.so" cmd = " ".join([cmd, extra_cmd]) res = self.ad.adb.shell(cmd) if "result 1" in res: self.ad.log.info("Nano app loaded successfully") break except Exception as e: self.ad.log.warning("Nano app loaded fail: %s" % e) gutils.reboot(self.ad) else: raise signals.TestError("Failed to load CHRE nanoapp") def enable_chre(self, interval_sec): """ Enable or disable gnss concurrency via nanoapp. Args: interval_sec: an int for frequency, set 0 as disable. """ if interval_sec == 0: self.ad.log.info(f"Stop CHRE request") else: self.ad.log.info( f"Initiate CHRE with {interval_sec} seconds interval") interval_msec = interval_sec * 1000 cmd = "chre_power_test_client" option = "enable %d" % interval_msec if interval_msec != 0 else "disable" for type in CONCURRENCY_TYPE.keys(): if "ap" not in type: self.ad.adb.shell(" ".join([cmd, type, option])) def parse_concurrency_result(self, begin_time, request_type, criteria, exam_lower=True): """ Parse the test result with given time and criteria. Args: begin_time: test begin time. request_type: str for location request type. criteria: dictionary for test criteria. exam_lower: a boolean to identify the lower bond or not. Return: List for the failure and outlier loops and results. """ results = [] failures = [] outliers = [] upper_bound = criteria * ( 1 + self.chre_tolerate_rate) + self.outlier_criteria lower_bound = criteria * ( 1 - self.chre_tolerate_rate) - self.outlier_criteria search_results = self.ad.search_logcat(CONCURRENCY_TYPE[request_type], begin_time) if not search_results: raise signals.TestFailure(f"No log entry found for keyword:" f"{CONCURRENCY_TYPE[request_type]}") for i in range(len(search_results) - 1): target = search_results[i + 1] timedelt = target["datetime_obj"] - search_results[i]["datetime_obj"] timedelt_sec = timedelt.total_seconds() results.append(timedelt_sec) res_tag = "" if timedelt_sec > upper_bound: failures.append(timedelt_sec) res_tag = "Failure" elif timedelt_sec < lower_bound and exam_lower: failures.append(timedelt_sec) res_tag = "Failure" elif timedelt_sec > criteria * (1 + self.chre_tolerate_rate): outliers.append(timedelt_sec) res_tag = "Outlier" if res_tag: self.ad.log.error( f"[{res_tag}][{target['time_stamp']}]:{timedelt_sec:.2f} sec" ) res_summary = " ".join([str(res) for res in results[1:]]) self.ad.log.info(f"[{request_type}]Overall Result: {res_summary}") log_prefix = f"TestResult {request_type}" self.ad.log.info(f"{log_prefix}_samples {len(search_results)}") self.ad.log.info(f"{log_prefix}_outliers {len(outliers)}") self.ad.log.info(f"{log_prefix}_failures {len(failures)}") self.ad.log.info(f"{log_prefix}_max_time {max(results):.2f}") return outliers, failures, results def run_gnss_concurrency_test(self, criteria, test_duration): """ Execute GNSS concurrency test steps. Args: criteria: int for test criteria. test_duration: int for test duration. """ self.enable_chre(criteria["gnss"]) TTFF_criteria = criteria["ap_location"] + self.standalone_cs_criteria gutils.process_gnss_by_gtw_gpstool( self.ad, TTFF_criteria, freq=criteria["ap_location"]) self.ad.log.info("Tracking 10 sec to prevent flakiness.") time.sleep(10) begin_time = datetime.now() self.ad.log.info(f"Test Start at {begin_time}") time.sleep(test_duration) self.enable_chre(0) gutils.start_gnss_by_gtw_gpstool(self.ad, False) self.validate_location_test_result(begin_time, criteria) def run_chre_only_test(self, criteria, test_duration): """ Execute CHRE only test steps. Args: criteria: int for test criteria. test_duration: int for test duration. """ begin_time = datetime.now() self.ad.log.info(f"Test Start at {begin_time}") self.enable_chre(criteria["gnss"]) time.sleep(test_duration) self.enable_chre(0) self.validate_location_test_result(begin_time, criteria) def validate_location_test_result(self, begin_time, request): """ Validate GNSS concurrency/CHRE test results. Args: begin_time: epoc of test begin time request: int for test criteria. """ results = {} outliers = {} failures = {} failure_log = "" for request_type, criteria in request.items(): criteria = criteria if criteria > 1 else 1 self.ad.log.info("Starting process %s result" % request_type) outliers[request_type], failures[request_type], results[ request_type] = self.parse_concurrency_result( begin_time, request_type, criteria, exam_lower=False) if not results[request_type]: failure_log += "[%s] Fail to find location report.\n" % request_type if len(failures[request_type]) > 0: failure_log += "[%s] Test exceeds criteria(%.2f): %.2f\n" % ( request_type, criteria, max(failures[request_type])) if len(outliers[request_type]) > self.max_outliers: failure_log += "[%s] Outliers excceds max amount: %d\n" % ( request_type, len(outliers[request_type])) if failure_log: failure_log += f"The test begins at {begin_time}\n" raise signals.TestFailure(failure_log) def run_engine_switching_test(self, freq): """ Conduct engine switching test with given frequency. Args: freq: a list identify source1/2 frequency [freq1, freq2] """ request = {"ap_location": self.max_interval} begin_time = datetime.now() self.ad.droid.startLocating(freq[0] * 1000, 0) time.sleep(10) for i in range(5): gutils.start_gnss_by_gtw_gpstool(self.ad, True, freq=freq[1]) time.sleep(10) gutils.start_gnss_by_gtw_gpstool(self.ad, False) self.ad.droid.stopLocating() self.calculate_position_error(begin_time) self.validate_location_test_result(begin_time, request) def calculate_position_error(self, begin_time): """ Calculate the position error for the logcat search results. Args: begin_time: test begin time """ position_errors = [] search_results = self.ad.search_logcat("reportLocation", begin_time) for result in search_results: # search for location like 25.000717,121.455163 regex = r"(-?\d{1,5}\.\d{1,10}),\s*(-?\d{1,5}\.\d{1,10})" result = re.search(regex, result["log_message"]) if not result: raise ValueError("lat/lon does not found. " f"original text: {result['log_message']}") lat = float(result.group(1)) lon = float(result.group(2)) pe = gutils.calculate_position_error(lat, lon, self.pixel_lab_location) position_errors.append(pe) self.ad.log.info("TestResult max_position_error %.2f" % max(position_errors)) def get_chre_ttff(self, interval_sec, duration): """ Get the TTFF for the first CHRE report. Args: interval_sec: test interval in seconds for CHRE. duration: test duration. """ begin_time = datetime.now() self.ad.log.info(f"Test start at {begin_time}") self.enable_chre(interval_sec) time.sleep(duration) self.enable_chre(0) for type, pattern in CONCURRENCY_TYPE.items(): if type == "ap_location": continue search_results = self.ad.search_logcat(pattern, begin_time) if not search_results: raise signals.TestFailure( f"Unable to receive {type} report in {duration} seconds") else: ttff_stamp = search_results[0]["datetime_obj"] self.ad.log.info(search_results[0]["time_stamp"]) ttff = (ttff_stamp - begin_time).total_seconds() self.ad.log.info(f"CHRE {type} TTFF = {ttff}") def add_ttff_conf(self, conf_type): """ Add mcu ttff config to gps.xml Args: conf_type: a string identify the config type """ gutils.bcm_gps_xml_update_option( self.ad, child_tag="gll", items_to_update=GPS_XML_CONFIG[conf_type]) def update_gps_conf(self, update_attrib): """ Update gps.xml content Args: search_line: target content update_line: update content """ gutils.bcm_gps_xml_update_option( self.ad, child_tag="gll", items_to_update=update_attrib) def delete_gps_conf(self, conf_type): """ Delete gps.xml content Args: conf_type: a string identify the config type """ gutils.bcm_gps_xml_update_option( self.ad, child_tag="gll", items_to_delete=GPS_XML_CONFIG[conf_type].keys()) def preset_mcu_test(self, mode): """ Preseting mcu test with config and device state mode: mode: a string identify the test type """ self.add_ttff_conf(mode) gutils.push_lhd_overlay(self.ad) toggle_airplane_mode(self.ad.log, self.ad, new_state=True) self.update_gps_conf(ONCHIP_CONFIG["enable"]) gutils.clear_aiding_data_by_gtw_gpstool(self.ad) self.ad.reboot(self.ad) self.load_chre_nanoapp() def reset_mcu_test(self, mode): """ Resetting mcu test with config and device state mode: mode: a string identify the test type """ self.delete_gps_conf(mode) self.update_gps_conf(ONCHIP_CONFIG["disable"]) def get_mcu_ttff(self): """ Get mcu ttff seconds Return: ttff: a float identify ttff seconds """ search_res = "" search_pattern = "$PGLOR,0,FIX" ttff_regex = r"FIX,(.*)\*" cmd_base = "chre_power_test_client gnss tcm" cmd_start = " ".join([cmd_base, "enable 1000"]) cmd_stop = " ".join([cmd_base, "disable"]) begin_time = datetime.now() self.ad.log.info("Send CHRE enable to DUT") self.ad.adb.shell(cmd_start) for i in range(6): search_res = self.ad.search_logcat(search_pattern, begin_time) if search_res: break time.sleep(10) else: self.ad.adb.shell(cmd_stop) self.ad.log.error("Unable to get mcu ttff in 60 seconds") return 60 self.ad.adb.shell(cmd_stop) res = re.search(ttff_regex, search_res[0]["log_message"]) ttff = res.group(1) self.ad.log.info(f"TTFF = {ttff}") return float(ttff) def run_mcu_ttff_loops(self, mode, loops): """ Run mcu ttff with given mode and loops Args: mode: a string identify mode cs/ws/hs. loops: a int to identify the number of loops """ ttff_res = [] for i in range(10): ttff = self.get_mcu_ttff() self.ad.log.info(f"{mode} TTFF LOOP{i+1} = {ttff}") ttff_res.append(ttff) time.sleep(10) self.ad.log.info(f"TestResult {mode}_MAX_TTFF {max(ttff_res)}") self.ad.log.info( f"TestResult {mode}_AVG_TTFF {statistics.mean(ttff_res)}") # Concurrency Test Cases def test_gnss_concurrency_location_1_chre_1(self): test_duration = 15 criteria = {"ap_location": 1, "gnss": 1, "gnss_meas": 1} self.run_gnss_concurrency_test(criteria, test_duration) def test_gnss_concurrency_location_1_chre_8(self): test_duration = 30 criteria = {"ap_location": 1, "gnss": 8, "gnss_meas": 8} self.run_gnss_concurrency_test(criteria, test_duration) def test_gnss_concurrency_location_15_chre_8(self): test_duration = 60 criteria = {"ap_location": 15, "gnss": 8, "gnss_meas": 8} self.run_gnss_concurrency_test(criteria, test_duration) def test_gnss_concurrency_location_61_chre_1(self): test_duration = 120 criteria = {"ap_location": 61, "gnss": 1, "gnss_meas": 1} self.run_gnss_concurrency_test(criteria, test_duration) def test_gnss_concurrency_location_61_chre_10(self): test_duration = 120 criteria = {"ap_location": 61, "gnss": 10, "gnss_meas": 10} self.run_gnss_concurrency_test(criteria, test_duration) # CHRE Only Test Cases def test_gnss_chre_1(self): test_duration = 15 criteria = {"gnss": 1, "gnss_meas": 1} self.run_chre_only_test(criteria, test_duration) def test_gnss_chre_8(self): test_duration = 30 criteria = {"gnss": 8, "gnss_meas": 8} self.run_chre_only_test(criteria, test_duration) # Interval tests def test_variable_interval_via_chre(self): test_duration = 10 intervals = [0.1, 0.5, 1.5] for interval in intervals: self.get_chre_ttff(interval, test_duration) def test_variable_interval_via_framework(self): test_duration = 10 intervals = [0, 0.5, 1.5] for interval in intervals: begin_time = datetime.now() self.ad.droid.startLocating(interval * 1000, 0) time.sleep(test_duration) self.ad.droid.stopLocating() criteria = interval if interval > 1 else 1 self.parse_concurrency_result(begin_time, "ap_location", criteria) # Engine switching test def test_gps_engine_switching_host_to_onchip(self): self.is_brcm_test() freq = [1, self.onchip_interval] self.run_engine_switching_test(freq) def test_gps_engine_switching_onchip_to_host(self): self.is_brcm_test() freq = [self.onchip_interval, 1] self.run_engine_switching_test(freq) def test_mcu_cs_ttff(self): mode = "CS" self.preset_mcu_test(mode) self.run_mcu_ttff_loops(mode, self.ttff_test_cycle) self.reset_mcu_test(mode) def test_mcu_ws_ttff(self): mode = "WS" self.preset_mcu_test(mode) self.run_mcu_ttff_loops(mode, self.ttff_test_cycle) self.reset_mcu_test(mode) def test_mcu_hs_ttff(self): mode = "HS" self.preset_mcu_test(mode) self.run_mcu_ttff_loops(mode, self.ttff_test_cycle) self.reset_mcu_test(mode)