aminghadersohi commented on code in PR #39162:
URL: https://github.com/apache/superset/pull/39162#discussion_r3343538648


##########
superset/db_engine_specs/databricks.py:
##########
@@ -107,13 +132,144 @@ def literal_processor(
         pass
 
 
+class DatabricksOAuth2Mixin:
+    """Mixin adding OAuth2 per-user impersonation to all Databricks engine 
specs.
+
+    When OAuth2 is configured, each user authenticates individually with the
+    Databricks workspace. Their personal OAuth2 token replaces the shared PAT
+    in the connection URI, so queries execute under the user's identity.
+
+    Databricks OIDC endpoints are auto-derived from a ``host`` key in the
+    ``DATABASE_OAUTH2_CLIENTS`` config, so admins only need to provide
+    client_id, secret, and workspace host.
+    """
+
+    supports_oauth2 = True
+    oauth2_scope = "sql offline_access"
+    oauth2_exception = DatabricksAuthError
+    oauth2_token_request_type = "data"  # noqa: S105
+
+    @classmethod
+    def get_oauth2_config(cls) -> OAuth2ClientConfig | None:
+        oauth2_config = app.config["DATABASE_OAUTH2_CLIENTS"]
+        # Look up by exact engine_name first, then fall back to "Databricks"
+        # so a single config entry works for all Databricks specs (e.g.,
+        # "Databricks SQL Endpoint", "Databricks Interactive Cluster", etc.)
+        spec_config = oauth2_config.get(
+            cls.engine_name,  # type: ignore[attr-defined]
+            oauth2_config.get("Databricks"),
+        )
+        if spec_config is None:
+            return None
+        host = spec_config.get("host", "")
+        redirect_uri = app.config.get(
+            "DATABASE_OAUTH2_REDIRECT_URI",

Review Comment:
   **HIGH**: Python evaluates all function arguments before calling `.get()`, 
so `url_for("DatabaseRestApi.oauth2", _external=True)` runs unconditionally — 
even when `DATABASE_OAUTH2_REDIRECT_URI` is configured and its result will be 
discarded. `url_for` with `_external=True` requires an active request context; 
calling `get_oauth2_config()` from a unit test without a full request setup (or 
any non-request context) raises `RuntimeError: Working outside of request 
context`.
   
   ```python
   # lazy — url_for only evaluated when the config key is absent
   redirect_uri = (
       app.config.get("DATABASE_OAUTH2_REDIRECT_URI")
       or url_for("DatabaseRestApi.oauth2", _external=True)
   )
   ```



##########
superset/db_engine_specs/databricks.py:
##########
@@ -107,13 +132,144 @@ def literal_processor(
         pass
 
 
+class DatabricksOAuth2Mixin:
+    """Mixin adding OAuth2 per-user impersonation to all Databricks engine 
specs.
+
+    When OAuth2 is configured, each user authenticates individually with the
+    Databricks workspace. Their personal OAuth2 token replaces the shared PAT
+    in the connection URI, so queries execute under the user's identity.
+
+    Databricks OIDC endpoints are auto-derived from a ``host`` key in the
+    ``DATABASE_OAUTH2_CLIENTS`` config, so admins only need to provide
+    client_id, secret, and workspace host.
+    """
+
+    supports_oauth2 = True
+    oauth2_scope = "sql offline_access"
+    oauth2_exception = DatabricksAuthError
+    oauth2_token_request_type = "data"  # noqa: S105
+
+    @classmethod
+    def get_oauth2_config(cls) -> OAuth2ClientConfig | None:
+        oauth2_config = app.config["DATABASE_OAUTH2_CLIENTS"]
+        # Look up by exact engine_name first, then fall back to "Databricks"
+        # so a single config entry works for all Databricks specs (e.g.,
+        # "Databricks SQL Endpoint", "Databricks Interactive Cluster", etc.)
+        spec_config = oauth2_config.get(
+            cls.engine_name,  # type: ignore[attr-defined]
+            oauth2_config.get("Databricks"),
+        )
+        if spec_config is None:
+            return None
+        host = spec_config.get("host", "")
+        redirect_uri = app.config.get(
+            "DATABASE_OAUTH2_REDIRECT_URI",
+            url_for("DatabaseRestApi.oauth2", _external=True),
+        )
+
+        return {
+            "id": spec_config["id"],
+            "secret": spec_config["secret"],
+            "scope": spec_config.get("scope") or cls.oauth2_scope,
+            "redirect_uri": redirect_uri,
+            "authorization_request_uri": spec_config.get(
+                "authorization_request_uri",

Review Comment:
   **MEDIUM**: When `host` is absent or empty, both `authorization_request_uri` 
and `token_request_uri` silently become `None`. The OAuth2 framework receives 
`None` for required URLs and fails later with an opaque error. Fail fast:
   
   ```python
   if not host and not spec_config.get("authorization_request_uri"):
       raise ValueError(
           'Databricks OAuth2 config requires either "host" or explicit '
           '"authorization_request_uri" / "token_request_uri"'
       )
   ```



##########
superset/db_engine_specs/databricks.py:
##########
@@ -107,13 +132,144 @@ def literal_processor(
         pass
 
 
+class DatabricksOAuth2Mixin:
+    """Mixin adding OAuth2 per-user impersonation to all Databricks engine 
specs.
+
+    When OAuth2 is configured, each user authenticates individually with the
+    Databricks workspace. Their personal OAuth2 token replaces the shared PAT
+    in the connection URI, so queries execute under the user's identity.
+
+    Databricks OIDC endpoints are auto-derived from a ``host`` key in the
+    ``DATABASE_OAUTH2_CLIENTS`` config, so admins only need to provide
+    client_id, secret, and workspace host.
+    """
+
+    supports_oauth2 = True
+    oauth2_scope = "sql offline_access"
+    oauth2_exception = DatabricksAuthError
+    oauth2_token_request_type = "data"  # noqa: S105
+
+    @classmethod
+    def get_oauth2_config(cls) -> OAuth2ClientConfig | None:
+        oauth2_config = app.config["DATABASE_OAUTH2_CLIENTS"]
+        # Look up by exact engine_name first, then fall back to "Databricks"
+        # so a single config entry works for all Databricks specs (e.g.,
+        # "Databricks SQL Endpoint", "Databricks Interactive Cluster", etc.)
+        spec_config = oauth2_config.get(
+            cls.engine_name,  # type: ignore[attr-defined]
+            oauth2_config.get("Databricks"),
+        )
+        if spec_config is None:
+            return None
+        host = spec_config.get("host", "")
+        redirect_uri = app.config.get(
+            "DATABASE_OAUTH2_REDIRECT_URI",
+            url_for("DatabaseRestApi.oauth2", _external=True),
+        )
+
+        return {
+            "id": spec_config["id"],
+            "secret": spec_config["secret"],
+            "scope": spec_config.get("scope") or cls.oauth2_scope,
+            "redirect_uri": redirect_uri,
+            "authorization_request_uri": spec_config.get(
+                "authorization_request_uri",
+                f"https://{host}/oidc/v1/authorize"; if host else None,
+            ),
+            "token_request_uri": spec_config.get(
+                "token_request_uri",
+                f"https://{host}/oidc/v1/token"; if host else None,
+            ),
+            "request_content_type": spec_config.get(
+                "request_content_type",
+                cls.oauth2_token_request_type,
+            ),
+        }
+
+    @classmethod
+    def needs_oauth2(cls, ex: Exception) -> bool:
+        """Check if the exception indicates OAuth2 authentication is needed.
+
+        The databricks-sql-connector raises RequestError which gets wrapped
+        by SQLAlchemy into a DBAPIError. We check both the original exception
+        and the string representation for auth failure indicators.
+        """
+        if not has_request_context() or not hasattr(g, "user"):
+            return False
+
+        # Check the full exception chain for auth errors
+        current: BaseException | None = ex
+        while current is not None:
+            if isinstance(current, DatabricksAuthError):
+                return True
+            # Also check string for wrapped exceptions
+            ex_str = str(current).lower()
+            if any(
+                s in ex_str
+                for s in (
+                    "error 401",
+                    "status code 401",
+                    "error 403",
+                    "status code 403",
+                    "unauthorized",
+                    "forbidden",
+                    "credential was not sent",
+                )
+            ):
+                return True
+            current = current.__cause__
+
+        return False
+
+    @classmethod
+    def impersonate_user(
+        cls,
+        database: Database,
+        username: str | None,
+        user_token: str | None,
+        url: URL,
+        engine_kwargs: dict[str, Any],
+    ) -> tuple[URL, dict[str, Any]]:
+        if username is None:
+            return url, engine_kwargs
+
+        if user_token is not None:
+            # Replace the static PAT with the per-user OAuth2 access token.
+            # The databricks-sql-connector accepts OAuth tokens as the password
+            # in the URI: databricks://token:{oauth_token}@host:port
+            url = url.set(password=user_token)
+        elif database.is_oauth2_enabled():
+            # No stored OAuth token for this user yet. Proactively trigger the
+            # OAuth dance instead of falling back to the shared PAT, which 
would
+            # run queries as the PAT owner rather than the logged-in user.
+            cls.start_oauth2_dance(database)  # type: ignore[attr-defined]

Review Comment:
   **HIGH**: `start_oauth2_dance` is called without a `has_request_context()` 
guard. When a Celery worker handles an async SQL Lab query and `user_token is 
None` (no token stored yet for this user), this branch fires inside the worker 
process — which has no HTTP request context. `start_oauth2_dance` 
(base.py:657–659) eagerly calls `url_for(...)`, which raises `RuntimeError: 
Working outside of request context` inside the worker. The query dies with an 
opaque runtime error instead of a useful auth message.
   
   ```python
   elif database.is_oauth2_enabled():
       if has_request_context():
           cls.start_oauth2_dance(database)  # type: ignore[attr-defined]
       # Outside request context (Celery/async): let the connection attempt fail
       # with the underlying auth error rather than crashing on url_for
   ```



##########
tests/unit_tests/db_engine_specs/test_databricks.py:
##########
@@ -284,3 +295,296 @@ def test_get_prequeries(mocker: MockerFixture) -> None:
         "USE CATALOG `escaped-hyphen`",
         "USE SCHEMA `hyphen-escaped`",
     ]
+
+
+# ============================================================================
+# OAuth2 and Impersonation Tests
+# ============================================================================
+
+
+ALL_DATABRICKS_SPECS: list[type[BaseEngineSpec]] = [
+    DatabricksHiveEngineSpec,
+    DatabricksBaseEngineSpec,
+    DatabricksODBCEngineSpec,
+    DatabricksNativeEngineSpec,
+    DatabricksPythonConnectorEngineSpec,
+]
+
+
[email protected]("spec", ALL_DATABRICKS_SPECS)
+def test_oauth2_class_vars(spec: type[BaseEngineSpec]) -> None:
+    """All Databricks specs should have OAuth2 support enabled."""
+    assert spec.supports_oauth2 is True
+    assert spec.oauth2_scope == "sql offline_access"
+    assert spec.oauth2_exception is DatabricksAuthError
+    assert spec.oauth2_token_request_type == "data"
+
+
+def test_impersonate_user_with_oauth_token() -> None:
+    """When user_token is provided, the URL password should be replaced."""
+    database = MagicMock()
+    url = URL.create(
+        "databricks",
+        username="token",
+        password="original-pat",
+        host="workspace.cloud.databricks.com",
+        port=443,
+        query={"http_path": "sql/1.0/warehouses/abc123", "catalog": "gold"},
+    )
+    engine_kwargs: dict[str, Any] = {
+        "connect_args": {"http_path": "sql/1.0/warehouses/abc123"},
+    }
+
+    new_url, new_kwargs = DatabricksPythonConnectorEngineSpec.impersonate_user(
+        database, "test_user", "oauth-access-token-xyz", url, engine_kwargs
+    )
+
+    assert new_url.password == "oauth-access-token-xyz"
+    assert new_url.username == "token"
+    assert new_url.host == "workspace.cloud.databricks.com"
+    # connect_args should be unchanged
+    assert new_kwargs["connect_args"]["http_path"] == 
"sql/1.0/warehouses/abc123"
+
+
+def test_impersonate_user_without_token() -> None:
+    """When user_token is None and OAuth not enabled, URL stays."""
+    database = MagicMock()
+    database.is_oauth2_enabled.return_value = False
+    url = URL.create(
+        "databricks",
+        username="token",
+        password="original-pat",
+        host="workspace.cloud.databricks.com",
+        port=443,
+    )
+    engine_kwargs: dict[str, Any] = {}
+
+    new_url, _ = DatabricksPythonConnectorEngineSpec.impersonate_user(
+        database, "test_user", None, url, engine_kwargs
+    )
+
+    assert new_url.password == "original-pat"
+
+
+def test_impersonate_user_no_username() -> None:
+    """When username is None, impersonation should be a no-op."""
+    database = MagicMock()
+    url = URL.create(
+        "databricks",
+        username="token",
+        password="original-pat",
+        host="workspace.cloud.databricks.com",
+        port=443,
+    )
+    engine_kwargs: dict[str, Any] = {"connect_args": {"foo": "bar"}}
+
+    new_url, new_kwargs = DatabricksPythonConnectorEngineSpec.impersonate_user(
+        database, None, "some-token", url, engine_kwargs
+    )
+
+    assert new_url.password == "original-pat"
+    assert new_kwargs == {"connect_args": {"foo": "bar"}}
+
+
+def test_impersonate_user_no_token_triggers_oauth_dance() -> None:
+    """When OAuth is enabled but no token exists, should trigger OAuth 
dance."""
+    from unittest.mock import patch

Review Comment:
   **NIT**: `from unittest.mock import patch` is imported inline here and in 
two other test functions (lines 538, 555). No circular-import concern — move 
all three to the module-level imports alongside `MagicMock`.



##########
superset/db_engine_specs/databricks.py:
##########
@@ -107,13 +132,144 @@ def literal_processor(
         pass
 
 
+class DatabricksOAuth2Mixin:
+    """Mixin adding OAuth2 per-user impersonation to all Databricks engine 
specs.
+
+    When OAuth2 is configured, each user authenticates individually with the
+    Databricks workspace. Their personal OAuth2 token replaces the shared PAT
+    in the connection URI, so queries execute under the user's identity.
+
+    Databricks OIDC endpoints are auto-derived from a ``host`` key in the
+    ``DATABASE_OAUTH2_CLIENTS`` config, so admins only need to provide
+    client_id, secret, and workspace host.
+    """
+
+    supports_oauth2 = True
+    oauth2_scope = "sql offline_access"
+    oauth2_exception = DatabricksAuthError
+    oauth2_token_request_type = "data"  # noqa: S105
+
+    @classmethod
+    def get_oauth2_config(cls) -> OAuth2ClientConfig | None:
+        oauth2_config = app.config["DATABASE_OAUTH2_CLIENTS"]
+        # Look up by exact engine_name first, then fall back to "Databricks"
+        # so a single config entry works for all Databricks specs (e.g.,
+        # "Databricks SQL Endpoint", "Databricks Interactive Cluster", etc.)
+        spec_config = oauth2_config.get(
+            cls.engine_name,  # type: ignore[attr-defined]
+            oauth2_config.get("Databricks"),
+        )
+        if spec_config is None:
+            return None
+        host = spec_config.get("host", "")
+        redirect_uri = app.config.get(
+            "DATABASE_OAUTH2_REDIRECT_URI",
+            url_for("DatabaseRestApi.oauth2", _external=True),
+        )
+
+        return {
+            "id": spec_config["id"],
+            "secret": spec_config["secret"],
+            "scope": spec_config.get("scope") or cls.oauth2_scope,
+            "redirect_uri": redirect_uri,
+            "authorization_request_uri": spec_config.get(
+                "authorization_request_uri",
+                f"https://{host}/oidc/v1/authorize"; if host else None,
+            ),
+            "token_request_uri": spec_config.get(
+                "token_request_uri",
+                f"https://{host}/oidc/v1/token"; if host else None,
+            ),
+            "request_content_type": spec_config.get(
+                "request_content_type",
+                cls.oauth2_token_request_type,
+            ),
+        }
+
+    @classmethod
+    def needs_oauth2(cls, ex: Exception) -> bool:
+        """Check if the exception indicates OAuth2 authentication is needed.
+
+        The databricks-sql-connector raises RequestError which gets wrapped
+        by SQLAlchemy into a DBAPIError. We check both the original exception
+        and the string representation for auth failure indicators.
+        """
+        if not has_request_context() or not hasattr(g, "user"):
+            return False
+
+        # Check the full exception chain for auth errors
+        current: BaseException | None = ex
+        while current is not None:
+            if isinstance(current, DatabricksAuthError):
+                return True
+            # Also check string for wrapped exceptions
+            ex_str = str(current).lower()
+            if any(
+                s in ex_str
+                for s in (
+                    "error 401",
+                    "status code 401",
+                    "error 403",
+                    "status code 403",
+                    "unauthorized",
+                    "forbidden",

Review Comment:
   **MEDIUM**: `"forbidden"` matches network ACL rejections, proxy errors, and 
firewall messages (e.g. `"Connection to X.X.X.X is forbidden by security 
policy"`). A false positive triggers the full OAuth2 dance for a non-auth 
failure, sending the user through Databricks OIDC login when the real problem 
is network connectivity.
   
   Consider removing the bare `"forbidden"` term and relying on the more 
specific HTTP status strings already present (`"error 403"`, `"status code 
403"`). `"credential was not sent"` is Databricks-specific enough to keep.



##########
superset/db_engine_specs/databricks.py:
##########
@@ -221,7 +377,7 @@ class DatabricksPythonConnectorPropertiesType(TypedDict):
 }
 
 
-class DatabricksHiveEngineSpec(HiveEngineSpec):
+class DatabricksHiveEngineSpec(DatabricksOAuth2Mixin, HiveEngineSpec):  # 
type: ignore[misc]

Review Comment:
   **NIT**: `# type: ignore[misc]` with no explanation. The suppressed error is 
mypy's "Definition of 'X' in base class 'Y' is incompatible with definition in 
base class 'Z'", caused by `update_params_from_encrypted_extra` and 
`impersonate_user` differing between the mixin and the parent specs. A brief 
note prevents future readers from wondering whether it's safe to remove:
   
   ```python
   class DatabricksHiveEngineSpec(DatabricksOAuth2Mixin, HiveEngineSpec):  # 
type: ignore[misc]  # mixin overrides 
impersonate_user/update_params_from_encrypted_extra
   ```



##########
superset/db_engine_specs/databricks.py:
##########
@@ -107,13 +132,144 @@ def literal_processor(
         pass
 
 
+class DatabricksOAuth2Mixin:
+    """Mixin adding OAuth2 per-user impersonation to all Databricks engine 
specs.
+
+    When OAuth2 is configured, each user authenticates individually with the
+    Databricks workspace. Their personal OAuth2 token replaces the shared PAT
+    in the connection URI, so queries execute under the user's identity.
+
+    Databricks OIDC endpoints are auto-derived from a ``host`` key in the
+    ``DATABASE_OAUTH2_CLIENTS`` config, so admins only need to provide
+    client_id, secret, and workspace host.
+    """
+
+    supports_oauth2 = True
+    oauth2_scope = "sql offline_access"
+    oauth2_exception = DatabricksAuthError
+    oauth2_token_request_type = "data"  # noqa: S105
+
+    @classmethod
+    def get_oauth2_config(cls) -> OAuth2ClientConfig | None:
+        oauth2_config = app.config["DATABASE_OAUTH2_CLIENTS"]
+        # Look up by exact engine_name first, then fall back to "Databricks"
+        # so a single config entry works for all Databricks specs (e.g.,
+        # "Databricks SQL Endpoint", "Databricks Interactive Cluster", etc.)
+        spec_config = oauth2_config.get(
+            cls.engine_name,  # type: ignore[attr-defined]
+            oauth2_config.get("Databricks"),
+        )
+        if spec_config is None:
+            return None
+        host = spec_config.get("host", "")
+        redirect_uri = app.config.get(
+            "DATABASE_OAUTH2_REDIRECT_URI",
+            url_for("DatabaseRestApi.oauth2", _external=True),
+        )
+
+        return {
+            "id": spec_config["id"],
+            "secret": spec_config["secret"],
+            "scope": spec_config.get("scope") or cls.oauth2_scope,
+            "redirect_uri": redirect_uri,
+            "authorization_request_uri": spec_config.get(
+                "authorization_request_uri",
+                f"https://{host}/oidc/v1/authorize"; if host else None,
+            ),
+            "token_request_uri": spec_config.get(
+                "token_request_uri",
+                f"https://{host}/oidc/v1/token"; if host else None,
+            ),
+            "request_content_type": spec_config.get(
+                "request_content_type",
+                cls.oauth2_token_request_type,
+            ),
+        }
+
+    @classmethod
+    def needs_oauth2(cls, ex: Exception) -> bool:
+        """Check if the exception indicates OAuth2 authentication is needed.
+
+        The databricks-sql-connector raises RequestError which gets wrapped
+        by SQLAlchemy into a DBAPIError. We check both the original exception
+        and the string representation for auth failure indicators.
+        """
+        if not has_request_context() or not hasattr(g, "user"):

Review Comment:
   **MEDIUM**: `hasattr(g, "user")` returns `True` when `g.user = None`. 
Flask-AppBuilder can set the attribute to `None` for anonymous requests rather 
than leaving it unset, so the guard passes for unauthenticated callers. An auth 
failure on an anonymous request then walks the exception chain and can reach 
`start_oauth2_dance` for a user who is not logged in.
   
   ```python
   if not has_request_context() or not getattr(g, "user", None):
       return False
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to