This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new 4102073 MINOR: Close timing window in SimpleAclAuthorizer startup (#5318) 4102073 is described below commit 4102073add48787e7caf6a2afdc3e6278014c243 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Mon Jul 2 22:11:05 2018 +0100 MINOR: Close timing window in SimpleAclAuthorizer startup (#5318) ZooKeeper listener for change notifications should be created before loading the ACL cache to avoid timing window if acls are modified when broker is starting up. Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@confluent.io> --- .../kafka/security/auth/SimpleAclAuthorizer.scala | 11 +++++-- .../security/auth/SimpleAclAuthorizerTest.scala | 37 +++++++++++++++++++++- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index c439f5e..588cbdc 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -95,10 +95,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging { zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer") zkClient.createAclPaths() + // Start change listeners first and then populate the cache so that there is no timing window + // between loading cache and processing change notifications. + startZkChangeListeners() loadCache() - - aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, AclChangedNotificationHandler) - aclChangeListener.init() } override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { @@ -223,6 +223,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } } + private[auth] def startZkChangeListeners(): Unit = { + aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, AclChangedNotificationHandler) + aclChangeListener.init() + } + private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) { def logMessage: String = { val authResult = if (authorized) "Allowed" else "Denied" diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 1e18f1d..ac68f70 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -18,11 +18,12 @@ package kafka.security.auth import java.net.InetAddress import java.util.UUID +import java.util.concurrent.{Executors, Semaphore, TimeUnit} import kafka.network.RequestChannel.Session import kafka.security.auth.Acl.WildCardHost import kafka.server.KafkaConfig -import kafka.utils.TestUtils +import kafka.utils.{CoreUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.Assert._ @@ -270,6 +271,40 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { } } + /** + * Verify that there is no timing window between loading ACL cache and setting + * up ZK change listener. Cache must be loaded before creating change listener + * in the authorizer to avoid the timing window. + */ + @Test + def testChangeListenerTiming() { + val configureSemaphore = new Semaphore(0) + val listenerSemaphore = new Semaphore(0) + val executor = Executors.newSingleThreadExecutor + val simpleAclAuthorizer3 = new SimpleAclAuthorizer { + override private[auth] def startZkChangeListeners(): Unit = { + configureSemaphore.release() + listenerSemaphore.acquireUninterruptibly() + super.startZkChangeListeners() + } + } + try { + val future = executor.submit(CoreUtils.runnable(simpleAclAuthorizer3.configure(config.originals))) + configureSemaphore.acquire() + val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + val acls = Set(new Acl(user1, Deny, "host-1", Read)) + simpleAclAuthorizer.addAcls(acls, resource) + + listenerSemaphore.release() + future.get(10, TimeUnit.SECONDS) + + assertEquals(acls, simpleAclAuthorizer3.getAcls(resource)) + } finally { + simpleAclAuthorizer3.close() + executor.shutdownNow() + } + } + @Test def testLocalConcurrentModificationOfResourceAcls() { val commonResource = new Resource(Topic, "test")