#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import re from collections import defaultdict from difflib import SequenceMatcher from typing import Any import requests from setuptools import distutils # type: ignore[import] ALL_SKIPPED_THRESHOLD = 100 SIMILARITY_THRESHOLD = 0.75 FAILURE_CHAIN_THRESHOLD = 2 MAX_CONCURRENT_ALERTS = 1 FAILED_JOB_PATTERN = ( r"^- \[(.*)\]\(.*\) failed consecutively starting with commit \[.*\]\(.*\)$" ) PENDING = "pending" NEUTRAL = "neutral" SKIPPED = "skipped" SUCCESS = "success" FAILURE = "failure" CANCELED = "canceled" ISSUES_WITH_LABEL_QUERY = """ query ($owner: String!, $name: String!, $labels: [String!]) { repository(owner: $owner, name: $name, followRenames: false) { issues(last: 10, labels: $labels, states: [OPEN]) { nodes { id title closed number body createdAt comments(first: 100) { nodes { bodyText databaseId } } } } } } """ NUM_ISSUES_QUERY = """ query ($query: String!) { search(type: ISSUE, query: $query) { issueCount } } """ DISABLED_ALERTS = [ "rerun_disabled_tests", "unstable", ] class JobStatus: job_name: str = "" jobs: list[Any] = [] current_status: Any = None job_statuses: list[Any] = [] filtered_statuses: list[Any] = [] failure_chain: list[Any] = [] flaky_jobs: list[Any] = [] def __init__(self, job_name: str, job_statuses: list[Any]) -> None: self.job_name = job_name self.job_statuses = job_statuses self.filtered_statuses = list( filter(lambda j: not is_job_skipped(j), job_statuses) ) self.current_status = self.get_current_status() self.failure_chain = self.get_most_recent_failure_chain() self.flaky_jobs = self.get_flaky_jobs() def get_current_status(self) -> Any: """ When getting the current status, we want the latest status which is not pending, be it success or failure """ for status in self.filtered_statuses: if status["conclusion"] != PENDING: return status return None def get_unique_failures(self, jobs: list[Any]) -> dict[str, list[Any]]: """ Returns list of jobs grouped by failureCaptures from the input list """ failures = defaultdict(list) for job in jobs: if job["conclusion"] == "failure": found_similar_failure = False if "failureCaptures" not in job: failures["unclassified"] = [job] continue # This is now a list returned by HUD API, not a string failureCaptures = " ".join(job["failureCaptures"]) for failure in failures: seq = SequenceMatcher(None, failureCaptures, failure) if seq.ratio() > SIMILARITY_THRESHOLD: failures[failure].append(job) found_similar_failure = True break if not found_similar_failure: failures[failureCaptures] = [job] return failures # A flaky job is if it's the only job that has that failureCapture and is not the most recent job def get_flaky_jobs(self) -> list[Any]: unique_failures = self.get_unique_failures(self.filtered_statuses) flaky_jobs = [] for failure in unique_failures: failure_list = unique_failures[failure] if ( len(failure_list) == 1 and failure_list[0]["sha"] != self.current_status["sha"] ): flaky_jobs.append(failure_list[0]) return flaky_jobs # The most recent failure chain is an array of jobs that have the same-ish failures. # A success in the middle of the chain will terminate the chain. def get_most_recent_failure_chain(self) -> list[Any]: failures = [] found_most_recent_failure = False for job in self.filtered_statuses: if is_job_failed(job): failures.append(job) found_most_recent_failure = True if found_most_recent_failure and not is_job_failed(job): break return failures def should_alert(self) -> bool: # Group jobs by their failures. The length of the failure chain is used # to raise the alert, so we can do a simple tweak here to use the length # of the longest unique chain unique_failures = self.get_unique_failures(self.failure_chain) return ( self.current_status is not None and self.current_status["conclusion"] != SUCCESS and any( len(failure_chain) >= FAILURE_CHAIN_THRESHOLD for failure_chain in unique_failures.values() ) and all( disabled_alert not in self.job_name for disabled_alert in DISABLED_ALERTS ) ) def __repr__(self) -> str: return f"jobName: {self.job_name}" def fetch_hud_data(repo: str, branch: str) -> Any: response = requests.get(f"https://hud.pytorch.org/api/hud/{repo}/{branch}/0") response.raise_for_status() hud_data = json.loads(response.text) return (hud_data["jobNames"], hud_data["shaGrid"]) # Creates a Dict of Job Name -> [JobData]. Essentially a Column in HUD def map_job_data(jobNames: Any, shaGrid: Any) -> dict[str, Any]: jobData = defaultdict(list) for sha in shaGrid: for ind, job in enumerate(sha["jobs"]): jobData[jobNames[ind]].append(job) return jobData def is_job_failed(job: Any) -> bool: conclusion = job["conclusion"] if "conclusion" in job else None return conclusion is not None and conclusion != SUCCESS and conclusion != PENDING def is_job_skipped(job: Any) -> bool: conclusion = job["conclusion"] if "conclusion" in job else None return conclusion in (NEUTRAL, SKIPPED) or conclusion is None def get_failed_jobs(job_data: list[Any]) -> list[Any]: return [job for job in job_data if job["conclusion"] == "failure"] def classify_jobs( all_job_names: list[str], sha_grid: Any, filtered_jobs_names: set[str] ) -> tuple[list[JobStatus], list[Any]]: """ Creates Job Statuses which has the logic for if need to alert or if there's flaky jobs. Classifies jobs into jobs to alert on and flaky jobs. :param all_job_names: list of all job names as returned by the HUD :param sha_grid: list of all job data as returned by the HUD (parallel index to all_job_names) :param filtered_jobs_names: set of job names to actually consider :return: """ job_data = map_job_data(all_job_names, sha_grid) job_statuses: list[JobStatus] = [] for job in job_data: job_statuses.append(JobStatus(job, job_data[job])) jobs_to_alert_on = [] flaky_jobs = [] for job_status in job_statuses: if job_status.job_name not in filtered_jobs_names: continue if job_status.should_alert(): jobs_to_alert_on.append(job_status) flaky_jobs.extend(job_status.flaky_jobs) return jobs_to_alert_on, flaky_jobs # filter job names that don't match the regex def filter_job_names(job_names: list[str], job_name_regex: str) -> list[str]: if job_name_regex: return [ job_name for job_name in job_names if re.match(job_name_regex, job_name) ] return job_names def get_recurrently_failing_jobs_alerts( repo: str, branch: str, job_name_regex: str ) -> list[dict[str, Any]]: job_names, sha_grid = fetch_hud_data(repo=repo, branch=branch) filtered_job_names = set(filter_job_names(job_names, job_name_regex)) if job_name_regex: print() print(f"Filtered to {len(filtered_job_names)} jobs:") if len(filtered_job_names) == 0: print("No jobs matched the regex") elif len(filtered_job_names) == len(job_names): print("All jobs matched the regex") else: print("\n".join(filtered_job_names)) (recurrently_failing_jobs, flaky_jobs) = classify_jobs( job_names, sha_grid, filtered_job_names ) alerts = [] for job in recurrently_failing_jobs: entry = { "AlertType": "Recurrently Failing Job", "AlertObject": job.job_name, "OncallTeams": [], "OncallIndividuals": [], "Flags": [], "sha": job.failure_chain[-1]["sha"], "branch": branch, } alerts.append(entry) return alerts def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument( "--repo", help="Repository to do checks for", type=str, default=os.getenv("REPO_TO_CHECK", "pytorch/pytorch"), ) parser.add_argument( "--branch", help="Branch to do checks for", type=str, default=os.getenv("BRANCH_TO_CHECK", "main"), ) parser.add_argument( "--job-name-regex", help="Consider only job names matching given regex (if omitted, all jobs are matched)", type=str, default=os.getenv("JOB_NAME_REGEX", ""), ) parser.add_argument( "--with-flaky-test-alert", help="Run this script with the flaky test alerting", type=distutils.util.strtobool, default=os.getenv("WITH_FLAKY_TEST_ALERT", "YES"), ) parser.add_argument( "--dry-run", help="Whether or not to actually post issues", type=distutils.util.strtobool, default=os.getenv("DRY_RUN", "YES"), ) return parser.parse_args() if __name__ == "__main__": args = parse_args() data = json.dumps( get_recurrently_failing_jobs_alerts(args.repo, args.branch, args.job_name_regex) ) print(data)