hachikuji commented on a change in pull request #11649:
URL: https://github.com/apache/kafka/pull/11649#discussion_r802061270



##########
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by 
StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+    /**
+     * The host or name string used in ACLs that match any host or name.
+     */
+    public static final String WILDCARD = "*";
+
+    /**
+     * The principal entry used in ACLs that match any principal.
+     */
+    public static final String WILDCARD_PRINCIPAL = "User:*";
+
+    /**
+     * The logger to use.
+     */
+    final Logger log;
+
+    /**
+     * The current AclMutator.
+     */
+    final AclMutator aclMutator;
+
+    /**
+     * A statically configured set of users that are authorized to do anything.
+     */
+    private final Set<String> superUsers;
+
+    /**
+     * The result to return if no ACLs match.
+     */
+    private final AuthorizationResult defaultResult;
+
+    /**
+     * Contains all of the current ACLs sorted by (resource type, pattern 
type, resource name).
+     */
+    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
+
+    /**
+     * Contains all of the current ACLs indexed by UUID.
+     */
+    private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
+
+    private static Logger createLogger(int nodeId) {
+        return new LogContext("[StandardAuthorizer " + nodeId + "] 
").logger(StandardAuthorizerData.class);
+    }
+
+    static StandardAuthorizerData createEmpty() {
+        return new StandardAuthorizerData(createLogger(-1),
+            null,
+            Collections.emptySet(),
+            DENIED,
+            new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+    }
+
+    private StandardAuthorizerData(Logger log,
+                                   AclMutator aclMutator,
+                                   Set<String> superUsers,
+                                   AuthorizationResult defaultResult,
+                                   ConcurrentSkipListSet<StandardAcl> 
aclsByResource,
+                                   ConcurrentHashMap<Uuid, StandardAcl> 
aclsById) {
+        this.log = log;
+        this.aclMutator = aclMutator;
+        this.superUsers = superUsers;
+        this.defaultResult = defaultResult;
+        this.aclsByResource = aclsByResource;
+        this.aclsById = aclsById;
+    }
+
+    StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
+        return new StandardAuthorizerData(log,
+            newAclMutator,
+            superUsers,
+            defaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewConfig(int nodeId,
+                                             Set<String> newSuperUsers,
+                                             AuthorizationResult 
newDefaultResult) {
+        return new StandardAuthorizerData(
+            createLogger(nodeId),
+            aclMutator,
+            newSuperUsers,
+            newDefaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, 
StandardAcl>> aclEntries) {
+        StandardAuthorizerData newData = new StandardAuthorizerData(
+            log,
+            aclMutator,
+            superUsers,
+            defaultResult,
+            new ConcurrentSkipListSet<>(),
+            new ConcurrentHashMap<>());
+        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
+            newData.addAcl(entry.getKey(), entry.getValue());
+        }
+        log.info("Applied " + aclEntries.size() + "acl(s) from image.");
+        return newData;
+    }
+
+    void addAcl(Uuid id, StandardAcl acl) {
+        try {
+            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
+            if (prevAcl != null) {
+                throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
+            }
+            if (!aclsByResource.add(acl)) {
+                aclsById.remove(id);
+                throw new RuntimeException("Unable to add the ACL with ID " + 
id +
+                    " to aclsByResource");
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Added ACL " + id + ": " + acl);
+            }
+        } catch (Throwable e) {
+            log.error("addAcl error", e);
+            throw e;
+        }
+    }
+
+    void removeAcl(Uuid id) {
+        try {
+            StandardAcl acl = aclsById.remove(id);
+            if (acl == null) {
+                throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+            }
+            if (!aclsByResource.remove(acl)) {
+                throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
+                    " from aclsByResource");
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Removed ACL " + id + ": " + acl);
+            }
+        } catch (Throwable e) {
+            log.error("removeAcl error", e);
+            throw e;
+        }
+    }
+
+    Set<String> superUsers() {
+        return superUsers;
+    }
+
+    AuthorizationResult defaultResult() {
+        return defaultResult;
+    }
+
+    int aclCount() {
+        return aclsById.size();
+    }
+
+    static class AuthorizationResultBuilder {
+        boolean foundDeny = false;
+        boolean foundAllow = false;
+    }
+
+    /**
+     * Authorize an action based on the current set of ACLs.
+     *
+     * In order to know whether to allow or deny the action, we need to 
examine the ACLs
+     * that apply to it. If any DENY ACLs match, the operation is denied, no 
matter how
+     * many ALLOW ACLs match. If neither ALLOW nor DENY ACLs match, we return 
the default
+     * result. In general it makes more sense to configure the default result 
to be
+     * DENY, but some people (and unit tests) configure it as ALLOW.
+     */
+    AuthorizationResult authorize(AuthorizableRequestContext requestContext,
+                                  Action action) {
+        // Superusers are authorized to do anything.
+        if (superUsers.contains(requestContext.principal().toString())) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", 
action=" + action +
+                    "): ALLOWED because " + 
requestContext.principal().toString() +
+                    " is a superuser");
+            }
+            return ALLOWED;
+        }
+
+        // This code relies on the ordering of StandardAcl within the 
NavigableMap.
+        // Entries are sorted by resource type first, then REVERSE resource 
name.
+        // Therefore, we can find all the applicable ACLs by starting at
+        // (resource_type, resource_name) and stepping forwards until we reach 
an ACL with
+        // a resource name which is not a prefix of the current one.
+        //
+        // For example, when trying to authorize a TOPIC resource named 
foobar, we would
+        // start at element 2, and continue on to 3 and 4 following map:
+        //
+        // 1. rs=TOPIC rn=gar pt=PREFIX
+        // 2. rs=TOPIC rn=foobar pt=PREFIX
+        // 3. rs=TOPIC rn=foob pt=LITERAL
+        // 4. rs=TOPIC rn=foo pt=PREFIX
+        // 5. rs=TOPIC rn=eeee pt=LITERAL
+        //
+        // Once we reached element 5, we would stop scanning.
+        AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
+        StandardAcl exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            action.resourcePattern().name(),
+            PatternType.UNKNOWN,
+            "",
+            "",
+            AclOperation.UNKNOWN,
+            AclPermissionType.UNKNOWN);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // In addition to ACLs for this specific resource name, there can also 
be wildcard
+        // ACLs that match any resource name. These are stored as type = 
LITERAL,
+        // name = "*". We search these next.
+        exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            WILDCARD,
+            LITERAL,
+            "",
+            "",
+            AclOperation.UNKNOWN,
+            AclPermissionType.UNKNOWN);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // If we found ALLOW ACLs, the action is allowed.
+        if (builder.foundAllow) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", 
action=" +
+                    action + "): ALLOWED");
+            }
+            return ALLOWED;
+        }
+
+        // If nothing matched, we return the default result.
+        if (log.isTraceEnabled()) {
+            log.trace("authorize(requestContext=" + requestContext + ", 
action=" +
+                action + "): returning default result " + defaultResult);
+        }
+        return defaultResult;
+    }
+
+    void checkSection(Action action,
+                      StandardAcl exemplar,
+                      AuthorizableRequestContext requestContext,
+                      AuthorizationResultBuilder builder) {
+        NavigableSet<StandardAcl> headSet = aclsByResource.tailSet(exemplar, 
true);
+        String resourceName = action.resourcePattern().name();
+        for (Iterator<StandardAcl> iterator = headSet.iterator();

Review comment:
       nit: I don't think we need the iterator

##########
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by 
StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+    /**
+     * The host or name string used in ACLs that match any host or name.
+     */
+    public static final String WILDCARD = "*";
+
+    /**
+     * The principal entry used in ACLs that match any principal.
+     */
+    public static final String WILDCARD_PRINCIPAL = "User:*";
+
+    /**
+     * The logger to use.
+     */
+    final Logger log;
+
+    /**
+     * The current AclMutator.
+     */
+    final AclMutator aclMutator;
+
+    /**
+     * A statically configured set of users that are authorized to do anything.
+     */
+    private final Set<String> superUsers;
+
+    /**
+     * The result to return if no ACLs match.
+     */
+    private final AuthorizationResult defaultResult;
+
+    /**
+     * Contains all of the current ACLs sorted by (resource type, pattern 
type, resource name).
+     */
+    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
+
+    /**
+     * Contains all of the current ACLs indexed by UUID.
+     */
+    private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
+
+    private static Logger createLogger(int nodeId) {
+        return new LogContext("[StandardAuthorizer " + nodeId + "] 
").logger(StandardAuthorizerData.class);
+    }
+
+    static StandardAuthorizerData createEmpty() {
+        return new StandardAuthorizerData(createLogger(-1),
+            null,
+            Collections.emptySet(),
+            DENIED,
+            new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+    }
+
+    private StandardAuthorizerData(Logger log,
+                                   AclMutator aclMutator,
+                                   Set<String> superUsers,
+                                   AuthorizationResult defaultResult,
+                                   ConcurrentSkipListSet<StandardAcl> 
aclsByResource,
+                                   ConcurrentHashMap<Uuid, StandardAcl> 
aclsById) {
+        this.log = log;
+        this.aclMutator = aclMutator;
+        this.superUsers = superUsers;
+        this.defaultResult = defaultResult;
+        this.aclsByResource = aclsByResource;
+        this.aclsById = aclsById;
+    }
+
+    StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
+        return new StandardAuthorizerData(log,
+            newAclMutator,
+            superUsers,
+            defaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewConfig(int nodeId,
+                                             Set<String> newSuperUsers,
+                                             AuthorizationResult 
newDefaultResult) {
+        return new StandardAuthorizerData(
+            createLogger(nodeId),
+            aclMutator,
+            newSuperUsers,
+            newDefaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, 
StandardAcl>> aclEntries) {
+        StandardAuthorizerData newData = new StandardAuthorizerData(
+            log,
+            aclMutator,
+            superUsers,
+            defaultResult,
+            new ConcurrentSkipListSet<>(),
+            new ConcurrentHashMap<>());
+        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
+            newData.addAcl(entry.getKey(), entry.getValue());
+        }
+        log.info("Applied " + aclEntries.size() + "acl(s) from image.");
+        return newData;
+    }
+
+    void addAcl(Uuid id, StandardAcl acl) {
+        try {
+            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
+            if (prevAcl != null) {
+                throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
+            }
+            if (!aclsByResource.add(acl)) {
+                aclsById.remove(id);
+                throw new RuntimeException("Unable to add the ACL with ID " + 
id +
+                    " to aclsByResource");
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Added ACL " + id + ": " + acl);
+            }
+        } catch (Throwable e) {
+            log.error("addAcl error", e);
+            throw e;
+        }
+    }
+
+    void removeAcl(Uuid id) {
+        try {
+            StandardAcl acl = aclsById.remove(id);
+            if (acl == null) {
+                throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+            }
+            if (!aclsByResource.remove(acl)) {
+                throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
+                    " from aclsByResource");
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Removed ACL " + id + ": " + acl);
+            }
+        } catch (Throwable e) {
+            log.error("removeAcl error", e);
+            throw e;
+        }
+    }
+
+    Set<String> superUsers() {
+        return superUsers;
+    }
+
+    AuthorizationResult defaultResult() {
+        return defaultResult;
+    }
+
+    int aclCount() {
+        return aclsById.size();
+    }
+
+    static class AuthorizationResultBuilder {
+        boolean foundDeny = false;
+        boolean foundAllow = false;
+    }
+
+    /**
+     * Authorize an action based on the current set of ACLs.
+     *
+     * In order to know whether to allow or deny the action, we need to 
examine the ACLs
+     * that apply to it. If any DENY ACLs match, the operation is denied, no 
matter how
+     * many ALLOW ACLs match. If neither ALLOW nor DENY ACLs match, we return 
the default
+     * result. In general it makes more sense to configure the default result 
to be
+     * DENY, but some people (and unit tests) configure it as ALLOW.
+     */
+    AuthorizationResult authorize(AuthorizableRequestContext requestContext,
+                                  Action action) {
+        // Superusers are authorized to do anything.
+        if (superUsers.contains(requestContext.principal().toString())) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", 
action=" + action +
+                    "): ALLOWED because " + 
requestContext.principal().toString() +
+                    " is a superuser");
+            }
+            return ALLOWED;
+        }
+
+        // This code relies on the ordering of StandardAcl within the 
NavigableMap.
+        // Entries are sorted by resource type first, then REVERSE resource 
name.
+        // Therefore, we can find all the applicable ACLs by starting at
+        // (resource_type, resource_name) and stepping forwards until we reach 
an ACL with
+        // a resource name which is not a prefix of the current one.
+        //
+        // For example, when trying to authorize a TOPIC resource named 
foobar, we would
+        // start at element 2, and continue on to 3 and 4 following map:
+        //
+        // 1. rs=TOPIC rn=gar pt=PREFIX
+        // 2. rs=TOPIC rn=foobar pt=PREFIX
+        // 3. rs=TOPIC rn=foob pt=LITERAL
+        // 4. rs=TOPIC rn=foo pt=PREFIX
+        // 5. rs=TOPIC rn=eeee pt=LITERAL
+        //
+        // Once we reached element 5, we would stop scanning.
+        AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
+        StandardAcl exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            action.resourcePattern().name(),
+            PatternType.UNKNOWN,
+            "",
+            "",
+            AclOperation.UNKNOWN,
+            AclPermissionType.UNKNOWN);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // In addition to ACLs for this specific resource name, there can also 
be wildcard
+        // ACLs that match any resource name. These are stored as type = 
LITERAL,
+        // name = "*". We search these next.
+        exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            WILDCARD,
+            LITERAL,
+            "",
+            "",
+            AclOperation.UNKNOWN,
+            AclPermissionType.UNKNOWN);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // If we found ALLOW ACLs, the action is allowed.
+        if (builder.foundAllow) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", 
action=" +
+                    action + "): ALLOWED");
+            }
+            return ALLOWED;
+        }
+
+        // If nothing matched, we return the default result.
+        if (log.isTraceEnabled()) {
+            log.trace("authorize(requestContext=" + requestContext + ", 
action=" +
+                action + "): returning default result " + defaultResult);
+        }
+        return defaultResult;
+    }
+
+    void checkSection(Action action,
+                      StandardAcl exemplar,
+                      AuthorizableRequestContext requestContext,
+                      AuthorizationResultBuilder builder) {
+        NavigableSet<StandardAcl> headSet = aclsByResource.tailSet(exemplar, 
true);
+        String resourceName = action.resourcePattern().name();
+        for (Iterator<StandardAcl> iterator = headSet.iterator();
+             iterator.hasNext(); ) {
+            StandardAcl acl = iterator.next();
+            if 
(!acl.resourceType().equals(action.resourcePattern().resourceType())) {
+                // We've stepped outside the section for the resource type we 
care about and
+                // should stop scanning.
+                break;
+            }
+            if (resourceName.startsWith(acl.resourceName())) {
+                if (acl.patternType() == LITERAL && 
!resourceName.equals(acl.resourceName())) {
+                    // This is a literal ACL whose name is a prefix of the 
resource name, but
+                    // which doesn't match it exactly. We should skip over 
this ACL, but keep
+                    // scanning in case there are any relevant PREFIX ACLs.
+                    continue;
+                }
+            } else if (!(acl.resourceName().equals(WILDCARD) && 
acl.patternType() == LITERAL)) {
+                // If the ACL resource name is NOT a prefix of the current 
resource name,
+                // and we're not dealing with the special case of a wildcard 
ACL, we've
+                // stepped outside of the section we care about and should 
stop scanning.
+                break;
+            }
+            AuthorizationResult result = findResult(action, requestContext, 
acl);
+            if (ALLOWED == result) {
+                builder.foundAllow = true;
+            } else if (DENIED == result) {
+                if (log.isTraceEnabled()) {
+                    log.trace("authorize(requestContext=" + requestContext + 
", action=" +
+                        action + "): DENIED because of " + acl);
+                }
+                builder.foundDeny = true;
+                return;
+            }
+        }
+    }
+
+    /**
+     * The set of operations which imply DESCRIBE permission, when used in an 
ALLOW acl.
+     */
+    private static final Set<AclOperation> IMPLIES_DESCRIBE = 
Collections.unmodifiableSet(

Review comment:
       Not sure if it makes a big difference, but we could use EnumSet for 
these.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.apache.kafka.metadata.authorizer.StandardAclRecordIterator;
+import org.apache.kafka.metadata.authorizer.StandardAclWithId;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import 
org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * The AclControlManager manages any ACLs that are stored in the 
__cluster_metadata topic.
+ * If the ACLs are stored externally (such as in ZooKeeper) then there will be 
nothing for
+ * this manager to do, and the authorizer field will always be Optional.empty.
+ *
+ * Because the Authorizer is being concurrently used by other threads, we need 
to be
+ * careful about snapshots. We don't want the Authorizer to act based on 
partial state
+ * during the loading process. Therefore, unlike most of the other managers,
+ * AclControlManager needs to receive callbacks when we start loading a 
snapshot and when
+ * we finish. The prepareForSnapshotLoad callback clears the authorizer field, 
preventing
+ * any changes from affecting the authorizer until completeSnapshotLoad is 
called.
+ * Note that the Authorizer's start() method will block until the first 
snapshot load has
+ * completed, which is another reason the prepare / complete callbacks are 
needed.
+ */
+public class AclControlManager {
+    private final TimelineHashMap<Uuid, StandardAcl> idToAcl;
+    private final TimelineHashSet<StandardAcl> existingAcls;
+    private final Optional<ClusterMetadataAuthorizer> authorizer;
+
+    AclControlManager(SnapshotRegistry snapshotRegistry,
+                      Optional<ClusterMetadataAuthorizer> authorizer) {
+        this.idToAcl = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.existingAcls = new TimelineHashSet<>(snapshotRegistry, 0);
+        this.authorizer = authorizer;
+    }
+
+    ControllerResult<List<AclCreateResult>> createAcls(List<AclBinding> acls) {
+        List<AclCreateResult> results = new ArrayList<>(acls.size());
+        List<ApiMessageAndVersion> records = new ArrayList<>(acls.size());
+        for (AclBinding acl : acls) {
+            try {
+                validateNewAcl(acl);

Review comment:
       These validations seem to make sense. Currently they are all implemented 
by `AclAuthorizer`, so I wanted to ask whether it makes sense to pull them up. 
Basically are there any cases where a custom authorizer might be depending on 
any of the cases this validation is excluding?

##########
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by 
StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+    /**
+     * The host or name string used in ACLs that match any host or name.
+     */
+    public static final String WILDCARD = "*";
+
+    /**
+     * The principal entry used in ACLs that match any principal.
+     */
+    public static final String WILDCARD_PRINCIPAL = "User:*";
+
+    /**
+     * The logger to use.
+     */
+    final Logger log;
+
+    /**
+     * The current AclMutator.
+     */
+    final AclMutator aclMutator;
+
+    /**
+     * A statically configured set of users that are authorized to do anything.
+     */
+    private final Set<String> superUsers;
+
+    /**
+     * The result to return if no ACLs match.
+     */
+    private final AuthorizationResult defaultResult;
+
+    /**
+     * Contains all of the current ACLs sorted by (resource type, pattern 
type, resource name).

Review comment:
       nit: need to update this since resource name is before pattern type now

##########
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by 
StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+    /**
+     * The host or name string used in ACLs that match any host or name.
+     */
+    public static final String WILDCARD = "*";
+
+    /**
+     * The principal entry used in ACLs that match any principal.
+     */
+    public static final String WILDCARD_PRINCIPAL = "User:*";
+
+    /**
+     * The logger to use.
+     */
+    final Logger log;
+
+    /**
+     * The current AclMutator.
+     */
+    final AclMutator aclMutator;
+
+    /**
+     * A statically configured set of users that are authorized to do anything.
+     */
+    private final Set<String> superUsers;
+
+    /**
+     * The result to return if no ACLs match.
+     */
+    private final AuthorizationResult defaultResult;
+
+    /**
+     * Contains all of the current ACLs sorted by (resource type, pattern 
type, resource name).
+     */
+    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
+
+    /**
+     * Contains all of the current ACLs indexed by UUID.
+     */
+    private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
+
+    private static Logger createLogger(int nodeId) {
+        return new LogContext("[StandardAuthorizer " + nodeId + "] 
").logger(StandardAuthorizerData.class);
+    }
+
+    static StandardAuthorizerData createEmpty() {
+        return new StandardAuthorizerData(createLogger(-1),
+            null,
+            Collections.emptySet(),
+            DENIED,
+            new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+    }
+
+    private StandardAuthorizerData(Logger log,
+                                   AclMutator aclMutator,
+                                   Set<String> superUsers,
+                                   AuthorizationResult defaultResult,
+                                   ConcurrentSkipListSet<StandardAcl> 
aclsByResource,
+                                   ConcurrentHashMap<Uuid, StandardAcl> 
aclsById) {
+        this.log = log;
+        this.aclMutator = aclMutator;
+        this.superUsers = superUsers;
+        this.defaultResult = defaultResult;
+        this.aclsByResource = aclsByResource;
+        this.aclsById = aclsById;
+    }
+
+    StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
+        return new StandardAuthorizerData(log,
+            newAclMutator,
+            superUsers,
+            defaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewConfig(int nodeId,
+                                             Set<String> newSuperUsers,
+                                             AuthorizationResult 
newDefaultResult) {
+        return new StandardAuthorizerData(
+            createLogger(nodeId),
+            aclMutator,
+            newSuperUsers,
+            newDefaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, 
StandardAcl>> aclEntries) {
+        StandardAuthorizerData newData = new StandardAuthorizerData(
+            log,
+            aclMutator,
+            superUsers,
+            defaultResult,
+            new ConcurrentSkipListSet<>(),
+            new ConcurrentHashMap<>());
+        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
+            newData.addAcl(entry.getKey(), entry.getValue());
+        }
+        log.info("Applied " + aclEntries.size() + "acl(s) from image.");
+        return newData;
+    }
+
+    void addAcl(Uuid id, StandardAcl acl) {
+        try {
+            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
+            if (prevAcl != null) {
+                throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
+            }
+            if (!aclsByResource.add(acl)) {
+                aclsById.remove(id);
+                throw new RuntimeException("Unable to add the ACL with ID " + 
id +
+                    " to aclsByResource");
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Added ACL " + id + ": " + acl);
+            }
+        } catch (Throwable e) {
+            log.error("addAcl error", e);
+            throw e;
+        }
+    }
+
+    void removeAcl(Uuid id) {
+        try {
+            StandardAcl acl = aclsById.remove(id);
+            if (acl == null) {
+                throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+            }
+            if (!aclsByResource.remove(acl)) {
+                throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
+                    " from aclsByResource");
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Removed ACL " + id + ": " + acl);
+            }
+        } catch (Throwable e) {
+            log.error("removeAcl error", e);
+            throw e;
+        }
+    }
+
+    Set<String> superUsers() {
+        return superUsers;
+    }
+
+    AuthorizationResult defaultResult() {
+        return defaultResult;
+    }
+
+    int aclCount() {
+        return aclsById.size();
+    }
+
+    static class AuthorizationResultBuilder {
+        boolean foundDeny = false;
+        boolean foundAllow = false;
+    }
+
+    /**
+     * Authorize an action based on the current set of ACLs.
+     *
+     * In order to know whether to allow or deny the action, we need to 
examine the ACLs
+     * that apply to it. If any DENY ACLs match, the operation is denied, no 
matter how
+     * many ALLOW ACLs match. If neither ALLOW nor DENY ACLs match, we return 
the default
+     * result. In general it makes more sense to configure the default result 
to be
+     * DENY, but some people (and unit tests) configure it as ALLOW.
+     */
+    AuthorizationResult authorize(AuthorizableRequestContext requestContext,
+                                  Action action) {
+        // Superusers are authorized to do anything.
+        if (superUsers.contains(requestContext.principal().toString())) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", 
action=" + action +
+                    "): ALLOWED because " + 
requestContext.principal().toString() +
+                    " is a superuser");
+            }
+            return ALLOWED;
+        }
+
+        // This code relies on the ordering of StandardAcl within the 
NavigableMap.
+        // Entries are sorted by resource type first, then REVERSE resource 
name.
+        // Therefore, we can find all the applicable ACLs by starting at
+        // (resource_type, resource_name) and stepping forwards until we reach 
an ACL with
+        // a resource name which is not a prefix of the current one.
+        //
+        // For example, when trying to authorize a TOPIC resource named 
foobar, we would
+        // start at element 2, and continue on to 3 and 4 following map:
+        //
+        // 1. rs=TOPIC rn=gar pt=PREFIX
+        // 2. rs=TOPIC rn=foobar pt=PREFIX
+        // 3. rs=TOPIC rn=foob pt=LITERAL
+        // 4. rs=TOPIC rn=foo pt=PREFIX
+        // 5. rs=TOPIC rn=eeee pt=LITERAL
+        //
+        // Once we reached element 5, we would stop scanning.
+        AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
+        StandardAcl exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            action.resourcePattern().name(),
+            PatternType.UNKNOWN,
+            "",
+            "",
+            AclOperation.UNKNOWN,
+            AclPermissionType.UNKNOWN);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // In addition to ACLs for this specific resource name, there can also 
be wildcard
+        // ACLs that match any resource name. These are stored as type = 
LITERAL,
+        // name = "*". We search these next.
+        exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            WILDCARD,
+            LITERAL,
+            "",
+            "",
+            AclOperation.UNKNOWN,
+            AclPermissionType.UNKNOWN);
+        checkSection(action, exemplar, requestContext, builder);
+        if (builder.foundDeny) return DENIED;
+
+        // If we found ALLOW ACLs, the action is allowed.
+        if (builder.foundAllow) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", 
action=" +
+                    action + "): ALLOWED");
+            }
+            return ALLOWED;
+        }
+
+        // If nothing matched, we return the default result.
+        if (log.isTraceEnabled()) {
+            log.trace("authorize(requestContext=" + requestContext + ", 
action=" +
+                action + "): returning default result " + defaultResult);
+        }
+        return defaultResult;
+    }
+
+    void checkSection(Action action,
+                      StandardAcl exemplar,
+                      AuthorizableRequestContext requestContext,
+                      AuthorizationResultBuilder builder) {
+        NavigableSet<StandardAcl> headSet = aclsByResource.tailSet(exemplar, 
true);

Review comment:
       Should we rename this to tailSet?

##########
File path: 
metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAclTest.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.kafka.metadata.authorizer.StandardAuthorizerData.WILDCARD;
+import static 
org.apache.kafka.metadata.authorizer.StandardAuthorizerData.WILDCARD_PRINCIPAL;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+
+@Timeout(value = 40)
+public class StandardAclTest {
+    public final static List<StandardAcl> TEST_ACLS = new ArrayList<>();
+
+    static {
+        TEST_ACLS.add(new StandardAcl(
+            ResourceType.CLUSTER,
+            Resource.CLUSTER_NAME,
+            PatternType.LITERAL,
+            WILDCARD_PRINCIPAL,
+            WILDCARD,
+            AclOperation.ALTER,
+            AclPermissionType.ALLOW));
+        TEST_ACLS.add(new StandardAcl(
+            ResourceType.TOPIC,
+            "foo_",
+            PatternType.PREFIXED,
+            WILDCARD_PRINCIPAL,
+            WILDCARD,
+            AclOperation.READ,
+            AclPermissionType.ALLOW));
+        TEST_ACLS.add(new StandardAcl(
+            ResourceType.GROUP,
+            "mygroup",
+            PatternType.LITERAL,
+            "User:foo",
+            WILDCARD,
+            AclOperation.READ,
+            AclPermissionType.DENY));
+        TEST_ACLS.add(new StandardAcl(
+            ResourceType.GROUP,
+            "mygroup",
+            PatternType.PREFIXED,
+            "User:foo",
+            WILDCARD,
+            AclOperation.READ,
+            AclPermissionType.DENY));
+        TEST_ACLS.add(new StandardAcl(
+            ResourceType.GROUP,
+            "foo",
+            PatternType.PREFIXED,
+            "User:foo",
+            WILDCARD,
+            AclOperation.READ,
+            AclPermissionType.DENY));
+    }
+
+    private static final int signum(int input) {

Review comment:
       nit: `final` is redundant for private static method

##########
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
+import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
+
+
+/**
+ * A class which encapsulates the configuration and the ACL data owned by 
StandardAuthorizer.
+ *
+ * The methods in this class support lockless concurrent access.
+ */
+public class StandardAuthorizerData {
+    /**
+     * The host or name string used in ACLs that match any host or name.
+     */
+    public static final String WILDCARD = "*";
+
+    /**
+     * The principal entry used in ACLs that match any principal.
+     */
+    public static final String WILDCARD_PRINCIPAL = "User:*";
+
+    /**
+     * The logger to use.
+     */
+    final Logger log;
+
+    /**
+     * The current AclMutator.
+     */
+    final AclMutator aclMutator;
+
+    /**
+     * A statically configured set of users that are authorized to do anything.
+     */
+    private final Set<String> superUsers;
+
+    /**
+     * The result to return if no ACLs match.
+     */
+    private final AuthorizationResult defaultResult;
+
+    /**
+     * Contains all of the current ACLs sorted by (resource type, pattern 
type, resource name).
+     */
+    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
+
+    /**
+     * Contains all of the current ACLs indexed by UUID.
+     */
+    private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;
+
+    private static Logger createLogger(int nodeId) {
+        return new LogContext("[StandardAuthorizer " + nodeId + "] 
").logger(StandardAuthorizerData.class);
+    }
+
+    static StandardAuthorizerData createEmpty() {
+        return new StandardAuthorizerData(createLogger(-1),
+            null,
+            Collections.emptySet(),
+            DENIED,
+            new ConcurrentSkipListSet<>(), new ConcurrentHashMap<>());
+    }
+
+    private StandardAuthorizerData(Logger log,
+                                   AclMutator aclMutator,
+                                   Set<String> superUsers,
+                                   AuthorizationResult defaultResult,
+                                   ConcurrentSkipListSet<StandardAcl> 
aclsByResource,
+                                   ConcurrentHashMap<Uuid, StandardAcl> 
aclsById) {
+        this.log = log;
+        this.aclMutator = aclMutator;
+        this.superUsers = superUsers;
+        this.defaultResult = defaultResult;
+        this.aclsByResource = aclsByResource;
+        this.aclsById = aclsById;
+    }
+
+    StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
+        return new StandardAuthorizerData(log,
+            newAclMutator,
+            superUsers,
+            defaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewConfig(int nodeId,
+                                             Set<String> newSuperUsers,
+                                             AuthorizationResult 
newDefaultResult) {
+        return new StandardAuthorizerData(
+            createLogger(nodeId),
+            aclMutator,
+            newSuperUsers,
+            newDefaultResult,
+            aclsByResource,
+            aclsById);
+    }
+
+    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, 
StandardAcl>> aclEntries) {
+        StandardAuthorizerData newData = new StandardAuthorizerData(
+            log,
+            aclMutator,
+            superUsers,
+            defaultResult,
+            new ConcurrentSkipListSet<>(),
+            new ConcurrentHashMap<>());
+        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
+            newData.addAcl(entry.getKey(), entry.getValue());
+        }
+        log.info("Applied " + aclEntries.size() + "acl(s) from image.");
+        return newData;
+    }
+
+    void addAcl(Uuid id, StandardAcl acl) {
+        try {
+            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
+            if (prevAcl != null) {
+                throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
+            }
+            if (!aclsByResource.add(acl)) {
+                aclsById.remove(id);
+                throw new RuntimeException("Unable to add the ACL with ID " + 
id +
+                    " to aclsByResource");
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Added ACL " + id + ": " + acl);
+            }
+        } catch (Throwable e) {
+            log.error("addAcl error", e);
+            throw e;
+        }
+    }
+
+    void removeAcl(Uuid id) {
+        try {
+            StandardAcl acl = aclsById.remove(id);
+            if (acl == null) {
+                throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+            }
+            if (!aclsByResource.remove(acl)) {
+                throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
+                    " from aclsByResource");
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Removed ACL " + id + ": " + acl);
+            }
+        } catch (Throwable e) {
+            log.error("removeAcl error", e);
+            throw e;
+        }
+    }
+
+    Set<String> superUsers() {
+        return superUsers;
+    }
+
+    AuthorizationResult defaultResult() {
+        return defaultResult;
+    }
+
+    int aclCount() {
+        return aclsById.size();
+    }
+
+    static class AuthorizationResultBuilder {
+        boolean foundDeny = false;
+        boolean foundAllow = false;
+    }
+
+    /**
+     * Authorize an action based on the current set of ACLs.
+     *
+     * In order to know whether to allow or deny the action, we need to 
examine the ACLs
+     * that apply to it. If any DENY ACLs match, the operation is denied, no 
matter how
+     * many ALLOW ACLs match. If neither ALLOW nor DENY ACLs match, we return 
the default
+     * result. In general it makes more sense to configure the default result 
to be
+     * DENY, but some people (and unit tests) configure it as ALLOW.
+     */
+    AuthorizationResult authorize(AuthorizableRequestContext requestContext,
+                                  Action action) {
+        // Superusers are authorized to do anything.
+        if (superUsers.contains(requestContext.principal().toString())) {
+            if (log.isTraceEnabled()) {
+                log.trace("authorize(requestContext=" + requestContext + ", 
action=" + action +
+                    "): ALLOWED because " + 
requestContext.principal().toString() +
+                    " is a superuser");
+            }
+            return ALLOWED;
+        }
+
+        // This code relies on the ordering of StandardAcl within the 
NavigableMap.
+        // Entries are sorted by resource type first, then REVERSE resource 
name.
+        // Therefore, we can find all the applicable ACLs by starting at
+        // (resource_type, resource_name) and stepping forwards until we reach 
an ACL with
+        // a resource name which is not a prefix of the current one.
+        //
+        // For example, when trying to authorize a TOPIC resource named 
foobar, we would
+        // start at element 2, and continue on to 3 and 4 following map:
+        //
+        // 1. rs=TOPIC rn=gar pt=PREFIX
+        // 2. rs=TOPIC rn=foobar pt=PREFIX
+        // 3. rs=TOPIC rn=foob pt=LITERAL
+        // 4. rs=TOPIC rn=foo pt=PREFIX
+        // 5. rs=TOPIC rn=eeee pt=LITERAL
+        //
+        // Once we reached element 5, we would stop scanning.
+        AuthorizationResultBuilder builder = new AuthorizationResultBuilder();
+        StandardAcl exemplar = new StandardAcl(
+            action.resourcePattern().resourceType(),
+            action.resourcePattern().name(),
+            PatternType.UNKNOWN,

Review comment:
       Maybe worth mentioning somewhere in here that UNKNOWN pattern type is 
ordered before others since the initial search depends on that?

##########
File path: 
metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.authorizer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+
+/**
+ * An interface for Authorizers which store state in the __cluster_metadata 
log.
+ *
+ * These methods must all be thread-safe.
+ */
+public interface ClusterMetadataAuthorizer extends Authorizer {
+    /**
+     * Set the mutator object which should be used for creating and deleting 
ACLs.
+     */
+    void setAclMutator(AclMutator aclMutator);
+
+    /**
+     * Get the mutator object which should be used for creating and deleting 
ACLs.
+     *
+     * @throws org.apache.kafka.common.errors.NotControllerException
+     *              If the aclMutator was not set.
+     */
+    AclMutator aclMutatorOrException();
+
+    /**
+     * Load the ACLs in the given map. Anything not in the map will be removed.
+     * The authorizer will also wait for this initial snapshot load to 
complete when
+     * coming up.
+     */
+    void loadSnapshot(Map<Uuid, StandardAcl> acls);
+
+    /**
+     * Add a new ACL. Any ACL with the same ID will be replaced.
+     */
+    void addAcl(Uuid id, StandardAcl acl);
+
+    /**
+     * Remove the ACL with the given ID.
+     */
+    void removeAcl(Uuid id);
+
+    default List<? extends CompletionStage<AclCreateResult>> createAcls(

Review comment:
       Just to be explicit, are you saying that 
`ClusterMetadataAuthorizer.addAcl` will be applied before the future returns, 
or just that the record will be committed. I guess the latter is really all we 
care about since the change must get propagated to the whole cluster in any 
case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to