rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r545261128



##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(

Review comment:
       `resourceIndex` => `resourceTypeKey`, Also we can omit new for 
ResourceTypeKey since it is a case class.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(resources) => matched = matched :+ resources
+            case None =>
+          }
+        }
+      }
+    }
+    matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                           resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(

Review comment:
       `resourceIndex` => `resourceTypeKey` and omit `new`.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(resources) => matched = matched :+ resources
+            case None =>
+          }
+        }
+      }
+    }
+    matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,

Review comment:
       `private def`

##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,133 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on 
at least one
+     * resource of the given type.
+     *
+     * It is important to override this interface default in implementations 
because

Review comment:
       Suggestions to improve this (feel free to ignore/update):
   
   ```
   Custom authorizer implementations should consider overriding this default 
implementation because:
   1) The default implementation iterates all AclBindings multiple times, 
without any caching for resource types. More efficient implementations may be 
added in custom authorizers that directly access cached entries.
   2) The default implementation cannot integrate with any audit logging 
included in the authorizer implementation.
   3) The default implementation does not support any custom authorizer configs 
or other access rules apart from ACLs.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)

Review comment:
       `PatternType.UNKNOWN` looks odd in audit logs, `ANY` may be better.

##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,133 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on 
at least one
+     * resource of the given type.
+     *
+     * It is important to override this interface default in implementations 
because
+     * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+     *    which is a CPU intense work.
+     * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+     * 3. The interface default cannot perform the audit logging properly
+     *
+     * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+     *                       given ACL operation on at least one resource of 
the given type.
+     *                       Return {@link AuthorizationResult#DENIED} 
otherwise.
+     */
+    default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+        if (authorize(requestContext, Collections.singletonList(new Action(

Review comment:
       Add a comment to say that we check for one hard-coded name to ensure 
that super users are granted access regardless of DENY acls.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {

Review comment:
       We should use the same pattern as the usage of `aclCache` where we get a 
`aclCacheSnapshot` at the start of the method and then use the same snapshot 
throughout the method rather than use a changing value of resourceCache within 
the loop. 

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(resources) => matched = matched :+ resources
+            case None =>
+          }
+        }
+      }
+    }
+    matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                           resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(_) => return true
+            case None =>
+          }
+        }
+      }
+    }
+    false
+  }
+
+  private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean =
+    denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+  private def allowAny(allowLiterals: List[immutable.Set[String]], 
allowPrefixes: List[immutable.Set[String]],
+                       denyLiterals: List[immutable.Set[String]], 
denyPrefixes: List[immutable.Set[String]]): Boolean = {
+    (allowPrefixes.exists(prefixes =>
+          prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes)))
+      || allowLiterals.exists(literals =>
+            literals.exists(literal => allowLiteral(literal, denyLiterals, 
denyPrefixes))))
+  }
+
+  private def allowLiteral(literalName: String, denyLiterals: 
List[immutable.Set[String]],
+                           denyPrefixes: List[immutable.Set[String]]): Boolean 
= {
+    literalName match{
+      case ResourcePattern.WILDCARD_RESOURCE => true
+      case _ => (denyLiterals.forall(denyLiterals => 
!denyLiterals.contains(literalName))

Review comment:
       Can be `!denyLiterals.exists(_.contains(literalName))`?

##########
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##########
@@ -988,6 +980,30 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
     }
   }
 
+  @Test
+  def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = {
+    testAuthorizeByResourceTypeNoAclFoundOverride(aclAuthorizer)
+  }
+
+  private def testAuthorizeByResourceTypeNoAclFoundOverride(authorizer: 
Authorizer): Unit = {

Review comment:
       The `authorizer` parameter is not used. Can't we just move this into the 
test method `testAuthorizeByResourceTypeNoAclFoundOverride` above?

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(resources) => matched = matched :+ resources
+            case None =>
+          }
+        }
+      }
+    }
+    matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                           resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {

Review comment:
       Couldn't we just check:`resourceCache.contains(resourceKey)` ?

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,

Review comment:
       `private def`

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging {
     zkClient.getVersionedAclsForResource(resource)
   }
 
-  private def updateCache(resource: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+  // Visible for benchmark
+  def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): 
Unit = {
+    val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match {
+      case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+      case None => Set.empty

Review comment:
       We can use `map` instead of `match`:
   ```
   aclCache.get(resource).map(_.acls.map(_.ace)).getOrElse(Set.empty)
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##########
@@ -139,4 +152,133 @@
      * @return Iterator for ACL bindings, which may be populated lazily.
      */
     Iterable<AclBinding> acls(AclBindingFilter filter);
+
+    /**
+     * Check if the caller is authorized to perform the given ACL operation on 
at least one
+     * resource of the given type.
+     *
+     * It is important to override this interface default in implementations 
because
+     * 1. The interface default iterates all AclBindings multiple times, 
without any indexing,
+     *    which is a CPU intense work.
+     * 2. The interface default rebuild several sets of strings, which is a 
memory intense work.
+     * 3. The interface default cannot perform the audit logging properly
+     *
+     * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+     * @param op             The ACL operation to check
+     * @param resourceType   The resource type to check
+     * @return               Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+     *                       given ACL operation on at least one resource of 
the given type.
+     *                       Return {@link AuthorizationResult#DENIED} 
otherwise.
+     */
+    default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+        if (authorize(requestContext, Collections.singletonList(new Action(
+                op, new ResourcePattern(resourceType, "hardcode", 
PatternType.LITERAL),
+                0, true, false)))
+                .get(0) == AuthorizationResult.ALLOWED) {
+            return AuthorizationResult.ALLOWED;
+        }
+
+        // Filter out all the resource pattern corresponding to the 
RequestContext,
+        // AclOperation, and ResourceType
+        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+            resourceType, null, PatternType.ANY);
+        AclBindingFilter aclFilter = new AclBindingFilter(
+            resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+        EnumMap<PatternType, Set<String>> denyPatterns =
+            new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+                put(PatternType.LITERAL, new HashSet<>());
+                put(PatternType.PREFIXED, new HashSet<>());
+            }};
+        EnumMap<PatternType, Set<String>> allowPatterns =
+            new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+                put(PatternType.LITERAL, new HashSet<>());
+                put(PatternType.PREFIXED, new HashSet<>());
+            }};
+
+        boolean hasWildCardAllow = false;
+
+        KafkaPrincipal principal = new KafkaPrincipal(
+            requestContext.principal().getPrincipalType(),
+            requestContext.principal().getName());
+        String hostAddr = requestContext.clientAddress().getHostAddress();
+
+        for (AclBinding binding : acls(aclFilter)) {
+            if (!binding.entry().host().equals(hostAddr) && 
!binding.entry().host().equals("*"))
+                continue;
+
+            if 
(!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
+                    && !binding.entry().principal().equals("User:*"))
+                continue;
+
+            if (binding.entry().operation() != op
+                    && binding.entry().operation() != AclOperation.ALL)
+                continue;
+
+            if (binding.entry().permissionType() == AclPermissionType.DENY) {
+                switch (binding.pattern().patternType()) {
+                    case LITERAL:
+                        // If wildcard deny exists, return deny directly
+                        if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
+                            return AuthorizationResult.DENIED;
+                        
denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
+                        break;
+                    case PREFIXED:
+                        
denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
+                        break;
+                    default:
+                }
+                continue;
+            }
+
+            if (binding.entry().permissionType() != AclPermissionType.ALLOW)
+                continue;
+
+            switch (binding.pattern().patternType()) {
+                case LITERAL:
+                    if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
+                        hasWildCardAllow = true;
+                        continue;
+                    }
+                    
allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
+                    break;
+                case PREFIXED:
+                    
allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
+                    break;
+                default:
+            }
+        }
+
+        if (hasWildCardAllow) {
+            return AuthorizationResult.ALLOWED;
+        }
+
+        // For any literal allowed, if there's no dominant literal
+        // and prefix denied, return allow.
+        // For any prefix allowed, if there's no dominant prefix
+        // denied, return allow.

Review comment:
       Can we make this comment two lines instead of 4 since each sentence 
seems short enough to fit into a line?

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging {
     zkClient.getVersionedAclsForResource(resource)
   }
 
-  private def updateCache(resource: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+  // Visible for benchmark
+  def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): 
Unit = {
+    val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match {
+      case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+      case None => Set.empty
+    }
+    val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+    val acesToAdd = newAces.diff(currentAces)
+    val acesToRemove = currentAces.diff(newAces)
+
+    acesToAdd.foreach(ace => {
+      val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), 
resource.patternType())
+      resourceCache.get(resourceIndex) match {
+        case Some(resources) => resourceCache += (resourceIndex -> (resources 
+ resource.name()))
+        case None => resourceCache += (resourceIndex -> 
immutable.HashSet(resource.name()))
+      }
+    })
+    acesToRemove.foreach(ace => {
+      val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), 
resource.patternType())

Review comment:
       `resourceIndex` => `resourceTypeKey`, Omit `new`

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -547,7 +683,37 @@ class AclAuthorizer extends Authorizer with Logging {
     zkClient.getVersionedAclsForResource(resource)
   }
 
-  private def updateCache(resource: ResourcePattern, versionedAcls: 
VersionedAcls): Unit = {
+  // Visible for benchmark
+  def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): 
Unit = {
+    val currentAces: Set[AccessControlEntry] = aclCache.get(resource) match {
+      case Some(versionedAcls) => versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+      case None => Set.empty
+    }
+    val newAces: Set[AccessControlEntry] = versionedAcls.acls.map(aclEntry => 
aclEntry.ace)
+    val acesToAdd = newAces.diff(currentAces)
+    val acesToRemove = currentAces.diff(newAces)
+
+    acesToAdd.foreach(ace => {
+      val resourceIndex = new ResourceTypeKey(ace, resource.resourceType(), 
resource.patternType())

Review comment:
       `resourceIndex` => `resourceTypeKey`, Omit `new`

##########
File path: 
clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
##########
@@ -146,4 +148,32 @@ else if (capitalizeNext) {
         }
         return builder.toString();
     }
+
+    public static void authorizeByResourceTypeCheckArgs(AclOperation op,
+                                                        ResourceType type) {
+        if (type == ResourceType.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter resource type for 
authorizeByResourceType");
+        }
+
+        if (type == ResourceType.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown resource type");
+        }
+
+        if (op == AclOperation.ANY) {
+            throw new IllegalArgumentException(
+                "Must specify a non-filter operation type for 
authorizeByResourceType");
+        }
+
+        if (op == AclOperation.UNKNOWN) {
+            throw new IllegalArgumentException(
+                "Unknown operation type");
+        }
+    }
+
+    public static boolean canDenyAll(ResourcePattern pattern) {

Review comment:
       `canDenyAll` => `denyAll` since `can` doesn't fit with `deny`

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {

Review comment:
       The nested for loop can be replaced with:
   ```
   for (p <- Set(principal, AclEntry.WildcardPrincipalString); h <- Set(host, 
AclEntry.WildcardHost); o <- Set(op, AclOperation.ALL))
   ```

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()

Review comment:
       We can make this a `val` by using an ArrayBuffer instead of List that we 
keep recreating

##########
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.util.concurrent.CompletionStage
+import java.{lang, util}
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.{After, Before}
+
+class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with 
BaseAuthorizerTest {
+
+  private val interfaceDefaultAuthorizer = new DelegateAuthorizer
+
+  override def authorizer: Authorizer = interfaceDefaultAuthorizer
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+
+    val authorizers = Seq(interfaceDefaultAuthorizer.authorizer)

Review comment:
       There is only one authorizer, we could just use it directly instead of 
creating a Seq

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(resources) => matched = matched :+ resources
+            case None =>
+          }
+        }
+      }
+    }
+    matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                           resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {

Review comment:
       As before, we can use a single for loop instead of nested loop

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(resources) => matched = matched :+ resources
+            case None =>
+          }
+        }
+      }
+    }
+    matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                           resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(_) => return true
+            case None =>
+          }
+        }
+      }
+    }
+    false
+  }
+
+  private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean =
+    denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+  private def allowAny(allowLiterals: List[immutable.Set[String]], 
allowPrefixes: List[immutable.Set[String]],
+                       denyLiterals: List[immutable.Set[String]], 
denyPrefixes: List[immutable.Set[String]]): Boolean = {
+    (allowPrefixes.exists(prefixes =>

Review comment:
       Can use `allowPrefixes.exists(_.exists`, similarly for `allowLiterals`.

##########
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.util.concurrent.CompletionStage
+import java.{lang, util}
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.{After, Before}
+
+class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with 
BaseAuthorizerTest {
+
+  private val interfaceDefaultAuthorizer = new DelegateAuthorizer
+
+  override def authorizer: Authorizer = interfaceDefaultAuthorizer
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+
+    val authorizers = Seq(interfaceDefaultAuthorizer.authorizer)
+
+    // Increase maxUpdateRetries to avoid transient failures
+    authorizers.foreach(a => a.maxUpdateRetries = Int.MaxValue)
+
+    val props = TestUtils.createBrokerConfig(0, zkConnect)
+    props.put(AclAuthorizer.SuperUsersProp, superUsers)
+
+    config = KafkaConfig.fromProps(props)
+    authorizers.foreach(a => a.configure(config.originals))
+
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, zkMaxInFlightRequests,
+      Time.SYSTEM, "kafka.test", "AuthorizerInterfaceDefaultTest")
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    val authorizers = Seq(interfaceDefaultAuthorizer)
+    authorizers.foreach(a => {
+      a.close()
+    })

Review comment:
       Could just close `interfaceDefaultAuthorizer` instead of creating an 
`authorizers` collection?

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(resources) => matched = matched :+ resources
+            case None =>
+          }
+        }
+      }
+    }
+    matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                           resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(_) => return true
+            case None =>
+          }
+        }
+      }
+    }
+    false
+  }
+
+  private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean =
+    denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))

Review comment:
       Can use 
`denyLiterals.exists(_.contains(ResourcePattern.WILDCARD_RESOURCE))`

##########
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
##########
@@ -988,6 +980,30 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
     }
   }
 
+  @Test
+  def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = {
+    testAuthorizeByResourceTypeNoAclFoundOverride(aclAuthorizer)
+  }
+
+  private def testAuthorizeByResourceTypeNoAclFoundOverride(authorizer: 
Authorizer): Unit = {
+    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+
+    val cfg = KafkaConfig.fromProps(props)
+    val testAuthorizer = new AclAuthorizer

Review comment:
       can't this be `aclAuthorizer`?

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -304,6 +309,137 @@ class AclAuthorizer extends Authorizer with Logging {
     if (zkClient != null) zkClient.close()
   }
 
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+                                       op: AclOperation,
+                                       resourceType: ResourceType): 
AuthorizationResult = {
+    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
+
+    val principal = new KafkaPrincipal(
+      requestContext.principal().getPrincipalType,
+      requestContext.principal().getName)
+
+    if (isSuperUser(principal))
+      return AuthorizationResult.ALLOWED
+
+    val principalStr = principal.toString
+
+    val host = requestContext.clientAddress().getHostAddress
+    val action = new Action(op, new ResourcePattern(resourceType, "NONE", 
PatternType.UNKNOWN), 0, true, true)
+
+    val denyLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.LITERAL)
+
+    if (denyAll(denyLiterals)) {
+      logAuditMessage(requestContext, action, authorized = false)
+      return AuthorizationResult.DENIED
+    }
+
+    if (shouldAllowEveryoneIfNoAclIsFound) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    val denyPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.DENY, resourceType, 
PatternType.PREFIXED)
+
+    if (denyLiterals.isEmpty && denyPrefixes.isEmpty) {
+      if (hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.PREFIXED)
+          || hasMatchingResources(principalStr, host, op, 
AclPermissionType.ALLOW, resourceType, PatternType.LITERAL)) {
+        logAuditMessage(requestContext, action, authorized = true)
+        return AuthorizationResult.ALLOWED
+      } else {
+        logAuditMessage(requestContext, action, authorized = false)
+        return AuthorizationResult.DENIED
+      }
+    }
+
+    val allowLiterals = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.LITERAL)
+    val allowPrefixes = matchingResources(
+      principalStr, host, op, AclPermissionType.ALLOW, resourceType, 
PatternType.PREFIXED)
+
+    if (allowAny(allowLiterals, allowPrefixes, denyLiterals, denyPrefixes)) {
+      logAuditMessage(requestContext, action, authorized = true)
+      return AuthorizationResult.ALLOWED
+    }
+
+    logAuditMessage(requestContext, action, authorized = false)
+    AuthorizationResult.DENIED
+  }
+
+  def matchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                        resourceType: ResourceType, patternType: PatternType): 
List[immutable.HashSet[String]] = {
+    var matched = List[immutable.HashSet[String]]()
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(resources) => matched = matched :+ resources
+            case None =>
+          }
+        }
+      }
+    }
+    matched
+  }
+
+  def hasMatchingResources(principal: String, host: String, op: AclOperation, 
permission: AclPermissionType,
+                           resourceType: ResourceType, patternType: 
PatternType): Boolean = {
+    for (p <- Set(principal, AclEntry.WildcardPrincipalString)) {
+      for (h <- Set(host, AclEntry.WildcardHost)) {
+        for (o <- Set(op, AclOperation.ALL)) {
+          val resourceIndex = new ResourceTypeKey(
+            new AccessControlEntry(p, h, o, permission), resourceType, 
patternType)
+          resourceCache.get(resourceIndex) match {
+            case Some(_) => return true
+            case None =>
+          }
+        }
+      }
+    }
+    false
+  }
+
+  private def denyAll(denyLiterals: List[immutable.HashSet[String]]): Boolean =
+    denyLiterals.exists(r => r.contains(ResourcePattern.WILDCARD_RESOURCE))
+
+
+  private def allowAny(allowLiterals: List[immutable.Set[String]], 
allowPrefixes: List[immutable.Set[String]],
+                       denyLiterals: List[immutable.Set[String]], 
denyPrefixes: List[immutable.Set[String]]): Boolean = {
+    (allowPrefixes.exists(prefixes =>
+          prefixes.exists(prefix => allowPrefix(prefix, denyPrefixes)))
+      || allowLiterals.exists(literals =>
+            literals.exists(literal => allowLiteral(literal, denyLiterals, 
denyPrefixes))))
+  }
+
+  private def allowLiteral(literalName: String, denyLiterals: 
List[immutable.Set[String]],
+                           denyPrefixes: List[immutable.Set[String]]): Boolean 
= {
+    literalName match{

Review comment:
       nit: space before {

##########
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.util.concurrent.CompletionStage
+import java.{lang, util}
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer._
+import org.junit.{After, Before}
+
+class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with 
BaseAuthorizerTest {
+
+  private val interfaceDefaultAuthorizer = new DelegateAuthorizer
+
+  override def authorizer: Authorizer = interfaceDefaultAuthorizer
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+
+    val authorizers = Seq(interfaceDefaultAuthorizer.authorizer)
+
+    // Increase maxUpdateRetries to avoid transient failures
+    authorizers.foreach(a => a.maxUpdateRetries = Int.MaxValue)
+
+    val props = TestUtils.createBrokerConfig(0, zkConnect)
+    props.put(AclAuthorizer.SuperUsersProp, superUsers)
+
+    config = KafkaConfig.fromProps(props)
+    authorizers.foreach(a => a.configure(config.originals))
+
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, zkMaxInFlightRequests,

Review comment:
       Looks like there is opportunity to move some of this stuff into 
BaseAuthorizerTest, but we can do that in a follow-up later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to