1# Copyright 2020 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4from __future__ import absolute_import 5import base64 6import json 7import logging 8import os 9 10import requests # pylint: disable=import-error 11from lib.results import result_types 12 13HTML_SUMMARY_MAX = 4096 14 15_HTML_SUMMARY_ARTIFACT = '<text-artifact artifact-id="HTML Summary" />' 16_TEST_LOG_ARTIFACT = '<text-artifact artifact-id="Test Log" />' 17 18# Maps result_types to the luci test-result.proto. 19# https://godoc.org/go.chromium.org/luci/resultdb/proto/v1#TestStatus 20RESULT_MAP = { 21 result_types.UNKNOWN: 'ABORT', 22 result_types.PASS: 'PASS', 23 result_types.FAIL: 'FAIL', 24 result_types.CRASH: 'CRASH', 25 result_types.TIMEOUT: 'ABORT', 26 result_types.SKIP: 'SKIP', 27 result_types.NOTRUN: 'SKIP', 28} 29 30 31def TryInitClient(): 32 """Tries to initialize a result_sink_client object. 33 34 Assumes that rdb stream is already running. 35 36 Returns: 37 A ResultSinkClient for the result_sink server else returns None. 38 """ 39 try: 40 with open(os.environ['LUCI_CONTEXT']) as f: 41 sink = json.load(f)['result_sink'] 42 return ResultSinkClient(sink) 43 except KeyError: 44 return None 45 46 47class ResultSinkClient(object): 48 """A class to store the sink's post configurations and make post requests. 49 50 This assumes that the rdb stream has been called already and that the 51 server is listening. 52 """ 53 54 def __init__(self, context): 55 base_url = 'http://%s/prpc/luci.resultsink.v1.Sink' % context['address'] 56 self.test_results_url = base_url + '/ReportTestResults' 57 self.report_artifacts_url = base_url + '/ReportInvocationLevelArtifacts' 58 59 headers = { 60 'Content-Type': 'application/json', 61 'Accept': 'application/json', 62 'Authorization': 'ResultSink %s' % context['auth_token'], 63 } 64 self.session = requests.Session() 65 self.session.headers.update(headers) 66 67 def __enter__(self): 68 return self 69 70 def __exit__(self, exc_type, exc_value, traceback): 71 self.close() 72 73 def close(self): 74 """Closes the session backing the sink.""" 75 self.session.close() 76 77 def Post(self, 78 test_id, 79 status, 80 duration, 81 test_log, 82 test_file, 83 variant=None, 84 artifacts=None, 85 failure_reason=None, 86 html_artifact=None, 87 tags=None): 88 """Uploads the test result to the ResultSink server. 89 90 This assumes that the rdb stream has been called already and that 91 server is ready listening. 92 93 Args: 94 test_id: A string representing the test's name. 95 status: A string representing if the test passed, failed, etc... 96 duration: An int representing time in ms. 97 test_log: A string representing the test's output. 98 test_file: A string representing the file location of the test. 99 variant: An optional dict of variant key value pairs as the 100 additional variant sent from test runners, which can override 101 or add to the variants passed to `rdb stream` command. 102 artifacts: An optional dict of artifacts to attach to the test. 103 failure_reason: An optional string with the reason why the test failed. 104 Should be None if the test did not fail. 105 html_artifact: An optional html-formatted string to prepend to the test's 106 log. Useful to encode click-able URL links in the test log, since that 107 won't be formatted in the test_log. 108 tags: An optional list of tuple of key name and value to prepend to the 109 test's tags. 110 111 Returns: 112 N/A 113 """ 114 assert status in RESULT_MAP 115 expected = status in (result_types.PASS, result_types.SKIP) 116 result_db_status = RESULT_MAP[status] 117 118 tr = { 119 'expected': 120 expected, 121 'status': 122 result_db_status, 123 'tags': [ 124 { 125 'key': 'test_name', 126 'value': test_id, 127 }, 128 { 129 # Status before getting mapped to result_db statuses. 130 'key': 'raw_status', 131 'value': status, 132 } 133 ], 134 'testId': 135 test_id, 136 'testMetadata': { 137 'name': test_id, 138 } 139 } 140 141 if tags: 142 tr['tags'].extend({ 143 'key': key_name, 144 'value': value 145 } for (key_name, value) in tags) 146 147 if variant: 148 tr['variant'] = {'def': variant} 149 150 artifacts = artifacts or {} 151 tr['summaryHtml'] = html_artifact if html_artifact else '' 152 153 # If over max supported length of html summary, replace with artifact 154 # upload. 155 if (test_log 156 and len(tr['summaryHtml']) + len(_TEST_LOG_ARTIFACT) > HTML_SUMMARY_MAX 157 or len(tr['summaryHtml']) > HTML_SUMMARY_MAX): 158 b64_summary = base64.b64encode(tr['summaryHtml'].encode()).decode() 159 artifacts.update({'HTML Summary': {'contents': b64_summary}}) 160 tr['summaryHtml'] = _HTML_SUMMARY_ARTIFACT 161 162 if test_log: 163 # Upload the original log without any modifications. 164 b64_log = base64.b64encode(test_log.encode()).decode() 165 artifacts.update({'Test Log': {'contents': b64_log}}) 166 tr['summaryHtml'] += _TEST_LOG_ARTIFACT 167 168 if artifacts: 169 tr['artifacts'] = artifacts 170 if failure_reason: 171 tr['failureReason'] = { 172 'primaryErrorMessage': _TruncateToUTF8Bytes(failure_reason, 1024) 173 } 174 175 if duration is not None: 176 # Duration must be formatted to avoid scientific notation in case 177 # number is too small or too large. Result_db takes seconds, not ms. 178 # Need to use float() otherwise it does substitution first then divides. 179 tr['duration'] = '%.9fs' % float(duration / 1000.0) 180 181 if test_file and str(test_file).startswith('//'): 182 tr['testMetadata']['location'] = { 183 'file_name': test_file, 184 'repo': 'https://chromium.googlesource.com/chromium/src', 185 } 186 187 res = self.session.post(url=self.test_results_url, 188 data=json.dumps({'testResults': [tr]})) 189 res.raise_for_status() 190 191 def ReportInvocationLevelArtifacts(self, artifacts): 192 """Uploads invocation-level artifacts to the ResultSink server. 193 194 This is for artifacts that don't apply to a single test but to the test 195 invocation as a whole (eg: system logs). 196 197 Args: 198 artifacts: A dict of artifacts to attach to the invocation. 199 """ 200 req = {'artifacts': artifacts} 201 res = self.session.post(url=self.report_artifacts_url, data=json.dumps(req)) 202 res.raise_for_status() 203 204 205def _TruncateToUTF8Bytes(s, length): 206 """ Truncates a string to a given number of bytes when encoded as UTF-8. 207 208 Ensures the given string does not take more than length bytes when encoded 209 as UTF-8. Adds trailing ellipsis (...) if truncation occurred. A truncated 210 string may end up encoding to a length slightly shorter than length because 211 only whole Unicode codepoints are dropped. 212 213 Args: 214 s: The string to truncate. 215 length: the length (in bytes) to truncate to. 216 """ 217 try: 218 encoded = s.encode('utf-8') 219 # When encode throws UnicodeDecodeError in py2, it usually means the str is 220 # already encoded and has non-ascii chars. So skip re-encoding it. 221 except UnicodeDecodeError: 222 encoded = s 223 if len(encoded) > length: 224 # Truncate, leaving space for trailing ellipsis (...). 225 encoded = encoded[:length - 3] 226 # Truncating the string encoded as UTF-8 may have left the final codepoint 227 # only partially present. Pass 'ignore' to acknowledge and ensure this is 228 # dropped. 229 return encoded.decode('utf-8', 'ignore') + "..." 230 return s 231