This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 0.10.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.10.2 by this push: new ada1ab8 MINOR: Close timing window in SimpleAclAuthorizer startup (#5318) ada1ab8 is described below commit ada1ab83551c7ff73945385820702f3ab9b24390 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Mon Jul 2 22:11:05 2018 +0100 MINOR: Close timing window in SimpleAclAuthorizer startup (#5318) ZooKeeper listener for change notifications should be created before loading the ACL cache to avoid timing window if acls are modified when broker is starting up. Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@confluent.io> --- .../kafka/security/auth/SimpleAclAuthorizer.scala | 12 +++++-- .../security/auth/SimpleAclAuthorizerTest.scala | 39 ++++++++++++++++++++-- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 28f910a..751894a 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -112,11 +112,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging { JaasUtils.isZkSecurityEnabled()) zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath) + // Start change listeners first and then populate the cache so that there is no timing window + // between loading cache and processing change notifications. + startZkChangeListeners() loadCache() - zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath) - aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler) - aclChangeListener.init() } override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { @@ -245,6 +245,12 @@ class SimpleAclAuthorizer extends Authorizer with Logging { SimpleAclAuthorizer.AclZkPath + "/" + resource.resourceType + "/" + resource.name } + private[auth] def startZkChangeListeners(): Unit = { + zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath) + aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler) + aclChangeListener.init() + } + private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) { val permissionType = if (authorized) "Allowed" else "Denied" authorizerLogger.debug(s"Principal = $principal is $permissionType Operation = $operation from host = $host on resource = $resource") diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 0765992..d862bc0 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -17,12 +17,13 @@ package kafka.security.auth import java.net.InetAddress -import java.util.{UUID} +import java.util.UUID +import java.util.concurrent.{Executors, Semaphore, TimeUnit} import kafka.network.RequestChannel.Session import kafka.security.auth.Acl.WildCardHost import kafka.server.KafkaConfig -import kafka.utils.TestUtils +import kafka.utils.{CoreUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.Assert._ @@ -261,6 +262,40 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { assertEquals(acls1, authorizer.getAcls(resource1)) } + /** + * Verify that there is no timing window between loading ACL cache and setting + * up ZK change listener. Cache must be loaded before creating change listener + * in the authorizer to avoid the timing window. + */ + @Test + def testChangeListenerTiming() { + val configureSemaphore = new Semaphore(0) + val listenerSemaphore = new Semaphore(0) + val executor = Executors.newSingleThreadExecutor + val simpleAclAuthorizer3 = new SimpleAclAuthorizer { + override private[auth] def startZkChangeListeners(): Unit = { + configureSemaphore.release() + listenerSemaphore.acquireUninterruptibly() + super.startZkChangeListeners() + } + } + try { + val future = executor.submit(CoreUtils.runnable(simpleAclAuthorizer3.configure(config.originals))) + configureSemaphore.acquire() + val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + val acls = Set(new Acl(user1, Deny, "host-1", Read)) + simpleAclAuthorizer.addAcls(acls, resource) + + listenerSemaphore.release() + future.get(10, TimeUnit.SECONDS) + + assertEquals(acls, simpleAclAuthorizer3.getAcls(resource)) + } finally { + simpleAclAuthorizer3.close() + executor.shutdownNow() + } + } + @Test def testLocalConcurrentModificationOfResourceAcls() { val commonResource = new Resource(Topic, "test")