bbotella commented on code in PR #165:
URL: https://github.com/apache/cassandra-sidecar/pull/165#discussion_r1893169508


##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java:
##########
@@ -32,8 +32,10 @@ public final class ApiEndpointsV1
 
     public static final String NATIVE = "/native";
     public static final String JMX = "/jmx";
-    public static final String KEYSPACE_PATH_PARAM = ":keyspace";
-    public static final String TABLE_PATH_PARAM = ":table";
+    public static final String KEYSPACE = "keyspace";
+    public static final String TABLE = "table";
+    public static final String KEYSPACE_PATH_PARAM = ":" + KEYSPACE;

Review Comment:
   If this is needed, what about extracting the `:` to its own static variable 
as well? And maybe refactoring the rest of params?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/Action.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import io.vertx.ext.auth.authorization.Authorization;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.acl.authorization.WildcardAction.WILDCARD_PART_DIVIDER_TOKEN;
+import static 
org.apache.cassandra.sidecar.acl.authorization.WildcardAction.WILDCARD_TOKEN;
+
+/**
+ * Represents an action that can be granted to a user on a resource or across 
resources.
+ */
+public interface Action
+{
+    /**
+     * @return {@link Authorization}.
+     */
+    default Authorization toAuthorization()
+    {
+        return toAuthorization(null);
+    }
+
+    /**
+     * @return {@link Authorization} created for a resource
+     */
+    Authorization toAuthorization(String resource);
+
+    /**
+     * @return a new instance of {@link Action}
+     */
+    static Action fromName(@NotNull String name)
+    {
+        boolean isWildCard = name.equals(WILDCARD_TOKEN) || 
name.contains(WILDCARD_PART_DIVIDER_TOKEN);

Review Comment:
   Don't we want to support partial wildcards? Things like: `foo*`?
   
   Also, don't quite love that the static variables are part of 
`WildcardAction`. If we are going to share them between an interface and one of 
its implementations, we should extract them to somewhere else.



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationProvider.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+
+import static org.apache.cassandra.sidecar.utils.AuthUtils.extractIdentities;
+
+/**
+ * Provides authorizations based on user's role. Extracts permissions user 
holds from Cassandra's
+ * system_auth.role_permissions table and from Sidecar's 
sidecar_internal.role_permissions_v1 table and sets
+ * them in user.
+ */
+public class RoleBasedAuthorizationProvider implements AuthorizationProvider
+{
+    private final IdentityToRoleCache identityToRoleCache;
+    private final RoleAuthorizationsCache roleAuthorizationsCache;
+
+    public RoleBasedAuthorizationProvider(IdentityToRoleCache 
identityToRoleCache,
+                                          RoleAuthorizationsCache 
roleAuthorizationsCache)
+    {
+        this.identityToRoleCache = identityToRoleCache;
+        this.roleAuthorizationsCache = roleAuthorizationsCache;
+    }
+
+    public String getId()
+    {
+        return "RoleBasedAccessControl";
+    }
+
+    @Override
+    public void getAuthorizations(User user, Handler<AsyncResult<Void>> 
handler)
+    {
+        getAuthorizations(user).onComplete(handler);
+    }
+
+    @Override
+    public Future<Void> getAuthorizations(User user)
+    {
+        List<String> identities = extractIdentities(user);
+
+        if (identities.isEmpty())
+        {
+            throw new HttpException(HttpResponseStatus.FORBIDDEN.code(), 
"Missing client identities");
+        }
+
+        // First identity in chain is usually client identity
+        String role = identityToRoleCache.get(identities.get(0));
+        if (role == null)
+        {
+            throw new HttpException(HttpResponseStatus.FORBIDDEN.code(), "No 
matching Cassandra role found");
+        }
+
+        // when entries in cache are not found, null is returned. We can not 
add null in user.authorizations()

Review Comment:
   When that happens, should we somehow trigger a cache refresh?



##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/SchemaUnavailableException.java:
##########
@@ -16,21 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.exceptions;
+package org.apache.cassandra.sidecar.common.server.exceptions;
 
 /**
- * Exception thrown when {@link 
org.apache.cassandra.sidecar.db.schema.TableSchema} is not prepared or expected
- * operations are unavailable.
+ * Exception thrown when {@link 
org.apache.cassandra.sidecar.db.schema.TableSchema} does not exist.
+ * For instance, the connected Cassandra no longer has such table
  */
 public class SchemaUnavailableException extends RuntimeException
 {
-    public SchemaUnavailableException(String message)
+    public SchemaUnavailableException(String keyspace, String table)
     {
-        super(message);
+        super(makeErrorMessage(keyspace, table));
     }
 
-    public SchemaUnavailableException(String message, Throwable cause)
+    private static String makeErrorMessage(String keyspace, String table)
     {
-        super(message, cause);
+        return "Table " + keyspace + '/' + table + " does not exist";

Review Comment:
   Nit: Should the separator be a `.`?



##########
server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/ActionTest.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import org.junit.jupiter.api.Test;
+
+import io.vertx.ext.auth.authorization.PermissionBasedAuthorization;
+import io.vertx.ext.auth.authorization.WildcardPermissionBasedAuthorization;
+import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Test for {@link Action}
+ */
+class ActionTest
+{
+    @Test
+    void testValidActions()
+    {
+        
assertThat(Action.fromName("CREATE_SNAPSHOT")).isInstanceOf(StandardAction.class);
+        
assertThat(Action.fromName("OPERATE")).isInstanceOf(StandardAction.class);
+        
assertThat(Action.fromName("CREATESNAPSHOT")).isInstanceOf(StandardAction.class);
+        
assertThat(Action.fromName("CREATE:SNAPSHOT")).isInstanceOf(WildcardAction.class);
+        
assertThat(Action.fromName("*:SNAPSHOT")).isInstanceOf(WildcardAction.class);
+        
assertThat(Action.fromName("STREAM:*")).isInstanceOf(WildcardAction.class);
+        assertThat(Action.fromName("*")).isInstanceOf(WildcardAction.class);
+        assertThat(Action.fromName("*:*")).isInstanceOf(WildcardAction.class);
+        
assertThat(Action.fromName("CREATE:SNAPSHOT:*")).isInstanceOf(WildcardAction.class);
+    }
+
+    @Test
+    void testInvalidActions()
+    {
+        assertThatThrownBy(() -> 
Action.fromName("")).isInstanceOf(ConfigurationException.class);

Review Comment:
   `null` should also fail right?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AllowAllAuthorizationProvider.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+
+/**
+ * Authorizer to allow all requests.

Review Comment:
   Technically, this is an authorization provider, not an authorizer. Right?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/AuthCache.java:
##########
@@ -164,6 +165,10 @@ protected void warmUp(int availableRetries)
         {
             cache.putAll(bulkLoadFunction.get());
         }
+        catch (SchemaUnavailableException sue)

Review Comment:
   Will all the implementations of `cache` throw the same exception?



##########
server-common/src/main/java/org/apache/cassandra/sidecar/db/schema/AbstractSchema.java:
##########
@@ -69,13 +81,16 @@ protected boolean initializeInternal(@NotNull Session 
session)
         }
 
         prepareStatements(session);
+        logger.info("{} is initialized!", this.getClass().getSimpleName());

Review Comment:
   Should this be a debug instead of info?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AllowAllAuthorization.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationContext;
+
+/**
+ * {@code Authorization} implementation to allow access for all users 
regardless of their authorizations.
+ */
+public class AllowAllAuthorization implements Authorization
+{
+    public static final AllowAllAuthorization INSTANCE = new 
AllowAllAuthorization();
+
+    @Override
+    public boolean match(AuthorizationContext context)

Review Comment:
   Javadoc?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleAuthorizationsCache.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.ext.auth.authorization.Authorization;
+import org.apache.cassandra.sidecar.acl.AuthCache;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.SidecarPermissionsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.SystemAuthDatabaseAccessor;
+
+/**
+ * Caches role and authorizations held by it. Entries from 
system_auth.role_permissions table in Cassandra and
+ * sidecar_internal.role_permissions_v1 table are processed into 
authorizations and cached here. All table entries are
+ * stored against a unique cache key. Caching against UNIQUE_CACHE_ENTRY is 
done to make sure new entries in the table
+ * are picked up during cache refreshes.
+ */
+@Singleton
+public class RoleAuthorizationsCache extends AuthCache<String, Map<String, 
Set<Authorization>>>
+{
+    private static final String NAME = "role_permissions_cache";
+    protected static final String UNIQUE_CACHE_ENTRY = 
"unique_cache_entry_key";
+
+    @Inject
+    public RoleAuthorizationsCache(Vertx vertx,
+                                   ExecutorPools executorPools,
+                                   SidecarConfiguration sidecarConfiguration,
+                                   SystemAuthDatabaseAccessor 
systemAuthDatabaseAccessor,
+                                   SidecarPermissionsDatabaseAccessor 
sidecarPermissionsDatabaseAccessor)
+    {
+        super(NAME,
+              vertx,
+              executorPools,
+              k -> loadAuthorizations(systemAuthDatabaseAccessor,
+                                      
sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled(),
+                                      sidecarPermissionsDatabaseAccessor),
+              () -> Collections.singletonMap(UNIQUE_CACHE_ENTRY,
+                                             
loadAuthorizations(systemAuthDatabaseAccessor,
+                                                                
sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled(),
+                                                                
sidecarPermissionsDatabaseAccessor)),
+              
sidecarConfiguration.accessControlConfiguration().permissionCacheConfiguration());
+    }
+
+    public Set<Authorization> getAuthorizations(String role)

Review Comment:
   Javadoc?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationWithAdminBypassHandler.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.List;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import io.vertx.ext.web.handler.impl.AuthorizationHandlerImpl;
+
+import static org.apache.cassandra.sidecar.utils.AuthUtils.extractIdentities;
+
+/**
+ * Verifies user has required authorizations. Allows admin identities to 
bypass authorization checks.
+ */
+public class AuthorizationWithAdminBypassHandler extends 
AuthorizationHandlerImpl
+{
+    private final AdminIdentityResolver adminIdentityResolver;
+
+    public AuthorizationWithAdminBypassHandler(AdminIdentityResolver 
adminIdentityResolver,
+                                               Authorization authorization)
+    {
+        super(authorization);
+        this.adminIdentityResolver = adminIdentityResolver;
+    }
+
+    @Override
+    public void handle(RoutingContext ctx)
+    {
+        List<String> identities = extractIdentities(ctx.user());
+
+        if (identities.isEmpty())
+        {
+            throw new HttpException(HttpResponseStatus.FORBIDDEN.code(), 
"Missing client identities");
+        }
+
+        // Admin identities bypas route specific authorization checks
+        if (identities.stream().anyMatch(adminIdentityResolver::isAdmin))

Review Comment:
   I understand this is by no means as hot path as the ones the [dev 
thread](https://lists.apache.org/thread/65glsjzkmpktzmns6j9wvr4nczvskx36) was 
referring to (even this is a potentially hot path for Sidecar), but I wonder if 
we should keep that choice in sidecar of avoiding streams in non test code.



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AdminIdentityResolver.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+
+/**
+ * Evaluates if provided identity is an admin identity.
+ */
+@Singleton
+public class AdminIdentityResolver
+{
+    private final IdentityToRoleCache identityToRoleCache;
+    private final SuperUserCache superUserCache;
+    private final Set<String> adminIdentities;
+
+    @Inject
+    public AdminIdentityResolver(IdentityToRoleCache identityToRoleCache,
+                                 SuperUserCache superUserCache,
+                                 SidecarConfiguration sidecarConfiguration)
+    {
+        this.identityToRoleCache = identityToRoleCache;
+        this.superUserCache = superUserCache;
+        this.adminIdentities = 
sidecarConfiguration.accessControlConfiguration().adminIdentities();
+    }
+
+    public Boolean isAdmin(String identity)
+    {
+        String role = identityToRoleCache.get(identity);
+        Boolean isSuperuser = superUserCache.isSuperUser(role);
+        // Cassandra superusers have admin privileges. For fallback, sidecar 
configured admin identities are honored
+        return isSuperuser || adminIdentities.contains(identity);

Review Comment:
   Does `adminIdentities.contains(identity)` need to be a fallback? I mean, the 
contains on a set is a O(1) operation that is always going to be called anyway 
in this implementation. If we checked it first thing, we may save the two cache 
checks? (unless they are way cheaper than checking the set?).



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleAuthorizationsCache.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.ext.auth.authorization.Authorization;
+import org.apache.cassandra.sidecar.acl.AuthCache;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.SidecarPermissionsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.SystemAuthDatabaseAccessor;
+
+/**
+ * Caches role and authorizations held by it. Entries from 
system_auth.role_permissions table in Cassandra and
+ * sidecar_internal.role_permissions_v1 table are processed into 
authorizations and cached here. All table entries are
+ * stored against a unique cache key. Caching against UNIQUE_CACHE_ENTRY is 
done to make sure new entries in the table
+ * are picked up during cache refreshes.
+ */
+@Singleton
+public class RoleAuthorizationsCache extends AuthCache<String, Map<String, 
Set<Authorization>>>
+{
+    private static final String NAME = "role_permissions_cache";
+    protected static final String UNIQUE_CACHE_ENTRY = 
"unique_cache_entry_key";
+
+    @Inject
+    public RoleAuthorizationsCache(Vertx vertx,
+                                   ExecutorPools executorPools,
+                                   SidecarConfiguration sidecarConfiguration,
+                                   SystemAuthDatabaseAccessor 
systemAuthDatabaseAccessor,
+                                   SidecarPermissionsDatabaseAccessor 
sidecarPermissionsDatabaseAccessor)
+    {
+        super(NAME,
+              vertx,
+              executorPools,
+              k -> loadAuthorizations(systemAuthDatabaseAccessor,
+                                      
sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled(),
+                                      sidecarPermissionsDatabaseAccessor),
+              () -> Collections.singletonMap(UNIQUE_CACHE_ENTRY,
+                                             
loadAuthorizations(systemAuthDatabaseAccessor,
+                                                                
sidecarConfiguration.serviceConfiguration().schemaKeyspaceConfiguration().isEnabled(),
+                                                                
sidecarPermissionsDatabaseAccessor)),
+              
sidecarConfiguration.accessControlConfiguration().permissionCacheConfiguration());
+    }
+
+    public Set<Authorization> getAuthorizations(String role)
+    {
+        return get(UNIQUE_CACHE_ENTRY).get(role);
+    }
+
+    private static Map<String, Set<Authorization>> 
loadAuthorizations(SystemAuthDatabaseAccessor systemAuthDatabaseAccessor,
+                                                                      boolean 
isSidecarSchemaEnabled,
+                                                                      
SidecarPermissionsDatabaseAccessor sidecarPermissionsDatabaseAccessor)
+    {
+
+        // when entries in cache are not found, null is returned. We can not 
add null in Map
+        Map<String, Set<Authorization>>  roleAuthorizations
+        = 
Optional.ofNullable(systemAuthDatabaseAccessor.getAllRolesAndPermissions()).orElse(Collections.emptyMap());

Review Comment:
   Fix ident



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleAuthorizationsCache.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.ext.auth.authorization.Authorization;
+import org.apache.cassandra.sidecar.acl.AuthCache;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.SidecarPermissionsDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.SystemAuthDatabaseAccessor;
+
+/**
+ * Caches role and authorizations held by it. Entries from 
system_auth.role_permissions table in Cassandra and
+ * sidecar_internal.role_permissions_v1 table are processed into 
authorizations and cached here. All table entries are
+ * stored against a unique cache key. Caching against UNIQUE_CACHE_ENTRY is 
done to make sure new entries in the table
+ * are picked up during cache refreshes.
+ */
+@Singleton
+public class RoleAuthorizationsCache extends AuthCache<String, Map<String, 
Set<Authorization>>>
+{
+    private static final String NAME = "role_permissions_cache";
+    protected static final String UNIQUE_CACHE_ENTRY = 
"unique_cache_entry_key";
+
+    @Inject
+    public RoleAuthorizationsCache(Vertx vertx,
+                                   ExecutorPools executorPools,
+                                   SidecarConfiguration sidecarConfiguration,
+                                   SystemAuthDatabaseAccessor 
systemAuthDatabaseAccessor,
+                                   SidecarPermissionsDatabaseAccessor 
sidecarPermissionsDatabaseAccessor)
+    {
+        super(NAME,
+              vertx,
+              executorPools,
+              k -> loadAuthorizations(systemAuthDatabaseAccessor,

Review Comment:
   Can we avoid calling loadAuthorizations twice? (here and on line 62)



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationWithAdminBypassHandler.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.List;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import io.vertx.ext.web.handler.impl.AuthorizationHandlerImpl;
+
+import static org.apache.cassandra.sidecar.utils.AuthUtils.extractIdentities;
+
+/**
+ * Verifies user has required authorizations. Allows admin identities to 
bypass authorization checks.
+ */
+public class AuthorizationWithAdminBypassHandler extends 
AuthorizationHandlerImpl
+{
+    private final AdminIdentityResolver adminIdentityResolver;
+
+    public AuthorizationWithAdminBypassHandler(AdminIdentityResolver 
adminIdentityResolver,
+                                               Authorization authorization)
+    {
+        super(authorization);
+        this.adminIdentityResolver = adminIdentityResolver;
+    }
+
+    @Override
+    public void handle(RoutingContext ctx)
+    {
+        List<String> identities = extractIdentities(ctx.user());
+
+        if (identities.isEmpty())
+        {
+            throw new HttpException(HttpResponseStatus.FORBIDDEN.code(), 
"Missing client identities");
+        }
+
+        // Admin identities bypas route specific authorization checks

Review Comment:
   typo: `bypass`



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SidecarActions.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+/**
+ * Sidecar actions allowed on specific targets are listed here. Majority of 
sidecar actions are represented in
+ * format <action_allowed>:<action_target>.
+ * <p>
+ * Example, with CREATE:SNAPSHOT permission, CREATE action is allowed for 
SNAPSHOT target. Sample actions are
+ * CREATE, VIEW, UPDATE, DELETE, STREAM, IMPORT, UPLOAD, START, ABORT etc.
+ * <p>
+ * Wildcard actions are supported with ':' wildcard parts divider and '*' 
wildcard token to match parts:
+ * <p>
+ * - *:SNAPSHOT allows CREATE:SNAPSHOT, VIEW:SNAPSHOT and DELETE:SNAPSHOT.
+ * - CREATE:* allows CREATE action on all possible targets.
+ * - *:* allows all possible permissions for specified resource
+ */
+public class SidecarActions
+{
+    // cassandra cluster related actions
+    public static final Action VIEW_CLUSTER = new 
WildcardAction("VIEW:CLUSTER");
+
+    // SSTable related actions
+    public static final Action UPLOAD_SSTABLE = new 
WildcardAction("UPLOAD:SSTABLE");

Review Comment:
   Nit: Maybe extracting static variables for Actions and for Targets, so we 
can reuse?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationProvider.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+
+import static org.apache.cassandra.sidecar.utils.AuthUtils.extractIdentities;
+
+/**
+ * Provides authorizations based on user's role. Extracts permissions user 
holds from Cassandra's
+ * system_auth.role_permissions table and from Sidecar's 
sidecar_internal.role_permissions_v1 table and sets
+ * them in user.
+ */
+public class RoleBasedAuthorizationProvider implements AuthorizationProvider
+{
+    private final IdentityToRoleCache identityToRoleCache;
+    private final RoleAuthorizationsCache roleAuthorizationsCache;
+
+    public RoleBasedAuthorizationProvider(IdentityToRoleCache 
identityToRoleCache,
+                                          RoleAuthorizationsCache 
roleAuthorizationsCache)
+    {
+        this.identityToRoleCache = identityToRoleCache;
+        this.roleAuthorizationsCache = roleAuthorizationsCache;
+    }
+
+    public String getId()
+    {
+        return "RoleBasedAccessControl";
+    }
+
+    @Override
+    public void getAuthorizations(User user, Handler<AsyncResult<Void>> 
handler)
+    {
+        getAuthorizations(user).onComplete(handler);
+    }
+
+    @Override
+    public Future<Void> getAuthorizations(User user)
+    {
+        List<String> identities = extractIdentities(user);
+
+        if (identities.isEmpty())
+        {
+            throw new HttpException(HttpResponseStatus.FORBIDDEN.code(), 
"Missing client identities");
+        }
+
+        // First identity in chain is usually client identity

Review Comment:
   How does this "usually" impacts the logic. Is there a valid case in which 
the client identity is not the first one, but it's still valid? If that's the 
case, we need to have a fallback to handle those cases.



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationProvider.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+
+import static org.apache.cassandra.sidecar.utils.AuthUtils.extractIdentities;
+
+/**
+ * Provides authorizations based on user's role. Extracts permissions user 
holds from Cassandra's
+ * system_auth.role_permissions table and from Sidecar's 
sidecar_internal.role_permissions_v1 table and sets
+ * them in user.
+ */
+public class RoleBasedAuthorizationProvider implements AuthorizationProvider
+{
+    private final IdentityToRoleCache identityToRoleCache;
+    private final RoleAuthorizationsCache roleAuthorizationsCache;
+
+    public RoleBasedAuthorizationProvider(IdentityToRoleCache 
identityToRoleCache,
+                                          RoleAuthorizationsCache 
roleAuthorizationsCache)
+    {
+        this.identityToRoleCache = identityToRoleCache;
+        this.roleAuthorizationsCache = roleAuthorizationsCache;
+    }
+
+    public String getId()

Review Comment:
   I think we are missing the `@Override`



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SidecarActions.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+/**
+ * Sidecar actions allowed on specific targets are listed here. Majority of 
sidecar actions are represented in
+ * format <action_allowed>:<action_target>.
+ * <p>
+ * Example, with CREATE:SNAPSHOT permission, CREATE action is allowed for 
SNAPSHOT target. Sample actions are
+ * CREATE, VIEW, UPDATE, DELETE, STREAM, IMPORT, UPLOAD, START, ABORT etc.
+ * <p>
+ * Wildcard actions are supported with ':' wildcard parts divider and '*' 
wildcard token to match parts:
+ * <p>
+ * - *:SNAPSHOT allows CREATE:SNAPSHOT, VIEW:SNAPSHOT and DELETE:SNAPSHOT.
+ * - CREATE:* allows CREATE action on all possible targets.
+ * - *:* allows all possible permissions for specified resource
+ */
+public class SidecarActions
+{
+    // cassandra cluster related actions
+    public static final Action VIEW_CLUSTER = new 
WildcardAction("VIEW:CLUSTER");
+
+    // SSTable related actions
+    public static final Action UPLOAD_SSTABLE = new 
WildcardAction("UPLOAD:SSTABLE");
+    public static final Action IMPORT_SSTABLE = new 
WildcardAction("IMPORT:SSTABLE");
+    public static final Action STREAM_SSTABLE = new 
WildcardAction("STREAM:SSTABLE");
+
+    // Upload related actions
+    public static final Action DELETE_UPLOAD = new 
WildcardAction("DELETE:UPLOAD");
+
+    // snapshot related actions
+    public static final Action CREATE_SNAPSHOT = new 
WildcardAction("CREATE:SNAPSHOT");
+    public static final Action VIEW_SNAPSHOT = new 
WildcardAction("VIEW:SNAPSHOT");
+    public static final Action DELETE_SNAPSHOT = new 
WildcardAction("DELETE:SNAPSHOT");
+
+    // restore related actions
+    public static final Action CREATE_RESTORE = new 
WildcardAction("CREATE:RESTORE");

Review Comment:
   Nit: I'd keep the full `RESTORE_JOB` target name. Otherwise, `RESTORE` may 
look like an action instead of a target.



##########
server/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java:
##########
@@ -239,6 +247,7 @@ public void close()
             try
             {
                 localSession.getCluster().closeAsync().get(1, 
TimeUnit.MINUTES);
+                vertx.eventBus().publish(ON_CASSANDRA_DRIVER_CLOSED.address(), 
null);

Review Comment:
   Is this an unrelated bug fix?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/StandardAction.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.PermissionBasedAuthorization;
+import io.vertx.ext.auth.authorization.impl.PermissionBasedAuthorizationImpl;
+import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+
+import static 
org.apache.cassandra.sidecar.acl.authorization.WildcardAction.WILDCARD_TOKEN;
+
+/**
+ * Standard actions need an exact match between allowed actions.
+ */
+public class StandardAction implements Action
+{
+    protected final String name;
+
+    public StandardAction(String name)
+    {
+        if (name == null || name.isEmpty())

Review Comment:
   Should we also prevent the WILDCARD_TOKEN to be present on the name?



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/RingHandler.java:
##########
@@ -95,15 +107,15 @@ protected void processFailure(Throwable cause,
             return;
         }
 
-        super.processFailure(cause, context, host, remoteAddress, keyspace);
+        super.processFailure(cause, context, host, remoteAddress, request);
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    protected Name extractParamsOrThrow(RoutingContext context)
+    protected Void extractParamsOrThrow(RoutingContext context)
     {
-        return keyspace(context, false);
+        return null;

Review Comment:
   Why are we returning null?



##########
server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SidecarActions.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+/**
+ * Sidecar actions allowed on specific targets are listed here. Majority of 
sidecar actions are represented in
+ * format <action_allowed>:<action_target>.
+ * <p>
+ * Example, with CREATE:SNAPSHOT permission, CREATE action is allowed for 
SNAPSHOT target. Sample actions are
+ * CREATE, VIEW, UPDATE, DELETE, STREAM, IMPORT, UPLOAD, START, ABORT etc.
+ * <p>
+ * Wildcard actions are supported with ':' wildcard parts divider and '*' 
wildcard token to match parts:
+ * <p>
+ * - *:SNAPSHOT allows CREATE:SNAPSHOT, VIEW:SNAPSHOT and DELETE:SNAPSHOT.
+ * - CREATE:* allows CREATE action on all possible targets.
+ * - *:* allows all possible permissions for specified resource
+ */
+public class SidecarActions
+{
+    // cassandra cluster related actions
+    public static final Action VIEW_CLUSTER = new 
WildcardAction("VIEW:CLUSTER");

Review Comment:
   What do you think about reusing the `Action.fromName()` you have implemented 
as default on the `Action` interface?



##########
server/src/test/integration/org/apache/cassandra/sidecar/acl/RoleBasedAuthorizationIntegrationTest.java:
##########


Review Comment:
   Do we have tests for "deny access" scenarios?



##########
server/src/main/java/org/apache/cassandra/sidecar/db/SystemAuthDatabaseAccessor.java:
##########
@@ -66,9 +68,7 @@ public String findRoleFromIdentity(String identity)
      */
     public Map<String, String> findAllIdentityToRoles()
     {
-        ensureIdentityToRoleTableAccess();

Review Comment:
   Why are we removing this check?



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/AccessProtectedRouteBuilder.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import io.vertx.core.Handler;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.auth.authorization.AndAuthorization;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationContext;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.BodyHandler;
+import org.apache.cassandra.sidecar.acl.authorization.AdminIdentityResolver;
+import 
org.apache.cassandra.sidecar.acl.authorization.AuthorizationWithAdminBypassHandler;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
+import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.KEYSPACE;
+import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.TABLE;
+
+/**
+ * Builder for building authorized routes
+ */
+public class AccessProtectedRouteBuilder
+{
+    private final AccessControlConfiguration accessControlConfiguration;
+    private final AuthorizationProvider authorizationProvider;
+    private final AdminIdentityResolver adminIdentityResolver;
+
+    private Router router;
+    private HttpMethod method;
+    private String endpoint;
+    private boolean setBodyHandler;
+    private final List<Handler<RoutingContext>> handlers = new ArrayList<>();
+
+    public AccessProtectedRouteBuilder(AccessControlConfiguration 
accessControlConfiguration,
+                                       AuthorizationProvider 
authorizationProvider,
+                                       AdminIdentityResolver 
adminIdentityResolver)
+    {
+        this.accessControlConfiguration = accessControlConfiguration;
+        this.authorizationProvider = authorizationProvider;
+        this.adminIdentityResolver = adminIdentityResolver;
+    }
+
+    /**
+     * Sets {@link Router} authorized route is built for
+     *
+     * @param router Router authorized route is built for
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder router(Router router)
+    {
+        this.router = router;
+        return this;
+    }
+
+    /**
+     * Sets {@link HttpMethod} for route
+     *
+     * @param method HttpMethod set for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder method(HttpMethod method)
+    {
+        this.method = method;
+        return this;
+    }
+
+    /**
+     * Sets path for route
+     *
+     * @param endpoint REST path for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder endpoint(String endpoint)
+    {
+        this.endpoint = endpoint;
+        return this;
+    }
+
+    /**
+     * Sets if BodyHandler should be created for the route.
+     *
+     * @param setBodyHandler boolean flag indicating if route requires 
BodyHandler
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder setBodyHandler(Boolean setBodyHandler)
+    {
+        this.setBodyHandler = setBodyHandler;
+        return this;
+    }
+
+    /**
+     * Adds handler to handler chain of route. Handlers are ordered, they are 
called in order they are set in chain.
+     *
+     * @param handler handler for route
+     * @return a reference to {@link AccessProtectedRouteBuilder} for chaining
+     */
+    public AccessProtectedRouteBuilder handler(Handler<RoutingContext> handler)
+    {
+        this.handlers.add(handler);
+        return this;
+    }
+
+    /**
+     * Builds an authorized route. Adds {@link 
io.vertx.ext.web.handler.AuthorizationHandler} at top of handler
+     * chain if access control is enabled.
+     */
+    public void build()
+    {
+        Preconditions.checkArgument(router != null, "Router must be set");
+        Preconditions.checkArgument(method != null, "Http method must be set");
+        Preconditions.checkArgument(endpoint != null && !endpoint.isEmpty(), 
"Endpoint must be set");
+        Preconditions.checkArgument(!handlers.isEmpty(), "Handler chain can 
not be empty");
+
+        Route route = router.route(method, endpoint);
+
+        // BodyHandler should be at index 0 in handler chain
+        if (setBodyHandler)
+        {
+            route.handler(BodyHandler.create());
+        }
+
+        if (accessControlConfiguration.enabled())
+        {
+            // authorization handler added before route specific handler chain
+            AuthorizationWithAdminBypassHandler authorizationHandler
+            = new AuthorizationWithAdminBypassHandler(adminIdentityResolver, 
requiredAuthorization());
+            
authorizationHandler.addAuthorizationProvider(authorizationProvider);
+            
authorizationHandler.variableConsumer(routeGenericVariableConsumer());
+
+            route.handler(authorizationHandler);
+        }
+        handlers.forEach(route::handler);
+    }
+
+    private Authorization requiredAuthorization()
+    {
+        AndAuthorization and = AndAuthorization.create();

Review Comment:
   Nit: `andAuthorization`? Reading only `and` can be confusing.
   
   Also, moving the creation after the `requiredAuthorization.isEmpty()` check 
will save us from creating it if an exception is going to be thrown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to