rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545261128
########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( Review comment: `resourceIndex` => `resourceTypeKey`, Also we can omit new for ResourceTypeKey since it is a case class. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( Review comment: `resourceIndex` => `resourceTypeKey` and omit `new`. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, Review comment: `private def` ########## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ########## @@ -139,4 +152,133 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable<AclBinding> acls(AclBindingFilter filter); + + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because Review comment: Suggestions to improve this (feel free to ignore/update): ``` Custom authorizer implementations should consider overriding this default implementation because: 1) The default implementation iterates all AclBindings multiple times, without any caching for resource types. More efficient implementations may be added in custom authorizers that directly access cached entries. 2) The default implementation cannot integrate with any audit logging included in the authorizer implementation. 3) The default implementation does not support any custom authorizer configs or other access rules apart from ACLs. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) Review comment: `PatternType.UNKNOWN` looks odd in audit logs, `ANY` may be better. ########## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ########## @@ -139,4 +152,133 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable<AclBinding> acls(AclBindingFilter filter); + + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + * which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ + default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + + if (authorize(requestContext, Collections.singletonList(new Action( Review comment: Add a comment to say that we check for one hard-coded name to ensure that super users are granted access regardless of DENY acls. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { Review comment: We should use the same pattern as the usage of `aclCache` where we get a `aclCacheSnapshot` at the start of the method and then use the same snapshot throughout the method rather than use a changing value of resourceCache within the loop. ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(_) => return true + case None => + } + } + } + } + false + } + + private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean = + denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[immutable.Set[String]], allowPrefixes: List[immutable.Set[String]], + denyLiterals: List[immutable.Set[String]], denyPrefixes: List[immutable.Set[String]]): Boolean = { + (allowPrefixes.exists(prefixes => + prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes))) + || allowLiterals.exists(literals => + literals.exists(literal => allowLiteral(literal, denyLiterals, denyPrefixes)))) + } + + private def allowLiteral(literalName: String, denyLiterals: List[immutable.Set[String]], + denyPrefixes: List[immutable.Set[String]]): Boolean = { + literalName match{ + case ResourcePattern.WILDCARD_RESOURCE => true + case _ => (denyLiterals.forall(denyLiterals => !denyLiterals.contains(literalName)) Review comment: Can be `!denyLiterals.exists(_.contains(literalName))`? ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ########## @@ -988,6 +980,30 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { } } + @Test + def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = { + testAuthorizeByResourceTypeNoAclFoundOverride(aclAuthorizer) + } + + private def testAuthorizeByResourceTypeNoAclFoundOverride(authorizer: Authorizer): Unit = { Review comment: The `authorizer` parameter is not used. Can't we just move this into the test method `testAuthorizeByResourceTypeNoAclFoundOverride` above? ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { Review comment: Couldn't we just check:`resourceCache.contains(resourceKey)` ? ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, Review comment: `private def` ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging { zkClient.getVersionedAclsForResource(resource) } - private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + // Visible for benchmark + def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match { + case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => aclEntry.ace) + case None => Set.empty Review comment: We can use `map` instead of `match`: ``` aclCache.get(resource).map(_.acls.map(_.ace)).getOrElse(Set.empty) ``` ########## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ########## @@ -139,4 +152,133 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable<AclBinding> acls(AclBindingFilter filter); + + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + * which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ + default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + + if (authorize(requestContext, Collections.singletonList(new Action( + op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), + 0, true, false))) + .get(0) == AuthorizationResult.ALLOWED) { + return AuthorizationResult.ALLOWED; + } + + // Filter out all the resource pattern corresponding to the RequestContext, + // AclOperation, and ResourceType + ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( + resourceType, null, PatternType.ANY); + AclBindingFilter aclFilter = new AclBindingFilter( + resourceTypeFilter, AccessControlEntryFilter.ANY); + + EnumMap<PatternType, Set<String>> denyPatterns = + new EnumMap<PatternType, Set<String>>(PatternType.class) {{ + put(PatternType.LITERAL, new HashSet<>()); + put(PatternType.PREFIXED, new HashSet<>()); + }}; + EnumMap<PatternType, Set<String>> allowPatterns = + new EnumMap<PatternType, Set<String>>(PatternType.class) {{ + put(PatternType.LITERAL, new HashSet<>()); + put(PatternType.PREFIXED, new HashSet<>()); + }}; + + boolean hasWildCardAllow = false; + + KafkaPrincipal principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType(), + requestContext.principal().getName()); + String hostAddr = requestContext.clientAddress().getHostAddress(); + + for (AclBinding binding : acls(aclFilter)) { + if (!binding.entry().host().equals(hostAddr) && !binding.entry().host().equals("*")) + continue; + + if (!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal) + && !binding.entry().principal().equals("User:*")) + continue; + + if (binding.entry().operation() != op + && binding.entry().operation() != AclOperation.ALL) + continue; + + if (binding.entry().permissionType() == AclPermissionType.DENY) { + switch (binding.pattern().patternType()) { + case LITERAL: + // If wildcard deny exists, return deny directly + if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) + return AuthorizationResult.DENIED; + denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); + break; + case PREFIXED: + denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); + break; + default: + } + continue; + } + + if (binding.entry().permissionType() != AclPermissionType.ALLOW) + continue; + + switch (binding.pattern().patternType()) { + case LITERAL: + if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) { + hasWildCardAllow = true; + continue; + } + allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); + break; + case PREFIXED: + allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); + break; + default: + } + } + + if (hasWildCardAllow) { + return AuthorizationResult.ALLOWED; + } + + // For any literal allowed, if there's no dominant literal + // and prefix denied, return allow. + // For any prefix allowed, if there's no dominant prefix + // denied, return allow. Review comment: Can we make this comment two lines instead of 4 since each sentence seems short enough to fit into a line? ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging { zkClient.getVersionedAclsForResource(resource) } - private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + // Visible for benchmark + def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match { + case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => aclEntry.ace) + case None => Set.empty + } + val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry => aclEntry.ace) + val acesToAdd = newAces.diff(currentAces) + val acesToRemove = currentAces.diff(newAces) + + acesToAdd.foreach(ace => { + val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), resource.patternType()) + resourceCache.get(resourceIndex) match { + case Some(resources) => resourceCache += (resourceIndex -> (resources + resource.name())) + case None => resourceCache += (resourceIndex -> immutable.HashSet(resource.name())) + } + }) + acesToRemove.foreach(ace => { + val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), resource.patternType()) Review comment: `resourceIndex` => `resourceTypeKey`, Omit `new` ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging { zkClient.getVersionedAclsForResource(resource) } - private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + // Visible for benchmark + def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = { + val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match { + case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => aclEntry.ace) + case None => Set.empty + } + val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry => aclEntry.ace) + val acesToAdd = newAces.diff(currentAces) + val acesToRemove = currentAces.diff(newAces) + + acesToAdd.foreach(ace => { + val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), resource.patternType()) Review comment: `resourceIndex` => `resourceTypeKey`, Omit `new` ########## File path: clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java ########## @@ -146,4 +148,32 @@ else if (capitalizeNext) { } return builder.toString(); } + + public static void authorizeByResourceTypeCheckArgs(AclOperation op, + ResourceType type) { + if (type == ResourceType.ANY) { + throw new IllegalArgumentException( + "Must specify a non-filter resource type for authorizeByResourceType"); + } + + if (type == ResourceType.UNKNOWN) { + throw new IllegalArgumentException( + "Unknown resource type"); + } + + if (op == AclOperation.ANY) { + throw new IllegalArgumentException( + "Must specify a non-filter operation type for authorizeByResourceType"); + } + + if (op == AclOperation.UNKNOWN) { + throw new IllegalArgumentException( + "Unknown operation type"); + } + } + + public static boolean canDenyAll(ResourcePattern pattern) { Review comment: `canDenyAll` => `denyAll` since `can` doesn't fit with `deny` ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { Review comment: The nested for loop can be replaced with: ``` for (p <- Set(principal, AclEntry.WildcardPrincipalString); h <- Set(host, AclEntry.WildcardHost); o <- Set(op, AclOperation.ALL)) ``` ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() Review comment: We can make this a `val` by using an ArrayBuffer instead of List that we keep recreating ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ########## @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.util.concurrent.CompletionStage +import java.{lang, util} + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.{After, Before} + +class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest { + + private val interfaceDefaultAuthorizer = new DelegateAuthorizer + + override def authorizer: Authorizer = interfaceDefaultAuthorizer + + @Before + override def setUp(): Unit = { + super.setUp() + + val authorizers = Seq(interfaceDefaultAuthorizer.authorizer) Review comment: There is only one authorizer, we could just use it directly instead of creating a Seq ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { Review comment: As before, we can use a single for loop instead of nested loop ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(_) => return true + case None => + } + } + } + } + false + } + + private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean = + denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[immutable.Set[String]], allowPrefixes: List[immutable.Set[String]], + denyLiterals: List[immutable.Set[String]], denyPrefixes: List[immutable.Set[String]]): Boolean = { + (allowPrefixes.exists(prefixes => Review comment: Can use `allowPrefixes.exists(_.exists`, similarly for `allowLiterals`. ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ########## @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.util.concurrent.CompletionStage +import java.{lang, util} + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.{After, Before} + +class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest { + + private val interfaceDefaultAuthorizer = new DelegateAuthorizer + + override def authorizer: Authorizer = interfaceDefaultAuthorizer + + @Before + override def setUp(): Unit = { + super.setUp() + + val authorizers = Seq(interfaceDefaultAuthorizer.authorizer) + + // Increase maxUpdateRetries to avoid transient failures + authorizers.foreach(a => a.maxUpdateRetries = Int.MaxValue) + + val props = TestUtils.createBrokerConfig(0, zkConnect) + props.put(AclAuthorizer.SuperUsersProp, superUsers) + + config = KafkaConfig.fromProps(props) + authorizers.foreach(a => a.configure(config.originals)) + + zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, + Time.SYSTEM, "kafka.test", "AuthorizerInterfaceDefaultTest") + } + + @After + override def tearDown(): Unit = { + val authorizers = Seq(interfaceDefaultAuthorizer) + authorizers.foreach(a => { + a.close() + }) Review comment: Could just close `interfaceDefaultAuthorizer` instead of creating an `authorizers` collection? ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(_) => return true + case None => + } + } + } + } + false + } + + private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean = + denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) Review comment: Can use `denyLiterals.exists(_.contains(ResourcePattern.WILDCARD_RESOURCE))` ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala ########## @@ -988,6 +980,30 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { } } + @Test + def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = { + testAuthorizeByResourceTypeNoAclFoundOverride(aclAuthorizer) + } + + private def testAuthorizeByResourceTypeNoAclFoundOverride(authorizer: Authorizer): Unit = { + val props = TestUtils.createBrokerConfig(1, zkConnect) + props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true") + + val cfg = KafkaConfig.fromProps(props) + val testAuthorizer = new AclAuthorizer Review comment: can't this be `aclAuthorizer`? ########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(_) => return true + case None => + } + } + } + } + false + } + + private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean = + denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[immutable.Set[String]], allowPrefixes: List[immutable.Set[String]], + denyLiterals: List[immutable.Set[String]], denyPrefixes: List[immutable.Set[String]]): Boolean = { + (allowPrefixes.exists(prefixes => + prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes))) + || allowLiterals.exists(literals => + literals.exists(literal => allowLiteral(literal, denyLiterals, denyPrefixes)))) + } + + private def allowLiteral(literalName: String, denyLiterals: List[immutable.Set[String]], + denyPrefixes: List[immutable.Set[String]]): Boolean = { + literalName match{ Review comment: nit: space before { ########## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala ########## @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.util.concurrent.CompletionStage +import java.{lang, util} + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.{After, Before} + +class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest { + + private val interfaceDefaultAuthorizer = new DelegateAuthorizer + + override def authorizer: Authorizer = interfaceDefaultAuthorizer + + @Before + override def setUp(): Unit = { + super.setUp() + + val authorizers = Seq(interfaceDefaultAuthorizer.authorizer) + + // Increase maxUpdateRetries to avoid transient failures + authorizers.foreach(a => a.maxUpdateRetries = Int.MaxValue) + + val props = TestUtils.createBrokerConfig(0, zkConnect) + props.put(AclAuthorizer.SuperUsersProp, superUsers) + + config = KafkaConfig.fromProps(props) + authorizers.foreach(a => a.configure(config.originals)) + + zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Review comment: Looks like there is opportunity to move some of this stuff into BaseAuthorizerTest, but we can do that in a follow-up later. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org