Module: Mesa Branch: main Commit: 664e6addeabd3ce209b05a454616c3df4a09b2b7 URL: http://cgit.freedesktop.org/mesa/mesa/commit/?id=664e6addeabd3ce209b05a454616c3df4a09b2b7
Author: Guilherme Gallo <guilherme.ga...@collabora.com> Date: Mon Nov 6 22:25:40 2023 -0300 ci/bin: gql: Implement pagination Make query support pagination by supplying the paginated key. In the following toy example, the paginated key is: ["levels", "cars"] ```graphql query vehicle_store($location: ID!) { levels { cars { pageInfo { hasNextPage endCursor } ... } } } ``` Signed-off-by: Guilherme Gallo <guilherme.ga...@collabora.com> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/25940> --- bin/ci/gitlab_gql.py | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/bin/ci/gitlab_gql.py b/bin/ci/gitlab_gql.py index 605a0c852d9..404fb9cccac 100755 --- a/bin/ci/gitlab_gql.py +++ b/bin/ci/gitlab_gql.py @@ -68,16 +68,27 @@ class GitlabGQL: gql_file: Union[Path, str], params: dict[str, Any] = {}, operation_name: Optional[str] = None, + paginated_key_loc: Iterable[str] = [], disable_cache: bool = False, ) -> dict[str, Any]: def run_uncached() -> dict[str, Any]: + if paginated_key_loc: + return self._sweep_pages(gql_file, params, operation_name, paginated_key_loc) return self._query(gql_file, params, operation_name) if disable_cache: return run_uncached() try: - return self._query_cached(gql_file, params, operation_name) + # Create an auxiliary variable to deliver a cached result and enable catching exceptions + # Decorate the query to be cached + if paginated_key_loc: + result = self._sweep_pages_cached( + gql_file, params, operation_name, paginated_key_loc + ) + else: + result = self._query_cached(gql_file, params, operation_name) + return result # type: ignore except Exception as ex: logging.error(f"Cached query failed with {ex}") # print exception traceback @@ -108,13 +119,82 @@ class GitlabGQL: query, variable_values=params, operation_name=operation_name ) + @filecache(DAY) + def _sweep_pages_cached(self, *args, **kwargs): + return self._sweep_pages(*args, **kwargs) + @filecache(DAY) def _query_cached(self, *args, **kwargs): return self._query(*args, **kwargs) + def _sweep_pages( + self, query, params, operation_name=None, paginated_key_loc: Iterable[str] = [] + ) -> dict[str, Any]: + """ + Retrieve paginated data from a GraphQL API and concatenate the results into a single + response. + + Args: + query: represents a filepath with the GraphQL query to be executed. + params: a dictionary that contains the parameters to be passed to the query. These + parameters can be used to filter or modify the results of the query. + operation_name: The `operation_name` parameter is an optional parameter that specifies + the name of the GraphQL operation to be executed. It is used when making a GraphQL + query to specify which operation to execute if there are multiple operations defined + in the GraphQL schema. If not provided, the default operation will be executed. + paginated_key_loc (Iterable[str]): The `paginated_key_loc` parameter is an iterable of + strings that represents the location of the paginated field within the response. It + is used to extract the paginated field from the response and append it to the final + result. The node has to be a list of objects with a `pageInfo` field that contains + at least the `hasNextPage` and `endCursor` fields. + + Returns: + a dictionary containing the response from the query with the paginated field + concatenated. + """ + + def fetch_page(cursor: str | None = None) -> dict[str, Any]: + if cursor: + params["cursor"] = cursor + logging.info( + f"Found more than 100 elements, paginating. " + f"Current cursor at {cursor}" + ) + + return self._query(query, params, operation_name) + + # Execute the initial query + response: dict[str, Any] = fetch_page() + + # Initialize an empty list to store the final result + final_partial_field: list[dict[str, Any]] = [] + + # Loop until all pages have been retrieved + while True: + # Get the partial field to be appended to the final result + partial_field = response + for key in paginated_key_loc: + partial_field = partial_field[key] + + # Append the partial field to the final result + final_partial_field += partial_field["nodes"] + + # Check if there are more pages to retrieve + page_info = partial_field["pageInfo"] + if not page_info["hasNextPage"]: + break + + # Execute the query with the updated cursor parameter + response = fetch_page(page_info["endCursor"]) + + # Replace the "nodes" field in the original response with the final result + partial_field["nodes"] = final_partial_field + return response + def invalidate_query_cache(self) -> None: logging.warning("Invalidating query cache") try: + self._sweep_pages._db.clear() self._query._db.clear() except AttributeError as ex: logging.warning(f"Could not invalidate cache, maybe it was not used in {ex.args}?")