aminghadersohi commented on code in PR #33976: URL: https://github.com/apache/superset/pull/33976#discussion_r2227226592
########## superset/mcp_service/auth.py: ########## @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +def get_user_from_request() -> Any: + """ + Extract user info from the request context (e.g., from Bearer token, headers, etc.). + By default, returns admin user. Override for OIDC/OAuth/Okta integration. + """ + from flask import current_app + + from superset.extensions import security_manager + + admin_username = current_app.config.get("MCP_ADMIN_USERNAME", "admin") + return security_manager.get_user_by_username(admin_username) + + +def impersonate_user(user: Any, run_as: Optional[str] = None) -> Any: + """ + Optionally impersonate another user if allowed. By default, returns the same user. + Override to enforce impersonation rules. + """ + return user + + +def has_permission(user: Any, tool_func: Any) -> bool: + """ + Check if the user has permission to run the tool. By default, always True. + Override for RBAC. + """ + return True + + +def log_access(user: Any, tool_name: str, args: Any, kwargs: Any) -> None: + """ + Log access/action for observability/audit. By default, does nothing. + Override to log to your system. + """ + pass + + +def mcp_auth_hook(tool_func: Any) -> Any: + """ + Decorator for MCP tool functions to enforce auth, impersonation, RBAC, and logging. + Also sets up Flask user context (g.user) for downstream DAO/model code. + All logic is overridable for enterprise integration. + """ + import functools + + from flask import current_app, g + from flask_login import AnonymousUserMixin + + from superset.extensions import security_manager + + @functools.wraps(tool_func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + # --- Setup user context (was _setup_user_context) --- + admin_username = current_app.config.get("MCP_ADMIN_USERNAME", "admin") + admin_user = security_manager.get_user_by_username(admin_username) Review Comment: Excellent observation! You're absolutely right - the current implementation hardcodes admin access without proper JWT verification. Here's how we can implement proper JWT-based authentication with user claims: Option 1: FastMCP Bearer Token Integration ``` def get_user_from_request() -> Any: """Extract user from JWT token claims via FastMCP's BearerAuthProvider.""" from fastmcp.auth import get_access_token from superset.extensions import security_manager try: # Get validated JWT token from FastMCP auth access_token = get_access_token() # Extract user identifier from JWT claims username = access_token.subject # or access_token.client_id user_email = access_token.claims.get("email") # Look up actual Superset user user = security_manager.get_user_by_username(username) if not user and user_email: user = security_manager.get_user_by_email(user_email) return user or AnonymousUserMixin() except Exception: # Fallback to anonymous if no valid token return AnonymousUserMixin() ``` Option 2: Enhanced RBAC with Scope-Based Permissions ``` def has_permission(user: Any, tool_func: Any) -> bool: """Check permissions using JWT scopes + Superset RBAC.""" from fastmcp.auth import get_access_token try: access_token = get_access_token() user_scopes = access_token.scopes # Map tool functions to required scopes required_scopes = { 'list_dashboards': ['dashboard:read'], 'create_chart': ['chart:write'], 'get_dataset_info': ['dataset:read'] } tool_name = tool_func.__name__ if required := required_scopes.get(tool_name): if not any(scope in user_scopes for scope in required): return False # Also check Superset's native RBAC return user and hasattr(user, 'is_active') and user.is_active except Exception: # No token = anonymous user, check Superset perms only return user and hasattr(user, 'is_active') and user.is_active ``` Updated Wrapper Implementation ``` @functools.wraps(tool_func) def wrapper(*args: Any, **kwargs: Any) -> Any: # Get authenticated user from JWT (replaces hardcoded admin) user = get_user_from_request() # Set Flask context with actual authenticated user g.user = user # Apply impersonation if requested and allowed if run_as := kwargs.get("run_as"): user = impersonate_user(user, run_as) # Check both JWT scopes and Superset RBAC if not has_permission(user, tool_func): raise PermissionError( f"User {getattr(user, 'username', 'anonymous')} lacks permission for {tool_func.__name__}" ) # Enhanced audit logging with JWT context log_access(user, tool_func.__name__, args, kwargs) return tool_func(*args, **kwargs) ``` This approach removes the hardcoded admin escalation and uses actual JWT-validated user identity with proper scope-based authorization! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@superset.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@superset.apache.org For additional commands, e-mail: notifications-h...@superset.apache.org