GitHub user rsaleev added a comment to the discussion: How to enable SSO login 
in Superset using Keycloak access token?

Looks like OAUTH doesn't work now for 5.0. Approach that worked in 4.1.1 

``class KeycloakOAuthView(AuthOAuthView):
    @expose("/login/")
    @expose("/login/<provider>")
    def login(self, provider: Optional[str] = "keycloak") -> WerkzeugResponse:
        if g.user is not None and g.user.is_authenticated:
            session.pop("_flashes", None)
            return redirect(self.appbuilder.get_url_for_index)
        auth_header = request.headers.get(
            "authorization", request.headers.get("Authorization")
        )
        if auth_header:
            log.info("Authorization header provided")
            _, token = auth_header.split("Bearer ")
            # TODO: check request with header
            log.info("Authorization with token")
            if request.headers.get("X-Elevate"):
                log.info("Elevating permissions")
                self.appbuilder.sm.auth_with_elevate(provider, token)
            else:
                self.appbuilder.sm.auth_by_token(provider, token)
            session.pop("_flashes", None)
            return redirect(self.appbuilder.get_url_for_index)
        session["oauth_state"] = os.environ["SUPERSET_SECRET_KEY"]
        state = jwt.encode(
            request.args.to_dict(flat=False),
            session["oauth_state"],
            algorithm="HS256",
        )
        try:
            return 
self.appbuilder.sm.oauth_remotes[provider].authorize_redirect(
                redirect_uri=url_for(
                    ".oauth_authorized", provider=provider, _external=True
                ),
                state=state.decode("ascii") if isinstance(state, bytes) else 
state,
            )
        except Exception as e:
            log.error("Error on OAuth authorize: %s", e)
            flash(as_unicode(self.invalid_login_message), "warning")
            return redirect(self.appbuilder.get_url_for_index)

    @expose("/oauth-authorized/<provider>")
    def oauth_authorized(self, provider: str) -> WerkzeugResponse:
        if provider not in self.appbuilder.sm.oauth_remotes:
            flash("Provider not supported.", "warning")
            log.warning("OAuth authorized got an unknown provider %s", provider)
            return redirect(self.appbuilder.get_url_for_login)
        try:
            resp = 
self.appbuilder.sm.oauth_remotes[provider].authorize_access_token()
        except Exception as e:
            log.error("Error authorizing OAuth access token: %s", e)
            return redirect(self.appbuilder.get_url_for_login)
        if resp is None:
            return redirect(self.appbuilder.get_url_for_login)
        session["id_token_hint"] = resp["id_token"]
        try:
            self.appbuilder.sm.set_oauth_session(provider, resp)
            userinfo = 
self.appbuilder.sm.get_userinfo_from_token(resp["access_token"])
        except Exception as e:
            log.error("Error returning OAuth user info: %s", e)
            user = None
            raise
        else:
            user = self.appbuilder.sm.auth_user_oauth(userinfo)
        if user is None:
            flash(as_unicode(self.invalid_login_message), "warning")
            return redirect(self.appbuilder.get_url_for_login)
        else:
            try:
                state = jwt.decode(
                    request.args["state"],
                    session["oauth_state"],
                    algorithms=["HS256"],
                )
            except (jwt.InvalidTokenError, KeyError):
                flash(as_unicode("Invalid state signature"), "warning")
                return redirect(self.appbuilder.get_url_for_login)
            login_user(user)
            next_url = self.appbuilder.get_url_for_index
            # Check if there is a next url on state
            if "next" in state and len(state["next"]) > 0:
                next_url = get_safe_redirect(state["next"][0])
            session.pop("_flashes", None)
            return redirect(next_url)

    @expose("/logout/")
    def logout(self):
        logout_user()
        base_url = 
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}/protocol/openid-connect/logout?"
        redirect_url = f"{request.host_url}{self.appbuilder.get_url_for_login}"
        token_hint = f"id_token_hint={session['id_token_hint']}"
        url = base_url + token_hint + "&" + 
f"post_logout_redirect_uri={redirect_url}"
        session.clear()
        return redirect(url)


class KeycloakSecurityManager(SupersetSecurityManager):
    def __init__(self, appbuilder):
        super().__init__(appbuilder)
        self.authoauthview = KeycloakOAuthView

    def get_keycloak_public_key(self, token) -> str:
        url = 
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}"
        resp = requests.get(
            url, headers={"Authorization": f"Bearer {token}"}, verify=False
        )
        resp.raise_for_status()
        data = resp.json()
        return data["public_key"]

    def decode_keycloak_jwt(self, token, verify: bool = False):
        log.info("Decoding token...")
        try:
            key_der = b64decode(self.get_keycloak_public_key(token).encode())
            public_key = serialization.load_der_public_key(key_der)
            decoded = jwt.decode(
                token,
                key=public_key,
                verify=verify,
                options={"verify_aud": False},
                algorithms=["RS256", "HS256"],
            )
            return decoded
        except Exception as e:
            log.exception(e)
            raise

    def get_userinfo_from_token(self, token):
        decoded_token: dict = self.decode_keycloak_jwt(token)
        roles = ["undefined"]
        if realm_access := decoded_token.get("realm_access"):
            if realm_roles := realm_access.get("roles"):
                roles = realm_roles
            if realm_user_roles := realm_access.get("userinfo_roles"):
                roles = realm_user_roles
        log.info("IdP user roles %s", ";".join(roles))
        if len(roles) > 1:
            roles = self.map_multiple_roles(roles)
        log.info("Dedicated user roles after mapping %s", ";".join(roles))
        fake_userinfo = FakeUserInfo.from_kwargs(**decoded_token)
        # map keys
        superset_userinfo = SupersetUserInfo(
            first_name=fake_userinfo.given_name,
            last_name=fake_userinfo.family_name,
            username=fake_userinfo.preferred_username,
            email=fake_userinfo.email,
            role_keys=roles,
        )
        return asdict(superset_userinfo)

    def map_multiple_roles(self, roles: list[str]) -> list[str]:
        for i in range(0, len(AUTH_ROLES_PRIORITY.keys()) - 1):
            if AUTH_ROLES_PRIORITY[i] in roles:
                roles = [AUTH_ROLES_PRIORITY[i]]
                break
        return roles

    def auth_by_token(self, provider: str, token: str):
        userinfo = self.appbuilder.sm.get_userinfo_from_token(token)
        fake_resp = {
            "access_token": token,
            "token_type": "Bearer",
            "userinfo": userinfo,
        }
        self.appbuilder.sm.set_oauth_session(provider, fake_resp)
        user = self.appbuilder.sm.auth_user_oauth(userinfo)
        login_user(user, force=True)

    def auth_with_elevate(self, provider, token):
        user = self.find_user("SERVICE")
        if not user:
            user = self.add_user(
                username="SERVICE",
                first_name="Service",
                last_name="Account",
                role=list(self.get_roles_from_keys(["SERVICE"])),
                email="[email protected]",
            )
        userinfo = self.appbuilder.sm.get_userinfo_from_token(token)
        userinfo["username"] = user.username
        userinfo["first_name"] = user.first_name
        userinfo["last_name"] = user.last_name
        userinfo["role_keys"] = ["ROLE"]
        fake_resp = {
            "access_token": token,
            "token_type": "Bearer",
            "userinfo": userinfo,
        }
        self.appbuilder.sm.set_oauth_session(provider, fake_resp)
        user = self.appbuilder.sm.auth_user_oauth(userinfo)
        login_user(user, force=True, duration=timedelta(minutes=10))
```
overridden config 

```
AUTH_TYPE = AUTH_OAUTH
CUSTOM_SECURITY_MANAGER = KeycloakSecurityManager
OAUTH_PROVIDERS = [
    {
        "name": "keycloak",
        "icon": "fa-key",
        "token_key": "access_token",
        "remote_app": {
            "client_id": os.environ["OAUTH_CLIENT_ID"],
            "client_secret": os.environ["OAUTH_CLIENT_SECRET"],
            "client_kwargs": {"scope": "email profile openid", "verify": False},
            "api_base_url": 
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}/protocol/openid-connect",
            "access_token_url": 
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}/protocol/openid-connect/token",
            "authorize_url": 
f"{os.environ.get('KEYCLOAK_BASE_URL')}/auth/realms/{os.environ['KEYCLOAK_REALM']}/protocol/openid-connect/auth",
            "request_token_url": None,
            "jwks_uri": 
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}/protocol/openid-connect/certs",
            "server_metadata_url": 
f"{os.environ['KEYCLOAK_BASE_URL']}/auth/realms/{os.environ['KEYCLOAK_REALM']}/.well-known/openid-configuration",
        },
    }
]
```
Superset just uses DB_AUTH


GitHub link: 
https://github.com/apache/superset/discussions/36203#discussioncomment-15124908

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to