1import argparse 2import logging 3from datetime import datetime 4from pathlib import Path 5 6from structured_logger import StructuredLogger 7 8 9class CustomLogger: 10 def __init__(self, log_file): 11 self.log_file = log_file 12 self.logger = StructuredLogger(file_name=self.log_file) 13 14 def get_last_dut_job(self): 15 """ 16 Gets the details of the most recent DUT job. 17 18 Returns: 19 dict: Details of the most recent DUT job. 20 21 Raises: 22 ValueError: If no DUT jobs are found in the logger's data. 23 """ 24 try: 25 job = self.logger.data["dut_jobs"][-1] 26 except KeyError: 27 raise ValueError( 28 "No DUT jobs found. Please create a job via create_dut_job call." 29 ) 30 31 return job 32 33 def update(self, **kwargs): 34 """ 35 Updates the log file with provided key-value pairs. 36 37 Args: 38 **kwargs: Key-value pairs to be updated. 39 40 """ 41 with self.logger.edit_context(): 42 for key, value in kwargs.items(): 43 self.logger.data[key] = value 44 45 def create_dut_job(self, **kwargs): 46 """ 47 Creates a new DUT job with provided key-value pairs. 48 49 Args: 50 **kwargs: Key-value pairs for the new DUT job. 51 52 """ 53 with self.logger.edit_context(): 54 if "dut_jobs" not in self.logger.data: 55 self.logger.data["dut_jobs"] = [] 56 new_job = { 57 "status": "", 58 "submitter_start_time": datetime.now().isoformat(), 59 "dut_submit_time": "", 60 "dut_start_time": "", 61 "dut_end_time": "", 62 "dut_name": "", 63 "dut_state": "pending", 64 "dut_job_phases": [], 65 **kwargs, 66 } 67 self.logger.data["dut_jobs"].append(new_job) 68 69 def update_dut_job(self, key, value): 70 """ 71 Updates the last DUT job with a key-value pair. 72 73 Args: 74 key : The key to be updated. 75 value: The value to be assigned. 76 77 """ 78 with self.logger.edit_context(): 79 job = self.get_last_dut_job() 80 job[key] = value 81 82 def update_status_fail(self, reason=""): 83 """ 84 Sets the status of the last DUT job to 'fail' and logs the failure reason. 85 86 Args: 87 reason (str, optional): The reason for the failure. Defaults to "". 88 89 """ 90 with self.logger.edit_context(): 91 job = self.get_last_dut_job() 92 job["status"] = "fail" 93 job["dut_job_fail_reason"] = reason 94 95 def create_job_phase(self, phase_name): 96 """ 97 Creates a new job phase for the last DUT job. 98 99 Args: 100 phase_name : The name of the new job phase. 101 102 """ 103 with self.logger.edit_context(): 104 job = self.get_last_dut_job() 105 if job["dut_job_phases"] and job["dut_job_phases"][-1]["end_time"] == "": 106 # If the last phase exists and its end time is empty, set the end time 107 job["dut_job_phases"][-1]["end_time"] = datetime.now().isoformat() 108 109 # Create a new phase 110 phase_data = { 111 "name": phase_name, 112 "start_time": datetime.now().isoformat(), 113 "end_time": "", 114 } 115 job["dut_job_phases"].append(phase_data) 116 117 def check_dut_timings(self, job): 118 """ 119 Check the timing sequence of a job to ensure logical consistency. 120 121 The function verifies that the job's submission time is not earlier than its start time and that 122 the job's end time is not earlier than its start time. If either of these conditions is found to be true, 123 an error is logged for each instance of inconsistency. 124 125 Args: 126 job (dict): A dictionary containing timing information of a job. Expected keys are 'dut_start_time', 127 'dut_submit_time', and 'dut_end_time'. 128 129 Returns: 130 None: This function does not return a value; it logs errors if timing inconsistencies are detected. 131 132 The function checks the following: 133 - If 'dut_start_time' and 'dut_submit_time' are both present and correctly sequenced. 134 - If 'dut_start_time' and 'dut_end_time' are both present and correctly sequenced. 135 """ 136 137 # Check if the start time and submit time exist 138 if job.get("dut_start_time") and job.get("dut_submit_time"): 139 # If they exist, check if the submission time is before the start time 140 if job["dut_start_time"] < job["dut_submit_time"]: 141 logging.error("Job submission is happening before job start.") 142 143 # Check if the start time and end time exist 144 if job.get("dut_start_time") and job.get("dut_end_time"): 145 # If they exist, check if the end time is after the start time 146 if job["dut_end_time"] < job["dut_start_time"]: 147 logging.error("Job ended before it started.") 148 149 # Method to update DUT start, submit and end time 150 def update_dut_time(self, value, custom_time): 151 """ 152 Updates DUT start, submit, and end times. 153 154 Args: 155 value : Specifies which DUT time to update. Options: 'start', 'submit', 'end'. 156 custom_time : Custom time to set. If None, use current time. 157 158 Raises: 159 ValueError: If an invalid argument is provided for value. 160 161 """ 162 with self.logger.edit_context(): 163 job = self.get_last_dut_job() 164 timestamp = custom_time if custom_time else datetime.now().isoformat() 165 if value == "start": 166 job["dut_start_time"] = timestamp 167 job["dut_state"] = "running" 168 elif value == "submit": 169 job["dut_submit_time"] = timestamp 170 job["dut_state"] = "submitted" 171 elif value == "end": 172 job["dut_end_time"] = timestamp 173 job["dut_state"] = "finished" 174 else: 175 raise ValueError( 176 "Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'." 177 ) 178 # check the sanity of the partial structured log 179 self.check_dut_timings(job) 180 181 def close_dut_job(self): 182 """ 183 Closes the most recent DUT (Device Under Test) job in the logger's data. 184 185 The method performs the following operations: 186 1. Validates if there are any DUT jobs in the logger's data. 187 2. If the last phase of the most recent DUT job has an empty end time, it sets the end time to the current time. 188 189 Raises: 190 ValueError: If no DUT jobs are found in the logger's data. 191 """ 192 with self.logger.edit_context(): 193 job = self.get_last_dut_job() 194 # Check if the last phase exists and its end time is empty, then set the end time 195 if job["dut_job_phases"] and job["dut_job_phases"][-1]["end_time"] == "": 196 job["dut_job_phases"][-1]["end_time"] = datetime.now().isoformat() 197 198 def close(self): 199 """ 200 Closes the most recent DUT (Device Under Test) job in the logger's data. 201 202 The method performs the following operations: 203 1. Determines the combined status of all DUT jobs. 204 2. Sets the submitter's end time to the current time. 205 3. Updates the DUT attempt counter to reflect the total number of DUT jobs. 206 207 """ 208 with self.logger.edit_context(): 209 job_status = [] 210 for job in self.logger.data["dut_jobs"]: 211 if "status" in job: 212 job_status.append(job["status"]) 213 214 if not job_status: 215 job_combined_status = "null" 216 else: 217 # Get job_combined_status 218 if "pass" in job_status: 219 job_combined_status = "pass" 220 else: 221 job_combined_status = "fail" 222 223 self.logger.data["job_combined_status"] = job_combined_status 224 self.logger.data["dut_attempt_counter"] = len(self.logger.data["dut_jobs"]) 225 job["submitter_end_time"] = datetime.now().isoformat() 226 227 228def process_args(args): 229 # Function to process key-value pairs and call corresponding logger methods 230 def process_key_value_pairs(args_list, action_func): 231 if not args_list: 232 raise ValueError( 233 f"No key-value pairs provided for {action_func.__name__.replace('_', '-')}" 234 ) 235 if len(args_list) % 2 != 0: 236 raise ValueError( 237 f"Incomplete key-value pairs for {action_func.__name__.replace('_', '-')}" 238 ) 239 kwargs = dict(zip(args_list[::2], args_list[1::2])) 240 action_func(**kwargs) 241 242 # Create a CustomLogger object with the specified log file path 243 custom_logger = CustomLogger(Path(args.log_file)) 244 245 if args.update: 246 process_key_value_pairs(args.update, custom_logger.update) 247 248 if args.create_dut_job: 249 process_key_value_pairs(args.create_dut_job, custom_logger.create_dut_job) 250 251 if args.update_dut_job: 252 key, value = args.update_dut_job 253 custom_logger.update_dut_job(key, value) 254 255 if args.create_job_phase: 256 custom_logger.create_job_phase(args.create_job_phase) 257 258 if args.update_status_fail: 259 custom_logger.update_status_fail(args.update_status_fail) 260 261 if args.update_dut_time: 262 if len(args.update_dut_time) == 2: 263 action, custom_time = args.update_dut_time 264 elif len(args.update_dut_time) == 1: 265 action, custom_time = args.update_dut_time[0], None 266 else: 267 raise ValueError("Invalid number of values for --update-dut-time") 268 269 if action in ["start", "end", "submit"]: 270 custom_logger.update_dut_time(action, custom_time) 271 else: 272 raise ValueError( 273 "Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'." 274 ) 275 276 if args.close_dut_job: 277 custom_logger.close_dut_job() 278 279 if args.close: 280 custom_logger.close() 281 282 283def main(): 284 parser = argparse.ArgumentParser(description="Custom Logger Command Line Tool") 285 parser.add_argument("log_file", help="Path to the log file") 286 parser.add_argument( 287 "--update", 288 nargs=argparse.ZERO_OR_MORE, 289 metavar=("key", "value"), 290 help="Update a key-value pair e.g., --update key1 value1 key2 value2)", 291 ) 292 parser.add_argument( 293 "--create-dut-job", 294 nargs=argparse.ZERO_OR_MORE, 295 metavar=("key", "value"), 296 help="Create a new DUT job with key-value pairs (e.g., --create-dut-job key1 value1 key2 value2)", 297 ) 298 parser.add_argument( 299 "--update-dut-job", 300 nargs=argparse.ZERO_OR_MORE, 301 metavar=("key", "value"), 302 help="Update a key-value pair in DUT job", 303 ) 304 parser.add_argument( 305 "--create-job-phase", 306 help="Create a new job phase (e.g., --create-job-phase name)", 307 ) 308 parser.add_argument( 309 "--update-status-fail", 310 help="Update fail as the status and log the failure reason (e.g., --update-status-fail reason)", 311 ) 312 parser.add_argument( 313 "--update-dut-time", 314 nargs=argparse.ZERO_OR_MORE, 315 metavar=("action", "custom_time"), 316 help="Update DUT start and end time. Provide action ('start', 'submit', 'end') and custom_time (e.g., '2023-01-01T12:00:00')", 317 ) 318 parser.add_argument( 319 "--close-dut-job", 320 action="store_true", 321 help="Close the dut job by updating end time of last dut job)", 322 ) 323 parser.add_argument( 324 "--close", 325 action="store_true", 326 help="Updates combined status, submitter's end time and DUT attempt counter", 327 ) 328 args = parser.parse_args() 329 330 process_args(args) 331 332 333if __name__ == "__main__": 334 main() 335