aminghadersohi commented on code in PR #39162:
URL: https://github.com/apache/superset/pull/39162#discussion_r3343538648
##########
superset/db_engine_specs/databricks.py:
##########
@@ -107,13 +132,144 @@ def literal_processor(
pass
+class DatabricksOAuth2Mixin:
+ """Mixin adding OAuth2 per-user impersonation to all Databricks engine
specs.
+
+ When OAuth2 is configured, each user authenticates individually with the
+ Databricks workspace. Their personal OAuth2 token replaces the shared PAT
+ in the connection URI, so queries execute under the user's identity.
+
+ Databricks OIDC endpoints are auto-derived from a ``host`` key in the
+ ``DATABASE_OAUTH2_CLIENTS`` config, so admins only need to provide
+ client_id, secret, and workspace host.
+ """
+
+ supports_oauth2 = True
+ oauth2_scope = "sql offline_access"
+ oauth2_exception = DatabricksAuthError
+ oauth2_token_request_type = "data" # noqa: S105
+
+ @classmethod
+ def get_oauth2_config(cls) -> OAuth2ClientConfig | None:
+ oauth2_config = app.config["DATABASE_OAUTH2_CLIENTS"]
+ # Look up by exact engine_name first, then fall back to "Databricks"
+ # so a single config entry works for all Databricks specs (e.g.,
+ # "Databricks SQL Endpoint", "Databricks Interactive Cluster", etc.)
+ spec_config = oauth2_config.get(
+ cls.engine_name, # type: ignore[attr-defined]
+ oauth2_config.get("Databricks"),
+ )
+ if spec_config is None:
+ return None
+ host = spec_config.get("host", "")
+ redirect_uri = app.config.get(
+ "DATABASE_OAUTH2_REDIRECT_URI",
Review Comment:
**HIGH**: Python evaluates all function arguments before calling `.get()`,
so `url_for("DatabaseRestApi.oauth2", _external=True)` runs unconditionally —
even when `DATABASE_OAUTH2_REDIRECT_URI` is configured and its result will be
discarded. `url_for` with `_external=True` requires an active request context;
calling `get_oauth2_config()` from a unit test without a full request setup (or
any non-request context) raises `RuntimeError: Working outside of request
context`.
```python
# lazy — url_for only evaluated when the config key is absent
redirect_uri = (
app.config.get("DATABASE_OAUTH2_REDIRECT_URI")
or url_for("DatabaseRestApi.oauth2", _external=True)
)
```
##########
superset/db_engine_specs/databricks.py:
##########
@@ -107,13 +132,144 @@ def literal_processor(
pass
+class DatabricksOAuth2Mixin:
+ """Mixin adding OAuth2 per-user impersonation to all Databricks engine
specs.
+
+ When OAuth2 is configured, each user authenticates individually with the
+ Databricks workspace. Their personal OAuth2 token replaces the shared PAT
+ in the connection URI, so queries execute under the user's identity.
+
+ Databricks OIDC endpoints are auto-derived from a ``host`` key in the
+ ``DATABASE_OAUTH2_CLIENTS`` config, so admins only need to provide
+ client_id, secret, and workspace host.
+ """
+
+ supports_oauth2 = True
+ oauth2_scope = "sql offline_access"
+ oauth2_exception = DatabricksAuthError
+ oauth2_token_request_type = "data" # noqa: S105
+
+ @classmethod
+ def get_oauth2_config(cls) -> OAuth2ClientConfig | None:
+ oauth2_config = app.config["DATABASE_OAUTH2_CLIENTS"]
+ # Look up by exact engine_name first, then fall back to "Databricks"
+ # so a single config entry works for all Databricks specs (e.g.,
+ # "Databricks SQL Endpoint", "Databricks Interactive Cluster", etc.)
+ spec_config = oauth2_config.get(
+ cls.engine_name, # type: ignore[attr-defined]
+ oauth2_config.get("Databricks"),
+ )
+ if spec_config is None:
+ return None
+ host = spec_config.get("host", "")
+ redirect_uri = app.config.get(
+ "DATABASE_OAUTH2_REDIRECT_URI",
+ url_for("DatabaseRestApi.oauth2", _external=True),
+ )
+
+ return {
+ "id": spec_config["id"],
+ "secret": spec_config["secret"],
+ "scope": spec_config.get("scope") or cls.oauth2_scope,
+ "redirect_uri": redirect_uri,
+ "authorization_request_uri": spec_config.get(
+ "authorization_request_uri",
Review Comment:
**MEDIUM**: When `host` is absent or empty, both `authorization_request_uri`
and `token_request_uri` silently become `None`. The OAuth2 framework receives
`None` for required URLs and fails later with an opaque error. Fail fast:
```python
if not host and not spec_config.get("authorization_request_uri"):
raise ValueError(
'Databricks OAuth2 config requires either "host" or explicit '
'"authorization_request_uri" / "token_request_uri"'
)
```
##########
superset/db_engine_specs/databricks.py:
##########
@@ -107,13 +132,144 @@ def literal_processor(
pass
+class DatabricksOAuth2Mixin:
+ """Mixin adding OAuth2 per-user impersonation to all Databricks engine
specs.
+
+ When OAuth2 is configured, each user authenticates individually with the
+ Databricks workspace. Their personal OAuth2 token replaces the shared PAT
+ in the connection URI, so queries execute under the user's identity.
+
+ Databricks OIDC endpoints are auto-derived from a ``host`` key in the
+ ``DATABASE_OAUTH2_CLIENTS`` config, so admins only need to provide
+ client_id, secret, and workspace host.
+ """
+
+ supports_oauth2 = True
+ oauth2_scope = "sql offline_access"
+ oauth2_exception = DatabricksAuthError
+ oauth2_token_request_type = "data" # noqa: S105
+
+ @classmethod
+ def get_oauth2_config(cls) -> OAuth2ClientConfig | None:
+ oauth2_config = app.config["DATABASE_OAUTH2_CLIENTS"]
+ # Look up by exact engine_name first, then fall back to "Databricks"
+ # so a single config entry works for all Databricks specs (e.g.,
+ # "Databricks SQL Endpoint", "Databricks Interactive Cluster", etc.)
+ spec_config = oauth2_config.get(
+ cls.engine_name, # type: ignore[attr-defined]
+ oauth2_config.get("Databricks"),
+ )
+ if spec_config is None:
+ return None
+ host = spec_config.get("host", "")
+ redirect_uri = app.config.get(
+ "DATABASE_OAUTH2_REDIRECT_URI",
+ url_for("DatabaseRestApi.oauth2", _external=True),
+ )
+
+ return {
+ "id": spec_config["id"],
+ "secret": spec_config["secret"],
+ "scope": spec_config.get("scope") or cls.oauth2_scope,
+ "redirect_uri": redirect_uri,
+ "authorization_request_uri": spec_config.get(
+ "authorization_request_uri",
+ f"https://{host}/oidc/v1/authorize" if host else None,
+ ),
+ "token_request_uri": spec_config.get(
+ "token_request_uri",
+ f"https://{host}/oidc/v1/token" if host else None,
+ ),
+ "request_content_type": spec_config.get(
+ "request_content_type",
+ cls.oauth2_token_request_type,
+ ),
+ }
+
+ @classmethod
+ def needs_oauth2(cls, ex: Exception) -> bool:
+ """Check if the exception indicates OAuth2 authentication is needed.
+
+ The databricks-sql-connector raises RequestError which gets wrapped
+ by SQLAlchemy into a DBAPIError. We check both the original exception
+ and the string representation for auth failure indicators.
+ """
+ if not has_request_context() or not hasattr(g, "user"):
+ return False
+
+ # Check the full exception chain for auth errors
+ current: BaseException | None = ex
+ while current is not None:
+ if isinstance(current, DatabricksAuthError):
+ return True
+ # Also check string for wrapped exceptions
+ ex_str = str(current).lower()
+ if any(
+ s in ex_str
+ for s in (
+ "error 401",
+ "status code 401",
+ "error 403",
+ "status code 403",
+ "unauthorized",
+ "forbidden",
+ "credential was not sent",
+ )
+ ):
+ return True
+ current = current.__cause__
+
+ return False
+
+ @classmethod
+ def impersonate_user(
+ cls,
+ database: Database,
+ username: str | None,
+ user_token: str | None,
+ url: URL,
+ engine_kwargs: dict[str, Any],
+ ) -> tuple[URL, dict[str, Any]]:
+ if username is None:
+ return url, engine_kwargs
+
+ if user_token is not None:
+ # Replace the static PAT with the per-user OAuth2 access token.
+ # The databricks-sql-connector accepts OAuth tokens as the password
+ # in the URI: databricks://token:{oauth_token}@host:port
+ url = url.set(password=user_token)
+ elif database.is_oauth2_enabled():
+ # No stored OAuth token for this user yet. Proactively trigger the
+ # OAuth dance instead of falling back to the shared PAT, which
would
+ # run queries as the PAT owner rather than the logged-in user.
+ cls.start_oauth2_dance(database) # type: ignore[attr-defined]
Review Comment:
**HIGH**: `start_oauth2_dance` is called without a `has_request_context()`
guard. When a Celery worker handles an async SQL Lab query and `user_token is
None` (no token stored yet for this user), this branch fires inside the worker
process — which has no HTTP request context. `start_oauth2_dance`
(base.py:657–659) eagerly calls `url_for(...)`, which raises `RuntimeError:
Working outside of request context` inside the worker. The query dies with an
opaque runtime error instead of a useful auth message.
```python
elif database.is_oauth2_enabled():
if has_request_context():
cls.start_oauth2_dance(database) # type: ignore[attr-defined]
# Outside request context (Celery/async): let the connection attempt fail
# with the underlying auth error rather than crashing on url_for
```
##########
tests/unit_tests/db_engine_specs/test_databricks.py:
##########
@@ -284,3 +295,296 @@ def test_get_prequeries(mocker: MockerFixture) -> None:
"USE CATALOG `escaped-hyphen`",
"USE SCHEMA `hyphen-escaped`",
]
+
+
+# ============================================================================
+# OAuth2 and Impersonation Tests
+# ============================================================================
+
+
+ALL_DATABRICKS_SPECS: list[type[BaseEngineSpec]] = [
+ DatabricksHiveEngineSpec,
+ DatabricksBaseEngineSpec,
+ DatabricksODBCEngineSpec,
+ DatabricksNativeEngineSpec,
+ DatabricksPythonConnectorEngineSpec,
+]
+
+
[email protected]("spec", ALL_DATABRICKS_SPECS)
+def test_oauth2_class_vars(spec: type[BaseEngineSpec]) -> None:
+ """All Databricks specs should have OAuth2 support enabled."""
+ assert spec.supports_oauth2 is True
+ assert spec.oauth2_scope == "sql offline_access"
+ assert spec.oauth2_exception is DatabricksAuthError
+ assert spec.oauth2_token_request_type == "data"
+
+
+def test_impersonate_user_with_oauth_token() -> None:
+ """When user_token is provided, the URL password should be replaced."""
+ database = MagicMock()
+ url = URL.create(
+ "databricks",
+ username="token",
+ password="original-pat",
+ host="workspace.cloud.databricks.com",
+ port=443,
+ query={"http_path": "sql/1.0/warehouses/abc123", "catalog": "gold"},
+ )
+ engine_kwargs: dict[str, Any] = {
+ "connect_args": {"http_path": "sql/1.0/warehouses/abc123"},
+ }
+
+ new_url, new_kwargs = DatabricksPythonConnectorEngineSpec.impersonate_user(
+ database, "test_user", "oauth-access-token-xyz", url, engine_kwargs
+ )
+
+ assert new_url.password == "oauth-access-token-xyz"
+ assert new_url.username == "token"
+ assert new_url.host == "workspace.cloud.databricks.com"
+ # connect_args should be unchanged
+ assert new_kwargs["connect_args"]["http_path"] ==
"sql/1.0/warehouses/abc123"
+
+
+def test_impersonate_user_without_token() -> None:
+ """When user_token is None and OAuth not enabled, URL stays."""
+ database = MagicMock()
+ database.is_oauth2_enabled.return_value = False
+ url = URL.create(
+ "databricks",
+ username="token",
+ password="original-pat",
+ host="workspace.cloud.databricks.com",
+ port=443,
+ )
+ engine_kwargs: dict[str, Any] = {}
+
+ new_url, _ = DatabricksPythonConnectorEngineSpec.impersonate_user(
+ database, "test_user", None, url, engine_kwargs
+ )
+
+ assert new_url.password == "original-pat"
+
+
+def test_impersonate_user_no_username() -> None:
+ """When username is None, impersonation should be a no-op."""
+ database = MagicMock()
+ url = URL.create(
+ "databricks",
+ username="token",
+ password="original-pat",
+ host="workspace.cloud.databricks.com",
+ port=443,
+ )
+ engine_kwargs: dict[str, Any] = {"connect_args": {"foo": "bar"}}
+
+ new_url, new_kwargs = DatabricksPythonConnectorEngineSpec.impersonate_user(
+ database, None, "some-token", url, engine_kwargs
+ )
+
+ assert new_url.password == "original-pat"
+ assert new_kwargs == {"connect_args": {"foo": "bar"}}
+
+
+def test_impersonate_user_no_token_triggers_oauth_dance() -> None:
+ """When OAuth is enabled but no token exists, should trigger OAuth
dance."""
+ from unittest.mock import patch
Review Comment:
**NIT**: `from unittest.mock import patch` is imported inline here and in
two other test functions (lines 538, 555). No circular-import concern — move
all three to the module-level imports alongside `MagicMock`.
##########
superset/db_engine_specs/databricks.py:
##########
@@ -107,13 +132,144 @@ def literal_processor(
pass
+class DatabricksOAuth2Mixin:
+ """Mixin adding OAuth2 per-user impersonation to all Databricks engine
specs.
+
+ When OAuth2 is configured, each user authenticates individually with the
+ Databricks workspace. Their personal OAuth2 token replaces the shared PAT
+ in the connection URI, so queries execute under the user's identity.
+
+ Databricks OIDC endpoints are auto-derived from a ``host`` key in the
+ ``DATABASE_OAUTH2_CLIENTS`` config, so admins only need to provide
+ client_id, secret, and workspace host.
+ """
+
+ supports_oauth2 = True
+ oauth2_scope = "sql offline_access"
+ oauth2_exception = DatabricksAuthError
+ oauth2_token_request_type = "data" # noqa: S105
+
+ @classmethod
+ def get_oauth2_config(cls) -> OAuth2ClientConfig | None:
+ oauth2_config = app.config["DATABASE_OAUTH2_CLIENTS"]
+ # Look up by exact engine_name first, then fall back to "Databricks"
+ # so a single config entry works for all Databricks specs (e.g.,
+ # "Databricks SQL Endpoint", "Databricks Interactive Cluster", etc.)
+ spec_config = oauth2_config.get(
+ cls.engine_name, # type: ignore[attr-defined]
+ oauth2_config.get("Databricks"),
+ )
+ if spec_config is None:
+ return None
+ host = spec_config.get("host", "")
+ redirect_uri = app.config.get(
+ "DATABASE_OAUTH2_REDIRECT_URI",
+ url_for("DatabaseRestApi.oauth2", _external=True),
+ )
+
+ return {
+ "id": spec_config["id"],
+ "secret": spec_config["secret"],
+ "scope": spec_config.get("scope") or cls.oauth2_scope,
+ "redirect_uri": redirect_uri,
+ "authorization_request_uri": spec_config.get(
+ "authorization_request_uri",
+ f"https://{host}/oidc/v1/authorize" if host else None,
+ ),
+ "token_request_uri": spec_config.get(
+ "token_request_uri",
+ f"https://{host}/oidc/v1/token" if host else None,
+ ),
+ "request_content_type": spec_config.get(
+ "request_content_type",
+ cls.oauth2_token_request_type,
+ ),
+ }
+
+ @classmethod
+ def needs_oauth2(cls, ex: Exception) -> bool:
+ """Check if the exception indicates OAuth2 authentication is needed.
+
+ The databricks-sql-connector raises RequestError which gets wrapped
+ by SQLAlchemy into a DBAPIError. We check both the original exception
+ and the string representation for auth failure indicators.
+ """
+ if not has_request_context() or not hasattr(g, "user"):
+ return False
+
+ # Check the full exception chain for auth errors
+ current: BaseException | None = ex
+ while current is not None:
+ if isinstance(current, DatabricksAuthError):
+ return True
+ # Also check string for wrapped exceptions
+ ex_str = str(current).lower()
+ if any(
+ s in ex_str
+ for s in (
+ "error 401",
+ "status code 401",
+ "error 403",
+ "status code 403",
+ "unauthorized",
+ "forbidden",
Review Comment:
**MEDIUM**: `"forbidden"` matches network ACL rejections, proxy errors, and
firewall messages (e.g. `"Connection to X.X.X.X is forbidden by security
policy"`). A false positive triggers the full OAuth2 dance for a non-auth
failure, sending the user through Databricks OIDC login when the real problem
is network connectivity.
Consider removing the bare `"forbidden"` term and relying on the more
specific HTTP status strings already present (`"error 403"`, `"status code
403"`). `"credential was not sent"` is Databricks-specific enough to keep.
##########
superset/db_engine_specs/databricks.py:
##########
@@ -221,7 +377,7 @@ class DatabricksPythonConnectorPropertiesType(TypedDict):
}
-class DatabricksHiveEngineSpec(HiveEngineSpec):
+class DatabricksHiveEngineSpec(DatabricksOAuth2Mixin, HiveEngineSpec): #
type: ignore[misc]
Review Comment:
**NIT**: `# type: ignore[misc]` with no explanation. The suppressed error is
mypy's "Definition of 'X' in base class 'Y' is incompatible with definition in
base class 'Z'", caused by `update_params_from_encrypted_extra` and
`impersonate_user` differing between the mixin and the parent specs. A brief
note prevents future readers from wondering whether it's safe to remove:
```python
class DatabricksHiveEngineSpec(DatabricksOAuth2Mixin, HiveEngineSpec): #
type: ignore[misc] # mixin overrides
impersonate_user/update_params_from_encrypted_extra
```
##########
superset/db_engine_specs/databricks.py:
##########
@@ -107,13 +132,144 @@ def literal_processor(
pass
+class DatabricksOAuth2Mixin:
+ """Mixin adding OAuth2 per-user impersonation to all Databricks engine
specs.
+
+ When OAuth2 is configured, each user authenticates individually with the
+ Databricks workspace. Their personal OAuth2 token replaces the shared PAT
+ in the connection URI, so queries execute under the user's identity.
+
+ Databricks OIDC endpoints are auto-derived from a ``host`` key in the
+ ``DATABASE_OAUTH2_CLIENTS`` config, so admins only need to provide
+ client_id, secret, and workspace host.
+ """
+
+ supports_oauth2 = True
+ oauth2_scope = "sql offline_access"
+ oauth2_exception = DatabricksAuthError
+ oauth2_token_request_type = "data" # noqa: S105
+
+ @classmethod
+ def get_oauth2_config(cls) -> OAuth2ClientConfig | None:
+ oauth2_config = app.config["DATABASE_OAUTH2_CLIENTS"]
+ # Look up by exact engine_name first, then fall back to "Databricks"
+ # so a single config entry works for all Databricks specs (e.g.,
+ # "Databricks SQL Endpoint", "Databricks Interactive Cluster", etc.)
+ spec_config = oauth2_config.get(
+ cls.engine_name, # type: ignore[attr-defined]
+ oauth2_config.get("Databricks"),
+ )
+ if spec_config is None:
+ return None
+ host = spec_config.get("host", "")
+ redirect_uri = app.config.get(
+ "DATABASE_OAUTH2_REDIRECT_URI",
+ url_for("DatabaseRestApi.oauth2", _external=True),
+ )
+
+ return {
+ "id": spec_config["id"],
+ "secret": spec_config["secret"],
+ "scope": spec_config.get("scope") or cls.oauth2_scope,
+ "redirect_uri": redirect_uri,
+ "authorization_request_uri": spec_config.get(
+ "authorization_request_uri",
+ f"https://{host}/oidc/v1/authorize" if host else None,
+ ),
+ "token_request_uri": spec_config.get(
+ "token_request_uri",
+ f"https://{host}/oidc/v1/token" if host else None,
+ ),
+ "request_content_type": spec_config.get(
+ "request_content_type",
+ cls.oauth2_token_request_type,
+ ),
+ }
+
+ @classmethod
+ def needs_oauth2(cls, ex: Exception) -> bool:
+ """Check if the exception indicates OAuth2 authentication is needed.
+
+ The databricks-sql-connector raises RequestError which gets wrapped
+ by SQLAlchemy into a DBAPIError. We check both the original exception
+ and the string representation for auth failure indicators.
+ """
+ if not has_request_context() or not hasattr(g, "user"):
Review Comment:
**MEDIUM**: `hasattr(g, "user")` returns `True` when `g.user = None`.
Flask-AppBuilder can set the attribute to `None` for anonymous requests rather
than leaving it unset, so the guard passes for unauthenticated callers. An auth
failure on an anonymous request then walks the exception chain and can reach
`start_oauth2_dance` for a user who is not logged in.
```python
if not has_request_context() or not getattr(g, "user", None):
return False
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]