Vitor-Avila commented on code in PR #37585: URL: https://github.com/apache/superset/pull/37585#discussion_r2748515938
########## superset/db_engine_specs/aws_iam.py: ########## @@ -0,0 +1,660 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +AWS IAM Authentication Mixin for database engine specs. + +This mixin provides cross-account IAM authentication support for AWS databases +(Aurora PostgreSQL, Aurora MySQL, Redshift). It handles: +- Assuming IAM roles via STS AssumeRole +- Generating RDS IAM auth tokens +- Generating Redshift Serverless credentials +- Configuring SSL (required for IAM auth) +- Caching STS credentials to reduce API calls +""" + +from __future__ import annotations + +import logging +import threading +from typing import Any, TYPE_CHECKING, TypedDict + +from cachetools import TTLCache + +from superset.databases.utils import make_url_safe +from superset.errors import ErrorLevel, SupersetError, SupersetErrorType +from superset.exceptions import SupersetSecurityException + +if TYPE_CHECKING: + from superset.models.core import Database + +logger = logging.getLogger(__name__) + +# Default session duration for STS AssumeRole (1 hour) +DEFAULT_SESSION_DURATION = 3600 + +# Default ports +DEFAULT_POSTGRES_PORT = 5432 +DEFAULT_MYSQL_PORT = 3306 +DEFAULT_REDSHIFT_PORT = 5439 + +# Cache STS credentials: key = (role_arn, region, external_id), TTL = 10 min +# Using a TTL shorter than the minimum supported session duration (900s) avoids +# reusing expired STS credentials when a short session_duration is configured. +_credentials_cache: TTLCache[tuple[str, str, str | None], dict[str, Any]] = TTLCache( + maxsize=100, ttl=600 +) +_credentials_lock = threading.RLock() + + +class AWSIAMConfig(TypedDict, total=False): + """Configuration for AWS IAM authentication.""" + + enabled: bool Review Comment: Out of curiosity, what would be the use-case for setting it up with `enabled=False`? ########## superset/db_engine_specs/aws_iam.py: ########## @@ -0,0 +1,660 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +AWS IAM Authentication Mixin for database engine specs. + +This mixin provides cross-account IAM authentication support for AWS databases +(Aurora PostgreSQL, Aurora MySQL, Redshift). It handles: +- Assuming IAM roles via STS AssumeRole +- Generating RDS IAM auth tokens +- Generating Redshift Serverless credentials +- Configuring SSL (required for IAM auth) +- Caching STS credentials to reduce API calls +""" + +from __future__ import annotations + +import logging +import threading +from typing import Any, TYPE_CHECKING, TypedDict + +from cachetools import TTLCache + +from superset.databases.utils import make_url_safe +from superset.errors import ErrorLevel, SupersetError, SupersetErrorType +from superset.exceptions import SupersetSecurityException + +if TYPE_CHECKING: + from superset.models.core import Database + +logger = logging.getLogger(__name__) + +# Default session duration for STS AssumeRole (1 hour) +DEFAULT_SESSION_DURATION = 3600 + +# Default ports +DEFAULT_POSTGRES_PORT = 5432 +DEFAULT_MYSQL_PORT = 3306 +DEFAULT_REDSHIFT_PORT = 5439 + +# Cache STS credentials: key = (role_arn, region, external_id), TTL = 10 min +# Using a TTL shorter than the minimum supported session duration (900s) avoids +# reusing expired STS credentials when a short session_duration is configured. +_credentials_cache: TTLCache[tuple[str, str, str | None], dict[str, Any]] = TTLCache( + maxsize=100, ttl=600 +) +_credentials_lock = threading.RLock() + + +class AWSIAMConfig(TypedDict, total=False): + """Configuration for AWS IAM authentication.""" + + enabled: bool + role_arn: str + external_id: str + region: str + db_username: str + session_duration: int + # Redshift Serverless fields + workgroup_name: str + db_name: str + # Redshift provisioned cluster fields + cluster_identifier: str + + +class AWSIAMAuthMixin: + """ + Mixin that provides AWS IAM authentication for database connections. + + This mixin can be used with database engine specs that support IAM + authentication (Aurora PostgreSQL, Aurora MySQL, Redshift). + + Configuration is provided via the database's encrypted_extra JSON: + + { + "aws_iam": { + "enabled": true, + "role_arn": "arn:aws:iam::222222222222:role/SupersetDatabaseAccess", + "external_id": "superset-prod-12345", # optional + "region": "us-east-1", + "db_username": "superset_iam_user", + "session_duration": 3600 # optional, defaults to 3600 + } + } + """ + + # AWS error patterns for actionable error messages + aws_iam_custom_errors: dict[str, tuple[SupersetErrorType, str]] = { + "AccessDenied": ( + SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + "Unable to assume IAM role. Verify the role ARN and trust policy " + "allow access from Superset's IAM role.", + ), + "InvalidIdentityToken": ( + SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + "Invalid IAM credentials. Ensure Superset has a valid IAM role " + "with permissions to assume the target role.", + ), + "MalformedPolicyDocument": ( + SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR, + "Invalid IAM role ARN format. Please verify the role ARN.", + ), + "ExpiredTokenException": ( + SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + "AWS credentials have expired. Please refresh the connection.", + ), + } + + @classmethod + def get_iam_credentials( + cls, + role_arn: str, + region: str, + external_id: str | None = None, + session_duration: int = DEFAULT_SESSION_DURATION, + ) -> dict[str, Any]: + """ + Assume cross-account IAM role via STS AssumeRole with credential caching. + + Credentials are cached by (role_arn, region, external_id) with a 50-minute + TTL to reduce STS API calls while ensuring tokens are refreshed before the + default 1-hour expiration. + + :param role_arn: The ARN of the IAM role to assume + :param region: AWS region for the STS client + :param external_id: External ID for the role assumption (optional) + :param session_duration: Duration of the session in seconds + :returns: Dictionary with AccessKeyId, SecretAccessKey, SessionToken + :raises SupersetSecurityException: If role assumption fails + """ + cache_key = (role_arn, region, external_id) + + with _credentials_lock: + cached = _credentials_cache.get(cache_key) + if cached is not None: + return cached + + try: + # Lazy import to avoid errors when boto3 is not installed + import boto3 + from botocore.exceptions import ClientError + except ImportError as ex: + raise SupersetSecurityException( Review Comment: Nice! Do you think it makes sense to add `boto3` as an entry to `optional-dependencies` as well? https://github.com/apache/superset/blob/master/pyproject.toml#L114 ########## superset/db_engine_specs/aws_iam.py: ########## @@ -0,0 +1,660 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +AWS IAM Authentication Mixin for database engine specs. + +This mixin provides cross-account IAM authentication support for AWS databases +(Aurora PostgreSQL, Aurora MySQL, Redshift). It handles: +- Assuming IAM roles via STS AssumeRole +- Generating RDS IAM auth tokens +- Generating Redshift Serverless credentials +- Configuring SSL (required for IAM auth) +- Caching STS credentials to reduce API calls +""" + +from __future__ import annotations + +import logging +import threading +from typing import Any, TYPE_CHECKING, TypedDict + +from cachetools import TTLCache + +from superset.databases.utils import make_url_safe +from superset.errors import ErrorLevel, SupersetError, SupersetErrorType +from superset.exceptions import SupersetSecurityException + +if TYPE_CHECKING: + from superset.models.core import Database + +logger = logging.getLogger(__name__) + +# Default session duration for STS AssumeRole (1 hour) +DEFAULT_SESSION_DURATION = 3600 + +# Default ports +DEFAULT_POSTGRES_PORT = 5432 +DEFAULT_MYSQL_PORT = 3306 +DEFAULT_REDSHIFT_PORT = 5439 + +# Cache STS credentials: key = (role_arn, region, external_id), TTL = 10 min +# Using a TTL shorter than the minimum supported session duration (900s) avoids +# reusing expired STS credentials when a short session_duration is configured. +_credentials_cache: TTLCache[tuple[str, str, str | None], dict[str, Any]] = TTLCache( + maxsize=100, ttl=600 +) +_credentials_lock = threading.RLock() + + +class AWSIAMConfig(TypedDict, total=False): + """Configuration for AWS IAM authentication.""" + + enabled: bool + role_arn: str + external_id: str + region: str + db_username: str + session_duration: int + # Redshift Serverless fields + workgroup_name: str + db_name: str + # Redshift provisioned cluster fields + cluster_identifier: str + + +class AWSIAMAuthMixin: + """ + Mixin that provides AWS IAM authentication for database connections. + + This mixin can be used with database engine specs that support IAM + authentication (Aurora PostgreSQL, Aurora MySQL, Redshift). + + Configuration is provided via the database's encrypted_extra JSON: + + { + "aws_iam": { + "enabled": true, + "role_arn": "arn:aws:iam::222222222222:role/SupersetDatabaseAccess", + "external_id": "superset-prod-12345", # optional + "region": "us-east-1", + "db_username": "superset_iam_user", + "session_duration": 3600 # optional, defaults to 3600 + } + } + """ + + # AWS error patterns for actionable error messages + aws_iam_custom_errors: dict[str, tuple[SupersetErrorType, str]] = { + "AccessDenied": ( + SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + "Unable to assume IAM role. Verify the role ARN and trust policy " + "allow access from Superset's IAM role.", + ), + "InvalidIdentityToken": ( + SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + "Invalid IAM credentials. Ensure Superset has a valid IAM role " + "with permissions to assume the target role.", + ), + "MalformedPolicyDocument": ( + SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR, + "Invalid IAM role ARN format. Please verify the role ARN.", + ), + "ExpiredTokenException": ( + SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + "AWS credentials have expired. Please refresh the connection.", + ), + } + + @classmethod + def get_iam_credentials( + cls, + role_arn: str, + region: str, + external_id: str | None = None, + session_duration: int = DEFAULT_SESSION_DURATION, + ) -> dict[str, Any]: + """ + Assume cross-account IAM role via STS AssumeRole with credential caching. + + Credentials are cached by (role_arn, region, external_id) with a 50-minute + TTL to reduce STS API calls while ensuring tokens are refreshed before the + default 1-hour expiration. + + :param role_arn: The ARN of the IAM role to assume + :param region: AWS region for the STS client + :param external_id: External ID for the role assumption (optional) + :param session_duration: Duration of the session in seconds + :returns: Dictionary with AccessKeyId, SecretAccessKey, SessionToken + :raises SupersetSecurityException: If role assumption fails + """ + cache_key = (role_arn, region, external_id) + + with _credentials_lock: + cached = _credentials_cache.get(cache_key) + if cached is not None: + return cached + + try: + # Lazy import to avoid errors when boto3 is not installed + import boto3 + from botocore.exceptions import ClientError + except ImportError as ex: + raise SupersetSecurityException( + SupersetError( + message="boto3 is required for AWS IAM authentication. " + "Install it with: pip install boto3", + error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR, + level=ErrorLevel.ERROR, + ) + ) from ex + + try: + sts_client = boto3.client("sts", region_name=region) + + assume_role_kwargs: dict[str, Any] = { + "RoleArn": role_arn, + "RoleSessionName": "superset-iam-session", + "DurationSeconds": session_duration, + } + if external_id: + assume_role_kwargs["ExternalId"] = external_id + + response = sts_client.assume_role(**assume_role_kwargs) + credentials = response["Credentials"] + + with _credentials_lock: + _credentials_cache[cache_key] = credentials + + return credentials + + except ClientError as ex: + error_code = ex.response.get("Error", {}).get("Code", "") + error_message = ex.response.get("Error", {}).get("Message", "") + + # Handle ExternalId mismatch (shows as AccessDenied with specific message) + # Check this first before generic AccessDenied handling + if "external id" in error_message.lower(): + raise SupersetSecurityException( + SupersetError( + message="External ID mismatch. Verify the external_id " + "configuration matches the trust policy.", + error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + level=ErrorLevel.ERROR, + ) + ) from ex + + if error_code in cls.aws_iam_custom_errors: + error_type, message = cls.aws_iam_custom_errors[error_code] + raise SupersetSecurityException( + SupersetError( + message=message, + error_type=error_type, + level=ErrorLevel.ERROR, + ) + ) from ex + + raise SupersetSecurityException( + SupersetError( + message=f"Failed to assume IAM role: {ex}", + error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + level=ErrorLevel.ERROR, + ) + ) from ex + + @classmethod + def generate_rds_auth_token( + cls, + credentials: dict[str, Any], + hostname: str, + port: int, + username: str, + region: str, + ) -> str: + """ + Generate RDS IAM auth token using temporary credentials. + + :param credentials: STS credentials from assume_role + :param hostname: RDS/Aurora endpoint hostname + :param port: Database port + :param username: Database username configured for IAM auth + :param region: AWS region + :returns: IAM auth token to use as database password + :raises SupersetSecurityException: If token generation fails + """ + try: + import boto3 + from botocore.exceptions import ClientError + except ImportError as ex: + raise SupersetSecurityException( + SupersetError( + message="boto3 is required for AWS IAM authentication.", + error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR, + level=ErrorLevel.ERROR, + ) + ) from ex Review Comment: I'm seeing here we don't have the `"Install it with: pip install boto3"` portion. Is that intentional? I was thinking if there was any way to make this DRYer (like in a util method or decorator) but I think the local import needs to be on the method using it, right? ########## superset/db_engine_specs/aws_iam.py: ########## @@ -0,0 +1,660 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +AWS IAM Authentication Mixin for database engine specs. + +This mixin provides cross-account IAM authentication support for AWS databases +(Aurora PostgreSQL, Aurora MySQL, Redshift). It handles: +- Assuming IAM roles via STS AssumeRole +- Generating RDS IAM auth tokens +- Generating Redshift Serverless credentials +- Configuring SSL (required for IAM auth) +- Caching STS credentials to reduce API calls +""" + +from __future__ import annotations + +import logging +import threading +from typing import Any, TYPE_CHECKING, TypedDict + +from cachetools import TTLCache + +from superset.databases.utils import make_url_safe +from superset.errors import ErrorLevel, SupersetError, SupersetErrorType +from superset.exceptions import SupersetSecurityException + +if TYPE_CHECKING: + from superset.models.core import Database + +logger = logging.getLogger(__name__) + +# Default session duration for STS AssumeRole (1 hour) +DEFAULT_SESSION_DURATION = 3600 + +# Default ports +DEFAULT_POSTGRES_PORT = 5432 +DEFAULT_MYSQL_PORT = 3306 +DEFAULT_REDSHIFT_PORT = 5439 + +# Cache STS credentials: key = (role_arn, region, external_id), TTL = 10 min +# Using a TTL shorter than the minimum supported session duration (900s) avoids +# reusing expired STS credentials when a short session_duration is configured. +_credentials_cache: TTLCache[tuple[str, str, str | None], dict[str, Any]] = TTLCache( + maxsize=100, ttl=600 +) +_credentials_lock = threading.RLock() + + +class AWSIAMConfig(TypedDict, total=False): + """Configuration for AWS IAM authentication.""" + + enabled: bool + role_arn: str + external_id: str + region: str + db_username: str + session_duration: int + # Redshift Serverless fields + workgroup_name: str + db_name: str + # Redshift provisioned cluster fields + cluster_identifier: str + + +class AWSIAMAuthMixin: + """ + Mixin that provides AWS IAM authentication for database connections. + + This mixin can be used with database engine specs that support IAM + authentication (Aurora PostgreSQL, Aurora MySQL, Redshift). + + Configuration is provided via the database's encrypted_extra JSON: + + { + "aws_iam": { + "enabled": true, + "role_arn": "arn:aws:iam::222222222222:role/SupersetDatabaseAccess", + "external_id": "superset-prod-12345", # optional + "region": "us-east-1", + "db_username": "superset_iam_user", + "session_duration": 3600 # optional, defaults to 3600 + } + } + """ + + # AWS error patterns for actionable error messages + aws_iam_custom_errors: dict[str, tuple[SupersetErrorType, str]] = { + "AccessDenied": ( + SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + "Unable to assume IAM role. Verify the role ARN and trust policy " + "allow access from Superset's IAM role.", + ), + "InvalidIdentityToken": ( + SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + "Invalid IAM credentials. Ensure Superset has a valid IAM role " + "with permissions to assume the target role.", + ), + "MalformedPolicyDocument": ( + SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR, + "Invalid IAM role ARN format. Please verify the role ARN.", + ), + "ExpiredTokenException": ( + SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + "AWS credentials have expired. Please refresh the connection.", + ), + } + + @classmethod + def get_iam_credentials( + cls, + role_arn: str, + region: str, + external_id: str | None = None, + session_duration: int = DEFAULT_SESSION_DURATION, + ) -> dict[str, Any]: + """ + Assume cross-account IAM role via STS AssumeRole with credential caching. + + Credentials are cached by (role_arn, region, external_id) with a 50-minute + TTL to reduce STS API calls while ensuring tokens are refreshed before the + default 1-hour expiration. + + :param role_arn: The ARN of the IAM role to assume + :param region: AWS region for the STS client + :param external_id: External ID for the role assumption (optional) + :param session_duration: Duration of the session in seconds + :returns: Dictionary with AccessKeyId, SecretAccessKey, SessionToken + :raises SupersetSecurityException: If role assumption fails + """ + cache_key = (role_arn, region, external_id) + + with _credentials_lock: + cached = _credentials_cache.get(cache_key) + if cached is not None: + return cached + + try: + # Lazy import to avoid errors when boto3 is not installed + import boto3 + from botocore.exceptions import ClientError + except ImportError as ex: + raise SupersetSecurityException( Review Comment: that could also update the `pip install` message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
