1#!/usr/bin/env python3 2 3# Copyright 2022 The gRPC Authors 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# A script to fetch total cpu seconds and memory data from prometheus. 18# example usage: python3 prometheus.py 19# --url=http://prometheus.prometheus.svc.cluster.local:9090 20# --pod_type=driver --pod_type=clients --container_name=main 21# --container_name=sidecar 22"""Perform Prometheus range queries to obtain cpu and memory data. 23 24This module performs range queries through Prometheus API to obtain 25total cpu seconds and memory during a test run for given container 26in given pods. The cpu data obtained is total cpu second used within 27given period of time. The memory data was instant memory usage at 28the query time. 29""" 30import argparse 31import json 32import logging 33import statistics 34import time 35from typing import Any, Dict, List 36 37from dateutil import parser 38import requests 39 40 41class Prometheus: 42 """Objects which holds the start time, end time and query URL.""" 43 44 def __init__( 45 self, 46 url: str, 47 start: str, 48 end: str, 49 ): 50 self.url = url 51 self.start = start 52 self.end = end 53 54 def _fetch_by_query(self, query: str) -> Dict[str, Any]: 55 """Fetches the given query with time range. 56 57 Fetch the given query within a time range. The pulling 58 interval is every 5s, the actual data from the query is 59 a time series. 60 """ 61 resp = requests.get( 62 self.url + '/api/v1/query_range', 63 { 64 'query': query, 65 'start': self.start, 66 'end': self.end, 67 'step': 5 68 }, 69 ) 70 resp.raise_for_status() 71 return resp.json() 72 73 def _fetch_cpu_for_pod(self, container_matcher: str, 74 pod_name: str) -> Dict[str, List[float]]: 75 """Fetches the cpu data for each pod. 76 77 Fetch total cpu seconds during the time range specified in the Prometheus instance 78 for a pod. After obtain the cpu seconds, the data are trimmed from time series to 79 a data list and saved in the Dict that keyed by the container names. 80 81 Args: 82 container_matcher: A string consist one or more container name separated by |. 83 """ 84 query = ( 85 'container_cpu_usage_seconds_total{job="kubernetes-cadvisor",pod="' 86 + pod_name + '",container=' + container_matcher + '}') 87 logging.debug('running prometheus query for cpu: %s', query) 88 cpu_data = self._fetch_by_query(query) 89 logging.debug('raw cpu data: %s', str(cpu_data)) 90 cpu_container_name_to_data_list = get_data_list_from_timeseries( 91 cpu_data) 92 return cpu_container_name_to_data_list 93 94 def _fetch_memory_for_pod(self, container_matcher: str, 95 pod_name: str) -> Dict[str, List[float]]: 96 """Fetches memory data for each pod. 97 98 Fetch total memory data during the time range specified in the Prometheus instance 99 for a pod. After obtain the memory data, the data are trimmed from time series to 100 a data list and saved in the Dict that keyed by the container names. 101 102 Args: 103 container_matcher: A string consist one or more container name separated by |. 104 """ 105 query = ( 106 'container_memory_usage_bytes{job="kubernetes-cadvisor",pod="' + 107 pod_name + '",container=' + container_matcher + "}") 108 109 logging.debug('running prometheus query for memory: %s', query) 110 memory_data = self._fetch_by_query(query) 111 112 logging.debug('raw memory data: %s', str(memory_data)) 113 memory_container_name_to_data_list = get_data_list_from_timeseries( 114 memory_data) 115 116 return memory_container_name_to_data_list 117 118 def fetch_cpu_and_memory_data( 119 self, container_list: List[str], 120 pod_dict: Dict[str, List[str]]) -> Dict[str, Any]: 121 """Fetch total cpu seconds and memory data for multiple pods. 122 123 Args: 124 container_list: A list of container names to fetch the data for. 125 pod_dict: the pods to fetch data for, the pod_dict is keyed by 126 role of the pod: clients, driver and servers. The values 127 for the pod_dict are the list of pod names that consist 128 the same role specified in the key. 129 """ 130 container_matcher = construct_container_matcher(container_list) 131 processed_data = {} 132 for role, pod_names in pod_dict.items(): 133 pod_data = {} 134 for pod in pod_names: 135 container_data = {} 136 for container, data in self._fetch_cpu_for_pod( 137 container_matcher, pod).items(): 138 container_data[container] = {} 139 container_data[container][ 140 'cpuSeconds'] = compute_total_cpu_seconds(data) 141 142 for container, data in self._fetch_memory_for_pod( 143 container_matcher, pod).items(): 144 container_data[container][ 145 'memoryMean'] = compute_average_memory_usage(data) 146 147 pod_data[pod] = container_data 148 processed_data[role] = pod_data 149 return processed_data 150 151 152def construct_container_matcher(container_list: List[str]) -> str: 153 """Constructs the container matching string used in the 154 prometheus query.""" 155 if len(container_list) == 0: 156 raise Exception('no container name provided') 157 158 containers_to_fetch = '"' 159 if len(container_list) == 1: 160 containers_to_fetch = container_list[0] 161 else: 162 containers_to_fetch = '~"' + container_list[0] 163 for container in container_list[1:]: 164 containers_to_fetch = containers_to_fetch + '|' + container 165 containers_to_fetch = containers_to_fetch + '"' 166 return containers_to_fetch 167 168 169def get_data_list_from_timeseries(data: Any) -> Dict[str, List[float]]: 170 """Constructs a Dict as keys are the container names and 171 values are a list of data taken from given timeseries data.""" 172 if data['status'] != 'success': 173 raise Exception('command failed: ' + data['status'] + str(data)) 174 if data['data']['resultType'] != 'matrix': 175 raise Exception('resultType is not matrix: ' + 176 data['data']['resultType']) 177 178 container_name_to_data_list = {} 179 for res in data["data"]["result"]: 180 container_name = res["metric"]["container"] 181 container_data_timeseries = res["values"] 182 183 container_data = [] 184 for d in container_data_timeseries: 185 container_data.append(float(d[1])) 186 container_name_to_data_list[container_name] = container_data 187 return container_name_to_data_list 188 189 190def compute_total_cpu_seconds(cpu_data_list: List[float]) -> float: 191 """Computes the total cpu seconds by CPUs[end]-CPUs[start].""" 192 return cpu_data_list[len(cpu_data_list) - 1] - cpu_data_list[0] 193 194 195def compute_average_memory_usage(memory_data_list: List[float]) -> float: 196 """Computes the mean and for a given list of data.""" 197 return statistics.mean(memory_data_list) 198 199 200def construct_pod_dict(node_info_file: str, 201 pod_types: List[str]) -> Dict[str, List[str]]: 202 """Constructs a dict of pod names to be queried. 203 204 Args: 205 node_info_file: The file path contains the pod names to query. 206 The pods' names are put into a Dict of list that keyed by the 207 role name: clients, servers and driver. 208 """ 209 with open(node_info_file, 'r') as f: 210 pod_names = json.load(f) 211 pod_type_to_name = {'clients': [], 'driver': [], 'servers': []} 212 213 for client in pod_names['Clients']: 214 pod_type_to_name['clients'].append(client['Name']) 215 for server in pod_names['Servers']: 216 pod_type_to_name['servers'].append(server['Name']) 217 218 pod_type_to_name["driver"].append(pod_names['Driver']['Name']) 219 220 pod_names_to_query = {} 221 for pod_type in pod_types: 222 pod_names_to_query[pod_type] = pod_type_to_name[pod_type] 223 return pod_names_to_query 224 225 226def convert_UTC_to_epoch(utc_timestamp: str) -> str: 227 """Converts a utc timestamp string to epoch time string.""" 228 parsed_time = parser.parse(utc_timestamp) 229 epoch = parsed_time.strftime('%s') 230 return epoch 231 232 233def main() -> None: 234 argp = argparse.ArgumentParser( 235 description='Fetch cpu and memory stats from prometheus') 236 argp.add_argument('--url', help='Prometheus base url', required=True) 237 argp.add_argument( 238 '--scenario_result_file', 239 default='scenario_result.json', 240 type=str, 241 help='File contains epoch seconds for start and end time', 242 ) 243 argp.add_argument( 244 '--node_info_file', 245 default='/var/data/qps_workers/node_info.json', 246 help='File contains pod name to query the metrics for', 247 ) 248 argp.add_argument( 249 '--pod_type', 250 action='append', 251 help= 252 'Pod type to query the metrics for, the options are driver, client and server', 253 choices=['driver', 'clients', 'servers'], 254 required=True, 255 ) 256 argp.add_argument( 257 '--container_name', 258 action='append', 259 help='The container names to query the metrics for', 260 required=True, 261 ) 262 argp.add_argument( 263 '--export_file_name', 264 default='prometheus_query_result.json', 265 type=str, 266 help='Name of exported JSON file.', 267 ) 268 argp.add_argument( 269 '--quiet', 270 default=False, 271 help='Suppress informative output', 272 ) 273 argp.add_argument( 274 '--delay_seconds', 275 default=0, 276 type=int, 277 help= 278 'Configure delay in seconds to perform Prometheus queries, default is 0', 279 ) 280 args = argp.parse_args() 281 282 if not args.quiet: 283 logging.getLogger().setLevel(logging.DEBUG) 284 285 with open(args.scenario_result_file, 'r') as q: 286 scenario_result = json.load(q) 287 start_time = convert_UTC_to_epoch( 288 scenario_result['summary']['startTime']) 289 end_time = convert_UTC_to_epoch(scenario_result['summary']['endTime']) 290 p = Prometheus( 291 url=args.url, 292 start=start_time, 293 end=end_time, 294 ) 295 296 time.sleep(args.delay_seconds) 297 298 pod_dict = construct_pod_dict(args.node_info_file, args.pod_type) 299 processed_data = p.fetch_cpu_and_memory_data( 300 container_list=args.container_name, pod_dict=pod_dict) 301 processed_data['testDurationSeconds'] = float(end_time) - float(start_time) 302 303 logging.debug(json.dumps(processed_data, sort_keys=True, indent=4)) 304 305 with open(args.export_file_name, 'w', encoding='utf8') as export_file: 306 json.dump(processed_data, export_file, sort_keys=True, indent=4) 307 308 309if __name__ == '__main__': 310 main() 311