1# Copyright 2021 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Module for working with BigQuery results.""" 5 6import collections 7import datetime 8import os 9from collections import defaultdict 10from typing import List, Tuple 11 12from flake_suppressor_common import common_typing as ct 13from flake_suppressor_common import data_types 14from flake_suppressor_common import expectations 15from flake_suppressor_common import tag_utils 16 17from typ import expectations_parser 18 19 20class ResultProcessor(): 21 def __init__(self, expectations_processor: expectations.ExpectationProcessor): 22 self._expectations_processor = expectations_processor 23 24 def AggregateResults(self, 25 results: ct.QueryJsonType) -> ct.AggregatedResultsType: 26 """Aggregates BigQuery results. 27 28 Also filters out any results that have already been suppressed. 29 30 Args: 31 results: Parsed JSON results from a BigQuery query. 32 33 Returns: 34 A map in the following format: 35 { 36 'test_suite': { 37 'test_name': { 38 'typ_tags_as_tuple': [ 'list', 'of', 'urls' ], 39 }, 40 }, 41 } 42 """ 43 results = self._ConvertJsonResultsToResultObjects(results) 44 results = self._FilterOutSuppressedResults(results) 45 aggregated_results = {} 46 for r in results: 47 build_url = 'http://ci.chromium.org/b/%s' % r.build_id 48 49 build_url_list = aggregated_results.setdefault(r.suite, {}).setdefault( 50 r.test, {}).setdefault(r.tags, []) 51 build_url_list.append(build_url) 52 return aggregated_results 53 54 def AggregateTestStatusResults( 55 self, results: ct.QueryJsonType) -> ct.AggregatedStatusResultsType: 56 """Aggregates BigQuery results. 57 58 Also filters out any results that have already been suppressed. 59 60 Args: 61 results: Parsed JSON results from a BigQuery query. 62 63 Returns: 64 A map in the following format: 65 { 66 'test_suite': { 67 'test_name': { 68 ('typ', 'tags', 'as', 'tuple'): 69 [ (status, url, date, is_slow, typ_expectations), 70 (status, url, date, is_slow, typ_expectations) ], 71 }, 72 }, 73 } 74 """ 75 results = self._ConvertJsonResultsToResultObjects(results) 76 results = self._FilterOutSuppressedResults(results) 77 aggregated_results = defaultdict( 78 lambda: defaultdict(lambda: defaultdict(list))) 79 for r in results: 80 build_url = 'http://ci.chromium.org/b/%s' % r.build_id 81 aggregated_results[r.suite][r.test][r.tags].append( 82 ct.ResultTupleType(r.status, build_url, r.date, r.is_slow, 83 r.typ_expectations)) 84 return aggregated_results 85 86 def _ConvertJsonResultsToResultObjects(self, results: ct.QueryJsonType 87 ) -> List[data_types.Result]: 88 """Converts JSON BigQuery results to data_types.Result objects. 89 90 Args: 91 results: Parsed JSON results from a BigQuery query 92 93 Returns: 94 The contents of |results| as a list of data_types.Result objects. 95 """ 96 object_results = [] 97 for r in results: 98 suite, test_name = self.GetTestSuiteAndNameFromResultDbName(r['name']) 99 build_id = r['id'].split('-')[-1] 100 typ_tags = tuple(tag_utils.TagUtils.RemoveIgnoredTags(r['typ_tags'])) 101 status = None 102 date = None 103 is_slow = None 104 typ_expectations = None 105 if 'status' in r: 106 status = r['status'] 107 if 'date' in r: 108 date = datetime.date.fromisoformat(r['date']) 109 if 'is_slow' in r: 110 is_slow = r['is_slow'] 111 if 'typ_expectations' in r: 112 typ_expectations = r['typ_expectations'] 113 object_results.append( 114 data_types.Result(suite, test_name, typ_tags, build_id, status, date, 115 is_slow, typ_expectations)) 116 return object_results 117 118 def _FilterOutSuppressedResults(self, results: List[data_types.Result] 119 ) -> List[data_types.Result]: 120 """Filters out results that have already been suppressed in the repo. 121 122 Args: 123 results: A list of data_types.Result objects. 124 125 Returns: 126 |results| with any already-suppressed failures removed. 127 """ 128 # Get all the expectations. 129 origin_expectation_contents = ( 130 self._expectations_processor.GetLocalCheckoutExpectationFileContents()) 131 origin_expectations = collections.defaultdict(list) 132 for filename, contents in origin_expectation_contents.items(): 133 list_parser = expectations_parser.TaggedTestListParser(contents) 134 for e in list_parser.expectations: 135 expectation = data_types.Expectation(e.test, e.tags, e.raw_results, 136 e.reason) 137 origin_expectations[filename].append(expectation) 138 139 # Discard any results that already have a matching expectation. 140 kept_results = [] 141 for r in results: 142 expectation_filename = ( 143 self._expectations_processor.GetExpectationFileForSuite( 144 r.suite, r.tags)) 145 expectation_filename = os.path.basename(expectation_filename) 146 should_keep = True 147 for e in origin_expectations[expectation_filename]: 148 if e.AppliesToResult(r): 149 should_keep = False 150 break 151 if should_keep: 152 kept_results.append(r) 153 154 return kept_results 155 156 def GetTestSuiteAndNameFromResultDbName(self, result_db_name: str 157 ) -> Tuple[str, str]: 158 raise NotImplementedError 159