d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r545584468
########## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ########## @@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + + val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + + if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + + val principalStr = principal.toString + + val host = requestContext.clientAddress().getHostAddress + val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + + val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + + if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + + if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + val denyPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.PREFIXED) + + if (denyLiterals.isEmpty && denyPrefixes.isEmpty) { + if (hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + || hasMatchingResources(principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } else { + logAuditMessage(requestContext, action, authorized = false) + return AuthorizationResult.DENIED + } + } + + val allowLiterals = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.LITERAL) + val allowPrefixes = matchingResources( + principalStr, host, op, AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED) + + if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) { + logAuditMessage(requestContext, action, authorized = true) + return AuthorizationResult.ALLOWED + } + + logAuditMessage(requestContext, action, authorized = false) + AuthorizationResult.DENIED + } + + def matchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): List[immutable.HashSet[String]] = { + var matched = List[immutable.HashSet[String]]() + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(resources) => matched = matched :+ resources + case None => + } + } + } + } + matched + } + + def hasMatchingResources(principal: String, host: String, op: AclOperation, permission: AclPermissionType, + resourceType: ResourceType, patternType: PatternType): Boolean = { + for (p <- Set(principal, AclEntry.WildcardPrincipalString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { + for (o <- Set(op, AclOperation.ALL)) { + val resourceIndex = new ResourceTypeKey( + new AccessControlEntry(p, h, o, permission), resourceType, patternType) + resourceCache.get(resourceIndex) match { + case Some(_) => return true + case None => + } + } + } + } + false + } + + private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean = + denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE)) + + + private def allowAny(allowLiterals: List[immutable.Set[String]], allowPrefixes: List[immutable.Set[String]], + denyLiterals: List[immutable.Set[String]], denyPrefixes: List[immutable.Set[String]]): Boolean = { + (allowPrefixes.exists(prefixes => + prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes))) + || allowLiterals.exists(literals => + literals.exists(literal => allowLiteral(literal, denyLiterals, denyPrefixes)))) + } + + private def allowLiteral(literalName: String, denyLiterals: List[immutable.Set[String]], + denyPrefixes: List[immutable.Set[String]]): Boolean = { + literalName match{ + case ResourcePattern.WILDCARD_RESOURCE => true + case _ => (denyLiterals.forall(denyLiterals => !denyLiterals.contains(literalName)) Review comment: Right. We can bring the ! to the front. commit 9407b1697d976fc6cff90703573a64f7a3c9f348 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org