1#!/usr/bin/env python3
2
3# Copyright 2022 The gRPC Authors
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# A script to fetch total cpu seconds and memory data from prometheus.
18# example usage: python3 prometheus.py
19# --url=http://prometheus.prometheus.svc.cluster.local:9090
20# --pod_type=driver --pod_type=clients --container_name=main
21# --container_name=sidecar
22"""Perform Prometheus range queries to obtain cpu and memory data.
23
24This module performs range queries through Prometheus API to obtain
25total cpu seconds and memory during a test run for given container
26in given pods. The cpu data obtained is total cpu second used within
27given period of time. The memory data was instant memory usage at
28the query time.
29"""
30import argparse
31import json
32import logging
33import statistics
34import time
35from typing import Any, Dict, List
36
37from dateutil import parser
38import requests
39
40
41class Prometheus:
42    """Objects which holds the start time, end time and query URL."""
43
44    def __init__(
45        self,
46        url: str,
47        start: str,
48        end: str,
49    ):
50        self.url = url
51        self.start = start
52        self.end = end
53
54    def _fetch_by_query(self, query: str) -> Dict[str, Any]:
55        """Fetches the given query with time range.
56
57        Fetch the given query within a time range. The pulling
58        interval is every 5s, the actual data from the query is
59        a time series.
60        """
61        resp = requests.get(
62            self.url + '/api/v1/query_range',
63            {
64                'query': query,
65                'start': self.start,
66                'end': self.end,
67                'step': 5
68            },
69        )
70        resp.raise_for_status()
71        return resp.json()
72
73    def _fetch_cpu_for_pod(self, container_matcher: str,
74                           pod_name: str) -> Dict[str, List[float]]:
75        """Fetches the cpu data for each pod.
76
77        Fetch total cpu seconds during the time range specified in the Prometheus instance
78        for a pod. After obtain the cpu seconds, the data are trimmed from time series to
79        a data list and saved in the Dict that keyed by the container names.
80
81        Args:
82            container_matcher:  A string consist one or more container name separated by |.
83        """
84        query = (
85            'container_cpu_usage_seconds_total{job="kubernetes-cadvisor",pod="'
86            + pod_name + '",container=' + container_matcher + '}')
87        logging.debug('running prometheus query for cpu: %s', query)
88        cpu_data = self._fetch_by_query(query)
89        logging.debug('raw cpu data: %s', str(cpu_data))
90        cpu_container_name_to_data_list = get_data_list_from_timeseries(
91            cpu_data)
92        return cpu_container_name_to_data_list
93
94    def _fetch_memory_for_pod(self, container_matcher: str,
95                              pod_name: str) -> Dict[str, List[float]]:
96        """Fetches memory data for each pod.
97
98        Fetch total memory data during the time range specified in the Prometheus instance
99        for a pod. After obtain the memory data, the data are trimmed from time series to
100        a data list and saved in the Dict that keyed by the container names.
101
102        Args:
103            container_matcher:  A string consist one or more container name separated by |.
104        """
105        query = (
106            'container_memory_usage_bytes{job="kubernetes-cadvisor",pod="' +
107            pod_name + '",container=' + container_matcher + "}")
108
109        logging.debug('running prometheus query for memory: %s', query)
110        memory_data = self._fetch_by_query(query)
111
112        logging.debug('raw memory data: %s', str(memory_data))
113        memory_container_name_to_data_list = get_data_list_from_timeseries(
114            memory_data)
115
116        return memory_container_name_to_data_list
117
118    def fetch_cpu_and_memory_data(
119            self, container_list: List[str],
120            pod_dict: Dict[str, List[str]]) -> Dict[str, Any]:
121        """Fetch total cpu seconds and memory data for multiple pods.
122
123        Args:
124            container_list: A list of container names to fetch the data for.
125            pod_dict: the pods to fetch data for, the pod_dict is keyed by
126                      role of the pod: clients, driver and servers. The values
127                      for the pod_dict are the list of pod names that consist
128                      the same role specified in the key.
129        """
130        container_matcher = construct_container_matcher(container_list)
131        processed_data = {}
132        for role, pod_names in pod_dict.items():
133            pod_data = {}
134            for pod in pod_names:
135                container_data = {}
136                for container, data in self._fetch_cpu_for_pod(
137                        container_matcher, pod).items():
138                    container_data[container] = {}
139                    container_data[container][
140                        'cpuSeconds'] = compute_total_cpu_seconds(data)
141
142                for container, data in self._fetch_memory_for_pod(
143                        container_matcher, pod).items():
144                    container_data[container][
145                        'memoryMean'] = compute_average_memory_usage(data)
146
147                pod_data[pod] = container_data
148            processed_data[role] = pod_data
149        return processed_data
150
151
152def construct_container_matcher(container_list: List[str]) -> str:
153    """Constructs the container matching string used in the
154    prometheus query."""
155    if len(container_list) == 0:
156        raise Exception('no container name provided')
157
158    containers_to_fetch = '"'
159    if len(container_list) == 1:
160        containers_to_fetch = container_list[0]
161    else:
162        containers_to_fetch = '~"' + container_list[0]
163        for container in container_list[1:]:
164            containers_to_fetch = containers_to_fetch + '|' + container
165        containers_to_fetch = containers_to_fetch + '"'
166    return containers_to_fetch
167
168
169def get_data_list_from_timeseries(data: Any) -> Dict[str, List[float]]:
170    """Constructs a Dict as keys are the container names and
171    values are a list of data taken from given timeseries data."""
172    if data['status'] != 'success':
173        raise Exception('command failed: ' + data['status'] + str(data))
174    if data['data']['resultType'] != 'matrix':
175        raise Exception('resultType is not matrix: ' +
176                        data['data']['resultType'])
177
178    container_name_to_data_list = {}
179    for res in data["data"]["result"]:
180        container_name = res["metric"]["container"]
181        container_data_timeseries = res["values"]
182
183        container_data = []
184        for d in container_data_timeseries:
185            container_data.append(float(d[1]))
186        container_name_to_data_list[container_name] = container_data
187    return container_name_to_data_list
188
189
190def compute_total_cpu_seconds(cpu_data_list: List[float]) -> float:
191    """Computes the total cpu seconds by CPUs[end]-CPUs[start]."""
192    return cpu_data_list[len(cpu_data_list) - 1] - cpu_data_list[0]
193
194
195def compute_average_memory_usage(memory_data_list: List[float]) -> float:
196    """Computes the mean and for a given list of data."""
197    return statistics.mean(memory_data_list)
198
199
200def construct_pod_dict(node_info_file: str,
201                       pod_types: List[str]) -> Dict[str, List[str]]:
202    """Constructs a dict of pod names to be queried.
203
204    Args:
205        node_info_file: The file path contains the pod names to query.
206            The pods' names are put into a Dict of list that keyed by the
207            role name: clients, servers and driver.
208    """
209    with open(node_info_file, 'r') as f:
210        pod_names = json.load(f)
211        pod_type_to_name = {'clients': [], 'driver': [], 'servers': []}
212
213        for client in pod_names['Clients']:
214            pod_type_to_name['clients'].append(client['Name'])
215        for server in pod_names['Servers']:
216            pod_type_to_name['servers'].append(server['Name'])
217
218        pod_type_to_name["driver"].append(pod_names['Driver']['Name'])
219
220    pod_names_to_query = {}
221    for pod_type in pod_types:
222        pod_names_to_query[pod_type] = pod_type_to_name[pod_type]
223    return pod_names_to_query
224
225
226def convert_UTC_to_epoch(utc_timestamp: str) -> str:
227    """Converts a utc timestamp string to epoch time string."""
228    parsed_time = parser.parse(utc_timestamp)
229    epoch = parsed_time.strftime('%s')
230    return epoch
231
232
233def main() -> None:
234    argp = argparse.ArgumentParser(
235        description='Fetch cpu and memory stats from prometheus')
236    argp.add_argument('--url', help='Prometheus base url', required=True)
237    argp.add_argument(
238        '--scenario_result_file',
239        default='scenario_result.json',
240        type=str,
241        help='File contains epoch seconds for start and end time',
242    )
243    argp.add_argument(
244        '--node_info_file',
245        default='/var/data/qps_workers/node_info.json',
246        help='File contains pod name to query the metrics for',
247    )
248    argp.add_argument(
249        '--pod_type',
250        action='append',
251        help=
252        'Pod type to query the metrics for, the options are driver, client and server',
253        choices=['driver', 'clients', 'servers'],
254        required=True,
255    )
256    argp.add_argument(
257        '--container_name',
258        action='append',
259        help='The container names to query the metrics for',
260        required=True,
261    )
262    argp.add_argument(
263        '--export_file_name',
264        default='prometheus_query_result.json',
265        type=str,
266        help='Name of exported JSON file.',
267    )
268    argp.add_argument(
269        '--quiet',
270        default=False,
271        help='Suppress informative output',
272    )
273    argp.add_argument(
274        '--delay_seconds',
275        default=0,
276        type=int,
277        help=
278        'Configure delay in seconds to perform Prometheus queries, default is 0',
279    )
280    args = argp.parse_args()
281
282    if not args.quiet:
283        logging.getLogger().setLevel(logging.DEBUG)
284
285    with open(args.scenario_result_file, 'r') as q:
286        scenario_result = json.load(q)
287        start_time = convert_UTC_to_epoch(
288            scenario_result['summary']['startTime'])
289        end_time = convert_UTC_to_epoch(scenario_result['summary']['endTime'])
290        p = Prometheus(
291            url=args.url,
292            start=start_time,
293            end=end_time,
294        )
295
296    time.sleep(args.delay_seconds)
297
298    pod_dict = construct_pod_dict(args.node_info_file, args.pod_type)
299    processed_data = p.fetch_cpu_and_memory_data(
300        container_list=args.container_name, pod_dict=pod_dict)
301    processed_data['testDurationSeconds'] = float(end_time) - float(start_time)
302
303    logging.debug(json.dumps(processed_data, sort_keys=True, indent=4))
304
305    with open(args.export_file_name, 'w', encoding='utf8') as export_file:
306        json.dump(processed_data, export_file, sort_keys=True, indent=4)
307
308
309if __name__ == '__main__':
310    main()
311