1#!/usr/bin/env python3
2# Copyright 2016 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# Uploads performance benchmark result file to bigquery.
17
18from __future__ import print_function
19
20import argparse
21import calendar
22import json
23import os
24import sys
25import time
26import uuid
27
28gcp_utils_dir = os.path.abspath(
29    os.path.join(os.path.dirname(__file__), '../../gcp/utils'))
30sys.path.append(gcp_utils_dir)
31import big_query_utils
32
33_PROJECT_ID = 'grpc-testing'
34
35
36def _upload_netperf_latency_csv_to_bigquery(dataset_id, table_id, result_file):
37    with open(result_file, 'r') as f:
38        (col1, col2, col3) = f.read().split(',')
39        latency50 = float(col1.strip()) * 1000
40        latency90 = float(col2.strip()) * 1000
41        latency99 = float(col3.strip()) * 1000
42
43        scenario_result = {
44            'scenario': {
45                'name': 'netperf_tcp_rr'
46            },
47            'summary': {
48                'latency50': latency50,
49                'latency90': latency90,
50                'latency99': latency99
51            }
52        }
53
54    bq = big_query_utils.create_big_query()
55    _create_results_table(bq, dataset_id, table_id)
56
57    if not _insert_result(
58            bq, dataset_id, table_id, scenario_result, flatten=False):
59        print('Error uploading result to bigquery.')
60        sys.exit(1)
61
62
63def _upload_scenario_result_to_bigquery(dataset_id, table_id, result_file,
64                                        metadata_file, node_info_file,
65                                        prometheus_query_results_file):
66    with open(result_file, 'r') as f:
67        scenario_result = json.loads(f.read())
68
69    bq = big_query_utils.create_big_query()
70    _create_results_table(bq, dataset_id, table_id)
71
72    if not _insert_scenario_result(bq, dataset_id, table_id, scenario_result,
73                                   metadata_file, node_info_file,
74                                   prometheus_query_results_file):
75        print('Error uploading result to bigquery.')
76        sys.exit(1)
77
78
79def _insert_result(bq, dataset_id, table_id, scenario_result, flatten=True):
80    if flatten:
81        _flatten_result_inplace(scenario_result)
82    _populate_metadata_inplace(scenario_result)
83    row = big_query_utils.make_row(str(uuid.uuid4()), scenario_result)
84    return big_query_utils.insert_rows(bq, _PROJECT_ID, dataset_id, table_id,
85                                       [row])
86
87
88def _insert_scenario_result(bq,
89                            dataset_id,
90                            table_id,
91                            scenario_result,
92                            test_metadata_file,
93                            node_info_file,
94                            prometheus_query_results_file,
95                            flatten=True):
96    if flatten:
97        _flatten_result_inplace(scenario_result)
98    _populate_metadata_from_file(scenario_result, test_metadata_file)
99    _populate_node_metadata_from_file(scenario_result, node_info_file)
100    _populate_prometheus_query_results_from_file(scenario_result,
101                                                 prometheus_query_results_file)
102    row = big_query_utils.make_row(str(uuid.uuid4()), scenario_result)
103    return big_query_utils.insert_rows(bq, _PROJECT_ID, dataset_id, table_id,
104                                       [row])
105
106
107def _create_results_table(bq, dataset_id, table_id):
108    with open(os.path.dirname(__file__) + '/scenario_result_schema.json',
109              'r') as f:
110        table_schema = json.loads(f.read())
111    desc = 'Results of performance benchmarks.'
112    return big_query_utils.create_table2(bq, _PROJECT_ID, dataset_id, table_id,
113                                         table_schema, desc)
114
115
116def _flatten_result_inplace(scenario_result):
117    """Bigquery is not really great for handling deeply nested data
118  and repeated fields. To maintain values of some fields while keeping
119  the schema relatively simple, we artificially leave some of the fields
120  as JSON strings.
121  """
122    scenario_result['scenario']['clientConfig'] = json.dumps(
123        scenario_result['scenario']['clientConfig'])
124    scenario_result['scenario']['serverConfig'] = json.dumps(
125        scenario_result['scenario']['serverConfig'])
126    scenario_result['latencies'] = json.dumps(scenario_result['latencies'])
127    scenario_result['serverCpuStats'] = []
128    for stats in scenario_result['serverStats']:
129        scenario_result['serverCpuStats'].append(dict())
130        scenario_result['serverCpuStats'][-1]['totalCpuTime'] = stats.pop(
131            'totalCpuTime', None)
132        scenario_result['serverCpuStats'][-1]['idleCpuTime'] = stats.pop(
133            'idleCpuTime', None)
134    for stats in scenario_result['clientStats']:
135        stats['latencies'] = json.dumps(stats['latencies'])
136        stats.pop('requestResults', None)
137    scenario_result['serverCores'] = json.dumps(scenario_result['serverCores'])
138    scenario_result['clientSuccess'] = json.dumps(
139        scenario_result['clientSuccess'])
140    scenario_result['serverSuccess'] = json.dumps(
141        scenario_result['serverSuccess'])
142    scenario_result['requestResults'] = json.dumps(
143        scenario_result.get('requestResults', []))
144    scenario_result['serverCpuUsage'] = scenario_result['summary'].pop(
145        'serverCpuUsage', None)
146    scenario_result['summary'].pop('successfulRequestsPerSecond', None)
147    scenario_result['summary'].pop('failedRequestsPerSecond', None)
148
149
150def _populate_metadata_inplace(scenario_result):
151    """Populates metadata based on environment variables set by Jenkins."""
152    # NOTE: Grabbing the Kokoro environment variables will only work if the
153    # driver is running locally on the same machine where Kokoro has started
154    # the job. For our setup, this is currently the case, so just assume that.
155    build_number = os.getenv('KOKORO_BUILD_NUMBER')
156    build_url = 'https://source.cloud.google.com/results/invocations/%s' % os.getenv(
157        'KOKORO_BUILD_ID')
158    job_name = os.getenv('KOKORO_JOB_NAME')
159    git_commit = os.getenv('KOKORO_GIT_COMMIT')
160    # actual commit is the actual head of PR that is getting tested
161    # TODO(jtattermusch): unclear how to obtain on Kokoro
162    git_actual_commit = os.getenv('ghprbActualCommit')
163
164    utc_timestamp = str(calendar.timegm(time.gmtime()))
165    metadata = {'created': utc_timestamp}
166
167    if build_number:
168        metadata['buildNumber'] = build_number
169    if build_url:
170        metadata['buildUrl'] = build_url
171    if job_name:
172        metadata['jobName'] = job_name
173    if git_commit:
174        metadata['gitCommit'] = git_commit
175    if git_actual_commit:
176        metadata['gitActualCommit'] = git_actual_commit
177
178    scenario_result['metadata'] = metadata
179
180
181def _populate_metadata_from_file(scenario_result, test_metadata_file):
182    utc_timestamp = str(calendar.timegm(time.gmtime()))
183    metadata = {'created': utc_timestamp}
184
185    _annotation_to_bq_metadata_key_map = {
186        'ci_' + key: key for key in (
187            'buildNumber',
188            'buildUrl',
189            'jobName',
190            'gitCommit',
191            'gitActualCommit',
192        )
193    }
194
195    if os.access(test_metadata_file, os.R_OK):
196        with open(test_metadata_file, 'r') as f:
197            test_metadata = json.loads(f.read())
198
199        # eliminate managedFields from metadata set
200        if 'managedFields' in test_metadata:
201            del test_metadata['managedFields']
202
203        annotations = test_metadata.get('annotations', {})
204
205        # if use kubectl apply ..., kubectl will append current configuration to
206        # annotation, the field is deleted since it includes a lot of irrelevant
207        # information
208        if 'kubectl.kubernetes.io/last-applied-configuration' in annotations:
209            del annotations['kubectl.kubernetes.io/last-applied-configuration']
210
211        # dump all metadata as JSON to testMetadata field
212        scenario_result['testMetadata'] = json.dumps(test_metadata)
213        for key, value in _annotation_to_bq_metadata_key_map.items():
214            if key in annotations:
215                metadata[value] = annotations[key]
216
217    scenario_result['metadata'] = metadata
218
219
220def _populate_node_metadata_from_file(scenario_result, node_info_file):
221    node_metadata = {'driver': {}, 'servers': [], 'clients': []}
222    _node_info_to_bq_node_metadata_key_map = {
223        'Name': 'name',
224        'PodIP': 'podIP',
225        'NodeName': 'nodeName',
226    }
227
228    if os.access(node_info_file, os.R_OK):
229        with open(node_info_file, 'r') as f:
230            file_metadata = json.loads(f.read())
231        for key, value in _node_info_to_bq_node_metadata_key_map.items():
232            node_metadata['driver'][value] = file_metadata['Driver'][key]
233        for clientNodeInfo in file_metadata['Clients']:
234            node_metadata['clients'].append({
235                value: clientNodeInfo[key] for key, value in
236                _node_info_to_bq_node_metadata_key_map.items()
237            })
238        for serverNodeInfo in file_metadata['Servers']:
239            node_metadata['servers'].append({
240                value: serverNodeInfo[key] for key, value in
241                _node_info_to_bq_node_metadata_key_map.items()
242            })
243
244    scenario_result['nodeMetadata'] = node_metadata
245
246
247def _populate_prometheus_query_results_from_file(scenario_result,
248                                                 prometheus_query_result_file):
249    """Populate the results from Prometheus query to Bigquery table """
250    if os.access(prometheus_query_result_file, os.R_OK):
251        with open(prometheus_query_result_file, 'r', encoding='utf8') as f:
252            file_query_results = json.loads(f.read())
253
254            scenario_result['testDurationSeconds'] = file_query_results[
255                'testDurationSeconds']
256            clientsPrometheusData = []
257            if 'clients' in file_query_results:
258                for client_name, client_data in file_query_results[
259                        'clients'].items():
260                    clientPrometheusData = {'name': client_name}
261                    containersPrometheusData = []
262                    for container_name, container_data in client_data.items():
263                        containerPrometheusData = {
264                            'name': container_name,
265                            'cpuSeconds': container_data['cpuSeconds'],
266                            'memoryMean': container_data['memoryMean'],
267                        }
268                        containersPrometheusData.append(containerPrometheusData)
269                    clientPrometheusData[
270                        'containers'] = containersPrometheusData
271                    clientsPrometheusData.append(clientPrometheusData)
272                scenario_result['clientsPrometheusData'] = clientsPrometheusData
273
274            serversPrometheusData = []
275            if 'servers' in file_query_results:
276                for server_name, server_data in file_query_results[
277                        'servers'].items():
278                    serverPrometheusData = {'name': server_name}
279                    containersPrometheusData = []
280                    for container_name, container_data in server_data.items():
281                        containerPrometheusData = {
282                            'name': container_name,
283                            'cpuSeconds': container_data['cpuSeconds'],
284                            'memoryMean': container_data['memoryMean'],
285                        }
286                        containersPrometheusData.append(containerPrometheusData)
287                    serverPrometheusData[
288                        'containers'] = containersPrometheusData
289                    serversPrometheusData.append(serverPrometheusData)
290            scenario_result['serversPrometheusData'] = serversPrometheusData
291
292
293argp = argparse.ArgumentParser(description='Upload result to big query.')
294argp.add_argument('--bq_result_table',
295                  required=True,
296                  default=None,
297                  type=str,
298                  help='Bigquery "dataset.table" to upload results to.')
299argp.add_argument('--file_to_upload',
300                  default='scenario_result.json',
301                  type=str,
302                  help='Report file to upload.')
303argp.add_argument('--metadata_file_to_upload',
304                  default='metadata.json',
305                  type=str,
306                  help='Metadata file to upload.')
307argp.add_argument('--node_info_file_to_upload',
308                  default='node_info.json',
309                  type=str,
310                  help='Node information file to upload.')
311argp.add_argument('--prometheus_query_results_to_upload',
312                  default='prometheus_query_result.json',
313                  type=str,
314                  help='Prometheus query result file to upload.')
315argp.add_argument('--file_format',
316                  choices=['scenario_result', 'netperf_latency_csv'],
317                  default='scenario_result',
318                  help='Format of the file to upload.')
319
320args = argp.parse_args()
321
322dataset_id, table_id = args.bq_result_table.split('.', 2)
323
324if args.file_format == 'netperf_latency_csv':
325    _upload_netperf_latency_csv_to_bigquery(dataset_id, table_id,
326                                            args.file_to_upload)
327else:
328    _upload_scenario_result_to_bigquery(dataset_id, table_id,
329                                        args.file_to_upload,
330                                        args.metadata_file_to_upload,
331                                        args.node_info_file_to_upload,
332                                        args.prometheus_query_results_to_upload)
333print('Successfully uploaded %s, %s, %s and %s to BigQuery.\n' %
334      (args.file_to_upload, args.metadata_file_to_upload,
335       args.node_info_file_to_upload, args.prometheus_query_results_to_upload))
336