codeant-ai-for-open-source[bot] commented on code in PR #34826: URL: https://github.com/apache/superset/pull/34826#discussion_r2764796309
########## superset/extensions/engine_manager.py: ########## @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +from datetime import timedelta + +from flask import Flask + +from superset.engines.manager import EngineManager, EngineModes + +logger = logging.getLogger(__name__) + + +class EngineManagerExtension: + """ + Flask extension for managing SQLAlchemy engines in Superset. + + This extension creates and configures an EngineManager instance based on + Flask configuration, handling startup and shutdown of background cleanup + threads as needed. + """ + + def __init__(self) -> None: + self.engine_manager: EngineManager | None = None + + def init_app(self, app: Flask) -> None: + """ + Initialize the EngineManager with Flask app configuration. + """ + engine_context_manager = app.config["ENGINE_CONTEXT_MANAGER"] + db_connection_mutator = app.config["DB_CONNECTION_MUTATOR"] + mode = app.config["ENGINE_MANAGER_MODE"] + cleanup_interval = app.config["ENGINE_MANAGER_CLEANUP_INTERVAL"] Review Comment: **Suggestion:** The code assumes `ENGINE_MANAGER_CLEANUP_INTERVAL` is a `timedelta`, but configuration comments describe it in seconds and users may set it to an int/float; in that case `cleanup_interval.total_seconds()` in logging and in `EngineManager` will raise an AttributeError because numeric types don't have `total_seconds`. [type error] <details> <summary><b>Severity Level:</b> Major ⚠️</summary> ```mdx - ❌ init_app() raises AttributeError on numeric config. - ⚠️ Misconfigured deployments fail startup. - ⚠️ Users confused by config type mismatch. - ⚠️ Logging of cleanup interval broken. ``` </details> ```suggestion cleanup_interval_config = app.config["ENGINE_MANAGER_CLEANUP_INTERVAL"] if isinstance(cleanup_interval_config, timedelta): cleanup_interval = cleanup_interval_config else: cleanup_interval = timedelta(seconds=cleanup_interval_config) ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Inspect init_app() where cleanup_interval is read from app.config at superset/extensions/engine_manager.py line 47. 2. Real-world config: per PR description, ENGINE_MANAGER_CLEANUP_INTERVAL may be configured in seconds (an int/float) by users. If a numeric value is provided, cleanup_interval becomes a number, not a timedelta. 3. Trigger code paths that use cleanup_interval: logging at lines 81-85 calls cleanup_interval.total_seconds() which will raise AttributeError if cleanup_interval is numeric. Reproduce by setting app.config["ENGINE_MANAGER_CLEANUP_INTERVAL"] = 300 (int) and calling init_app(); observing an AttributeError on init when logger tries to call total_seconds(). 4. Also the EngineManager constructor (lines 54-62) may expect a timedelta; passing a numeric type could cause further errors inside EngineManager when it calls timedelta methods. Note: This reproduction is derived from the final file state where cleanup_interval is used with .total_seconds() at logger invocation (lines 81-85). ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/extensions/engine_manager.py **Line:** 47:47 **Comment:** *Type Error: The code assumes `ENGINE_MANAGER_CLEANUP_INTERVAL` is a `timedelta`, but configuration comments describe it in seconds and users may set it to an int/float; in that case `cleanup_interval.total_seconds()` in logging and in `EngineManager` will raise an AttributeError because numeric types don't have `total_seconds`. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/engines/manager.py: ########## @@ -0,0 +1,630 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import enum +import hashlib +import logging +import threading +from contextlib import contextmanager +from datetime import timedelta +from io import StringIO +from typing import Any, Iterator, TYPE_CHECKING + +import sshtunnel +from paramiko import RSAKey +from sqlalchemy import create_engine, event, pool +from sqlalchemy.engine import Engine +from sqlalchemy.engine.url import URL +from sshtunnel import SSHTunnelForwarder + +from superset.databases.utils import make_url_safe +from superset.superset_typing import DBConnectionMutator, EngineContextManager +from superset.utils.core import get_query_source_from_request, get_user_id, QuerySource + +if TYPE_CHECKING: + from superset.databases.ssh_tunnel.models import SSHTunnel + from superset.models.core import Database + + +logger = logging.getLogger(__name__) + + +class _LockManager: + """ + Manages per-key locks safely without defaultdict race conditions. + + This class provides a thread-safe way to create and manage locks for specific keys, + avoiding the race conditions that occur when using defaultdict with threading.Lock. + + The implementation uses a two-level locking strategy: + 1. A meta-lock to protect the lock dictionary itself + 2. Per-key locks to protect specific resources + + This ensures that: + - Different keys can be locked concurrently (scalability) + - Lock creation is thread-safe (no race conditions) + - The same key always gets the same lock instance + """ + + def __init__(self) -> None: + self._locks: dict[str, threading.RLock] = {} + self._meta_lock = threading.Lock() + + def get_lock(self, key: str) -> threading.RLock: + """ + Get or create a lock for the given key. + + This method uses double-checked locking to ensure thread safety: + 1. First check without lock (fast path) + 2. Acquire meta-lock if needed + 3. Double-check inside the lock to prevent race conditions + + This approach minimizes lock contention while ensuring correctness. + + :param key: The key to get a lock for + :returns: An RLock instance for the given key + """ + if lock := self._locks.get(key): + return lock + + with self._meta_lock: + # Double-check inside the lock + lock = self._locks.get(key) + if lock is None: + lock = threading.RLock() + self._locks[key] = lock + return lock + + def cleanup(self, active_keys: set[str]) -> None: + """ + Remove locks for keys that are no longer in use. + + This prevents memory leaks from accumulating locks for resources + that have been disposed. + + :param active_keys: Set of keys that are still active + """ + with self._meta_lock: + # Find locks to remove + locks_to_remove = self._locks.keys() - active_keys + for key in locks_to_remove: + self._locks.pop(key, None) + + +EngineKey = str +TunnelKey = str + + +def _generate_cache_key(*args: Any) -> str: + """ + Generate a deterministic cache key from arbitrary arguments. + + Uses repr() for serialization and SHA-256 for hashing. The resulting key + is a 32-character hex string that: + 1. Is deterministic for the same inputs + 2. Does not expose sensitive data (everything is hashed) + 3. Has sufficient entropy to avoid collisions + + :param args: Arguments to include in the cache key + :returns: 32-character hex string + """ + # Use repr() which works with most Python objects and is deterministic + serialized = repr(args).encode("utf-8") + return hashlib.sha256(serialized).hexdigest()[:32] + + +class EngineModes(enum.Enum): + # reuse existing engine if available, otherwise create a new one; this mode should + # have a connection pool configured in the database + SINGLETON = enum.auto() + + # always create a new engine for every connection; this mode will use a NullPool + # and is the default behavior for Superset + NEW = enum.auto() + + +class EngineManager: + """ + A manager for SQLAlchemy engines. + + This class handles the creation and management of SQLAlchemy engines, allowing them + to be configured with connection pools and reused across requests. The default mode + is the original behavior for Superset, where we create a new engine for every + connection, using a NullPool. The `SINGLETON` mode, on the other hand, allows for + reusing of the engines, as well as configuring the pool through the database + settings. + """ + + def __init__( + self, + engine_context_manager: EngineContextManager, + db_connection_mutator: DBConnectionMutator | None = None, + mode: EngineModes = EngineModes.NEW, + cleanup_interval: timedelta = timedelta(minutes=5), + local_bind_address: str = "127.0.0.1", + tunnel_timeout: timedelta = timedelta(seconds=30), + ssh_timeout: timedelta = timedelta(seconds=1), + ) -> None: + self.engine_context_manager = engine_context_manager + self.db_connection_mutator = db_connection_mutator + self.mode = mode + self.cleanup_interval = cleanup_interval + self.local_bind_address = local_bind_address + + sshtunnel.TUNNEL_TIMEOUT = tunnel_timeout.total_seconds() + sshtunnel.SSH_TIMEOUT = ssh_timeout.total_seconds() + + self._engines: dict[EngineKey, Engine] = {} + self._engine_locks = _LockManager() + self._tunnels: dict[TunnelKey, SSHTunnelForwarder] = {} + self._tunnel_locks = _LockManager() + + # Background cleanup thread management + self._cleanup_thread: threading.Thread | None = None + self._cleanup_stop_event = threading.Event() + self._cleanup_thread_lock = threading.Lock() + + def __del__(self) -> None: + """ + Ensure cleanup thread is stopped when the manager is destroyed. + """ + try: + self.stop_cleanup_thread() + except Exception as ex: + # Avoid exceptions during garbage collection, but log if possible + try: + logger.warning("Error stopping cleanup thread: %s", ex) + except Exception: # noqa: S110 + # If logging fails during destruction, we can't do anything + pass + + @contextmanager + def get_engine( + self, + database: "Database", + catalog: str | None, + schema: str | None, + source: QuerySource | None, + ) -> Iterator[Engine]: + """ + Context manager to get a SQLAlchemy engine. + """ + # users can wrap the engine in their own context manager for different + # reasons + with self.engine_context_manager(database, catalog, schema): + # we need to check for errors indicating that OAuth2 is needed, and + # return the proper exception so it starts the authentication flow + from superset.utils.oauth2 import check_for_oauth2 + + with check_for_oauth2(database): + yield self._get_engine(database, catalog, schema, source) + + def _get_engine( + self, + database: "Database", + catalog: str | None, + schema: str | None, + source: QuerySource | None, + ) -> Engine: + """ + Get a specific engine, or create it if none exists. + """ + source = source or get_query_source_from_request() + user_id = get_user_id() + + if self.mode == EngineModes.NEW: + return self._create_engine( + database, + catalog, + schema, + source, + user_id, + ) + + engine_key = self._get_engine_key( + database, + catalog, + schema, + source, + user_id, + ) + + if engine := self._engines.get(engine_key): + return engine + + lock = self._engine_locks.get_lock(engine_key) + with lock: + # Double-check inside the lock + if engine := self._engines.get(engine_key): + return engine + + # Create and cache the engine + engine = self._create_engine( + database, + catalog, + schema, + source, + user_id, + ) + self._engines[engine_key] = engine + self._add_disposal_listener(engine, engine_key) + return engine + + def _get_engine_key( + self, + database: "Database", + catalog: str | None, + schema: str | None, + source: QuerySource | None, + user_id: int | None, + ) -> EngineKey: + """ + Generate a cache key for the engine. + + The key is a hash of all parameters that affect the engine, ensuring + proper cache isolation without exposing sensitive data. + + :returns: 32-character hex string + """ + uri, kwargs = self._get_engine_args( + database, + catalog, + schema, + source, + user_id, + ) + + return _generate_cache_key( + database.id, + catalog, + schema, + str(uri), + source, + user_id, + kwargs, + ) + + def _get_engine_args( + self, + database: "Database", + catalog: str | None, + schema: str | None, + source: QuerySource | None, + user_id: int | None, + ) -> tuple[URL, dict[str, Any]]: + """ + Build the almost final SQLAlchemy URI and engine kwargs. + + "Almost" final because we may still need to mutate the URI if an SSH tunnel is + needed, since it needs to connect to the tunnel instead of the original DB. But + that information is only available after the tunnel is created. + """ + # Import here to avoid circular imports + from superset.extensions import security_manager + from superset.utils.feature_flag_manager import FeatureFlagManager + + uri = make_url_safe(database.sqlalchemy_uri_decrypted) + + extra = database.get_extra(source) + # Make a copy to avoid mutating the original extra dict + kwargs = dict(extra.get("engine_params", {})) + + # get pool class + if self.mode == EngineModes.NEW or "poolclass" not in kwargs: + kwargs["poolclass"] = pool.NullPool + else: + pools = { + "queue": pool.QueuePool, + "singleton": pool.SingletonThreadPool, + "assertion": pool.AssertionPool, + "null": pool.NullPool, + "static": pool.StaticPool, + } + kwargs["poolclass"] = pools.get(extra["poolclass"], pool.QueuePool) + + # update URI for specific catalog/schema + connect_args = dict(extra.get("connect_args", {})) + uri, connect_args = database.db_engine_spec.adjust_engine_params( + uri, + connect_args, + catalog, + schema, + ) Review Comment: **Suggestion:** Connection-level `connect_args` options configured on the database (for example SSL, timeouts, driver-specific flags) are silently ignored: they are read from the wrong place in `extra` and never written back into `kwargs`, so `create_engine` is called without them, changing behavior compared to the previous implementation and potentially breaking connections. [logic error] <details> <summary><b>Severity Level:</b> Critical 🚨</summary> ```mdx - ❌ DB connections fail when connect_args required. - ⚠️ SSL and driver options ignored causing misconfigurations. - ⚠️ Affects all code paths creating engines for databases. ``` </details> ```suggestion connect_args = dict(kwargs.get("connect_args", {})) uri, connect_args = database.db_engine_spec.adjust_engine_params( uri, connect_args, catalog, schema, ) kwargs["connect_args"] = connect_args ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Configure a Database with connect_args (for example SSL, timeout flags) so they exist in the DB extra and should be passed to SQLAlchemy. The manager reads extras at superset/engines/manager.py:320-325 where `extra = database.get_extra(source)` and `kwargs = dict(extra.get("engine_params", {}))`. 2. Trigger EngineManager._create_engine (superset/engines/manager.py:406-439) — used by _get_engine when obtaining an engine for queries (lines 216-266). This is a standard path whenever a DB connection is established. 3. Execution reaches the connect_args handling at superset/engines/manager.py:340-346. The code reads connect_args from `extra` into a local variable, passes them to db_engine_spec.adjust_engine_params, but never writes the resulting connect_args back into `kwargs`. 4. Later, create_engine is called with `kwargs` at superset/engines/manager.py:434-436. Because kwargs["connect_args"] was never set, the connection-level options (SSL, driver flags) are missing from create_engine and thus not applied. 5. Reproducible symptom: a database that required connect_args (for example forcing TLS cipher or driver option) will fail to connect or behave differently; logs will show create_engine called without expected connect_args. The code shown in the PR (final file) proves kwargs is the dict passed to create_engine but is never updated with connect_args in this function. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/engines/manager.py **Line:** 341:346 **Comment:** *Logic Error: Connection-level `connect_args` options configured on the database (for example SSL, timeouts, driver-specific flags) are silently ignored: they are read from the wrong place in `extra` and never written back into `kwargs`, so `create_engine` is called without them, changing behavior compared to the previous implementation and potentially breaking connections. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/models/core.py: ########## @@ -424,130 +419,31 @@ def get_effective_user(self, object_url: URL) -> str | None: ) @contextmanager - def get_sqla_engine( # pylint: disable=too-many-arguments + def get_sqla_engine( self, catalog: str | None = None, schema: str | None = None, - nullpool: bool = True, source: utils.QuerySource | None = None, - ) -> Engine: + ) -> Iterator[Engine]: """ Context manager for a SQLAlchemy engine. - This method will return a context manager for a SQLAlchemy engine. Using the - context manager (as opposed to the engine directly) is important because we need - to potentially establish SSH tunnels before the connection is created, and clean - them up once the engine is no longer used. + This method will return a context manager for a SQLAlchemy engine. The engine + manager handles connection pooling, SSH tunnels, and other connection details + based on the configured mode (NEW or SINGLETON). """ + # Import here to avoid circular imports + from superset.extensions import engine_manager_extension - sqlalchemy_uri = self.sqlalchemy_uri_decrypted - - ssh_context_manager = ( - ssh_manager_factory.instance.create_tunnel( - ssh_tunnel=self.ssh_tunnel, - sqlalchemy_database_uri=sqlalchemy_uri, - ) - if self.ssh_tunnel - else nullcontext() - ) - - with ssh_context_manager as ssh_context: - if ssh_context: - logger.info( - "[SSH] Successfully created tunnel w/ %s tunnel_timeout + %s " - "ssh_timeout at %s", - sshtunnel.TUNNEL_TIMEOUT, - sshtunnel.SSH_TIMEOUT, - ssh_context.local_bind_address, - ) - sqlalchemy_uri = ssh_manager_factory.instance.build_sqla_url( - sqlalchemy_uri, - ssh_context, - ) - - engine_context_manager = app.config["ENGINE_CONTEXT_MANAGER"] - with engine_context_manager(self, catalog, schema): - with check_for_oauth2(self): - yield self._get_sqla_engine( - catalog=catalog, - schema=schema, - nullpool=nullpool, - source=source, - sqlalchemy_uri=sqlalchemy_uri, - ) - - def _get_sqla_engine( # pylint: disable=too-many-locals # noqa: C901 - self, - catalog: str | None = None, - schema: str | None = None, - nullpool: bool = True, - source: utils.QuerySource | None = None, - sqlalchemy_uri: str | None = None, - ) -> Engine: - sqlalchemy_url = make_url_safe( - sqlalchemy_uri if sqlalchemy_uri else self.sqlalchemy_uri_decrypted - ) - self.db_engine_spec.validate_database_uri(sqlalchemy_url) - - extra = self.get_extra(source) - engine_kwargs = extra.get("engine_params", {}) - if nullpool: - engine_kwargs["poolclass"] = NullPool - connect_args = engine_kwargs.setdefault("connect_args", {}) - - # modify URL/args for a specific catalog/schema - sqlalchemy_url, connect_args = self.db_engine_spec.adjust_engine_params( - uri=sqlalchemy_url, - connect_args=connect_args, + # Use the engine manager to get the engine + engine_manager = engine_manager_extension.manager Review Comment: **Suggestion:** The new `get_sqla_engine` implementation calls the engine manager directly without wrapping engine creation in `check_for_oauth2`, so for OAuth2-enabled databases that raise during engine/connection creation (e.g. BigQuery, Snowflake) the OAuth2 exceptions will no longer be converted into the expected OAuth2 redirect flow and will instead surface as generic errors, regressing the behavior that `get_raw_connection` still maintains. [logic error] <details> <summary><b>Severity Level:</b> Major ⚠️</summary> ```mdx - ❌ SQL execution paths may surface generic errors. - ❌ Inspector/catalog endpoints fail without OAuth2 redirect. - ⚠️ Query compilation may expose 500s to users. - ⚠️ UX OAuth2 dance not triggered for affected DBs. ``` </details> ```suggestion # Use the engine manager to get the engine, wrapping creation with OAuth2 handling engine_manager = engine_manager_extension.manager with check_for_oauth2(self): ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Ensure a Database row is configured with OAuth2 credentials (encrypted_extra contains oauth2_client_info). Confirm via superset/models/core.py: get_oauth2_config() which returns a non-None config if present (see Database.get_oauth2_config in superset/models/core.py). 2. Trigger any code path that calls Database.get_sqla_engine(), for example: - compile_sqla_query() calls with self.get_sqla_engine(catalog=catalog, schema=schema) (superset/models/core.py around compile_sqla_query). - get_inspector() calls with self.get_sqla_engine(catalog=catalog, schema=schema) (superset/models/core.py: get_inspector). - _execute_sql_with_mutation_and_logging() calls get_sqla_engine to obtain engine.url (superset/models/core.py: lines where engine_url = engine.url). 3. With the PR code as-is, calling Database.get_sqla_engine() (superset/models/core.py: get_sqla_engine block starting at line 435) will call engine_manager.get_engine(...) directly without wrapping the call in check_for_oauth2(self). 4. If engine_manager.get_engine(...) raises an exception that the DB engine spec class deems to require OAuth2 (i.e., db_engine_spec.needs_oauth2(ex) would return True), the exception will bubble up as a generic error because get_sqla_engine did not call check_for_oauth2(self). The equivalent earlier protection exists in get_raw_connection() where check_for_oauth2(self) wraps raw_connection acquisition (see superset/models/core.py: get_raw_connection). 5. Observed behavior: instead of initiating the OAuth2 redirect/dance (the UI flow triggered by start_oauth2_dance()), the system surfaces a generic exception to the caller (500/error), regressing previous behavior where OAuth2-required errors were intercepted and the OAuth2 flow was started. Note: This reproduction is based on concrete call sites in this file (compile_sqla_query, get_inspector, _execute_sql_with_mutation_and_logging, get_raw_connection) and the presence of check_for_oauth2 wrapping in get_raw_connection but not in the new get_sqla_engine implementation. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/models/core.py **Line:** 438:439 **Comment:** *Logic Error: The new `get_sqla_engine` implementation calls the engine manager directly without wrapping engine creation in `check_for_oauth2`, so for OAuth2-enabled databases that raise during engine/connection creation (e.g. BigQuery, Snowflake) the OAuth2 exceptions will no longer be converted into the expected OAuth2 redirect flow and will instead surface as generic errors, regressing the behavior that `get_raw_connection` still maintains. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## tests/unit_tests/engines/manager_test.py: ########## @@ -0,0 +1,527 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Unit tests for EngineManager.""" + +import threading +from collections.abc import Iterator +from unittest.mock import MagicMock, patch + +import pytest + +from superset.engines.manager import _LockManager, EngineManager, EngineModes + Review Comment: **Suggestion:** The tests call `EngineManager._get_engine` and `get_engine` without a Flask request or application context, but those methods internally use `get_query_source_from_request` and `get_user_id`, which rely on Flask's `request` and `g` and will raise "working outside of request context"; patching these functions in the EngineManager module for tests avoids spurious failures unrelated to engine logic. [logic error] <details> <summary><b>Severity Level:</b> Major ⚠️</summary> ```mdx - ❌ Unit tests fail with Flask context errors. - ⚠️ CI pipeline test suite blocked by failures. - ⚠️ Developer workflow slowed by spurious test noise. ``` </details> ```suggestion @pytest.fixture(autouse=True) def _mock_engine_manager_dependencies(monkeypatch): """ Avoid Flask request/g dependencies when calling EngineManager in unit tests. """ import superset.engines.manager as manager monkeypatch.setattr(manager, "get_query_source_from_request", lambda: None) monkeypatch.setattr(manager, "get_user_id", lambda: None) ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Run the unit tests for the engine manager: pytest tests/unit_tests/engines/manager_test.py. The test module imports EngineManager (file: tests/unit_tests/engines/manager_test.py) and executes tests such as test_get_engine_new_mode and test_engine_oauth2_error_handling. 2. In test_get_engine_new_mode (defined in the same file, added around lines 128-149 in the PR), the test calls engine_manager._get_engine(mock_database, "catalog1", "schema1", None). That code path inside EngineManager uses helper functions tied to Flask request context (e.g., get_query_source_from_request / get_user_id) which access flask.request or flask.g. 3. Because pytest runs these tests outside of any Flask request/app context, calling those helpers triggers a "working outside of request context" runtime error coming from Flask internals when EngineManager calls get_query_source_from_request/get_user_id. The failure surface is seen during test collection/execution of the test functions in this file. 4. The suggested change (add an autouse fixture patching get_query_source_from_request and get_user_id) prevents the Flask-dependent helpers from executing during tests so the EngineManager logic is exercised without needing a Flask request context. This reproduces locally by removing the fixture and re-running pytest to observe the Flask "working outside of request context" error. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** tests/unit_tests/engines/manager_test.py **Line:** 27:27 **Comment:** *Logic Error: The tests call `EngineManager._get_engine` and `get_engine` without a Flask request or application context, but those methods internally use `get_query_source_from_request` and `get_user_id`, which rely on Flask's `request` and `g` and will raise "working outside of request context"; patching these functions in the EngineManager module for tests avoids spurious failures unrelated to engine logic. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/engines/manager.py: ########## @@ -0,0 +1,630 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import enum +import hashlib +import logging +import threading +from contextlib import contextmanager +from datetime import timedelta +from io import StringIO +from typing import Any, Iterator, TYPE_CHECKING + +import sshtunnel +from paramiko import RSAKey +from sqlalchemy import create_engine, event, pool +from sqlalchemy.engine import Engine +from sqlalchemy.engine.url import URL +from sshtunnel import SSHTunnelForwarder + +from superset.databases.utils import make_url_safe +from superset.superset_typing import DBConnectionMutator, EngineContextManager +from superset.utils.core import get_query_source_from_request, get_user_id, QuerySource + +if TYPE_CHECKING: + from superset.databases.ssh_tunnel.models import SSHTunnel + from superset.models.core import Database + + +logger = logging.getLogger(__name__) + + +class _LockManager: + """ + Manages per-key locks safely without defaultdict race conditions. + + This class provides a thread-safe way to create and manage locks for specific keys, + avoiding the race conditions that occur when using defaultdict with threading.Lock. + + The implementation uses a two-level locking strategy: + 1. A meta-lock to protect the lock dictionary itself + 2. Per-key locks to protect specific resources + + This ensures that: + - Different keys can be locked concurrently (scalability) + - Lock creation is thread-safe (no race conditions) + - The same key always gets the same lock instance + """ + + def __init__(self) -> None: + self._locks: dict[str, threading.RLock] = {} + self._meta_lock = threading.Lock() + + def get_lock(self, key: str) -> threading.RLock: + """ + Get or create a lock for the given key. + + This method uses double-checked locking to ensure thread safety: + 1. First check without lock (fast path) + 2. Acquire meta-lock if needed + 3. Double-check inside the lock to prevent race conditions + + This approach minimizes lock contention while ensuring correctness. + + :param key: The key to get a lock for + :returns: An RLock instance for the given key + """ + if lock := self._locks.get(key): + return lock + + with self._meta_lock: + # Double-check inside the lock + lock = self._locks.get(key) + if lock is None: + lock = threading.RLock() + self._locks[key] = lock + return lock + + def cleanup(self, active_keys: set[str]) -> None: + """ + Remove locks for keys that are no longer in use. + + This prevents memory leaks from accumulating locks for resources + that have been disposed. + + :param active_keys: Set of keys that are still active + """ + with self._meta_lock: + # Find locks to remove + locks_to_remove = self._locks.keys() - active_keys + for key in locks_to_remove: + self._locks.pop(key, None) + + +EngineKey = str +TunnelKey = str + + +def _generate_cache_key(*args: Any) -> str: + """ + Generate a deterministic cache key from arbitrary arguments. + + Uses repr() for serialization and SHA-256 for hashing. The resulting key + is a 32-character hex string that: + 1. Is deterministic for the same inputs + 2. Does not expose sensitive data (everything is hashed) + 3. Has sufficient entropy to avoid collisions + + :param args: Arguments to include in the cache key + :returns: 32-character hex string + """ + # Use repr() which works with most Python objects and is deterministic + serialized = repr(args).encode("utf-8") + return hashlib.sha256(serialized).hexdigest()[:32] + + +class EngineModes(enum.Enum): + # reuse existing engine if available, otherwise create a new one; this mode should + # have a connection pool configured in the database + SINGLETON = enum.auto() + + # always create a new engine for every connection; this mode will use a NullPool + # and is the default behavior for Superset + NEW = enum.auto() + + +class EngineManager: + """ + A manager for SQLAlchemy engines. + + This class handles the creation and management of SQLAlchemy engines, allowing them + to be configured with connection pools and reused across requests. The default mode + is the original behavior for Superset, where we create a new engine for every + connection, using a NullPool. The `SINGLETON` mode, on the other hand, allows for + reusing of the engines, as well as configuring the pool through the database + settings. + """ + + def __init__( + self, + engine_context_manager: EngineContextManager, + db_connection_mutator: DBConnectionMutator | None = None, + mode: EngineModes = EngineModes.NEW, + cleanup_interval: timedelta = timedelta(minutes=5), + local_bind_address: str = "127.0.0.1", + tunnel_timeout: timedelta = timedelta(seconds=30), + ssh_timeout: timedelta = timedelta(seconds=1), + ) -> None: + self.engine_context_manager = engine_context_manager + self.db_connection_mutator = db_connection_mutator + self.mode = mode + self.cleanup_interval = cleanup_interval + self.local_bind_address = local_bind_address + + sshtunnel.TUNNEL_TIMEOUT = tunnel_timeout.total_seconds() + sshtunnel.SSH_TIMEOUT = ssh_timeout.total_seconds() + + self._engines: dict[EngineKey, Engine] = {} + self._engine_locks = _LockManager() + self._tunnels: dict[TunnelKey, SSHTunnelForwarder] = {} + self._tunnel_locks = _LockManager() + + # Background cleanup thread management + self._cleanup_thread: threading.Thread | None = None + self._cleanup_stop_event = threading.Event() + self._cleanup_thread_lock = threading.Lock() + + def __del__(self) -> None: + """ + Ensure cleanup thread is stopped when the manager is destroyed. + """ + try: + self.stop_cleanup_thread() + except Exception as ex: + # Avoid exceptions during garbage collection, but log if possible + try: + logger.warning("Error stopping cleanup thread: %s", ex) + except Exception: # noqa: S110 + # If logging fails during destruction, we can't do anything + pass + + @contextmanager + def get_engine( + self, + database: "Database", + catalog: str | None, + schema: str | None, + source: QuerySource | None, + ) -> Iterator[Engine]: + """ + Context manager to get a SQLAlchemy engine. + """ + # users can wrap the engine in their own context manager for different + # reasons + with self.engine_context_manager(database, catalog, schema): + # we need to check for errors indicating that OAuth2 is needed, and + # return the proper exception so it starts the authentication flow + from superset.utils.oauth2 import check_for_oauth2 + + with check_for_oauth2(database): + yield self._get_engine(database, catalog, schema, source) + + def _get_engine( + self, + database: "Database", + catalog: str | None, + schema: str | None, + source: QuerySource | None, + ) -> Engine: + """ + Get a specific engine, or create it if none exists. + """ + source = source or get_query_source_from_request() + user_id = get_user_id() + + if self.mode == EngineModes.NEW: + return self._create_engine( + database, + catalog, + schema, + source, + user_id, + ) + + engine_key = self._get_engine_key( + database, + catalog, + schema, + source, + user_id, + ) + + if engine := self._engines.get(engine_key): + return engine + + lock = self._engine_locks.get_lock(engine_key) + with lock: + # Double-check inside the lock + if engine := self._engines.get(engine_key): + return engine + + # Create and cache the engine + engine = self._create_engine( + database, + catalog, + schema, + source, + user_id, + ) + self._engines[engine_key] = engine + self._add_disposal_listener(engine, engine_key) + return engine + + def _get_engine_key( + self, + database: "Database", + catalog: str | None, + schema: str | None, + source: QuerySource | None, + user_id: int | None, + ) -> EngineKey: + """ + Generate a cache key for the engine. + + The key is a hash of all parameters that affect the engine, ensuring + proper cache isolation without exposing sensitive data. + + :returns: 32-character hex string + """ + uri, kwargs = self._get_engine_args( + database, + catalog, + schema, + source, + user_id, + ) + + return _generate_cache_key( + database.id, + catalog, + schema, + str(uri), + source, + user_id, + kwargs, + ) + + def _get_engine_args( + self, + database: "Database", + catalog: str | None, + schema: str | None, + source: QuerySource | None, + user_id: int | None, + ) -> tuple[URL, dict[str, Any]]: + """ + Build the almost final SQLAlchemy URI and engine kwargs. + + "Almost" final because we may still need to mutate the URI if an SSH tunnel is + needed, since it needs to connect to the tunnel instead of the original DB. But + that information is only available after the tunnel is created. + """ + # Import here to avoid circular imports + from superset.extensions import security_manager + from superset.utils.feature_flag_manager import FeatureFlagManager + + uri = make_url_safe(database.sqlalchemy_uri_decrypted) + + extra = database.get_extra(source) + # Make a copy to avoid mutating the original extra dict + kwargs = dict(extra.get("engine_params", {})) + + # get pool class + if self.mode == EngineModes.NEW or "poolclass" not in kwargs: + kwargs["poolclass"] = pool.NullPool + else: + pools = { + "queue": pool.QueuePool, + "singleton": pool.SingletonThreadPool, + "assertion": pool.AssertionPool, + "null": pool.NullPool, + "static": pool.StaticPool, + } + kwargs["poolclass"] = pools.get(extra["poolclass"], pool.QueuePool) Review Comment: **Suggestion:** The engine pool configuration path is broken: when a custom `poolclass` is configured in the database `extra.engine_params`, the code checks for `"poolclass"` in `kwargs` but then reads `extra["poolclass"]` instead of `kwargs["poolclass"]`, which will raise a `KeyError` at runtime and prevent the engine from being created. [logic error] <details> <summary><b>Severity Level:</b> Major ⚠️</summary> ```mdx - ❌ Engine creation fails for databases with engine_params.poolclass. - ⚠️ Singleton pooling configuration becomes unusable. - ⚠️ Affects any feature using pooled DB connections. ``` </details> ```suggestion pool_name = kwargs["poolclass"] if isinstance(pool_name, str): kwargs["poolclass"] = pools.get(pool_name, pool.QueuePool) ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Construct a Database record whose extra contains engine_params with a string poolclass (common user configuration). This is read at superset/engines/manager.py:_get_engine_args around lines 320-326 where `extra = database.get_extra(source)` and `kwargs = dict(extra.get("engine_params", {}))`. 2. Call EngineManager._get_engine_args (superset/engines/manager.py:301-404) — reached indirectly from EngineManager._create_engine (lines 406-439) which is used by EngineManager._get_engine (lines 216-266) when EngineModes.SINGLETON is enabled. 3. Execution reaches the pool selection block at superset/engines/manager.py:326-337. Since kwargs contains "poolclass" (copied from engine_params), the code goes into the else branch but then attempts to read extra["poolclass"] (line 337) instead of kwargs["poolclass"]. 4. Because extra does not have a top-level "poolclass" key (it's under engine_params), Python raises a KeyError at superset/engines/manager.py:337 and engine creation fails, preventing create_engine from being called and blocking any DB connection for that database configuration. 5. Evidence: the file in the PR shows kwargs is built from extra["engine_params"] at line 324, so looking up extra["poolclass"] is inconsistent and provably incorrect in the same function. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/engines/manager.py **Line:** 337:337 **Comment:** *Logic Error: The engine pool configuration path is broken: when a custom `poolclass` is configured in the database `extra.engine_params`, the code checks for `"poolclass"` in `kwargs` but then reads `extra["poolclass"]` instead of `kwargs["poolclass"]`, which will raise a `KeyError` at runtime and prevent the engine from being created. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/extensions/engine_manager.py: ########## @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +from datetime import timedelta + +from flask import Flask + +from superset.engines.manager import EngineManager, EngineModes + +logger = logging.getLogger(__name__) + + +class EngineManagerExtension: + """ + Flask extension for managing SQLAlchemy engines in Superset. + + This extension creates and configures an EngineManager instance based on + Flask configuration, handling startup and shutdown of background cleanup + threads as needed. + """ + + def __init__(self) -> None: + self.engine_manager: EngineManager | None = None + + def init_app(self, app: Flask) -> None: + """ + Initialize the EngineManager with Flask app configuration. + """ + engine_context_manager = app.config["ENGINE_CONTEXT_MANAGER"] + db_connection_mutator = app.config["DB_CONNECTION_MUTATOR"] + mode = app.config["ENGINE_MANAGER_MODE"] + cleanup_interval = app.config["ENGINE_MANAGER_CLEANUP_INTERVAL"] + local_bind_address = app.config["SSH_TUNNEL_LOCAL_BIND_ADDRESS"] + tunnel_timeout = timedelta(seconds=app.config["SSH_TUNNEL_TIMEOUT_SEC"]) + ssh_timeout = timedelta(seconds=app.config["SSH_TUNNEL_PACKET_TIMEOUT_SEC"]) + auto_start_cleanup = app.config["ENGINE_MANAGER_AUTO_START_CLEANUP"] + + # Create the engine manager + self.engine_manager = EngineManager( + engine_context_manager, + db_connection_mutator, + mode, + cleanup_interval, + local_bind_address, + tunnel_timeout, + ssh_timeout, + ) + + # Start cleanup thread if requested and in SINGLETON mode + if auto_start_cleanup and mode == EngineModes.SINGLETON: + self.engine_manager.start_cleanup_thread() + logger.info("Started EngineManager cleanup thread") + + # Register shutdown handler + def shutdown_engine_manager() -> None: + if self.engine_manager: + self.engine_manager.stop_cleanup_thread() + + app.teardown_appcontext_funcs.append(lambda exc: None) Review Comment: **Suggestion:** The shutdown handler is only registered with `atexit` and never hooked into Flask's teardown mechanism; instead a no-op lambda is added to `app.teardown_appcontext_funcs`, so the cleanup thread is not stopped on app teardown and the intended `shutdown_engine_manager` is never invoked until interpreter exit. [resource leak] <details> <summary><b>Severity Level:</b> Major ⚠️</summary> ```mdx - ⚠️ Background cleanup thread continues after app teardown. - ⚠️ Tests creating multiple app contexts may hang. - ⚠️ Worker processes retain threads until process exit. - ⚠️ Resource leaks during app reloads in dev. ``` </details> ```suggestion def shutdown_engine_manager(exc: BaseException | None = None) -> None: if self.engine_manager: self.engine_manager.stop_cleanup_thread() app.teardown_appcontext(shutdown_engine_manager) ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Inspect extension initialization at superset/extensions/engine_manager.py:40-63 where EngineManagerExtension.init_app() creates self.engine_manager and may start a cleanup thread (see lines 54-66 in the PR). 2. Observe shutdown registration code at lines 70-79: a shutdown function shutdown_engine_manager() is defined (70) but the code appends a no-op lambda to app.teardown_appcontext_funcs (line 74) instead of registering shutdown_engine_manager with Flask teardown, and only registers shutdown_engine_manager with atexit (line 79). 3. Reproduce by running a Flask app, calling init_app(app) so the cleanup thread is started (SINGLETON mode + ENGINE_MANAGER_AUTO_START_CLEANUP True). Then trigger Flask app context teardown (for example, when the Flask test client finishes a request or when an application context is popped). Because shutdown_engine_manager is not registered with Flask's teardown, the cleanup thread remains running after the app context ends. 4. Confirm behavior: the cleanup thread continues until process exit (atexit) rather than stopping on app teardown — observable by checking running threads after app.pop(): the EngineManager cleanup thread is still present and only stops when interpreter exits (atexit handler). Note: This reproduction is based on the final file contents in superset/extensions/engine_manager.py where app.teardown_appcontext is not called and a no-op is appended instead. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/extensions/engine_manager.py **Line:** 71:74 **Comment:** *Resource Leak: The shutdown handler is only registered with `atexit` and never hooked into Flask's teardown mechanism; instead a no-op lambda is added to `app.teardown_appcontext_funcs`, so the cleanup thread is not stopped on app teardown and the intended `shutdown_engine_manager` is never invoked until interpreter exit. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> ########## superset/extensions/engine_manager.py: ########## @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +from datetime import timedelta + +from flask import Flask + +from superset.engines.manager import EngineManager, EngineModes + +logger = logging.getLogger(__name__) + + +class EngineManagerExtension: + """ + Flask extension for managing SQLAlchemy engines in Superset. + + This extension creates and configures an EngineManager instance based on + Flask configuration, handling startup and shutdown of background cleanup + threads as needed. + """ + + def __init__(self) -> None: + self.engine_manager: EngineManager | None = None + + def init_app(self, app: Flask) -> None: + """ + Initialize the EngineManager with Flask app configuration. + """ + engine_context_manager = app.config["ENGINE_CONTEXT_MANAGER"] + db_connection_mutator = app.config["DB_CONNECTION_MUTATOR"] + mode = app.config["ENGINE_MANAGER_MODE"] + cleanup_interval = app.config["ENGINE_MANAGER_CLEANUP_INTERVAL"] + local_bind_address = app.config["SSH_TUNNEL_LOCAL_BIND_ADDRESS"] + tunnel_timeout = timedelta(seconds=app.config["SSH_TUNNEL_TIMEOUT_SEC"]) + ssh_timeout = timedelta(seconds=app.config["SSH_TUNNEL_PACKET_TIMEOUT_SEC"]) + auto_start_cleanup = app.config["ENGINE_MANAGER_AUTO_START_CLEANUP"] + + # Create the engine manager + self.engine_manager = EngineManager( Review Comment: **Suggestion:** Recalling `init_app` multiple times on the same `EngineManagerExtension` (for example when creating multiple Flask apps in a single process during tests) will replace `self.engine_manager` with a new instance without stopping the old one's cleanup thread, leaving background threads running with no way to stop them. [resource leak] <details> <summary><b>Severity Level:</b> Major ⚠️</summary> ```mdx - ⚠️ Tests reusing extension leak threads across runs. - ⚠️ Long-running processes accumulate background threads. - ⚠️ Increased memory and FD usage over time. - ⚠️ Hard-to-debug intermittent test hangs. ``` </details> ```suggestion # Stop any existing manager's cleanup thread before creating a new one if self.engine_manager: self.engine_manager.stop_cleanup_thread() ``` <details> <summary><b>Steps of Reproduction ✅ </b></summary> ```mdx 1. Locate init_app() in superset/extensions/engine_manager.py where self.engine_manager is assigned (lines 40-63). The current code unconditionally overwrites self.engine_manager at lines 54-62. 2. Reproduce in tests or multi-app process: create first Flask app and call EngineManagerExtension.init_app(app1) which starts an EngineManager and (when configured) starts its cleanup thread (lines 64-66). Then create a second Flask app and call init_app(app2) on the same EngineManagerExtension instance. 3. Because the code replaces self.engine_manager without stopping the old instance, the original EngineManager cleanup thread continues running with no reference in the extension and cannot be stopped except at process exit. Verify by listing threads after second init_app: a stale cleanup thread exists alongside the new one. 4. This scenario is realistic in test suites or embedding environments where the same extension instance is reused for multiple app instances. ``` </details> <details> <summary><b>Prompt for AI Agent 🤖 </b></summary> ```mdx This is a comment left during a code review. **Path:** superset/extensions/engine_manager.py **Line:** 54:54 **Comment:** *Resource Leak: Recalling `init_app` multiple times on the same `EngineManagerExtension` (for example when creating multiple Flask apps in a single process during tests) will replace `self.engine_manager` with a new instance without stopping the old one's cleanup thread, leaving background threads running with no way to stop them. Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise. ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
