rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r542830098
########## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ########## @@ -139,4 +151,126 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable<AclBinding> acls(AclBindingFilter filter); + + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * 1. Filter out all the resource pattern corresponding to the requestContext, AclOperation, + * and ResourceType + * 2. If wildcard deny exists, return deny directly + * 3. For any literal allowed resource, if there's no dominant literal denied resource, and + * no dominant prefixed denied resource, return allow + * 4. For any prefixed allowed resource, if there's no dominant denied resource, return allow + * 5. For any other cases, return deny Review comment: Since this is the javadoc of a public API, we should move the details on how the default implementation works outside of the javadoc. We can move this list of comments inside the method. ########## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ########## @@ -139,4 +151,126 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable<AclBinding> acls(AclBindingFilter filter); + + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * 1. Filter out all the resource pattern corresponding to the requestContext, AclOperation, + * and ResourceType + * 2. If wildcard deny exists, return deny directly + * 3. For any literal allowed resource, if there's no dominant literal denied resource, and + * no dominant prefixed denied resource, return allow + * 4. For any prefixed allowed resource, if there's no dominant denied resource, return allow + * 5. For any other cases, return deny + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + * which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ + default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); Review comment: We don't currently have anything in the default implementation to support super.users right? Unlike `allow.everyone.if.no.acl.found` which is not particularly suitable for production use, `super.users` is a commonly used config that is likely to be in use in a lot of deployments. The simplest fix may be to `authorize()` with a hard-coded name and return ALLOWED if `authorize()` returns ALLOWED before any of the logic below is executed. ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ########## @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.net.InetAddress + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{RequestContext, RequestHeader} +import org.apache.kafka.common.resource.{ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.{After, Before, Test} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness { + + private val interfaceDefaultAuthorizer = new DelegateAuthorizer + private val superUsers = "User:superuser1; User:superuser2" + private val username = "alice" + private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + private val requestContext = newRequestContext(principal, InetAddress.getByName("192.168.0.1")) + private var config: KafkaConfig = _ + private var zooKeeperClient: ZooKeeperClient = _ + private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], ResourcePattern)] = ArrayBuffer() + private val authorizerTestFactory = new AuthorizerTestFactory( + newRequestContext, addAcls, authorizeByResourceType, removeAcls) + + class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) { + override def equals(o: scala.Any): Boolean = false + } + + @Before + override def setUp(): Unit = { + super.setUp() + + val authorizers = Seq(interfaceDefaultAuthorizer.authorizer) + + // Increase maxUpdateRetries to avoid transient failures + authorizers.foreach(a => a.maxUpdateRetries = Int.MaxValue) + + val props = TestUtils.createBrokerConfig(0, zkConnect) + props.put(AclAuthorizer.SuperUsersProp, superUsers) + + config = KafkaConfig.fromProps(props) + authorizers.foreach(a => a.configure(config.originals)) + + zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, + Time.SYSTEM, "kafka.test", "AclAuthorizerTest") + } + + @After + override def tearDown(): Unit = { + val authorizers = Seq(interfaceDefaultAuthorizer) + authorizers.foreach(a => { + a.acls(AclBindingFilter.ANY).forEach(bd => { + removeAcls(interfaceDefaultAuthorizer, Set(bd.entry), bd.pattern()) Review comment: As in the other class, we don't need this in tearDown ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ########## @@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { Review comment: Why? ########## File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourceType; + +import java.util.Objects; + +public class ResourceIndex { Review comment: For Map, you would say `key` rather than `index`. But this is not a `resource` or `resourceName` - it has no resource name, it is not a filter, but it includes AccessControlEntry. Maybe just ResourceTypeKey is sufficient, but you could also include something to indicate it includes the AccessControlEntry if you want. Either way, putting it along with AclAuthorizer would make naming less critical. ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ########## @@ -100,8 +106,15 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { @After override def tearDown(): Unit = { - aclAuthorizer.close() - aclAuthorizer2.close() + val authorizers = Seq(aclAuthorizer, aclAuthorizer2) + authorizers.foreach(a => { + a.acls(AclBindingFilter.ANY).forEach(bd => { + removeAcls(aclAuthorizer, Set(bd.entry), bd.pattern()) Review comment: ZK is reinstantiated for every test. ########## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ########## @@ -71,15 +74,22 @@ object AuthorizerWrapper { } def convertToResource(resourcePattern: ResourcePattern): Resource = { - Resource(ResourceType.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) + Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) } } @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { + var shouldAllowEveryoneIfNoAclIsFound = false + override def configure(configs: util.Map[String, _]): Unit = { baseAuthorizer.configure(configs) + shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( + AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + && baseAuthorizer.authorize( + new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getByName("1.2.3.4")), + Read, new Resource(Topic, "hi", PatternType.LITERAL))) Review comment: This is too hacky. And it breaks if ANONYMOUS has all access (e.g. because inter-broker listener alone uses PLAINTEXT). We could check `baseAuthorizer.isInstanceOf[SimpleAclAuthorizer]` perhaps. It is not perfect since it would break if there was a custom authorizer that extended SimpleAclAuthorizer, but doesn't support AllowEveryoneIfNoAclIsFoundProp and the prop was set to true. But that seems like an unlikely scenario. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +308,131 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString Review comment: We need to check if the principal is a super.user and return ALLOWED for super users before executing any of the logic below. ########## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ########## @@ -175,4 +185,39 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + if (denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.DENIED + } else if (shouldAllowEveryoneIfNoAclIsFound) { + AuthorizationResult.ALLOWED + } else { + super.authorizeByResourceType(requestContext, op, resourceType) + } + } + + private def denyAllResource(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): Boolean = { + val resourceTypeFilter = new ResourcePatternFilter( + resourceType, Resource.WildCardResource, PatternType.LITERAL) + val principal = new KafkaPrincipal(requestContext.principal.getPrincipalType, requestContext.principal.getName) + val host = requestContext.clientAddress().getHostAddress + val accessControlEntry = new AccessControlEntryFilter(null, null, op, AclPermissionType.DENY) + val aclFilter = new AclBindingFilter(resourceTypeFilter, accessControlEntry) + + acls(aclFilter).asScala.exists(b => principalHostMatch(b.entry(), principal, host)) + } + + private def principalHostMatch(ace: AccessControlEntry, + principal: KafkaPrincipal, + host: String): Boolean = { + ((ace.host() == AclEntry.WildcardHost || ace.host() == host) + && (ace.principal() == AclEntry.WildcardPrincipalString || ace.principal() == principal.toString)) Review comment: We could have done principal.toString() once in the caller rather than convert everytime. ########## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ########## @@ -175,4 +185,39 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + if (denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.DENIED + } else if (shouldAllowEveryoneIfNoAclIsFound) { + AuthorizationResult.ALLOWED + } else { + super.authorizeByResourceType(requestContext, op, resourceType) + } Review comment: This sequence doesn't work with super.users. We probably should do something like: ``` if (super.authorizeByResourceType(requestContext, op, resourceType) == AuthorizationResult.ALLOWED) AuthorizationResult.ALLOWED else if (denyAllResource(requestContext, op, resourceType) || !shouldAllowEveryoneIfNoAclIsFound) AuthorizationResult.DENIED else AuthorizationResult.ALLOWED ``` ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -130,6 +130,10 @@ class AclAuthorizer extends Authorizer with Logging { @volatile private var aclCache = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering) + + private val resourceCache = new scala.collection.mutable.HashMap[ResourceIndex, + scala.collection.mutable.HashSet[String]]() Review comment: This needs to be an immutable map or a ConcurrentHashMap since we read this without lock. ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ########## @@ -74,6 +75,10 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { private var config: KafkaConfig = _ private var zooKeeperClient: ZooKeeperClient = _ + private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], ResourcePattern)] = ArrayBuffer() Review comment: why are we storing these? ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ########## @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.net.InetAddress + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{RequestContext, RequestHeader} +import org.apache.kafka.common.resource.{ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.{After, Before, Test} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness { + + private val interfaceDefaultAuthorizer = new DelegateAuthorizer + private val superUsers = "User:superuser1; User:superuser2" Review comment: We should add tests for super users. ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ########## @@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { val action = new Action(operation, resource, 1, true, true) authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED } - private def addAcls(authorizer: AclAuthorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { + private def authorizeByResourceType(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean = { + authorizer.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED + } + + private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { Review comment: why was this changed from AclAuthorizer to Authorizer? ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ########## @@ -1040,19 +1116,24 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { securityProtocol, ClientInformation.EMPTY, false) } - private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { + private def authorize(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { val action = new Action(operation, resource, 1, true, true) authorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED } - private def addAcls(authorizer: AclAuthorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { + private def authorizeByResourceType(authorizer: Authorizer, requestContext: RequestContext, operation: AclOperation, resourceType: ResourceType) : Boolean = { + authorizer.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED + } + + private def addAcls(authorizer: Authorizer, aces: Set[AccessControlEntry], resourcePattern: ResourcePattern): Unit = { val bindings = aces.map { ace => new AclBinding(resourcePattern, ace) } authorizer.createAcls(requestContext, bindings.toList.asJava).asScala .map(_.toCompletableFuture.get) .foreach { result => result.exception.ifPresent { e => throw e } } + aclAdded += Tuple3(authorizer, aces, resourcePattern) Review comment: why are we storing this? ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED + } + + val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { Review comment: ok, makes sense ########## File path: core/src/test/scala/unit/kafka/security/authorizer/DelegateAuthorizer.scala ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.{lang, util} +import java.util.concurrent.CompletionStage + +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} +import org.apache.kafka.server.authorizer.{AclCreateResult, AclDeleteResult, Action, AuthorizableRequestContext, AuthorizationResult, Authorizer, AuthorizerServerInfo} + +/** + * For testing the interface default + */ +class DelegateAuthorizer extends Authorizer { Review comment: It would be better to move this inside AuthorizerInterfaceDefaultTest since it is specific to that test. ########## File path: clients/src/main/java/org/apache/kafka/common/acl/ResourceIndex.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; Review comment: We have to move the class outside of the public package, so putting it alongside AclAuthorizer makes sense. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +308,105 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED + } + + val allowLiterals = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principal, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + val denyPrefixes = matchingResources( + principal, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[mutable.HashSet[String]] = { + var matched = List[mutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val ace = new AccessControlEntry(p, h, o, permission) + val resourceIndex = new ResourceIndex(ace, resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def denyAll(denyLiterals: List[mutable.HashSet[String]]): Boolean = + denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[mutable.Set[String]], allowPrefixes: List[mutable.Set[String]], + denyLiterals: List[mutable.Set[String]], denyPrefixes: List[mutable.Set[String]]): Boolean = { + (allowPrefixes.exists(prefixes => + prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes))) + || allowLiterals.exists(literals => + literals.exists(literal => allowLiteral(literal, denyLiterals, denyPrefixes)))) + } + + private def allowLiteral(literalName: String, + denyLiterals: List[mutable.Set[String]], denyPrefixes: List[mutable.Set[String]]): Boolean = { + literalName match{ + case ResourcePattern.WILDCARD_RESOURCE => true + case _ => (denyLiterals.forall(denyLiterals => !denyLiterals.contains(literalName)) + && !hasDominantPrefixedDeny(literalName, denyPrefixes)) + } + } + + private def allowPrefix(prefixName: String, + denyPrefixes: List[mutable.Set[String]]): Boolean = { + !hasDominantPrefixedDeny(prefixName, denyPrefixes) + } + + private def hasDominantPrefixedDeny(resourceName: String, denyPrefixes: List[mutable.Set[String]]): Boolean = { Review comment: ok, let's leave as is. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org