Module: Mesa Branch: main Commit: 278fc1c22a71a880b41e70a17a143218fb40704d URL: http://cgit.freedesktop.org/mesa/mesa/commit/?id=278fc1c22a71a880b41e70a17a143218fb40704d
Author: Guilherme Gallo <guilherme.ga...@collabora.com> Date: Sat Oct 28 00:45:03 2023 -0300 ci/bin: gql: Improve queries for jobs/stages retrieval Modify the GraphQL query used to fetch all jobs within a pipeline, transitioning from fetching data via stage nodes to a direct job retrieval approach. The prior method was not paginated, potentially overloading the server and complicating result parsing due to the structure of stage nodes. The new approach simplifies data interpretation and handles job lists exceeding 100 elements by implementing pagination with helper functions to concatenate paginated results. - Transitioned from extracting jobs from stage nodes to a direct query for all jobs in the pipeline, improving data readability and server performance. - With the enhanced data clarity from the updated query, removed the Dag+JobMetadata tuple as it's now redundant. The refined query provides a more comprehensive job data including job name, stage, and dependencies. - The previous graph query relied on a graph node that will (or should) be paginated anyway. Closes: #10050 Signed-off-by: Guilherme Gallo <guilherme.ga...@collabora.com> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/25940> --- bin/ci/ci_run_n_monitor.py | 7 +- bin/ci/gitlab_gql.py | 156 ++++++++++++++++++++++++++++---------------- bin/ci/pipeline_details.gql | 35 +++++----- 3 files changed, 124 insertions(+), 74 deletions(-) diff --git a/bin/ci/ci_run_n_monitor.py b/bin/ci/ci_run_n_monitor.py index 7306a21d0e4..0be9c1d48dc 100755 --- a/bin/ci/ci_run_n_monitor.py +++ b/bin/ci/ci_run_n_monitor.py @@ -295,7 +295,7 @@ def parse_args() -> None: def find_dependencies(target_jobs_regex: re.Pattern, project_path: str, iid: int) -> set[str]: gql_instance = GitlabGQL() - dag, _ = create_job_needs_dag( + dag = create_job_needs_dag( gql_instance, {"projectPath": project_path.path_with_namespace, "iid": iid} ) @@ -308,7 +308,10 @@ def find_dependencies(target_jobs_regex: re.Pattern, project_path: str, iid: int print() print_dag(target_dep_dag) print(Fore.RESET) - return set(chain.from_iterable(target_dep_dag.values())) + + dependency_jobs = set(chain.from_iterable(d["needs"] for d in target_dep_dag.values())) + target_jobs = set(target_dep_dag.keys()) + return target_jobs.union(dependency_jobs) if __name__ == "__main__": diff --git a/bin/ci/gitlab_gql.py b/bin/ci/gitlab_gql.py index 404fb9cccac..b0299b07314 100755 --- a/bin/ci/gitlab_gql.py +++ b/bin/ci/gitlab_gql.py @@ -5,13 +5,13 @@ import logging import re import traceback from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace -from collections import OrderedDict, defaultdict +from collections import OrderedDict from copy import deepcopy from dataclasses import dataclass, field from itertools import accumulate from os import getenv from pathlib import Path -from typing import Any, Iterable, Optional, Pattern, Union +from typing import Any, Iterable, Optional, Pattern, TypedDict, Union import yaml from filecache import DAY, filecache @@ -19,7 +19,18 @@ from gql import Client, gql from gql.transport.requests import RequestsHTTPTransport from graphql import DocumentNode -Dag = dict[str, set[str]] + +class DagNode(TypedDict): + needs: set[str] + stage: str + # `name` is redundant but is here for retro-compatibility + name: str + + +# see create_job_needs_dag function for more details +Dag = dict[str, DagNode] + + StageSeq = OrderedDict[str, set[str]] TOKEN_DIR = Path(getenv("XDG_CONFIG_HOME") or Path.home() / ".config") @@ -200,104 +211,139 @@ class GitlabGQL: logging.warning(f"Could not invalidate cache, maybe it was not used in {ex.args}?") -def insert_early_stage_jobs(dag: Dag, stage_sequence: StageSeq, jobs_metadata: dict) -> Dag: - pre_processed_dag: Dag = {} +def insert_early_stage_jobs(stage_sequence: StageSeq, jobs_metadata: Dag) -> Dag: + pre_processed_dag: dict[str, set[str]] = {} jobs_from_early_stages = list(accumulate(stage_sequence.values(), set.union)) - for job_name, needs in dag.items(): - final_needs: set = deepcopy(needs) + for job_name, metadata in jobs_metadata.items(): + final_needs: set[str] = deepcopy(metadata["needs"]) # Pre-process jobs that are not based on needs field # e.g. sanity job in mesa MR pipelines if not final_needs: - job_stage = jobs_metadata[job_name]["stage"]["name"] - stage_index = list(stage_sequence.keys()).index(job_stage) + job_stage: str = jobs_metadata[job_name]["stage"] + stage_index: int = list(stage_sequence.keys()).index(job_stage) if stage_index > 0: final_needs |= jobs_from_early_stages[stage_index - 1] pre_processed_dag[job_name] = final_needs - return pre_processed_dag + for job_name, needs in pre_processed_dag.items(): + jobs_metadata[job_name]["needs"] = needs + return jobs_metadata -def traverse_dag_needs(dag: Dag) -> None: - for job, needs in dag.items(): - final_needs: set = deepcopy(needs) + +def traverse_dag_needs(jobs_metadata: Dag) -> None: + created_jobs = set(jobs_metadata.keys()) + for job, metadata in jobs_metadata.items(): + final_needs: set = deepcopy(metadata["needs"]) & created_jobs # Post process jobs that are based on needs field partial = True while partial: - next_depth = {n for dn in final_needs for n in dag[dn]} - partial = not final_needs.issuperset(next_depth) + next_depth: set[str] = {n for dn in final_needs for n in jobs_metadata[dn]["needs"]} + partial: bool = not final_needs.issuperset(next_depth) final_needs = final_needs.union(next_depth) - dag[job] = final_needs + jobs_metadata[job]["needs"] = final_needs -def extract_stages_and_job_needs(pipeline_result: dict[str, Any]) -> tuple[Dag, StageSeq, dict]: - incomplete_dag = defaultdict(set) - jobs_metadata = {} +def extract_stages_and_job_needs( + pipeline_jobs: dict[str, Any], pipeline_stages: dict[str, Any] +) -> tuple[StageSeq, Dag]: + jobs_metadata = Dag() # Record the stage sequence to post process deps that are not based on needs # field, for example: sanity job stage_sequence: OrderedDict[str, set[str]] = OrderedDict() - for stage in pipeline_result["stages"]["nodes"]: - stage_jobs: set[str] = set() - for stage_job in stage["groups"]["nodes"]: - for job in stage_job["jobs"]["nodes"]: - stage_jobs.add(job["name"]) - needs = job.pop("needs")["nodes"] - jobs_metadata[job["name"]] = job - incomplete_dag[job["name"]] = {node["name"] for node in needs} - # ensure that all needed nodes its in the graph - [incomplete_dag[node["name"]] for node in needs] - stage_sequence[stage["name"]] = stage_jobs - - return incomplete_dag, stage_sequence, jobs_metadata - - -def create_job_needs_dag(gl_gql: GitlabGQL, params) -> tuple[Dag, dict[str, dict[str, Any]]]: + for stage in pipeline_stages["nodes"]: + stage_sequence[stage["name"]] = set() + + for job in pipeline_jobs["nodes"]: + stage_sequence[job["stage"]["name"]].add(job["name"]) + dag_job: DagNode = { + "name": job["name"], + "stage": job["stage"]["name"], + "needs": set([j["node"]["name"] for j in job["needs"]["edges"]]), + } + jobs_metadata[job["name"]] = dag_job + + return stage_sequence, jobs_metadata + + +def create_job_needs_dag(gl_gql: GitlabGQL, params, disable_cache: bool = True) -> Dag: """ - The function `create_job_needs_dag` retrieves pipeline details from GitLab, extracts stages and - job needs, inserts early stage jobs, and returns the final DAG and job dictionary. + This function creates a Directed Acyclic Graph (DAG) to represent a sequence of jobs, where each + job has a set of jobs that it depends on (its "needs") and belongs to a certain "stage". + The "name" of the job is used as the key in the dictionary. + + For example, consider the following DAG: + + 1. build stage: job1 -> job2 -> job3 + 2. test stage: job2 -> job4 + + - The job needs for job3 are: job1, job2 + - The job needs for job4 are: job2 + - The job2 needs to wait all jobs from build stage to finish. + + The resulting DAG would look like this: + + dag = { + "job1": {"needs": set(), "stage": "build", "name": "job1"}, + "job2": {"needs": {"job1", "job2", job3"}, "stage": "test", "name": "job2"}, + "job3": {"needs": {"job1", "job2"}, "stage": "build", "name": "job3"}, + "job4": {"needs": {"job2"}, "stage": "test", "name": "job4"}, + } + + To access the job needs, one can do: + + dag["job3"]["needs"] + + This will return the set of jobs that job3 needs: {"job1", "job2"} Args: gl_gql (GitlabGQL): The `gl_gql` parameter is an instance of the `GitlabGQL` class, which is used to make GraphQL queries to the GitLab API. - params: The `params` parameter is a dictionary that contains the necessary parameters for - the GraphQL query. It is used to specify the details of the pipeline for which the job - needs DAG is being created. + params (dict): The `params` parameter is a dictionary that contains the necessary parameters + for the GraphQL query. It is used to specify the details of the pipeline for which the + job needs DAG is being created. The specific keys and values in the `params` dictionary will depend on the requirements of the GraphQL query being executed + disable_cache (bool): The `disable_cache` parameter is a boolean that specifies whether the Returns: - The function `create_job_needs_dag` returns a tuple containing two elements. - The first element is the final DAG (Directed Acyclic Graph) representing the stages and job - dependencies. - The second element is a dictionary containing information about the jobs in the DAG, where - the keys are job names and the values are dictionaries containing additional job - information. + The final DAG (Directed Acyclic Graph) representing the job dependencies sourced from needs + or stages rule. """ - result = gl_gql.query("pipeline_details.gql", params) - pipeline = result["project"]["pipeline"] - if not pipeline: + stages_jobs_gql = gl_gql.query( + "pipeline_details.gql", + params=params, + paginated_key_loc=["project", "pipeline", "jobs"], + disable_cache=disable_cache, + ) + pipeline_data = stages_jobs_gql["project"]["pipeline"] + if not pipeline_data: raise RuntimeError(f"Could not find any pipelines for {params}") - incomplete_dag, stage_sequence, jobs_metadata = extract_stages_and_job_needs(pipeline) + stage_sequence, jobs_metadata = extract_stages_and_job_needs( + pipeline_data["jobs"], pipeline_data["stages"] + ) # Fill the DAG with the job needs from stages that don't have any needs but still need to wait # for previous stages - final_dag = insert_early_stage_jobs(incomplete_dag, stage_sequence, jobs_metadata) + final_dag = insert_early_stage_jobs(stage_sequence, jobs_metadata) # Now that each job has its direct needs filled correctly, update the "needs" field for each job # in the DAG by performing a topological traversal traverse_dag_needs(final_dag) - return final_dag, jobs_metadata + return final_dag def filter_dag(dag: Dag, regex: Pattern) -> Dag: - return {job: needs for job, needs in dag.items() if regex.match(job)} + jobs_with_regex: set[str] = {job for job in dag if regex.match(job)} + return Dag({job: data for job, data in dag.items() if job in sorted(jobs_with_regex)}) def print_dag(dag: Dag) -> None: - for job, needs in dag.items(): + for job, data in dag.items(): print(f"{job}:") - print(f"\t{' '.join(needs)}") + print(f"\t{' '.join(data['needs'])}") print() diff --git a/bin/ci/pipeline_details.gql b/bin/ci/pipeline_details.gql index f723084e4cf..2c8be9fc5ba 100644 --- a/bin/ci/pipeline_details.gql +++ b/bin/ci/pipeline_details.gql @@ -1,4 +1,4 @@ -query getPipelineDetails($projectPath: ID!, $iid: ID!) { +query jobs($projectPath: ID!, $iid: ID!, $cursor: String) { project(fullPath: $projectPath) { id pipeline(iid: $iid) { @@ -8,25 +8,26 @@ query getPipelineDetails($projectPath: ID!, $iid: ID!) { stages { nodes { name - groups { - nodes { - jobs { - nodes { - id - name - stage { - name - } - needs { - nodes { - id - name - } - } - } + } + } + jobs(after: $cursor) { + pageInfo { + hasNextPage + endCursor + } + count + nodes { + name + needs { + edges { + node { + name } } } + stage { + name + } } } }