Module: Mesa
Branch: main
Commit: 278fc1c22a71a880b41e70a17a143218fb40704d
URL:    
http://cgit.freedesktop.org/mesa/mesa/commit/?id=278fc1c22a71a880b41e70a17a143218fb40704d

Author: Guilherme Gallo <guilherme.ga...@collabora.com>
Date:   Sat Oct 28 00:45:03 2023 -0300

ci/bin: gql: Improve queries for jobs/stages retrieval

Modify the GraphQL query used to fetch all jobs within a pipeline,
transitioning from fetching data via stage nodes to a direct job
retrieval approach.

The prior method was not paginated, potentially overloading the server
and complicating result parsing due to the structure of stage nodes. The
new approach simplifies data interpretation and handles job lists
exceeding 100 elements by implementing pagination with helper functions
to concatenate paginated results.

- Transitioned from extracting jobs from stage nodes to a direct query
  for all jobs in the pipeline, improving data readability and server
  performance.
- With the enhanced data clarity from the updated query, removed the
  Dag+JobMetadata tuple as it's now redundant. The refined query
  provides a more comprehensive job data including job name, stage, and
  dependencies.
- The previous graph query relied on a graph node that will (or should)
  be paginated anyway.

Closes: #10050
Signed-off-by: Guilherme Gallo <guilherme.ga...@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/25940>

---

 bin/ci/ci_run_n_monitor.py  |   7 +-
 bin/ci/gitlab_gql.py        | 156 ++++++++++++++++++++++++++++----------------
 bin/ci/pipeline_details.gql |  35 +++++-----
 3 files changed, 124 insertions(+), 74 deletions(-)

diff --git a/bin/ci/ci_run_n_monitor.py b/bin/ci/ci_run_n_monitor.py
index 7306a21d0e4..0be9c1d48dc 100755
--- a/bin/ci/ci_run_n_monitor.py
+++ b/bin/ci/ci_run_n_monitor.py
@@ -295,7 +295,7 @@ def parse_args() -> None:
 
 def find_dependencies(target_jobs_regex: re.Pattern, project_path: str, iid: 
int) -> set[str]:
     gql_instance = GitlabGQL()
-    dag, _ = create_job_needs_dag(
+    dag = create_job_needs_dag(
         gql_instance, {"projectPath": project_path.path_with_namespace, "iid": 
iid}
     )
 
@@ -308,7 +308,10 @@ def find_dependencies(target_jobs_regex: re.Pattern, 
project_path: str, iid: int
     print()
     print_dag(target_dep_dag)
     print(Fore.RESET)
-    return set(chain.from_iterable(target_dep_dag.values()))
+
+    dependency_jobs = set(chain.from_iterable(d["needs"] for d in 
target_dep_dag.values()))
+    target_jobs = set(target_dep_dag.keys())
+    return target_jobs.union(dependency_jobs)
 
 
 if __name__ == "__main__":
diff --git a/bin/ci/gitlab_gql.py b/bin/ci/gitlab_gql.py
index 404fb9cccac..b0299b07314 100755
--- a/bin/ci/gitlab_gql.py
+++ b/bin/ci/gitlab_gql.py
@@ -5,13 +5,13 @@ import logging
 import re
 import traceback
 from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
-from collections import OrderedDict, defaultdict
+from collections import OrderedDict
 from copy import deepcopy
 from dataclasses import dataclass, field
 from itertools import accumulate
 from os import getenv
 from pathlib import Path
-from typing import Any, Iterable, Optional, Pattern, Union
+from typing import Any, Iterable, Optional, Pattern, TypedDict, Union
 
 import yaml
 from filecache import DAY, filecache
@@ -19,7 +19,18 @@ from gql import Client, gql
 from gql.transport.requests import RequestsHTTPTransport
 from graphql import DocumentNode
 
-Dag = dict[str, set[str]]
+
+class DagNode(TypedDict):
+    needs: set[str]
+    stage: str
+    # `name` is redundant but is here for retro-compatibility
+    name: str
+
+
+# see create_job_needs_dag function for more details
+Dag = dict[str, DagNode]
+
+
 StageSeq = OrderedDict[str, set[str]]
 TOKEN_DIR = Path(getenv("XDG_CONFIG_HOME") or Path.home() / ".config")
 
@@ -200,104 +211,139 @@ class GitlabGQL:
             logging.warning(f"Could not invalidate cache, maybe it was not 
used in {ex.args}?")
 
 
-def insert_early_stage_jobs(dag: Dag, stage_sequence: StageSeq, jobs_metadata: 
dict) -> Dag:
-    pre_processed_dag: Dag = {}
+def insert_early_stage_jobs(stage_sequence: StageSeq, jobs_metadata: Dag) -> 
Dag:
+    pre_processed_dag: dict[str, set[str]] = {}
     jobs_from_early_stages = list(accumulate(stage_sequence.values(), 
set.union))
-    for job_name, needs in dag.items():
-        final_needs: set = deepcopy(needs)
+    for job_name, metadata in jobs_metadata.items():
+        final_needs: set[str] = deepcopy(metadata["needs"])
         # Pre-process jobs that are not based on needs field
         # e.g. sanity job in mesa MR pipelines
         if not final_needs:
-            job_stage = jobs_metadata[job_name]["stage"]["name"]
-            stage_index = list(stage_sequence.keys()).index(job_stage)
+            job_stage: str = jobs_metadata[job_name]["stage"]
+            stage_index: int = list(stage_sequence.keys()).index(job_stage)
             if stage_index > 0:
                 final_needs |= jobs_from_early_stages[stage_index - 1]
         pre_processed_dag[job_name] = final_needs
 
-    return pre_processed_dag
+    for job_name, needs in pre_processed_dag.items():
+        jobs_metadata[job_name]["needs"] = needs
 
+    return jobs_metadata
 
-def traverse_dag_needs(dag: Dag) -> None:
-    for job, needs in dag.items():
-        final_needs: set = deepcopy(needs)
+
+def traverse_dag_needs(jobs_metadata: Dag) -> None:
+    created_jobs = set(jobs_metadata.keys())
+    for job, metadata in jobs_metadata.items():
+        final_needs: set = deepcopy(metadata["needs"]) & created_jobs
         # Post process jobs that are based on needs field
         partial = True
 
         while partial:
-            next_depth = {n for dn in final_needs for n in dag[dn]}
-            partial = not final_needs.issuperset(next_depth)
+            next_depth: set[str] = {n for dn in final_needs for n in 
jobs_metadata[dn]["needs"]}
+            partial: bool = not final_needs.issuperset(next_depth)
             final_needs = final_needs.union(next_depth)
 
-        dag[job] = final_needs
+        jobs_metadata[job]["needs"] = final_needs
 
 
-def extract_stages_and_job_needs(pipeline_result: dict[str, Any]) -> 
tuple[Dag, StageSeq, dict]:
-    incomplete_dag = defaultdict(set)
-    jobs_metadata = {}
+def extract_stages_and_job_needs(
+    pipeline_jobs: dict[str, Any], pipeline_stages: dict[str, Any]
+) -> tuple[StageSeq, Dag]:
+    jobs_metadata = Dag()
     # Record the stage sequence to post process deps that are not based on 
needs
     # field, for example: sanity job
     stage_sequence: OrderedDict[str, set[str]] = OrderedDict()
-    for stage in pipeline_result["stages"]["nodes"]:
-        stage_jobs: set[str] = set()
-        for stage_job in stage["groups"]["nodes"]:
-            for job in stage_job["jobs"]["nodes"]:
-                stage_jobs.add(job["name"])
-                needs = job.pop("needs")["nodes"]
-                jobs_metadata[job["name"]] = job
-                incomplete_dag[job["name"]] = {node["name"] for node in needs}
-                # ensure that all needed nodes its in the graph
-                [incomplete_dag[node["name"]] for node in needs]
-        stage_sequence[stage["name"]] = stage_jobs
-
-    return incomplete_dag, stage_sequence, jobs_metadata
-
-
-def create_job_needs_dag(gl_gql: GitlabGQL, params) -> tuple[Dag, dict[str, 
dict[str, Any]]]:
+    for stage in pipeline_stages["nodes"]:
+        stage_sequence[stage["name"]] = set()
+
+    for job in pipeline_jobs["nodes"]:
+        stage_sequence[job["stage"]["name"]].add(job["name"])
+        dag_job: DagNode = {
+            "name": job["name"],
+            "stage": job["stage"]["name"],
+            "needs": set([j["node"]["name"] for j in job["needs"]["edges"]]),
+        }
+        jobs_metadata[job["name"]] = dag_job
+
+    return stage_sequence, jobs_metadata
+
+
+def create_job_needs_dag(gl_gql: GitlabGQL, params, disable_cache: bool = 
True) -> Dag:
     """
-    The function `create_job_needs_dag` retrieves pipeline details from 
GitLab, extracts stages and
-    job needs, inserts early stage jobs, and returns the final DAG and job 
dictionary.
+    This function creates a Directed Acyclic Graph (DAG) to represent a 
sequence of jobs, where each
+    job has a set of jobs that it depends on (its "needs") and belongs to a 
certain "stage".
+    The "name" of the job is used as the key in the dictionary.
+
+    For example, consider the following DAG:
+
+        1. build stage: job1 -> job2 -> job3
+        2. test stage: job2 -> job4
+
+    - The job needs for job3 are: job1, job2
+    - The job needs for job4 are: job2
+    - The job2 needs to wait all jobs from build stage to finish.
+
+    The resulting DAG would look like this:
+
+        dag = {
+            "job1": {"needs": set(), "stage": "build", "name": "job1"},
+            "job2": {"needs": {"job1", "job2", job3"}, "stage": "test", 
"name": "job2"},
+            "job3": {"needs": {"job1", "job2"}, "stage": "build", "name": 
"job3"},
+            "job4": {"needs": {"job2"}, "stage": "test", "name": "job4"},
+        }
+
+    To access the job needs, one can do:
+
+        dag["job3"]["needs"]
+
+    This will return the set of jobs that job3 needs: {"job1", "job2"}
 
     Args:
         gl_gql (GitlabGQL): The `gl_gql` parameter is an instance of the 
`GitlabGQL` class, which is
             used to make GraphQL queries to the GitLab API.
-        params: The `params` parameter is a dictionary that contains the 
necessary parameters for
-            the GraphQL query. It is used to specify the details of the 
pipeline for which the job
-            needs DAG is being created.
+        params (dict): The `params` parameter is a dictionary that contains 
the necessary parameters
+            for the GraphQL query. It is used to specify the details of the 
pipeline for which the
+            job needs DAG is being created.
             The specific keys and values in the `params` dictionary will 
depend on
             the requirements of the GraphQL query being executed
+        disable_cache (bool): The `disable_cache` parameter is a boolean that 
specifies whether the
 
     Returns:
-        The function `create_job_needs_dag` returns a tuple containing two 
elements.
-        The first element is the final DAG (Directed Acyclic Graph) 
representing the stages and job
-        dependencies.
-        The second element is a dictionary containing information about the 
jobs in the DAG, where
-        the keys are job names and the values are dictionaries containing 
additional job
-        information.
+        The final DAG (Directed Acyclic Graph) representing the job 
dependencies sourced from needs
+        or stages rule.
     """
-    result = gl_gql.query("pipeline_details.gql", params)
-    pipeline = result["project"]["pipeline"]
-    if not pipeline:
+    stages_jobs_gql = gl_gql.query(
+        "pipeline_details.gql",
+        params=params,
+        paginated_key_loc=["project", "pipeline", "jobs"],
+        disable_cache=disable_cache,
+    )
+    pipeline_data = stages_jobs_gql["project"]["pipeline"]
+    if not pipeline_data:
         raise RuntimeError(f"Could not find any pipelines for {params}")
 
-    incomplete_dag, stage_sequence, jobs_metadata = 
extract_stages_and_job_needs(pipeline)
+    stage_sequence, jobs_metadata = extract_stages_and_job_needs(
+        pipeline_data["jobs"], pipeline_data["stages"]
+    )
     # Fill the DAG with the job needs from stages that don't have any needs 
but still need to wait
     # for previous stages
-    final_dag = insert_early_stage_jobs(incomplete_dag, stage_sequence, 
jobs_metadata)
+    final_dag = insert_early_stage_jobs(stage_sequence, jobs_metadata)
     # Now that each job has its direct needs filled correctly, update the 
"needs" field for each job
     # in the DAG by performing a topological traversal
     traverse_dag_needs(final_dag)
 
-    return final_dag, jobs_metadata
+    return final_dag
 
 
 def filter_dag(dag: Dag, regex: Pattern) -> Dag:
-    return {job: needs for job, needs in dag.items() if regex.match(job)}
+    jobs_with_regex: set[str] = {job for job in dag if regex.match(job)}
+    return Dag({job: data for job, data in dag.items() if job in 
sorted(jobs_with_regex)})
 
 
 def print_dag(dag: Dag) -> None:
-    for job, needs in dag.items():
+    for job, data in dag.items():
         print(f"{job}:")
-        print(f"\t{' '.join(needs)}")
+        print(f"\t{' '.join(data['needs'])}")
         print()
 
 
diff --git a/bin/ci/pipeline_details.gql b/bin/ci/pipeline_details.gql
index f723084e4cf..2c8be9fc5ba 100644
--- a/bin/ci/pipeline_details.gql
+++ b/bin/ci/pipeline_details.gql
@@ -1,4 +1,4 @@
-query getPipelineDetails($projectPath: ID!, $iid: ID!) {
+query jobs($projectPath: ID!, $iid: ID!, $cursor: String) {
   project(fullPath: $projectPath) {
     id
     pipeline(iid: $iid) {
@@ -8,25 +8,26 @@ query getPipelineDetails($projectPath: ID!, $iid: ID!) {
       stages {
         nodes {
           name
-          groups {
-            nodes {
-              jobs {
-                nodes {
-                  id
-                  name
-                  stage {
-                    name
-                  }
-                  needs {
-                    nodes {
-                      id
-                      name
-                    }
-                  }
-                }
+        }
+      }
+      jobs(after: $cursor) {
+        pageInfo {
+          hasNextPage
+          endCursor
+        }
+        count
+        nodes {
+          name
+          needs {
+            edges {
+              node {
+                name
               }
             }
           }
+          stage {
+            name
+          }
         }
       }
     }

Reply via email to