ktmud commented on a change in pull request #12875:
URL: https://github.com/apache/superset/pull/12875#discussion_r569205111
##########
File path: superset/models/dashboard.py
##########
@@ -407,3 +408,30 @@ def clear_dashboard_cache(
sqla.event.listen(TableColumn, "after_update", clear_dashboard_cache)
sqla.event.listen(DruidMetric, "after_update", clear_dashboard_cache)
sqla.event.listen(DruidColumn, "after_update", clear_dashboard_cache)
+
+
+def get_dashboard(id_or_slug: str) -> Dashboard:
Review comment:
This is a good candidate for class method. In fact, it would be nice to
add a `get_by_id_or_slug` to all entities with slug support for simplicity and
consistency. Imagine the following API:
```python
Dashboard.get_by_id(111)
Dashboard.get_by_id_or_slug(xxxx)
# or simply
Dashboard.get(id_or_slug)
Chart.get(id_or_slug)
Tag.get(id_or_slug)
```
##########
File path: superset/security/manager.py
##########
@@ -417,6 +418,27 @@ def can_access_table(self, database: "Database", table:
"Table") -> bool:
return True
+ def raise_for_dashboard_access(self, dashboard: "Dashboard") -> None:
Review comment:
It's worth adding feature flag checks here, too, just in case this
method was accidentally invoked outside of a feature flag enabled brach.
As a general note, unless it's refactoring that will also benefit the base
case, new code for feature flags should all wrap behind the flag because it
would also make cleaning up the feature flag easier (hope we don't have to do
it for this case!).
##########
File path: superset/security/manager.py
##########
@@ -417,6 +418,27 @@ def can_access_table(self, database: "Database", table:
"Table") -> bool:
return True
+ def raise_for_dashboard_access(self, dashboard: "Dashboard") -> None:
+ from superset.views.base import is_user_admin
+ from superset.views.utils import is_owner
+ from superset.models.dashboard import get_dashboard_access_error_object
+ from superset.views.base import get_user_roles
+
+ has_rbac_access = any(
+ dashboard_role.id in [user_role.id for user_role in
get_user_roles()]
+ for dashboard_role in dashboard.roles
+ )
+ can_access = (
+ is_user_admin()
+ or is_owner(dashboard, g.user)
+ or (dashboard.published and has_rbac_access)
+ )
+
+ if not can_access:
+ raise SupersetSecurityException(
+ get_dashboard_access_error_object(dashboard)
+ )
Review comment:
I believe `SupersetSecurityException` is reserved for when users haven't
logged-in or the login is invalid, because it returns `401` status code.
##########
File path: tests/dashboards/security/security_rbac_tests.py
##########
@@ -48,6 +178,20 @@ def
test_get_dashboards_list__admin_get_all_dashboards(self):
def test_get_dashboards_list__owner_get_all_owned_dashboards(self):
# arrange
+ (
+ not_owned_dashboards,
+ owned_dashboards,
+ ) = self.arrange_all_owned_dashboards_scenario()
+
+ # act
+ response = self.get_dashboards_list_response()
+
+ # assert
+ self.assert_dashboards_list_view_response(
+ response, 2, owned_dashboards, not_owned_dashboards
+ )
+
+ def arrange_all_owned_dashboards_scenario(self):
Review comment:
Ditto: `_create_sample_dashboards_with_owner_access`
##########
File path: superset/models/dashboard.py
##########
@@ -407,3 +408,30 @@ def clear_dashboard_cache(
sqla.event.listen(TableColumn, "after_update", clear_dashboard_cache)
sqla.event.listen(DruidMetric, "after_update", clear_dashboard_cache)
sqla.event.listen(DruidColumn, "after_update", clear_dashboard_cache)
+
+
+def get_dashboard(id_or_slug: str) -> Dashboard:
+ session = db.session()
+ qry = session.query(Dashboard)
+ if id_or_slug.isdigit():
+ qry = qry.filter_by(id=int(id_or_slug))
+ else:
+ qry = qry.filter_by(slug=id_or_slug)
+
+ return qry.one_or_none()
+
+
+def get_dashboard_access_error_object(
+ dashboard: Dashboard, # pylint: disable=invalid-name
+) -> SupersetError:
+ """
+ Return the error object for the denied Superset dashboard.
+ :param dashboard: The denied Superset dashboard
+ :returns: The error object
+ """
+ return SupersetError(
+ error_type=SupersetErrorType.DASHBOARD_SECURITY_ACCESS_ERROR,
+ message=f"This dashboard requires to have one of the access roles
assigned it",
Review comment:
Can we add the exception to `superset.dashboard.exceptions` instead?
```python
class DashboardAccessDeniedError(ForbiddenError):
message = _("You don't have access to this dashboard.")
```
Since `dashboard_id` is not used anywhere for now, I wouldn't worry about
adding it using a custom method, either.
##########
File path: superset/views/core.py
##########
@@ -1785,6 +1786,11 @@ def publish( # pylint: disable=no-self-use
@has_access
@expose("/dashboard/<dashboard_id_or_slug>/")
@event_logger.log_this_with_extra_payload
+ @check_permissions(
+ on_error=lambda self, ex: Response(
+ utils.error_msg_from_exception(ex), status=403
+ )
+ )
Review comment:
This is an HTML page. Returning a simple 403 response is probably not
enough. We should have a decorator similar to `handle_api_exception` to handle
exceptions on HTML pages.
Ideally all exceptions should be handled in a single final handler for
consistent error message rendering.
But maybe the dashboard page is complex enough to warrant its own access
denied page, too.
cc @junlincc
##########
File path: superset/views/core.py
##########
@@ -1811,7 +1810,7 @@ def dashboard( # pylint: disable=too-many-locals
datasource = ConnectorRegistry.get_datasource(
datasource_type=datasource["type"],
datasource_id=datasource["id"],
- session=session,
+ session=db.session(),
Review comment:
Could you explain why is this change necessary?
##########
File path: tests/dashboards/security/security_rbac_tests.py
##########
@@ -96,23 +232,11 @@ def
test_get_dashboards_list__user_without_any_permissions_get_empty_list(self):
def
test_get_dashboards_list__user_get_only_published_permitted_dashboards(self):
# arrange
- username = random_str()
- new_role = f"role_{random_str()}"
- self.create_user_with_roles(username, [new_role],
should_create_roles=True)
-
- published_dashboards = [
- create_dashboard_to_db(published=True),
- create_dashboard_to_db(published=True),
- ]
- not_published_dashboards = [
- create_dashboard_to_db(published=False),
- create_dashboard_to_db(published=False),
- ]
-
- for dash in published_dashboards + not_published_dashboards:
- grant_access_to_dashboard(dash, new_role)
-
- self.login(username)
+ (
+ new_role,
+ not_published_dashboards,
Review comment:
Maybe we can call this `draft_dashboard`?
##########
File path: tests/dashboards/security/security_rbac_tests.py
##########
@@ -129,6 +253,23 @@ def
test_get_dashboards_list__user_get_only_published_permitted_dashboards(self)
for dash in published_dashboards + not_published_dashboards:
revoke_access_to_dashboard(dash, new_role)
+ def __arrange_only_published_permitted(self):
Review comment:
Is double underscore a new convention for private methods? The method
name is also a little unclear. How about `def
_create_sample_dashboards_with_role_access`
##########
File path: superset/utils/decorators.py
##########
@@ -69,3 +74,34 @@ def wrapped(*args: Any, **kwargs: Any) -> Any:
return wrapped
return decorate
+
+
+def on_security_exception(self, ex) -> Response:
+ return self.response(403, **{"message":
utils.error_msg_from_exception(ex)})
+
+
+def check_permissions(
Review comment:
The name of this decorator is too generic for a check that's only on
dashboards.
##########
File path: tests/dashboards/security/security_rbac_tests.py
##########
@@ -31,6 +31,136 @@
"superset.extensions.feature_flag_manager._feature_flags",
DASHBOARD_RBAC=True,
)
class TestDashboardRoleBasedSecurity(BaseTestDashboardSecurity):
+ def test_get_dashboard_view__admin_can_access(self):
+ # arrange
+ dashboard_to_access = create_dashboard_to_db(
+ owners=[], slices=[create_slice_to_db()], published=False
+ )
+ self.login("admin")
+
+ # act
+ response = self.get_dashboard_view_response(dashboard_to_access)
+
+ # assert
+ self.assert_dashboard_view_response(response, dashboard_to_access)
+
+ def test_get_dashboard_view__owner_can_access(self):
+ # arrange
+ username = random_str()
+ new_role = f"role_{random_str()}"
+ owner = self.create_user_with_roles(
+ username, [new_role], should_create_roles=True
+ )
+ dashboard_to_access = create_dashboard_to_db(
+ owners=[owner], slices=[create_slice_to_db()], published=False
+ )
+ self.login(username)
+
+ # act
+ response = self.get_dashboard_view_response(dashboard_to_access)
+
+ # assert
+ self.assert_dashboard_view_response(response, dashboard_to_access)
+
+ def test_get_dashboard_view__user_can_not_access_without_permission(self):
+ username = random_str()
+ new_role = f"role_{random_str()}"
+ self.create_user_with_roles(username, [new_role],
should_create_roles=True)
+ dashboard_to_access = create_dashboard_to_db(published=True)
+ self.login(username)
+
+ # act
+ response = self.get_dashboard_view_response(dashboard_to_access)
+
+ # assert
+ self.assert403(response)
+
+ def
test_get_dashboard_view__user_with_dashboard_permission_can_not_access_not_published(
+ self,
+ ):
+ # arrange
+ dashboard_to_access = create_dashboard_to_db(published=False)
+ username = random_str()
+ new_role = f"role_{random_str()}"
+ self.create_user_with_roles(username, [new_role],
should_create_roles=True)
+ grant_access_to_dashboard(dashboard_to_access, new_role)
+ self.login(username)
+
+ # act
+ response = self.get_dashboard_view_response(dashboard_to_access)
+
+ # assert
+ self.assert403(response)
+
+ # post
+ revoke_access_to_dashboard(dashboard_to_access, new_role)
+
+ def test_get_dashboard_view__user_access_with_dashboard_permission(self):
+ # arrange
+
+ username = random_str()
+ new_role = f"role_{random_str()}"
+ self.create_user_with_roles(username, [new_role],
should_create_roles=True)
+
+ dashboard_to_access = create_dashboard_to_db(
+ published=True, slices=[create_slice_to_db()]
+ )
+ self.login(username)
+ grant_access_to_dashboard(dashboard_to_access, new_role)
+
+ # act
+ response = self.get_dashboard_view_response(dashboard_to_access)
+
+ # assert
+ self.assert_dashboard_view_response(response, dashboard_to_access)
+
+ # post
+ revoke_access_to_dashboard(dashboard_to_access, new_role)
+
+ def
test_get_dashboard_view__public_user_can_not_access_without_permission(self):
+ dashboard_to_access = create_dashboard_to_db(published=True)
+ self.logout()
+
+ # act
+ response = self.get_dashboard_view_response(dashboard_to_access)
+
+ # assert
+ self.assert403(response)
+
+ def
test_get_dashboard_view__public_user_with_dashboard_permission_can_not_access_not_published(
Review comment:
nit: `replaceAll('not_published', 'draft')`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]