This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 4102073  MINOR: Close timing window in SimpleAclAuthorizer startup 
(#5318)
4102073 is described below

commit 4102073add48787e7caf6a2afdc3e6278014c243
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Mon Jul 2 22:11:05 2018 +0100

    MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
    
    ZooKeeper listener for change notifications should be created before 
loading the ACL cache to avoid timing window if acls are modified when broker 
is starting up.
    
    Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@confluent.io>
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 11 +++++--
 .../security/auth/SimpleAclAuthorizerTest.scala    | 37 +++++++++++++++++++++-
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index c439f5e..588cbdc 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -95,10 +95,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer")
     zkClient.createAclPaths()
 
+    // Start change listeners first and then populate the cache so that there 
is no timing window
+    // between loading cache and processing change notifications.
+    startZkChangeListeners()
     loadCache()
-
-    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, 
AclChangeNotificationZNode.path, 
AclChangeNotificationSequenceZNode.SequenceNumberPrefix, 
AclChangedNotificationHandler)
-    aclChangeListener.init()
   }
 
   override def authorize(session: Session, operation: Operation, resource: 
Resource): Boolean = {
@@ -223,6 +223,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     }
   }
 
+  private[auth] def startZkChangeListeners(): Unit = {
+    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, 
AclChangeNotificationZNode.path, 
AclChangeNotificationSequenceZNode.SequenceNumberPrefix, 
AclChangedNotificationHandler)
+    aclChangeListener.init()
+  }
+
   private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, 
operation: Operation, resource: Resource, host: String) {
     def logMessage: String = {
       val authResult = if (authorized) "Allowed" else "Denied"
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 1e18f1d..ac68f70 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -18,11 +18,12 @@ package kafka.security.auth
 
 import java.net.InetAddress
 import java.util.UUID
+import java.util.concurrent.{Executors, Semaphore, TimeUnit}
 
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Acl.WildCardHost
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert._
@@ -270,6 +271,40 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness 
{
     }
   }
 
+  /**
+   * Verify that there is no timing window between loading ACL cache and 
setting
+   * up ZK change listener. Cache must be loaded before creating change 
listener
+   * in the authorizer to avoid the timing window.
+   */
+  @Test
+  def testChangeListenerTiming() {
+    val configureSemaphore = new Semaphore(0)
+    val listenerSemaphore = new Semaphore(0)
+    val executor = Executors.newSingleThreadExecutor
+    val simpleAclAuthorizer3 = new SimpleAclAuthorizer {
+      override private[auth] def startZkChangeListeners(): Unit = {
+        configureSemaphore.release()
+        listenerSemaphore.acquireUninterruptibly()
+        super.startZkChangeListeners()
+      }
+    }
+    try {
+      val future = 
executor.submit(CoreUtils.runnable(simpleAclAuthorizer3.configure(config.originals)))
+      configureSemaphore.acquire()
+      val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+      val acls = Set(new Acl(user1, Deny, "host-1", Read))
+      simpleAclAuthorizer.addAcls(acls, resource)
+
+      listenerSemaphore.release()
+      future.get(10, TimeUnit.SECONDS)
+
+      assertEquals(acls, simpleAclAuthorizer3.getAcls(resource))
+    } finally {
+      simpleAclAuthorizer3.close()
+      executor.shutdownNow()
+    }
+  }
+
   @Test
   def testLocalConcurrentModificationOfResourceAcls() {
     val commonResource = new Resource(Topic, "test")

Reply via email to