xref: /aosp_15_r20/external/grpc-grpc/tools/run_tests/performance/prometheus.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1#!/usr/bin/env python3
2# Copyright 2022 The gRPC Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# A script to fetch total cpu seconds and memory data from prometheus.
17# example usage: python3 prometheus.py
18# --url=http://prometheus.prometheus.svc.cluster.local:9090
19# --pod_type=driver --pod_type=clients --container_name=main
20# --container_name=sidecar
21"""Perform Prometheus range queries to obtain cpu and memory data.
22
23This module performs range queries through Prometheus API to obtain
24total cpu seconds and memory during a test run for given container
25in given pods. The cpu data obtained is total cpu second used within
26given period of time. The memory data was instant memory usage at
27the query time.
28"""
29import argparse
30import json
31import logging
32import statistics
33import time
34from typing import Any, Dict, List
35
36from dateutil import parser
37import requests
38
39
40class Prometheus:
41    """Objects which holds the start time, end time and query URL."""
42
43    def __init__(
44        self,
45        url: str,
46        start: str,
47        end: str,
48    ):
49        self.url = url
50        self.start = start
51        self.end = end
52
53    def _fetch_by_query(self, query: str) -> Dict[str, Any]:
54        """Fetches the given query with time range.
55
56        Fetch the given query within a time range. The pulling
57        interval is every 5s, the actual data from the query is
58        a time series.
59        """
60        resp = requests.get(
61            self.url + "/api/v1/query_range",
62            {"query": query, "start": self.start, "end": self.end, "step": 5},
63        )
64        resp.raise_for_status()
65        return resp.json()
66
67    def _fetch_cpu_for_pod(
68        self, container_matcher: str, pod_name: str
69    ) -> Dict[str, List[float]]:
70        """Fetches the cpu data for each pod.
71
72        Fetch total cpu seconds during the time range specified in the Prometheus instance
73        for a pod. After obtain the cpu seconds, the data are trimmed from time series to
74        a data list and saved in the Dict that keyed by the container names.
75
76        Args:
77            container_matcher:  A string consist one or more container name separated by |.
78        """
79        query = (
80            'container_cpu_usage_seconds_total{job="kubernetes-cadvisor",pod="'
81            + pod_name
82            + '",container='
83            + container_matcher
84            + "}"
85        )
86        logging.debug("running prometheus query for cpu: %s", query)
87        cpu_data = self._fetch_by_query(query)
88        logging.debug("raw cpu data: %s", str(cpu_data))
89        cpu_container_name_to_data_list = get_data_list_from_timeseries(
90            cpu_data
91        )
92        return cpu_container_name_to_data_list
93
94    def _fetch_memory_for_pod(
95        self, container_matcher: str, pod_name: str
96    ) -> Dict[str, List[float]]:
97        """Fetches memory data for each pod.
98
99        Fetch total memory data during the time range specified in the Prometheus instance
100        for a pod. After obtain the memory data, the data are trimmed from time series to
101        a data list and saved in the Dict that keyed by the container names.
102
103        Args:
104            container_matcher:  A string consist one or more container name separated by |.
105        """
106        query = (
107            'container_memory_usage_bytes{job="kubernetes-cadvisor",pod="'
108            + pod_name
109            + '",container='
110            + container_matcher
111            + "}"
112        )
113
114        logging.debug("running prometheus query for memory: %s", query)
115        memory_data = self._fetch_by_query(query)
116
117        logging.debug("raw memory data: %s", str(memory_data))
118        memory_container_name_to_data_list = get_data_list_from_timeseries(
119            memory_data
120        )
121
122        return memory_container_name_to_data_list
123
124    def fetch_cpu_and_memory_data(
125        self, container_list: List[str], pod_dict: Dict[str, List[str]]
126    ) -> Dict[str, Any]:
127        """Fetch total cpu seconds and memory data for multiple pods.
128
129        Args:
130            container_list: A list of container names to fetch the data for.
131            pod_dict: the pods to fetch data for, the pod_dict is keyed by
132                      role of the pod: clients, driver and servers. The values
133                      for the pod_dict are the list of pod names that consist
134                      the same role specified in the key.
135        """
136        container_matcher = construct_container_matcher(container_list)
137        processed_data = {}
138        for role, pod_names in pod_dict.items():
139            pod_data = {}
140            for pod in pod_names:
141                container_data = {}
142                for container, data in self._fetch_cpu_for_pod(
143                    container_matcher, pod
144                ).items():
145                    container_data[container] = {}
146                    container_data[container][
147                        "cpuSeconds"
148                    ] = compute_total_cpu_seconds(data)
149
150                for container, data in self._fetch_memory_for_pod(
151                    container_matcher, pod
152                ).items():
153                    container_data[container][
154                        "memoryMean"
155                    ] = compute_average_memory_usage(data)
156
157                pod_data[pod] = container_data
158            processed_data[role] = pod_data
159        return processed_data
160
161
162def construct_container_matcher(container_list: List[str]) -> str:
163    """Constructs the container matching string used in the
164    prometheus query."""
165    if len(container_list) == 0:
166        raise Exception("no container name provided")
167
168    containers_to_fetch = '"'
169    if len(container_list) == 1:
170        containers_to_fetch = container_list[0]
171    else:
172        containers_to_fetch = '~"' + container_list[0]
173        for container in container_list[1:]:
174            containers_to_fetch = containers_to_fetch + "|" + container
175        containers_to_fetch = containers_to_fetch + '"'
176    return containers_to_fetch
177
178
179def get_data_list_from_timeseries(data: Any) -> Dict[str, List[float]]:
180    """Constructs a Dict as keys are the container names and
181    values are a list of data taken from given timeseries data."""
182    if data["status"] != "success":
183        raise Exception("command failed: " + data["status"] + str(data))
184    if data["data"]["resultType"] != "matrix":
185        raise Exception(
186            "resultType is not matrix: " + data["data"]["resultType"]
187        )
188
189    container_name_to_data_list = {}
190    for res in data["data"]["result"]:
191        container_name = res["metric"]["container"]
192        container_data_timeseries = res["values"]
193
194        container_data = []
195        for d in container_data_timeseries:
196            container_data.append(float(d[1]))
197        container_name_to_data_list[container_name] = container_data
198    return container_name_to_data_list
199
200
201def compute_total_cpu_seconds(cpu_data_list: List[float]) -> float:
202    """Computes the total cpu seconds by CPUs[end]-CPUs[start]."""
203    return cpu_data_list[len(cpu_data_list) - 1] - cpu_data_list[0]
204
205
206def compute_average_memory_usage(memory_data_list: List[float]) -> float:
207    """Computes the mean and for a given list of data."""
208    return statistics.mean(memory_data_list)
209
210
211def construct_pod_dict(
212    node_info_file: str, pod_types: List[str]
213) -> Dict[str, List[str]]:
214    """Constructs a dict of pod names to be queried.
215
216    Args:
217        node_info_file: The file path contains the pod names to query.
218            The pods' names are put into a Dict of list that keyed by the
219            role name: clients, servers and driver.
220    """
221    with open(node_info_file, "r") as f:
222        pod_names = json.load(f)
223        pod_type_to_name = {"clients": [], "driver": [], "servers": []}
224
225        for client in pod_names["Clients"]:
226            pod_type_to_name["clients"].append(client["Name"])
227        for server in pod_names["Servers"]:
228            pod_type_to_name["servers"].append(server["Name"])
229
230        pod_type_to_name["driver"].append(pod_names["Driver"]["Name"])
231
232    pod_names_to_query = {}
233    for pod_type in pod_types:
234        pod_names_to_query[pod_type] = pod_type_to_name[pod_type]
235    return pod_names_to_query
236
237
238def convert_UTC_to_epoch(utc_timestamp: str) -> str:
239    """Converts a utc timestamp string to epoch time string."""
240    parsed_time = parser.parse(utc_timestamp)
241    epoch = parsed_time.strftime("%s")
242    return epoch
243
244
245def main() -> None:
246    argp = argparse.ArgumentParser(
247        description="Fetch cpu and memory stats from prometheus"
248    )
249    argp.add_argument("--url", help="Prometheus base url", required=True)
250    argp.add_argument(
251        "--scenario_result_file",
252        default="scenario_result.json",
253        type=str,
254        help="File contains epoch seconds for start and end time",
255    )
256    argp.add_argument(
257        "--node_info_file",
258        default="/var/data/qps_workers/node_info.json",
259        help="File contains pod name to query the metrics for",
260    )
261    argp.add_argument(
262        "--pod_type",
263        action="append",
264        help=(
265            "Pod type to query the metrics for, the options are driver, client"
266            " and server"
267        ),
268        choices=["driver", "clients", "servers"],
269        required=True,
270    )
271    argp.add_argument(
272        "--container_name",
273        action="append",
274        help="The container names to query the metrics for",
275        required=True,
276    )
277    argp.add_argument(
278        "--export_file_name",
279        default="prometheus_query_result.json",
280        type=str,
281        help="Name of exported JSON file.",
282    )
283    argp.add_argument(
284        "--quiet",
285        default=False,
286        help="Suppress informative output",
287    )
288    argp.add_argument(
289        "--delay_seconds",
290        default=0,
291        type=int,
292        help=(
293            "Configure delay in seconds to perform Prometheus queries, default"
294            " is 0"
295        ),
296    )
297    args = argp.parse_args()
298
299    if not args.quiet:
300        logging.getLogger().setLevel(logging.DEBUG)
301
302    with open(args.scenario_result_file, "r") as q:
303        scenario_result = json.load(q)
304        start_time = convert_UTC_to_epoch(
305            scenario_result["summary"]["startTime"]
306        )
307        end_time = convert_UTC_to_epoch(scenario_result["summary"]["endTime"])
308        p = Prometheus(
309            url=args.url,
310            start=start_time,
311            end=end_time,
312        )
313
314    time.sleep(args.delay_seconds)
315
316    pod_dict = construct_pod_dict(args.node_info_file, args.pod_type)
317    processed_data = p.fetch_cpu_and_memory_data(
318        container_list=args.container_name, pod_dict=pod_dict
319    )
320    processed_data["testDurationSeconds"] = float(end_time) - float(start_time)
321
322    logging.debug(
323        "%s: %s",
324        args.export_file_name,
325        json.dumps(processed_data, sort_keys=True, indent=4),
326    )
327
328    with open(args.export_file_name, "w", encoding="utf8") as export_file:
329        json.dump(processed_data, export_file, sort_keys=True, indent=4)
330
331
332if __name__ == "__main__":
333    main()
334