This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.2 by this push:
     new ada1ab8  MINOR: Close timing window in SimpleAclAuthorizer startup 
(#5318)
ada1ab8 is described below

commit ada1ab83551c7ff73945385820702f3ab9b24390
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Mon Jul 2 22:11:05 2018 +0100

    MINOR: Close timing window in SimpleAclAuthorizer startup (#5318)
    
    ZooKeeper listener for change notifications should be created before 
loading the ACL cache to avoid timing window if acls are modified when broker 
is starting up.
    
    Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@confluent.io>
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 12 +++++--
 .../security/auth/SimpleAclAuthorizerTest.scala    | 39 ++++++++++++++++++++--
 2 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 28f910a..751894a 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -112,11 +112,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging 
{
                       JaasUtils.isZkSecurityEnabled())
     zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
 
+    // Start change listeners first and then populate the cache so that there 
is no timing window
+    // between loading cache and processing change notifications.
+    startZkChangeListeners()
     loadCache()
 
-    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
-    aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, 
SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, 
AclChangedNotificationHandler)
-    aclChangeListener.init()
   }
 
   override def authorize(session: Session, operation: Operation, resource: 
Resource): Boolean = {
@@ -245,6 +245,12 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     SimpleAclAuthorizer.AclZkPath + "/" + resource.resourceType + "/" + 
resource.name
   }
 
+  private[auth] def startZkChangeListeners(): Unit = {
+    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
+    aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, 
SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, 
AclChangedNotificationHandler)
+    aclChangeListener.init()
+  }
+
   private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, 
operation: Operation, resource: Resource, host: String) {
     val permissionType = if (authorized) "Allowed" else "Denied"
     authorizerLogger.debug(s"Principal = $principal is $permissionType 
Operation = $operation from host = $host on resource = $resource")
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 0765992..d862bc0 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -17,12 +17,13 @@
 package kafka.security.auth
 
 import java.net.InetAddress
-import java.util.{UUID}
+import java.util.UUID
+import java.util.concurrent.{Executors, Semaphore, TimeUnit}
 
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Acl.WildCardHost
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert._
@@ -261,6 +262,40 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness 
{
     assertEquals(acls1, authorizer.getAcls(resource1))
   }
 
+  /**
+   * Verify that there is no timing window between loading ACL cache and 
setting
+   * up ZK change listener. Cache must be loaded before creating change 
listener
+   * in the authorizer to avoid the timing window.
+   */
+  @Test
+  def testChangeListenerTiming() {
+    val configureSemaphore = new Semaphore(0)
+    val listenerSemaphore = new Semaphore(0)
+    val executor = Executors.newSingleThreadExecutor
+    val simpleAclAuthorizer3 = new SimpleAclAuthorizer {
+      override private[auth] def startZkChangeListeners(): Unit = {
+        configureSemaphore.release()
+        listenerSemaphore.acquireUninterruptibly()
+        super.startZkChangeListeners()
+      }
+    }
+    try {
+      val future = 
executor.submit(CoreUtils.runnable(simpleAclAuthorizer3.configure(config.originals)))
+      configureSemaphore.acquire()
+      val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+      val acls = Set(new Acl(user1, Deny, "host-1", Read))
+      simpleAclAuthorizer.addAcls(acls, resource)
+
+      listenerSemaphore.release()
+      future.get(10, TimeUnit.SECONDS)
+
+      assertEquals(acls, simpleAclAuthorizer3.getAcls(resource))
+    } finally {
+      simpleAclAuthorizer3.close()
+      executor.shutdownNow()
+    }
+  }
+
   @Test
   def testLocalConcurrentModificationOfResourceAcls() {
     val commonResource = new Resource(Topic, "test")

Reply via email to