akhileshchg commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971088491


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
     private final DefaultRule defaultRule;
 
     /**
-     * Contains all of the current ACLs sorted by (resource type, resource 
name).
+     * An immutable array of all the current ACLs sorted by (resource type, 
resource name).
      */
-    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;

Review Comment:
   We're storing `Id` with the `StandardAcl`. Shouldn't that make it unique? I 
think since it is sorted, we can maybe have a conservative check to make sure 
there are no duplicate ids. 



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
     }
 
     @Override
-    public void addAcl(Uuid id, StandardAcl acl) {
-        data.addAcl(id, acl);
-    }
-
-    @Override
-    public void removeAcl(Uuid id) {
-        data.removeAcl(id);
+    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+        data = data.copyWithAllNewAcls(acls.entrySet());
     }
 
     @Override
-    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
-        data = data.copyWithNewAcls(acls.entrySet());
+    public synchronized void applyAclChanges(
+        Collection<Map.Entry<Uuid, StandardAcl>> newAcls,

Review Comment:
   I don't think `StandardAcl` has id with it. We have a different data 
structure for it`StandardAclWithId`



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -138,6 +139,15 @@ public List<AuthorizationResult> authorize(
         return results;
     }

Review Comment:
   In `authorize` function we still do `StandardAuthorizerData curData = 
data;`. I don't think we need to do this anymore.



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, 
StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-                    " to aclsByResource");
-            }
-            log.trace("Added ACL {}: {}", id, acl);
-        } catch (Throwable e) {
-            log.error("addAcl error", e);
-            throw e;
-        }
+    StandardAuthorizerData copyWithAclChanges(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        return copyWithNewAcls(acls, newAclEntries, removedAclIds);
     }
 
-    void removeAcl(Uuid id) {
-        try {
-            StandardAcl acl = aclsById.remove(id);
-            if (acl == null) {
-                throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+    StandardAuthorizerData copyWithNewAcls(
+        StandardAclWithId[] existingAcls,
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        StandardAclWithId[] newAcls = new StandardAclWithId[
+                existingAcls.length + newAclEntries.size() - 
removedAclIds.size()];
+        int numRemoved = 0, j = 0;
+        for (int i = 0; i < existingAcls.length; i++) {
+            StandardAclWithId aclWithId = existingAcls[i];
+            if (removedAclIds.contains(aclWithId.id())) {
+                numRemoved++;
+            } else {
+                newAcls[j++] = aclWithId;
             }
-            if (!aclsByResource.remove(acl)) {
-                throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
-                    " from aclsByResource");
+        }
+        if (numRemoved < removedAclIds.size()) {
+            throw new RuntimeException("Only located " + numRemoved + " out of 
" +
+                removedAclIds.size() + " removed ACL ID(s). removedAclIds = " +
+                removedAclIds.stream().map(a -> 
a.toString()).collect(Collectors.joining(", ")));
+        }
+        if (!newAclEntries.isEmpty()) {
+            int i = 0;
+            for (Entry<Uuid, StandardAcl> entry : newAclEntries) {
+                newAcls[existingAcls.length + i] = new 
StandardAclWithId(entry.getKey(), entry.getValue());
+                i++;
             }

Review Comment:
   Compared to the previous code, we're missing the check for duplicate ids. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to