1#!/usr/bin/env python3 2# Copyright 2022 The gRPC Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# A script to fetch total cpu seconds and memory data from prometheus. 17# example usage: python3 prometheus.py 18# --url=http://prometheus.prometheus.svc.cluster.local:9090 19# --pod_type=driver --pod_type=clients --container_name=main 20# --container_name=sidecar 21"""Perform Prometheus range queries to obtain cpu and memory data. 22 23This module performs range queries through Prometheus API to obtain 24total cpu seconds and memory during a test run for given container 25in given pods. The cpu data obtained is total cpu second used within 26given period of time. The memory data was instant memory usage at 27the query time. 28""" 29import argparse 30import json 31import logging 32import statistics 33import time 34from typing import Any, Dict, List 35 36from dateutil import parser 37import requests 38 39 40class Prometheus: 41 """Objects which holds the start time, end time and query URL.""" 42 43 def __init__( 44 self, 45 url: str, 46 start: str, 47 end: str, 48 ): 49 self.url = url 50 self.start = start 51 self.end = end 52 53 def _fetch_by_query(self, query: str) -> Dict[str, Any]: 54 """Fetches the given query with time range. 55 56 Fetch the given query within a time range. The pulling 57 interval is every 5s, the actual data from the query is 58 a time series. 59 """ 60 resp = requests.get( 61 self.url + "/api/v1/query_range", 62 {"query": query, "start": self.start, "end": self.end, "step": 5}, 63 ) 64 resp.raise_for_status() 65 return resp.json() 66 67 def _fetch_cpu_for_pod( 68 self, container_matcher: str, pod_name: str 69 ) -> Dict[str, List[float]]: 70 """Fetches the cpu data for each pod. 71 72 Fetch total cpu seconds during the time range specified in the Prometheus instance 73 for a pod. After obtain the cpu seconds, the data are trimmed from time series to 74 a data list and saved in the Dict that keyed by the container names. 75 76 Args: 77 container_matcher: A string consist one or more container name separated by |. 78 """ 79 query = ( 80 'container_cpu_usage_seconds_total{job="kubernetes-cadvisor",pod="' 81 + pod_name 82 + '",container=' 83 + container_matcher 84 + "}" 85 ) 86 logging.debug("running prometheus query for cpu: %s", query) 87 cpu_data = self._fetch_by_query(query) 88 logging.debug("raw cpu data: %s", str(cpu_data)) 89 cpu_container_name_to_data_list = get_data_list_from_timeseries( 90 cpu_data 91 ) 92 return cpu_container_name_to_data_list 93 94 def _fetch_memory_for_pod( 95 self, container_matcher: str, pod_name: str 96 ) -> Dict[str, List[float]]: 97 """Fetches memory data for each pod. 98 99 Fetch total memory data during the time range specified in the Prometheus instance 100 for a pod. After obtain the memory data, the data are trimmed from time series to 101 a data list and saved in the Dict that keyed by the container names. 102 103 Args: 104 container_matcher: A string consist one or more container name separated by |. 105 """ 106 query = ( 107 'container_memory_usage_bytes{job="kubernetes-cadvisor",pod="' 108 + pod_name 109 + '",container=' 110 + container_matcher 111 + "}" 112 ) 113 114 logging.debug("running prometheus query for memory: %s", query) 115 memory_data = self._fetch_by_query(query) 116 117 logging.debug("raw memory data: %s", str(memory_data)) 118 memory_container_name_to_data_list = get_data_list_from_timeseries( 119 memory_data 120 ) 121 122 return memory_container_name_to_data_list 123 124 def fetch_cpu_and_memory_data( 125 self, container_list: List[str], pod_dict: Dict[str, List[str]] 126 ) -> Dict[str, Any]: 127 """Fetch total cpu seconds and memory data for multiple pods. 128 129 Args: 130 container_list: A list of container names to fetch the data for. 131 pod_dict: the pods to fetch data for, the pod_dict is keyed by 132 role of the pod: clients, driver and servers. The values 133 for the pod_dict are the list of pod names that consist 134 the same role specified in the key. 135 """ 136 container_matcher = construct_container_matcher(container_list) 137 processed_data = {} 138 for role, pod_names in pod_dict.items(): 139 pod_data = {} 140 for pod in pod_names: 141 container_data = {} 142 for container, data in self._fetch_cpu_for_pod( 143 container_matcher, pod 144 ).items(): 145 container_data[container] = {} 146 container_data[container][ 147 "cpuSeconds" 148 ] = compute_total_cpu_seconds(data) 149 150 for container, data in self._fetch_memory_for_pod( 151 container_matcher, pod 152 ).items(): 153 container_data[container][ 154 "memoryMean" 155 ] = compute_average_memory_usage(data) 156 157 pod_data[pod] = container_data 158 processed_data[role] = pod_data 159 return processed_data 160 161 162def construct_container_matcher(container_list: List[str]) -> str: 163 """Constructs the container matching string used in the 164 prometheus query.""" 165 if len(container_list) == 0: 166 raise Exception("no container name provided") 167 168 containers_to_fetch = '"' 169 if len(container_list) == 1: 170 containers_to_fetch = container_list[0] 171 else: 172 containers_to_fetch = '~"' + container_list[0] 173 for container in container_list[1:]: 174 containers_to_fetch = containers_to_fetch + "|" + container 175 containers_to_fetch = containers_to_fetch + '"' 176 return containers_to_fetch 177 178 179def get_data_list_from_timeseries(data: Any) -> Dict[str, List[float]]: 180 """Constructs a Dict as keys are the container names and 181 values are a list of data taken from given timeseries data.""" 182 if data["status"] != "success": 183 raise Exception("command failed: " + data["status"] + str(data)) 184 if data["data"]["resultType"] != "matrix": 185 raise Exception( 186 "resultType is not matrix: " + data["data"]["resultType"] 187 ) 188 189 container_name_to_data_list = {} 190 for res in data["data"]["result"]: 191 container_name = res["metric"]["container"] 192 container_data_timeseries = res["values"] 193 194 container_data = [] 195 for d in container_data_timeseries: 196 container_data.append(float(d[1])) 197 container_name_to_data_list[container_name] = container_data 198 return container_name_to_data_list 199 200 201def compute_total_cpu_seconds(cpu_data_list: List[float]) -> float: 202 """Computes the total cpu seconds by CPUs[end]-CPUs[start].""" 203 return cpu_data_list[len(cpu_data_list) - 1] - cpu_data_list[0] 204 205 206def compute_average_memory_usage(memory_data_list: List[float]) -> float: 207 """Computes the mean and for a given list of data.""" 208 return statistics.mean(memory_data_list) 209 210 211def construct_pod_dict( 212 node_info_file: str, pod_types: List[str] 213) -> Dict[str, List[str]]: 214 """Constructs a dict of pod names to be queried. 215 216 Args: 217 node_info_file: The file path contains the pod names to query. 218 The pods' names are put into a Dict of list that keyed by the 219 role name: clients, servers and driver. 220 """ 221 with open(node_info_file, "r") as f: 222 pod_names = json.load(f) 223 pod_type_to_name = {"clients": [], "driver": [], "servers": []} 224 225 for client in pod_names["Clients"]: 226 pod_type_to_name["clients"].append(client["Name"]) 227 for server in pod_names["Servers"]: 228 pod_type_to_name["servers"].append(server["Name"]) 229 230 pod_type_to_name["driver"].append(pod_names["Driver"]["Name"]) 231 232 pod_names_to_query = {} 233 for pod_type in pod_types: 234 pod_names_to_query[pod_type] = pod_type_to_name[pod_type] 235 return pod_names_to_query 236 237 238def convert_UTC_to_epoch(utc_timestamp: str) -> str: 239 """Converts a utc timestamp string to epoch time string.""" 240 parsed_time = parser.parse(utc_timestamp) 241 epoch = parsed_time.strftime("%s") 242 return epoch 243 244 245def main() -> None: 246 argp = argparse.ArgumentParser( 247 description="Fetch cpu and memory stats from prometheus" 248 ) 249 argp.add_argument("--url", help="Prometheus base url", required=True) 250 argp.add_argument( 251 "--scenario_result_file", 252 default="scenario_result.json", 253 type=str, 254 help="File contains epoch seconds for start and end time", 255 ) 256 argp.add_argument( 257 "--node_info_file", 258 default="/var/data/qps_workers/node_info.json", 259 help="File contains pod name to query the metrics for", 260 ) 261 argp.add_argument( 262 "--pod_type", 263 action="append", 264 help=( 265 "Pod type to query the metrics for, the options are driver, client" 266 " and server" 267 ), 268 choices=["driver", "clients", "servers"], 269 required=True, 270 ) 271 argp.add_argument( 272 "--container_name", 273 action="append", 274 help="The container names to query the metrics for", 275 required=True, 276 ) 277 argp.add_argument( 278 "--export_file_name", 279 default="prometheus_query_result.json", 280 type=str, 281 help="Name of exported JSON file.", 282 ) 283 argp.add_argument( 284 "--quiet", 285 default=False, 286 help="Suppress informative output", 287 ) 288 argp.add_argument( 289 "--delay_seconds", 290 default=0, 291 type=int, 292 help=( 293 "Configure delay in seconds to perform Prometheus queries, default" 294 " is 0" 295 ), 296 ) 297 args = argp.parse_args() 298 299 if not args.quiet: 300 logging.getLogger().setLevel(logging.DEBUG) 301 302 with open(args.scenario_result_file, "r") as q: 303 scenario_result = json.load(q) 304 start_time = convert_UTC_to_epoch( 305 scenario_result["summary"]["startTime"] 306 ) 307 end_time = convert_UTC_to_epoch(scenario_result["summary"]["endTime"]) 308 p = Prometheus( 309 url=args.url, 310 start=start_time, 311 end=end_time, 312 ) 313 314 time.sleep(args.delay_seconds) 315 316 pod_dict = construct_pod_dict(args.node_info_file, args.pod_type) 317 processed_data = p.fetch_cpu_and_memory_data( 318 container_list=args.container_name, pod_dict=pod_dict 319 ) 320 processed_data["testDurationSeconds"] = float(end_time) - float(start_time) 321 322 logging.debug( 323 "%s: %s", 324 args.export_file_name, 325 json.dumps(processed_data, sort_keys=True, indent=4), 326 ) 327 328 with open(args.export_file_name, "w", encoding="utf8") as export_file: 329 json.dump(processed_data, export_file, sort_keys=True, indent=4) 330 331 332if __name__ == "__main__": 333 main() 334