This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 1780f2660e4 KAFKA-14265: Prefix ACLs may shadow other prefix ACLs 1780f2660e4 is described below commit 1780f2660e4b45ca8895f1614a27613f922aad22 Author: Colin P. McCabe <cmcc...@apache.org> AuthorDate: Wed Sep 28 17:02:04 2022 -0700 KAFKA-14265: Prefix ACLs may shadow other prefix ACLs --- .../kafka/api/AuthorizerIntegrationTest.scala | 14 +++++- .../authorizer/StandardAuthorizerData.java | 55 +++++++++++++++++----- .../authorizer/StandardAuthorizerTest.java | 27 +++++++++++ 3 files changed, 83 insertions(+), 13 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index a109ae8ce4c..ff1b2f5934d 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -25,7 +25,7 @@ import kafka.security.authorizer.AclEntry.WildcardHost import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.{TestInfoUtils, TestUtils} import kafka.utils.TestUtils.waitUntilTrue -import org.apache.kafka.clients.admin.{Admin, AlterConfigOp} +import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, NewTopic} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.producer._ @@ -2619,4 +2619,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testPrefixAcls(quorum: String): Unit = { + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, CREATE, ALLOW)), + new ResourcePattern(TOPIC, "f", PREFIXED)) + addAndVerifyAcls(Set(new AccessControlEntry("User:otherPrincipal", WildcardHost, CREATE, DENY)), + new ResourcePattern(TOPIC, "fooa", PREFIXED)) + addAndVerifyAcls(Set(new AccessControlEntry("User:otherPrincipal", WildcardHost, CREATE, ALLOW)), + new ResourcePattern(TOPIC, "foob", PREFIXED)) + createAdminClient().createTopics(Collections. + singletonList(new NewTopic("foobar", 1, 1.toShort))).all().get() + } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java index c6e3b74a2ab..6e9efc3cd5d 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java @@ -352,8 +352,10 @@ public class StandardAuthorizerData { // This code relies on the ordering of StandardAcl within the NavigableMap. // Entries are sorted by resource type first, then REVERSE resource name. // Therefore, we can find all the applicable ACLs by starting at - // (resource_type, resource_name) and stepping forwards until we reach an ACL with - // a resource name which is not a prefix of the current one. + // (resource_type, resource_name) and stepping forwards until we reach + // an ACL with a resource name which is not a prefix of the current one. + // At that point, we need to search for if there are any more ACLs at + // the first divergence point. // // For example, when trying to authorize a TOPIC resource named foobar, we would // start at element 2, and continue on to 3 and 4 following map: @@ -362,9 +364,12 @@ public class StandardAuthorizerData { // 2. rs=TOPIC rn=foobar pt=PREFIX // 3. rs=TOPIC rn=foob pt=LITERAL // 4. rs=TOPIC rn=foo pt=PREFIX - // 5. rs=TOPIC rn=eeee pt=LITERAL + // 5. rs=TOPIC rn=fb pt=PREFIX + // 6. rs=TOPIC rn=fa pt=PREFIX + // 7. rs=TOPIC rn=f pt=PREFIX + // 8. rs=TOPIC rn=eeee pt=LITERAL // - // Once we reached element 5, we would stop scanning. + // Once we reached element 5, we would jump to element 7. MatchingAclBuilder matchingAclBuilder = new MatchingAclBuilder(); StandardAcl exemplar = new StandardAcl( action.resourcePattern().resourceType(), @@ -394,6 +399,20 @@ public class StandardAuthorizerData { return matchingAclBuilder.build(); } + static int matchesUpTo( + String resource, + String pattern + ) { + int i = 0; + while (true) { + if (resource.length() == i) break; + if (pattern.length() == i) break; + if (resource.charAt(i) != pattern.charAt(i)) break; + i++; + } + return i; + } + private void checkSection( Action action, StandardAcl exemplar, @@ -401,28 +420,40 @@ public class StandardAuthorizerData { String host, MatchingAclBuilder matchingAclBuilder ) { - NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true); String resourceName = action.resourcePattern().name(); - for (Iterator<StandardAcl> iterator = tailSet.iterator(); - iterator.hasNext(); ) { + NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true); + Iterator<StandardAcl> iterator = tailSet.iterator(); + while (iterator.hasNext()) { StandardAcl acl = iterator.next(); if (!acl.resourceType().equals(action.resourcePattern().resourceType())) { // We've stepped outside the section for the resource type we care about and // should stop scanning. break; } - if (resourceName.startsWith(acl.resourceName())) { - if (acl.patternType() == LITERAL && !resourceName.equals(acl.resourceName())) { + int matchesUpTo = matchesUpTo(resourceName, acl.resourceName()); + if (matchesUpTo == acl.resourceName().length()) { + if (acl.patternType() == LITERAL && matchesUpTo != resourceName.length()) { // This is a literal ACL whose name is a prefix of the resource name, but // which doesn't match it exactly. We should skip over this ACL, but keep // scanning in case there are any relevant PREFIX ACLs. continue; } + } else if (!(acl.resourceName().equals(WILDCARD) && acl.patternType() == LITERAL)) { // If the ACL resource name is NOT a prefix of the current resource name, // and we're not dealing with the special case of a wildcard ACL, we've - // stepped outside of the section we care about and should stop scanning. - break; + // stepped outside of the section we care about. Scan for any other potential + // prefix matches. + exemplar = new StandardAcl(exemplar.resourceType(), + exemplar.resourceName().substring(0, matchesUpTo), + exemplar.patternType(), + exemplar.principal(), + exemplar.host(), + exemplar.operation(), + exemplar.permissionType()); + tailSet = aclsByResource.tailSet(exemplar, true); + iterator = tailSet.iterator(); + continue; } AuthorizationResult result = findResult(action, matchingPrincipals, host, acl); if (ALLOWED == result) { @@ -625,4 +656,4 @@ public class StandardAuthorizerData { HashMap<Uuid, StandardAcl> getAclsById() { return aclsById; } -} +} \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java index 987c00155c4..a26eb3d50b5 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java @@ -640,4 +640,31 @@ public class StandardAuthorizerTest { assertTrue(futures.get(CONTROLLER).toCompletableFuture().isDone()); assertFalse(futures.get(CONTROLLER).toCompletableFuture().isCompletedExceptionally()); } + + @Test + public void testPrefixAcls() throws Exception { + StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer(); + List<StandardAcl> acls = Arrays.asList( + new StandardAcl(TOPIC, "fooa", PREFIXED, "User:alice", "*", ALL, ALLOW), + new StandardAcl(TOPIC, "foobar", LITERAL, "User:bob", "*", ALL, ALLOW), + new StandardAcl(TOPIC, "f", PREFIXED, "User:bob", "*", ALL, ALLOW) + ); + acls.forEach(acl -> { + StandardAclWithId aclWithId = withId(acl); + authorizer.addAcl(aclWithId.id(), aclWithId.acl()); + }); + assertEquals(Arrays.asList(ALLOWED, DENIED, ALLOWED), authorizer.authorize( + newRequestContext("bob"), + Arrays.asList( + newAction(WRITE, TOPIC, "foobarr"), + newAction(READ, TOPIC, "goobar"), + newAction(READ, TOPIC, "fooa")))); + + assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize( + newRequestContext("alice"), + Arrays.asList( + newAction(DESCRIBE, TOPIC, "fooa"), + newAction(WRITE, TOPIC, "bar"), + newAction(READ, TOPIC, "baz")))); + } }