villebro commented on code in PR #36529: URL: https://github.com/apache/superset/pull/36529#discussion_r2614941430
########## superset/sql/execution/celery_task.py: ########## @@ -0,0 +1,473 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Celery task for async SQL execution. + +This module provides the Celery task for executing SQL queries asynchronously. +It is used by SQLExecutor.execute_async() to run queries in the background. +""" + +from __future__ import annotations + +import dataclasses +import logging +import sys +import uuid +from sys import getsizeof +from typing import Any, cast, TYPE_CHECKING + +import backoff +import msgpack +from celery.exceptions import SoftTimeLimitExceeded +from flask import current_app as app, has_app_context +from flask_babel import gettext as __ + +from superset import ( + db, + results_backend, + results_backend_use_msgpack, + security_manager, +) +from superset.common.db_query_status import QueryStatus +from superset.constants import QUERY_CANCEL_KEY +from superset.dataframe import df_to_records +from superset.errors import ErrorLevel, SupersetError, SupersetErrorType +from superset.exceptions import ( + SupersetErrorException, + SupersetErrorsException, +) +from superset.extensions import celery_app +from superset.models.sql_lab import Query +from superset.result_set import SupersetResultSet +from superset.sql.execution.executor import execute_sql_with_cursor +from superset.sql.parse import SQLScript +from superset.sqllab.utils import write_ipc_buffer +from superset.utils import json +from superset.utils.core import override_user, zlib_compress +from superset.utils.dates import now_as_float +from superset.utils.decorators import stats_timing + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + +BYTES_IN_MB = 1024 * 1024 + + +def _get_query_backoff_handler(details: dict[Any, Any]) -> None: + """Handler for backoff retry logging.""" + stats_logger = app.config["STATS_LOGGER"] + query_id = details["kwargs"]["query_id"] + stats_logger.incr(f"error_attempting_orm_query_{details['tries'] - 1}") + logger.warning( + "Query with id `%s` could not be retrieved, retrying...", + str(query_id), + exc_info=True, + ) + + +def _get_query_giveup_handler(_: Any) -> None: + """Handler for backoff giveup logging.""" + stats_logger = app.config["STATS_LOGGER"] + stats_logger.incr("error_failed_at_getting_orm_query") + + [email protected]_exception( + backoff.constant, + Exception, + interval=1, + on_backoff=_get_query_backoff_handler, + on_giveup=_get_query_giveup_handler, + max_tries=5, +) +def _get_query(query_id: int) -> Query: + """Attempt to get the query with retry logic.""" + return db.session.query(Query).filter_by(id=query_id).one() Review Comment: I wonder what types of exceptions that should be retried we could run into here? I think typically if this raises an Exception, there's some more fundamental issue going on (metastore down, query object missing or similar), which likely won't change by doing 5 x 1 sec constant backoff. ########## superset-core/src/superset_core/api/types.py: ########## @@ -0,0 +1,164 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Query execution types for superset-core. + +Provides type definitions for query execution that are partially aligned +with frontend types in superset-ui-core/src/query/types/. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + import pandas as pd + + +class QueryStatus(Enum): + """ + Status of query execution. + + Aligned with frontend QueryState values but using Python conventions. + """ + + PENDING = "pending" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + TIMED_OUT = "timed_out" + STOPPED = "stopped" + + +@dataclass +class CacheOptions: + """ + Options for query result caching. + """ + + timeout: int | None = None # Override default cache timeout (seconds) + force_refresh: bool = False # Bypass cache and re-execute query + + +@dataclass +class QueryOptions: + """ + Options for query execution via Database.execute() and execute_async(). + + Supports customization of: + - Basic: catalog, schema, limit, timeout + - Templates: Jinja2 template parameters + - Caching: Cache timeout and refresh control + - Dry run: Return transformed SQL without execution + """ + + # Basic options + catalog: str | None = None + schema: str | None = None + limit: int | None = None + timeout_seconds: int | None = None + + # Template options + template_params: dict[str, Any] | None = None # For Jinja2 rendering + + # Caching options + cache: CacheOptions | None = None + + # Dry run option + dry_run: bool = False # Return transformed SQL without executing + + +@dataclass +class QueryResult: + """ + Result of a query execution. + + Aligned with frontend ChartDataResponseResult structure. + For column information, use df.columns and df.dtypes on the data DataFrame. + + Fields: + status: Query execution status + data: Result DataFrame (None if dry_run=True or error) + row_count: Number of rows in result Review Comment: For multi-statement queries, it's not obvious which statement the result/row count relates to. If we have a single API for single and multi-statement queries, should we opt for slimming down this class, and having a more comprehensive `result: StatementResult[]`, where ```python class StatementResult: statement: str # this would be the single statement that was executed data: DataFrame | None row_count: int | None error_message: str | None execution_time_ms: float | None ``` ? ########## superset-core/src/superset_core/api/models.py: ########## @@ -75,6 +84,83 @@ def backend(self) -> str: def data(self) -> dict[str, Any]: raise NotImplementedError + def execute( + self, + sql: str, + options: QueryOptions | None = None, + ) -> QueryResult: Review Comment: Design consideration: it's not uncommon to have a separate API for only executing queries, vs executing and expecting a result. I'm not necessarily recommending having two separate methods for this, but something to keep in mind. ########## superset-core/src/superset_core/api/models.py: ########## @@ -75,6 +84,83 @@ def backend(self) -> str: def data(self) -> dict[str, Any]: raise NotImplementedError + def execute( + self, + sql: str, + options: QueryOptions | None = None, + ) -> QueryResult: + """ + Execute SQL synchronously. + + :param sql: SQL query to execute + :param options: Optional QueryOptions for catalog, schema, limit, + template_params, cache settings, timeout, dry_run Review Comment: A few nits: - We don't necessarily need to mention that it's optional, as that's already conveyed by the type annotation. - Unpacking what parameters `QueryOptions` has may easily fall out of sync when updated later (I'm sure this type will evolve over time). I'd prefer to just have something simple like `Query execution options` or `:param options: Query execution options (see :class:`QueryOptions`). If not provided, defaults are used.` ########## superset-core/src/superset_core/api/types.py: ########## @@ -0,0 +1,164 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Query execution types for superset-core. + +Provides type definitions for query execution that are partially aligned +with frontend types in superset-ui-core/src/query/types/. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + import pandas as pd + + +class QueryStatus(Enum): + """ + Status of query execution. + + Aligned with frontend QueryState values but using Python conventions. Review Comment: What are these conventions? I see `QueryState` appears to be a superset of these. ########## superset/sql/execution/celery_task.py: ########## @@ -0,0 +1,473 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Celery task for async SQL execution. + +This module provides the Celery task for executing SQL queries asynchronously. +It is used by SQLExecutor.execute_async() to run queries in the background. +""" + +from __future__ import annotations + +import dataclasses +import logging +import sys +import uuid +from sys import getsizeof +from typing import Any, cast, TYPE_CHECKING + +import backoff +import msgpack +from celery.exceptions import SoftTimeLimitExceeded +from flask import current_app as app, has_app_context +from flask_babel import gettext as __ + +from superset import ( + db, + results_backend, + results_backend_use_msgpack, + security_manager, +) +from superset.common.db_query_status import QueryStatus +from superset.constants import QUERY_CANCEL_KEY +from superset.dataframe import df_to_records +from superset.errors import ErrorLevel, SupersetError, SupersetErrorType +from superset.exceptions import ( + SupersetErrorException, + SupersetErrorsException, +) +from superset.extensions import celery_app +from superset.models.sql_lab import Query +from superset.result_set import SupersetResultSet +from superset.sql.execution.executor import execute_sql_with_cursor +from superset.sql.parse import SQLScript +from superset.sqllab.utils import write_ipc_buffer +from superset.utils import json +from superset.utils.core import override_user, zlib_compress +from superset.utils.dates import now_as_float +from superset.utils.decorators import stats_timing + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + +BYTES_IN_MB = 1024 * 1024 + + +def _get_query_backoff_handler(details: dict[Any, Any]) -> None: + """Handler for backoff retry logging.""" + stats_logger = app.config["STATS_LOGGER"] + query_id = details["kwargs"]["query_id"] + stats_logger.incr(f"error_attempting_orm_query_{details['tries'] - 1}") + logger.warning( + "Query with id `%s` could not be retrieved, retrying...", + str(query_id), + exc_info=True, + ) + + +def _get_query_giveup_handler(_: Any) -> None: + """Handler for backoff giveup logging.""" + stats_logger = app.config["STATS_LOGGER"] + stats_logger.incr("error_failed_at_getting_orm_query") + + [email protected]_exception( + backoff.constant, + Exception, + interval=1, + on_backoff=_get_query_backoff_handler, + on_giveup=_get_query_giveup_handler, + max_tries=5, +) +def _get_query(query_id: int) -> Query: + """Attempt to get the query with retry logic.""" + return db.session.query(Query).filter_by(id=query_id).one() + + +def _handle_query_error( + ex: Exception, + query: Query, + payload: dict[str, Any] | None = None, + prefix_message: str = "", +) -> dict[str, Any]: + """Handle error while processing the SQL query.""" + payload = payload or {} + msg = f"{prefix_message} {str(ex)}".strip() + query.error_message = msg + query.tmp_table_name = None + query.status = QueryStatus.FAILED + + if not query.end_time: + query.end_time = now_as_float() + + # Extract DB-specific errors + if isinstance(ex, SupersetErrorException): + errors = [ex.error] + elif isinstance(ex, SupersetErrorsException): + errors = ex.errors + else: + errors = query.database.db_engine_spec.extract_errors( + str(ex), database_name=query.database.unique_name + ) + + errors_payload = [dataclasses.asdict(error) for error in errors] + if errors: + query.set_extra_json_key("errors", errors_payload) + + db.session.commit() + payload.update({"status": query.status, "error": msg, "errors": errors_payload}) + if troubleshooting_link := app.config["TROUBLESHOOTING_LINK"]: + payload["link"] = troubleshooting_link + return payload + + +def _serialize_payload( + payload: dict[Any, Any], use_msgpack: bool | None = False +) -> bytes | str: + """Serialize payload for storage.""" + logger.debug("Serializing to msgpack: %r", use_msgpack) + if use_msgpack: + return msgpack.dumps(payload, default=json.json_iso_dttm_ser, use_bin_type=True) + return json.dumps(payload, default=json.json_iso_dttm_ser, ignore_nan=True) + + +def _prepare_statement_blocks( + rendered_query: str, + db_engine_spec: Any, +) -> tuple[SQLScript, list[str]]: + """ + Parse SQL and build statement blocks for execution. + + Note: RLS, security checks, and other preprocessing are handled by + SQLExecutor before the query reaches this task. + """ + parsed_script = SQLScript(rendered_query, engine=db_engine_spec.engine) + + # Build statement blocks for execution + if db_engine_spec.run_multiple_statements_as_one: + blocks = [parsed_script.format(comments=db_engine_spec.allows_sql_comments)] + else: + blocks = [ + statement.format(comments=db_engine_spec.allows_sql_comments) + for statement in parsed_script.statements + ] + + return parsed_script, blocks + + +def _finalize_successful_query( + query: Query, + result_set: SupersetResultSet, + db_engine_spec: Any, + payload: dict[str, Any], +) -> None: + """Update query metadata and payload after successful execution.""" + query.rows = result_set.size + query.progress = 100 + query.set_extra_json_key("progress", None) + query.set_extra_json_key("columns", result_set.columns) + query.end_time = now_as_float() + + use_arrow_data = cast(bool, results_backend_use_msgpack) + data, selected_columns, all_columns, expanded_columns = _serialize_and_expand_data( + result_set, db_engine_spec, use_arrow_data + ) + + payload.update( + { + "status": QueryStatus.SUCCESS, + "data": data, + "columns": all_columns, + "selected_columns": selected_columns, + "expanded_columns": expanded_columns, + "query": query.to_dict(), + } + ) + payload["query"]["state"] = QueryStatus.SUCCESS + + +def _store_results_in_backend( + query: Query, + payload: dict[str, Any], + database: Any, +) -> None: + """Store query results in the results backend.""" + key = str(uuid.uuid4()) + payload["query"]["resultsKey"] = key + logger.info( + "Query %s: Storing results in results backend, key: %s", + str(query.id), + key, + ) + stats_logger = app.config["STATS_LOGGER"] + with stats_timing("sqllab.query.results_backend_write", stats_logger): + with stats_timing( + "sqllab.query.results_backend_write_serialization", stats_logger + ): + serialized_payload = _serialize_payload( + payload, cast(bool, results_backend_use_msgpack) + ) + + # Check payload size limit + if sql_lab_payload_max_mb := app.config.get("SQLLAB_PAYLOAD_MAX_MB"): + serialized_payload_size = sys.getsizeof(serialized_payload) + max_bytes = sql_lab_payload_max_mb * BYTES_IN_MB + + if serialized_payload_size > max_bytes: + logger.info("Result size exceeds the allowed limit.") + raise SupersetErrorException( + SupersetError( + message=( + f"Result size " + f"({serialized_payload_size / BYTES_IN_MB:.2f} MB) " + f"exceeds the allowed limit of " + f"{sql_lab_payload_max_mb} MB." + ), + error_type=SupersetErrorType.RESULT_TOO_LARGE_ERROR, + level=ErrorLevel.ERROR, + ) + ) + + cache_timeout = database.cache_timeout + if cache_timeout is None: + cache_timeout = app.config["CACHE_DEFAULT_TIMEOUT"] + + compressed = zlib_compress(serialized_payload) + logger.debug("*** serialized payload size: %i", getsizeof(serialized_payload)) + logger.debug("*** compressed payload size: %i", getsizeof(compressed)) + + write_success = results_backend.set(key, compressed, cache_timeout) + if not write_success: + logger.error( + "Query %s: Failed to store results in backend, key: %s", + str(query.id), + key, + ) + stats_logger.incr("sqllab.results_backend.write_failure") + query.results_key = None + query.status = QueryStatus.FAILED + query.error_message = ( + "Failed to store query results in the results backend. " + "Please try again or contact your administrator." + ) + db.session.commit() + raise SupersetErrorException( + SupersetError( + message=__("Failed to store query results. Please try again."), + error_type=SupersetErrorType.RESULTS_BACKEND_ERROR, + level=ErrorLevel.ERROR, + ) + ) + else: + query.results_key = key + logger.info( + "Query %s: Successfully stored results in backend, key: %s", + str(query.id), + key, + ) + + +def _serialize_and_expand_data( + result_set: SupersetResultSet, + db_engine_spec: Any, + use_msgpack: bool | None = False, Review Comment: thought: I assume this is for backwards compatibility, but to simplify this, I think we should always/never use `msgpack`, rather than making it optional. But this relates to future work, so no need to change this now. ########## superset-core/src/superset_core/api/types.py: ########## @@ -0,0 +1,164 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Query execution types for superset-core. + +Provides type definitions for query execution that are partially aligned +with frontend types in superset-ui-core/src/query/types/. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + import pandas as pd + + +class QueryStatus(Enum): + """ + Status of query execution. + + Aligned with frontend QueryState values but using Python conventions. + """ + + PENDING = "pending" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + TIMED_OUT = "timed_out" + STOPPED = "stopped" + + +@dataclass +class CacheOptions: + """ + Options for query result caching. + """ + + timeout: int | None = None # Override default cache timeout (seconds) + force_refresh: bool = False # Bypass cache and re-execute query + + +@dataclass +class QueryOptions: + """ + Options for query execution via Database.execute() and execute_async(). + + Supports customization of: + - Basic: catalog, schema, limit, timeout + - Templates: Jinja2 template parameters + - Caching: Cache timeout and refresh control + - Dry run: Return transformed SQL without execution + """ + + # Basic options + catalog: str | None = None + schema: str | None = None + limit: int | None = None + timeout_seconds: int | None = None + + # Template options + template_params: dict[str, Any] | None = None # For Jinja2 rendering + + # Caching options + cache: CacheOptions | None = None + + # Dry run option + dry_run: bool = False # Return transformed SQL without executing + + +@dataclass +class QueryResult: + """ + Result of a query execution. + + Aligned with frontend ChartDataResponseResult structure. + For column information, use df.columns and df.dtypes on the data DataFrame. + + Fields: + status: Query execution status + data: Result DataFrame (None if dry_run=True or error) + row_count: Number of rows in result + error_message: Error message if failed + query: SQL after all transformations (RLS, templates, limits) + query_id: Query model ID for audit trail (None if dry_run=True) + execution_time_ms: Query execution time in milliseconds + is_cached: Whether result came from cache + """ + + status: QueryStatus + data: pd.DataFrame | None = None + row_count: int = 0 + error_message: str | None = None + query: str | None = None # SQL after all transformations (RLS, templates, limits) Review Comment: Related to the comment about a more comprehensive `StatementResult` and having a `statement` property on that, I suggest removing this in the main `QueryResult`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
