m1a2st commented on code in PR #22261:
URL: https://github.com/apache/kafka/pull/22261#discussion_r3240245217


##########
server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java:
##########
@@ -0,0 +1,1069 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.internals.SecurityUtils;
+import org.apache.kafka.controller.MockAclMutator;
+import org.apache.kafka.metadata.authorizer.AuthorizerTestServerInfo;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.raft.KRaftConfigs;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.util.ServerTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.kafka.common.acl.AclOperation.ALL;
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.TWO_PHASE_COMMIT;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.acl.AclPermissionType.DENY;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.MATCH;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME;
+import static 
org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE;
+import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
+import static org.apache.kafka.common.resource.ResourceType.GROUP;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
+import static org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST;
+import static 
org.apache.kafka.security.authorizer.AclEntry.WILDCARD_PRINCIPAL_STRING;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AuthorizerTest {
+
+    private final Endpoint plaintext = new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "127.0.0.1", 9020);
+
+    private final AccessControlEntry allowReadAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, READ, ALLOW);
+    private final AccessControlEntry allowWriteAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW);
+    private final AccessControlEntry denyReadAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, READ, DENY);
+
+    private final ResourcePattern wildCardResource = new 
ResourcePattern(TOPIC, WILDCARD_RESOURCE, LITERAL);
+    private final ResourcePattern prefixedResource = new 
ResourcePattern(TOPIC, "foo", PREFIXED);
+    private final ResourcePattern clusterResource = new 
ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL);
+    private final KafkaPrincipal wildcardPrincipal = 
SecurityUtils.parseKafkaPrincipal(WILDCARD_PRINCIPAL_STRING);
+
+    private final String username = "alice";
+    private final KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, username);
+    private final List<Metrics> metricsInstances = new ArrayList<>();
+    private final List<PluginMetricsImpl> pluginMetricsInstances = new 
ArrayList<>();
+
+    private Authorizer authorizer;
+    private RequestContext requestContext;
+    private ResourcePattern resource;
+
+    static class CustomPrincipal extends KafkaPrincipal {
+
+        public CustomPrincipal(String principalType, String name) {
+            super(principalType, name);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }
+    }
+
+    @BeforeEach
+    public void setup() throws Exception {
+        requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"));
+        authorizer = createAuthorizer(configs());
+        resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), 
LITERAL);
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        authorizer.close();
+        for (PluginMetricsImpl pluginMetric : pluginMetricsInstances) {
+            pluginMetric.close();
+        }
+        for (Metrics metrics : metricsInstances) {
+            metrics.close();
+        }
+    }
+
+    private Map<String, Object> configs() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(StandardAuthorizer.SUPER_USERS_CONFIG, "User:superuser1; 
User:superuser2");
+        configs.put(KRaftConfigs.NODE_ID_CONFIG, "0");
+        return configs;
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeMultipleAddAndRemove() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        AccessControlEntry denyRead = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry allowRead = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+
+        for (int i = 0; i < 10; i++) {
+            assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),

Review Comment:
   We can just use `TOPIC` since `ResourceType` is already statically imported. 
This comment should be updated throughout the file.



##########
server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java:
##########
@@ -0,0 +1,1069 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.internals.SecurityUtils;
+import org.apache.kafka.controller.MockAclMutator;
+import org.apache.kafka.metadata.authorizer.AuthorizerTestServerInfo;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.raft.KRaftConfigs;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.util.ServerTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.kafka.common.acl.AclOperation.ALL;
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.TWO_PHASE_COMMIT;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.acl.AclPermissionType.DENY;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.MATCH;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.common.resource.Resource.CLUSTER_NAME;
+import static 
org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE;
+import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
+import static org.apache.kafka.common.resource.ResourceType.GROUP;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
+import static org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST;
+import static 
org.apache.kafka.security.authorizer.AclEntry.WILDCARD_PRINCIPAL_STRING;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AuthorizerTest {
+
+    private final Endpoint plaintext = new Endpoint("PLAINTEXT", 
SecurityProtocol.PLAINTEXT, "127.0.0.1", 9020);
+
+    private final AccessControlEntry allowReadAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, READ, ALLOW);
+    private final AccessControlEntry allowWriteAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW);
+    private final AccessControlEntry denyReadAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, READ, DENY);
+
+    private final ResourcePattern wildCardResource = new 
ResourcePattern(TOPIC, WILDCARD_RESOURCE, LITERAL);
+    private final ResourcePattern prefixedResource = new 
ResourcePattern(TOPIC, "foo", PREFIXED);
+    private final ResourcePattern clusterResource = new 
ResourcePattern(CLUSTER, CLUSTER_NAME, LITERAL);
+    private final KafkaPrincipal wildcardPrincipal = 
SecurityUtils.parseKafkaPrincipal(WILDCARD_PRINCIPAL_STRING);
+
+    private final String username = "alice";
+    private final KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, username);
+    private final List<Metrics> metricsInstances = new ArrayList<>();
+    private final List<PluginMetricsImpl> pluginMetricsInstances = new 
ArrayList<>();
+
+    private Authorizer authorizer;
+    private RequestContext requestContext;
+    private ResourcePattern resource;
+
+    static class CustomPrincipal extends KafkaPrincipal {
+
+        public CustomPrincipal(String principalType, String name) {
+            super(principalType, name);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }
+    }
+
+    @BeforeEach
+    public void setup() throws Exception {
+        requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"));
+        authorizer = createAuthorizer(configs());
+        resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), 
LITERAL);
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        authorizer.close();
+        for (PluginMetricsImpl pluginMetric : pluginMetricsInstances) {
+            pluginMetric.close();
+        }
+        for (Metrics metrics : metricsInstances) {
+            metrics.close();
+        }
+    }
+
+    private Map<String, Object> configs() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(StandardAuthorizer.SUPER_USERS_CONFIG, "User:superuser1; 
User:superuser2");
+        configs.put(KRaftConfigs.NODE_ID_CONFIG, "0");
+        return configs;
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeMultipleAddAndRemove() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        AccessControlEntry denyRead = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry allowRead = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+
+        for (int i = 0; i < 10; i++) {
+            assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                    "User1 from host1 should not have READ access to any topic 
when no ACL exists");
+
+            addAcls(authorizer, Set.of(allowRead), resource1);
+            assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                    "User1 from host1 now should have READ access to at least 
one topic");
+
+            for (int j = 0; j < 10; j++) {
+                addAcls(authorizer, Set.of(denyRead), resource1);
+                assertFalse(authorizeByResourceType(authorizer, u1h1Context, 
READ, ResourceType.TOPIC),
+                        "User1 from host1 now should not have READ access to 
any topic");
+
+                removeAcls(authorizer, Set.of(denyRead), resource1);
+                addAcls(authorizer, Set.of(allowRead), resource1);
+                assertTrue(authorizeByResourceType(authorizer, u1h1Context, 
READ, ResourceType.TOPIC),
+                        "User1 from host1 now should have READ access to at 
least one topic");
+            }
+
+            removeAcls(authorizer, Set.of(allowRead), resource1);
+            assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                    "User1 from host1 now should not have READ access to any 
topic");
+        }
+    }
+
+    @Test
+    public void 
testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user2");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        InetAddress host2 = InetAddress.getByName("192.168.1.2");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        ResourcePattern resource2 = new ResourcePattern(TOPIC, "sb2" + 
UUID.randomUUID(), LITERAL);
+        ResourcePattern resource3 = new ResourcePattern(GROUP, "s", PREFIXED);
+
+        AccessControlEntry acl1 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry acl2 = new AccessControlEntry(user2.toString(), 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry acl3 = new AccessControlEntry(user1.toString(), 
host2.getHostAddress(), WRITE, DENY);
+        AccessControlEntry acl4 = new AccessControlEntry(user1.toString(), 
host2.getHostAddress(), READ, DENY);
+        AccessControlEntry acl5 = new AccessControlEntry(user1.toString(), 
host2.getHostAddress(), READ, DENY);
+        AccessControlEntry acl6 = new AccessControlEntry(user2.toString(), 
host2.getHostAddress(), READ, DENY);
+        AccessControlEntry acl7 = new AccessControlEntry(user1.toString(), 
host2.getHostAddress(), READ, ALLOW);
+
+        addAcls(authorizer, Set.of(acl1, acl2, acl3, acl6, acl7), resource1);
+        addAcls(authorizer, Set.of(acl4), resource2);
+        addAcls(authorizer, Set.of(acl5), resource3);
+
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        RequestContext u1h2Context = newRequestContext(user1, host2);
+
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 should not have READ access to any topic");
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 should not have READ access to any consumer 
group");
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
TRANSACTIONAL_ID),
+                "User1 from host1 should not have READ access to any 
transactional id");
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
CLUSTER),
+                "User1 from host1 should not have READ access to the cluster");
+        assertTrue(authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC),
+                "User1 from host2 should have READ access to at least one 
topic");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeDenyTakesPrecedence() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        AccessControlEntry acl1 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, ALLOW);
+        AccessControlEntry acl2 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, DENY);
+
+        addAcls(authorizer, Set.of(acl1), resource1);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.TOPIC),
+                "User1 from host1 should have WRITE access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(acl2), resource1);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
ResourceType.TOPIC),
+                "User1 from host1 should not have WRITE access to any topic");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypePrefixedResourceDenyDominate() 
throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern a = new ResourcePattern(GROUP, "a", PREFIXED);
+        ResourcePattern ab = new ResourcePattern(GROUP, "ab", PREFIXED);
+        ResourcePattern abc = new ResourcePattern(GROUP, "abc", PREFIXED);
+        ResourcePattern abcd = new ResourcePattern(GROUP, "abcd", PREFIXED);
+        ResourcePattern abcde = new ResourcePattern(GROUP, "abcde", PREFIXED);
+
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        AccessControlEntry allowAce = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, ALLOW);
+        AccessControlEntry denyAce = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, DENY);
+
+        addAcls(authorizer, Set.of(allowAce), abcde);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 should have READ access to at least one 
group");
+
+        addAcls(authorizer, Set.of(denyAce), abcd);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 now should not have READ access to any 
group");
+
+        addAcls(authorizer, Set.of(allowAce), abc);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 now should have READ access to any group");
+
+        addAcls(authorizer, Set.of(denyAce), a);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 now should not have READ access to any 
group");
+
+        addAcls(authorizer, Set.of(allowAce), ab);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
GROUP),
+                "User1 from host1 still should not have READ access to any 
group");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeWildcardResourceDenyDominate() 
throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern wildcard = new ResourcePattern(GROUP, 
WILDCARD_RESOURCE, LITERAL);
+        ResourcePattern prefixed = new ResourcePattern(GROUP, "hello", 
PREFIXED);
+        ResourcePattern literal = new ResourcePattern(GROUP, "aloha", LITERAL);
+
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        AccessControlEntry allowAce = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, ALLOW);
+        AccessControlEntry denyAce = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, DENY);
+
+        addAcls(authorizer, Set.of(allowAce), prefixed);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
GROUP),
+                "User1 from host1 should have WRITE access to at least one 
group");
+
+        addAcls(authorizer, Set.of(denyAce), wildcard);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
GROUP),
+                "User1 from host1 now should not have WRITE access to any 
group");
+
+        addAcls(authorizer, Set.of(allowAce), wildcard);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
GROUP),
+                "User1 from host1 still should not have WRITE access to any 
group");
+
+        addAcls(authorizer, Set.of(allowAce), literal);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, WRITE, 
GROUP),
+                "User1 from host1 still should not have WRITE access to any 
group");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeWithAllOperationAce() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        AccessControlEntry denyAll = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), ALL, DENY);
+        AccessControlEntry allowAll = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), ALL, ALLOW);
+        AccessControlEntry denyWrite = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), WRITE, DENY);
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 should not have READ access to any topic 
when no ACL exists");
+
+        addAcls(authorizer, Set.of(denyWrite, allowAll), resource1);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 now should have READ access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(denyAll), resource1);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 now should not have READ access to any 
topic");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeWithAllHostAce() throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        InetAddress host2 = InetAddress.getByName("192.168.1.2");
+        String allHost = WILDCARD_HOST;
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        ResourcePattern resource2 = new ResourcePattern(TOPIC, "sb2" + 
UUID.randomUUID(), LITERAL);
+        AccessControlEntry allowHost1 = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+        AccessControlEntry denyHost1 = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, DENY);
+        AccessControlEntry denyAllHost = new 
AccessControlEntry(user1.toString(), allHost, READ, DENY);
+        AccessControlEntry allowAllHost = new 
AccessControlEntry(user1.toString(), allHost, READ, ALLOW);
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        RequestContext u1h2Context = newRequestContext(user1, host2);
+
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 should not have READ access to any topic 
when no ACL exists");
+
+        addAcls(authorizer, Set.of(allowHost1), resource1);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 should now have READ access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(denyAllHost), resource1);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 now shouldn't have READ access to any 
topic");
+
+        addAcls(authorizer, Set.of(denyHost1), resource2);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 still should not have READ access to any 
topic");
+        assertFalse(authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC),
+                "User1 from host2 should not have READ access to any topic");
+
+        addAcls(authorizer, Set.of(allowAllHost), resource2);
+        assertTrue(authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC),
+                "User1 from host2 should now have READ access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(denyAllHost), resource2);
+        assertFalse(authorizeByResourceType(authorizer, u1h2Context, READ, 
ResourceType.TOPIC),
+                "User1 from host2 now shouldn't have READ access to any 
topic");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeWithAllPrincipalAce() throws 
Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user1");
+        KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"user2");
+        String allUser = WILDCARD_PRINCIPAL_STRING;
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        ResourcePattern resource1 = new ResourcePattern(TOPIC, "sb1" + 
UUID.randomUUID(), LITERAL);
+        ResourcePattern resource2 = new ResourcePattern(TOPIC, "sb2" + 
UUID.randomUUID(), LITERAL);
+        AccessControlEntry allowUser1 = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+        AccessControlEntry denyUser1 = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, DENY);
+        AccessControlEntry denyAllUser = new AccessControlEntry(allUser, 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry allowAllUser = new AccessControlEntry(allUser, 
host1.getHostAddress(), READ, ALLOW);
+        RequestContext u1h1Context = newRequestContext(user1, host1);
+        RequestContext u2h1Context = newRequestContext(user2, host1);
+
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 should not have READ access to any topic 
when no ACL exists");
+
+        addAcls(authorizer, Set.of(allowUser1), resource1);
+        assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 should now have READ access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(denyAllUser), resource1);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 now shouldn't have READ access to any 
topic");
+
+        addAcls(authorizer, Set.of(denyUser1), resource2);
+        assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, 
ResourceType.TOPIC),
+                "User1 from host1 still should not have READ access to any 
topic");
+        assertFalse(authorizeByResourceType(authorizer, u2h1Context, READ, 
ResourceType.TOPIC),
+                "User2 from host1 should not have READ access to any topic");
+
+        addAcls(authorizer, Set.of(allowAllUser), resource2);
+        assertTrue(authorizeByResourceType(authorizer, u2h1Context, READ, 
ResourceType.TOPIC),
+                "User2 from host1 should now have READ access to at least one 
topic");
+
+        addAcls(authorizer, Set.of(denyAllUser), resource2);
+        assertFalse(authorizeByResourceType(authorizer, u2h1Context, READ, 
ResourceType.TOPIC),
+                "User2 from host1 now shouldn't have READ access to any 
topic");
+    }
+
+    @Test
+    public void testAuthorizeByResourceTypeSuperUserHasAccess() throws 
Exception {
+        AccessControlEntry denyAllAce = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, 
DENY);
+        String superUserName = "superuser1";
+        KafkaPrincipal superUser1 = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, superUserName);
+        InetAddress host1 = InetAddress.getByName("192.0.4.4");
+        ResourcePattern allTopicsResource = new ResourcePattern(TOPIC, 
WILDCARD_RESOURCE, LITERAL);
+        ResourcePattern clusterResource = new ResourcePattern(CLUSTER, 
WILDCARD_RESOURCE, LITERAL);
+        ResourcePattern groupResource = new ResourcePattern(GROUP, 
WILDCARD_RESOURCE, LITERAL);
+        ResourcePattern transactionIdResource = new 
ResourcePattern(TRANSACTIONAL_ID, WILDCARD_RESOURCE, LITERAL);
+
+        addAcls(authorizer, Set.of(denyAllAce), allTopicsResource);
+        addAcls(authorizer, Set.of(denyAllAce), clusterResource);
+        addAcls(authorizer, Set.of(denyAllAce), groupResource);
+        addAcls(authorizer, Set.of(denyAllAce), transactionIdResource);
+
+        RequestContext superUserContext = newRequestContext(superUser1, host1);
+
+        assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
ResourceType.TOPIC),
+                "superuser always has access, no matter what acls.");
+        assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
CLUSTER),
+                "superuser always has access, no matter what acls.");
+        assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
GROUP),
+                "superuser always has access, no matter what acls.");
+        assertTrue(authorizeByResourceType(authorizer, superUserContext, READ, 
TRANSACTIONAL_ID),
+                "superuser always has access, no matter what acls.");
+    }
+
+    @Test
+    public void testAuthorizeThrowsOnNonLiteralResource() {
+        assertThrows(IllegalArgumentException.class, () -> 
authorize(authorizer, requestContext, READ,
+                new ResourcePattern(TOPIC, "something", PREFIXED)));
+    }
+
+    @Test
+    public void testAuthorizeWithEmptyResourceName() throws Exception {
+        assertFalse(authorize(authorizer, requestContext, READ, new 
ResourcePattern(GROUP, "", LITERAL)));
+        addAcls(authorizer, Set.of(allowReadAcl), new ResourcePattern(GROUP, 
WILDCARD_RESOURCE, LITERAL));
+        assertTrue(authorize(authorizer, requestContext, READ, new 
ResourcePattern(GROUP, "", LITERAL)));
+    }
+
+    // Authorizing the empty resource is not supported because empty resource 
name is invalid.
+    @Test
+    public void testEmptyAclThrowsException() {
+        assertThrows(ApiException.class,
+                () -> addAcls(authorizer, Set.of(allowReadAcl), new 
ResourcePattern(GROUP, "", LITERAL)));
+    }
+
+    @Test
+    public void testTopicAcl() throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"rob");
+        KafkaPrincipal user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"batman");
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        InetAddress host2 = InetAddress.getByName("192.168.1.2");
+
+        //user1 has READ access from host1 and host2.
+        AccessControlEntry acl1 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, ALLOW);
+        AccessControlEntry acl2 = new AccessControlEntry(user1.toString(), 
host2.getHostAddress(), READ, ALLOW);
+
+        //user1 does not have  READ access from host1.
+        AccessControlEntry acl3 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, DENY);
+
+        //user1 has WRITE access from host1 only.
+        AccessControlEntry acl4 = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, ALLOW);
+
+        //user1 has DESCRIBE access from all hosts.
+        AccessControlEntry acl5 = new AccessControlEntry(user1.toString(), 
WILDCARD_HOST, DESCRIBE, ALLOW);
+
+        //user2 has READ access from all hosts.
+        AccessControlEntry acl6 = new AccessControlEntry(user2.toString(), 
WILDCARD_HOST, READ, ALLOW);
+
+        //user3 has WRITE access from all hosts.
+        AccessControlEntry acl7 = new AccessControlEntry(user3.toString(), 
WILDCARD_HOST, WRITE, ALLOW);
+
+        Set<AccessControlEntry> acls = Set.of(acl1, acl2, acl3, acl4, acl5, 
acl6, acl7);
+
+        changeAclAndVerify(Set.of(), acls, Set.of());
+
+        RequestContext host1Context = newRequestContext(user1, host1);
+        RequestContext host2Context = newRequestContext(user1, host2);
+
+        assertTrue(authorize(authorizer, host2Context, READ, resource), "User1 
should have READ access from host2");
+        assertFalse(authorize(authorizer, host1Context, READ, resource), 
"User1 should not have READ access from host1 due to denyAcl");
+        assertTrue(authorize(authorizer, host1Context, WRITE, resource), 
"User1 should have WRITE access from host1");
+        assertFalse(authorize(authorizer, host2Context, WRITE, resource), 
"User1 should not have WRITE access from host2 as no allow acl is defined");
+        assertTrue(authorize(authorizer, host1Context, DESCRIBE, resource), 
"User1 should have DESCRIBE access from host1");
+        assertTrue(authorize(authorizer, host2Context, DESCRIBE, resource), 
"User1 should have DESCRIBE access from host2");
+        assertFalse(authorize(authorizer, host1Context, ALTER, resource), 
"User1 should not have edit access from host1");
+        assertFalse(authorize(authorizer, host2Context, ALTER, resource), 
"User1 should not have edit access from host2");
+
+        //test if user has READ or WRITE access they also get DESCRIBE access
+        RequestContext user2Context = newRequestContext(user2, host1);
+        RequestContext user3Context = newRequestContext(user3, host1);
+        assertTrue(authorize(authorizer, user2Context, DESCRIBE, resource), 
"User2 should have DESCRIBE access from host1");
+        assertTrue(authorize(authorizer, user3Context, DESCRIBE, resource), 
"User3 should have DESCRIBE access from host1");
+        assertTrue(authorize(authorizer, user2Context, READ, resource), "User2 
should have READ access from host1");
+        assertTrue(authorize(authorizer, user3Context, WRITE, resource), 
"User3 should have WRITE access from host1");
+    }
+
+    /**
+     * CustomPrincipals should be compared with their principal type and name
+     */
+    @Test
+    public void testAllowAccessWithCustomPrincipal() throws Exception {
+        KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        CustomPrincipal customUserPrincipal = new 
CustomPrincipal(KafkaPrincipal.USER_TYPE, username);
+        InetAddress host1 = InetAddress.getByName("192.168.1.1");
+        InetAddress host2 = InetAddress.getByName("192.168.1.2");
+
+        // user has READ access from host2 but not from host1
+        AccessControlEntry acl1 = new AccessControlEntry(user.toString(), 
host1.getHostAddress(), READ, DENY);
+        AccessControlEntry acl2 = new AccessControlEntry(user.toString(), 
host2.getHostAddress(), READ, ALLOW);
+        Set<AccessControlEntry> acls = Set.of(acl1, acl2);
+        changeAclAndVerify(Set.of(), acls, Set.of());
+
+        RequestContext host1Context = newRequestContext(customUserPrincipal, 
host1);
+        RequestContext host2Context = newRequestContext(customUserPrincipal, 
host2);
+
+        assertTrue(authorize(authorizer, host2Context, READ, resource), "User1 
should have READ access from host2");
+        assertFalse(authorize(authorizer, host1Context, READ, resource), 
"User1 should not have READ access from host1 due to denyAcl");
+    }
+
+    @Test
+    public void testDenyTakesPrecedence() throws Exception {
+        KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        InetAddress host = InetAddress.getByName("192.168.2.1");
+        RequestContext session = newRequestContext(user, host);
+
+        AccessControlEntry allowAll = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, 
ALLOW);
+        AccessControlEntry denyAcl = new AccessControlEntry(user.toString(), 
host.getHostAddress(), AclOperation.ALL, DENY);
+        Set<AccessControlEntry> acls = Set.of(allowAll, denyAcl);
+
+        changeAclAndVerify(Set.of(), acls, Set.of());
+
+        assertFalse(authorize(authorizer, session, READ, resource), "deny 
should take precedence over allow.");
+    }
+
+    @Test
+    public void testAllowAllAccess() throws Exception {
+        AccessControlEntry allowAllAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, 
ALLOW);
+
+        changeAclAndVerify(Set.of(), Set.of(allowAllAcl), Set.of());
+
+        RequestContext context = newRequestContext(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), 
InetAddress.getByName("192.0.4.4"));
+        assertTrue(authorize(authorizer, context, READ, resource), "allow all 
acl should allow access to all.");
+    }
+
+    @Test
+    public void testSuperUserHasAccess() throws Exception {
+        AccessControlEntry denyAllAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, 
DENY);
+
+        changeAclAndVerify(Set.of(), Set.of(denyAllAcl), Set.of());
+
+        RequestContext session1 = newRequestContext(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), 
InetAddress.getByName("192.0.4.4"));
+        RequestContext session2 = newRequestContext(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), 
InetAddress.getByName("192.0.4.4"));
+
+        assertTrue(authorize(authorizer, session1, READ, resource), "superuser 
always has access, no matter what acls.");
+        assertTrue(authorize(authorizer, session2, READ, resource), "superuser 
always has access, no matter what acls.");
+    }
+
+    /**
+     * CustomPrincipals should be compared with their principal type and name
+     */
+    @Test
+    public void testSuperUserWithCustomPrincipalHasAccess() throws Exception {
+        AccessControlEntry denyAllAcl = new 
AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, 
DENY);
+        changeAclAndVerify(Set.of(), Set.of(denyAllAcl), Set.of());
+
+        RequestContext session = newRequestContext(new 
CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), 
InetAddress.getByName("192.0.4.4"));
+
+        assertTrue(authorize(authorizer, session, READ, resource), "superuser 
with custom principal always has access, no matter what acls.");
+    }
+
+    @Test
+    public void testWildCardAcls() throws Exception {
+        assertFalse(authorize(authorizer, requestContext, READ, resource), 
"when acls = [], authorizer should fail close.");
+
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        InetAddress host1 = InetAddress.getByName("192.168.3.1");
+        AccessControlEntry readAcl = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), READ, ALLOW);
+
+        Set<AccessControlEntry> acls = changeAclAndVerify(Set.of(), 
Set.of(readAcl), Set.of(), wildCardResource);
+
+        RequestContext host1Context = newRequestContext(user1, host1);
+        assertTrue(authorize(authorizer, host1Context, READ, resource), "User1 
should have READ access from host1");
+
+        //allow WRITE to specific topic.
+        AccessControlEntry writeAcl = new AccessControlEntry(user1.toString(), 
host1.getHostAddress(), WRITE, ALLOW);
+        changeAclAndVerify(Set.of(), Set.of(writeAcl), Set.of());
+
+        //deny WRITE to wild card topic.
+        AccessControlEntry denyWriteOnWildCardResourceAcl = new 
AccessControlEntry(user1.toString(), host1.getHostAddress(), WRITE, DENY);
+        changeAclAndVerify(acls, Set.of(denyWriteOnWildCardResourceAcl), 
Set.of(), wildCardResource);
+
+        assertFalse(authorize(authorizer, host1Context, WRITE, resource), 
"User1 should not have WRITE access from host1");
+    }
+
+    @Test
+    public void testNoAclFound() {
+        assertFalse(authorize(authorizer, requestContext, READ, resource), 
"when acls = [], authorizer should deny op.");
+    }
+
+    @Test
+    public void testNoAclFoundOverride() throws IOException {
+        Map<String, Object> cfg = configs();
+        cfg.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, 
"true");
+
+        try (Authorizer testAuthorizer = createAuthorizer(cfg)) {
+            assertTrue(authorize(testAuthorizer, requestContext, READ, 
resource),
+                    "when acls = null or [],  authorizer should allow op with 
allow.everyone = true.");
+        }
+    }
+
+    @Test
+    public void testAclConfigWithWhitespace() throws IOException {
+        Map<String, Object> cfg = configs();
+        cfg.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, " 
true");
+        // replace all property values with leading & trailing whitespaces
+        cfg.replaceAll((k, v) -> " " + v + " ");
+
+        try (Authorizer testAuthorizer = createAuthorizer(cfg)) {
+            assertTrue(authorize(testAuthorizer, requestContext, READ, 
resource),
+                    "when acls = null or [],  authorizer should allow op with 
allow.everyone = true.");
+        }
+    }
+
+    @Test
+    public void testAclManagementAPIs() throws Exception {
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"bob");
+        String host1 = "host1";
+        String host2 = "host2";
+
+        AccessControlEntry acl1 = new AccessControlEntry(user1.toString(), 
host1, READ, ALLOW);
+        AccessControlEntry acl2 = new AccessControlEntry(user1.toString(), 
host1, WRITE, ALLOW);
+        AccessControlEntry acl3 = new AccessControlEntry(user2.toString(), 
host2, READ, ALLOW);
+        AccessControlEntry acl4 = new AccessControlEntry(user2.toString(), 
host2, WRITE, ALLOW);
+
+        Set<AccessControlEntry> acls = changeAclAndVerify(Set.of(), 
Set.of(acl1, acl2, acl3, acl4), Set.of());
+
+        //test addAcl is additive
+        AccessControlEntry acl5 = new AccessControlEntry(user2.toString(), 
WILDCARD_HOST, READ, ALLOW);
+        acls = changeAclAndVerify(acls, Set.of(acl5), Set.of());
+
+        //test get by principal name.
+        TestUtils.waitForCondition(() -> Set.of(acl1, acl2).stream().map(acl 
-> new AclBinding(resource, 
acl)).collect(Collectors.toSet()).equals(getAcls(authorizer, user1)),
+                "changes not propagated in timeout period");
+        TestUtils.waitForCondition(() -> Set.of(acl3, acl4, 
acl5).stream().map(acl -> new AclBinding(resource, 
acl)).collect(Collectors.toSet()).equals(getAcls(authorizer, user2)),
+                "changes not propagated in timeout period");
+
+        Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = Map.of(
+                new ResourcePattern(TOPIC, WILDCARD_RESOURCE, LITERAL), 
Set.of(new AccessControlEntry(user2.toString(), WILDCARD_HOST, READ, ALLOW)),
+                new ResourcePattern(CLUSTER, WILDCARD_RESOURCE, LITERAL), 
Set.of(new AccessControlEntry(user2.toString(), host1, READ, ALLOW)),
+                new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL), acls,
+                new ResourcePattern(GROUP, "test-ConsumerGroup", LITERAL), acls
+        );
+
+        for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry : 
resourceToAcls.entrySet()) {
+            ResourcePattern key = entry.getKey();
+            Set<AccessControlEntry> value = entry.getValue();
+            changeAclAndVerify(Set.of(), value, Set.of(), key);
+        }
+
+        Set<AclBinding> expectedAcls = new HashSet<>();
+        resourceToAcls.forEach((res, aces) ->
+            aces.forEach(ace -> expectedAcls.add(new AclBinding(res, ace)))
+        );
+        acls.forEach(acl -> expectedAcls.add(new AclBinding(resource, acl)));
+        TestUtils.waitForCondition(() -> 
expectedAcls.equals(getAcls(authorizer)), "changes not propagated in timeout 
period.");
+
+        //test remove acl from existing acls.
+        changeAclAndVerify(acls, Set.of(), Set.of(acl1, acl5));
+
+        //test remove all acls for resource
+        removeAcls(authorizer, Set.of(), resource);
+        ServerTestUtils.waitAndVerifyAcls(Set.of(), authorizer, resource, 
AccessControlEntryFilter.ANY);
+
+        acls = changeAclAndVerify(Set.of(), Set.of(acl1), Set.of());
+        changeAclAndVerify(acls, Set.of(), acls);
+    }
+
+    @Test
+    public void testLocalConcurrentModificationOfResourceAcls() throws 
Exception {
+        ResourcePattern commonResource = new ResourcePattern(TOPIC, "test", 
LITERAL);
+
+        KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username);
+        AccessControlEntry acl1 = new AccessControlEntry(user1.toString(), 
WILDCARD_HOST, READ, ALLOW);
+
+        KafkaPrincipal user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"bob");
+        AccessControlEntry acl2 = new AccessControlEntry(user2.toString(), 
WILDCARD_HOST, READ, DENY);
+
+        addAcls(authorizer, Set.of(acl1), commonResource);
+        addAcls(authorizer, Set.of(acl2), commonResource);
+
+        ServerTestUtils.waitAndVerifyAcls(Set.of(acl1, acl2), authorizer, 
commonResource, AccessControlEntryFilter.ANY);
+    }
+
+    /**
+     * Test ACL inheritance, as described in 
#{org.apache.kafka.common.acl.AclOperation}
+     */

Review Comment:
   ```suggestion
       /**
        * Test ACL inheritance, as described in {@link 
org.apache.kafka.common.acl.AclOperation}
        */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to