This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 164aded  KAFKA-7007:  Use JSON for /kafka-acl-extended-changes path 
(#5161)
164aded is described below

commit 164aded6300ce587a1a08e0939628ea2846a0698
Author: Andy Coates <8012398+big-andy-coa...@users.noreply.github.com>
AuthorDate: Tue Jun 12 23:07:10 2018 +0100

    KAFKA-7007:  Use JSON for /kafka-acl-extended-changes path (#5161)
    
    Keep Literal ACLs on the old paths, using the old formats, to maintain 
backwards compatibility.
    Have Prefixed, and any latter types, go on new paths, using JSON, (old 
brokers are not aware of them)
    Add checks to reject any adminClient requests to add prefixed acls before 
the cluster is fully upgraded.
    
    Colin Patrick McCabe <co...@cmccabe.xyz>, Jun Rao <jun...@gmail.com>
---
 .../kafka/common/resource/ResourceNameType.java    |  13 ++
 .../common/ZkNodeChangeNotificationListener.scala  |  21 +-
 .../main/scala/kafka/security/auth/Resource.scala  |  25 +--
 .../kafka/security/auth/SimpleAclAuthorizer.scala  |  41 ++--
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  31 +--
 core/src/main/scala/kafka/zk/ZkData.scala          | 211 +++++++++++++++++----
 .../scala/kafka/security/auth/ResourceTest.scala   |  15 +-
 .../test/scala/kafka/zk/ExtendedAclStoreTest.scala |  67 +++++++
 .../test/scala/kafka/zk/LiteralAclStoreTest.scala  |  62 ++++++
 .../ZkNodeChangeNotificationListenerTest.scala     |  74 ++++++--
 .../security/auth/SimpleAclAuthorizerTest.scala    |  97 +++++++++-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  24 ++-
 docs/upgrade.html                                  |   2 +-
 13 files changed, 551 insertions(+), 132 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java 
b/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java
index 7aa7217..0e4fc0f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java
@@ -62,6 +62,12 @@ public enum ResourceNameType {
                 .collect(Collectors.toMap(ResourceNameType::code, 
Function.identity()))
         );
 
+    private final static Map<String, ResourceNameType> NAME_TO_VALUE =
+        Collections.unmodifiableMap(
+            Arrays.stream(ResourceNameType.values())
+                .collect(Collectors.toMap(ResourceNameType::name, 
Function.identity()))
+        );
+
     private final byte code;
 
     ResourceNameType(byte code) {
@@ -88,4 +94,11 @@ public enum ResourceNameType {
     public static ResourceNameType fromCode(byte code) {
         return CODE_TO_VALUE.getOrDefault(code, UNKNOWN);
     }
+
+    /**
+     * Return the ResourceNameType with the provided name or {@link #UNKNOWN} 
if one cannot be found.
+     */
+    public static ResourceNameType fromString(String name) {
+        return NAME_TO_VALUE.getOrDefault(name, UNKNOWN);
+    }
 }
diff --git 
a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala 
b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 5179851..8ec7f95 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -24,6 +24,8 @@ import kafka.zk.{KafkaZkClient, StateChangeHandlers}
 import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler}
 import org.apache.kafka.common.utils.Time
 
+import scala.util.{Failure, Try}
+
 /**
  * Handle the notificationMessage.
  */
@@ -83,12 +85,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: 
KafkaZkClient,
         for (notification <- notifications) {
           val changeId = changeNumber(notification)
           if (changeId > lastExecutedChange) {
-            val changeZnode = seqNodeRoot + "/" + notification
-            val (data, _) = zkClient.getDataAndStat(changeZnode)
-            data match {
-              case Some(d) => notificationHandler.processNotification(d)
-              case None => warn(s"read null data from $changeZnode when 
processing notification $notification")
-            }
+            processNotification(notification)
             lastExecutedChange = changeId
           }
         }
@@ -100,6 +97,18 @@ class ZkNodeChangeNotificationListener(private val 
zkClient: KafkaZkClient,
     }
   }
 
+  private def processNotification(notification: String): Unit = {
+    val changeZnode = seqNodeRoot + "/" + notification
+    val (data, _) = zkClient.getDataAndStat(changeZnode)
+    data match {
+      case Some(d) => Try(notificationHandler.processNotification(d)) match {
+        case Failure(e) => error(s"error processing change notification from 
$changeZnode", e)
+        case _ =>
+      }
+      case None => warn(s"read null data from $changeZnode")
+    }
+  }
+
   private def addChangeNotification(): Unit = {
     if (!isClosed.get && queue.peek() == null)
       queue.put(new ChangeNotification)
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala 
b/core/src/main/scala/kafka/security/auth/Resource.scala
index 303c642..78f0483 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -16,6 +16,7 @@
  */
 package kafka.security.auth
 
+import kafka.common.KafkaException
 import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern}
 
 object Resource {
@@ -26,16 +27,18 @@ object Resource {
   val WildCardResource = "*"
 
   def fromString(str: String): Resource = {
-    ResourceNameType.values.find(nameType => str.startsWith(nameType.name)) 
match {
-      case Some(nameType) =>
-        str.split(Separator, 3) match {
-          case Array(_, resourceType, name, _*) => new 
Resource(ResourceType.fromString(resourceType), name, nameType)
-          case _ => throw new IllegalArgumentException("expected a string in 
format ResourceType:ResourceName but got " + str)
-        }
-      case _ =>
-        str.split(Separator, 2) match {
-          case Array(resourceType, name, _*) => new 
Resource(ResourceType.fromString(resourceType), name, ResourceNameType.LITERAL)
-          case _ => throw new IllegalArgumentException("expected a string in 
format ResourceType:ResourceName but got " + str)
+    ResourceType.values.find(resourceType => str.startsWith(resourceType.name 
+ Separator)) match {
+      case None => throw new KafkaException("Invalid resource string: '" + str 
+ "'")
+      case Some(resourceType) =>
+        val remaining = str.substring(resourceType.name.length + 1)
+
+        ResourceNameType.values.find(nameType => 
remaining.startsWith(nameType.name + Separator)) match {
+          case Some(nameType) =>
+            val name = remaining.substring(nameType.name.length + 1)
+            Resource(resourceType, name, nameType)
+
+          case None =>
+            Resource(resourceType, remaining, ResourceNameType.LITERAL)
         }
     }
   }
@@ -74,7 +77,7 @@ case class Resource(resourceType: ResourceType, name: String, 
nameType: Resource
   }
 
   override def toString: String = {
-    nameType + Resource.Separator + resourceType.name + Resource.Separator + 
name
+    resourceType.name + Resource.Separator + nameType + Resource.Separator + 
name
   }
 }
 
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 601b5be..cecad0e 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -20,13 +20,14 @@ import java.util
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import com.typesafe.scalalogging.Logger
-import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
+import kafka.api.KAFKA_2_0_IV1
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
-import kafka.zk.{AclChangeNotificationSequenceZNode, KafkaZkClient, ZkAclStore}
+import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription, 
KafkaZkClient, ZkAclChangeStore, ZkAclStore}
+import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.resource.ResourceNameType
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
@@ -55,7 +56,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   private var superUsers = Set.empty[KafkaPrincipal]
   private var shouldAllowEveryoneIfNoAclIsFound = false
   private var zkClient: KafkaZkClient = _
-  private var aclChangeListeners: Seq[ZkNodeChangeNotificationListener] = 
List()
+  private var aclChangeListeners: Iterable[AclChangeSubscription] = 
Iterable.empty
+  private var extendedAclSupport: Boolean = _
 
   @volatile
   private var aclCache = new scala.collection.immutable.TreeMap[Resource, 
VersionedAcls]()(ResourceOrdering)
@@ -96,6 +98,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer")
     zkClient.createAclPaths()
 
+    extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= 
KAFKA_2_0_IV1
+
     loadCache()
 
     startZkChangeListeners()
@@ -161,6 +165,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   override def addAcls(acls: Set[Acl], resource: Resource) {
     if (acls != null && acls.nonEmpty) {
+      if (!extendedAclSupport && resource.nameType == 
ResourceNameType.PREFIXED) {
+        throw new UnsupportedVersionException(s"Adding ACLs on prefixed 
resource patterns requires " +
+          s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or 
greater")
+      }
+
       inWriteLock(lock) {
         updateResourceAcls(resource) { currentAcls =>
           currentAcls ++ acls
@@ -238,13 +247,14 @@ class SimpleAclAuthorizer extends Authorizer with Logging 
{
   private def loadCache() {
     inWriteLock(lock) {
       ZkAclStore.stores.foreach(store => {
-        val resourceTypes = zkClient.getResourceTypes(store.nameType)
+        val resourceTypes = zkClient.getResourceTypes(store.patternType)
         for (rType <- resourceTypes) {
           val resourceType = ResourceType.fromString(rType)
-          val resourceNames = zkClient.getResourceNames(store.nameType, 
resourceType)
+          val resourceNames = zkClient.getResourceNames(store.patternType, 
resourceType)
           for (resourceName <- resourceNames) {
-            val versionedAcls = getAclsFromZk(new Resource(resourceType, 
resourceName, store.nameType))
-            updateCache(new Resource(resourceType, resourceName, 
store.nameType), versionedAcls)
+            val resource = new Resource(resourceType, resourceName, 
store.patternType)
+            val versionedAcls = getAclsFromZk(resource)
+            updateCache(resource, versionedAcls)
           }
         }
       })
@@ -252,13 +262,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   private def startZkChangeListeners(): Unit = {
-    aclChangeListeners = ZkAclStore.stores.map(store => {
-      val aclChangeListener = new ZkNodeChangeNotificationListener(
-        zkClient, store.aclChangePath, 
AclChangeNotificationSequenceZNode.SequenceNumberPrefix, new 
AclChangedNotificationHandler(store))
-
-      aclChangeListener.init()
-      aclChangeListener
-    })
+    aclChangeListeners = ZkAclChangeStore.stores
+      .map(store => store.createListener(AclChangedNotificationHandler, 
zkClient))
   }
 
   private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, 
operation: Operation, resource: Resource, host: String) {
@@ -343,17 +348,15 @@ class SimpleAclAuthorizer extends Authorizer with Logging 
{
   }
 
   private def updateAclChangedFlag(resource: Resource) {
-    zkClient.createAclChangeNotification(resource)
+      zkClient.createAclChangeNotification(resource)
   }
 
   private def backoffTime = {
     retryBackoffMs + Random.nextInt(retryBackoffJitterMs)
   }
 
-  class AclChangedNotificationHandler(store: ZkAclStore) extends 
NotificationHandler {
-    override def processNotification(notificationMessage: Array[Byte]) {
-      val resource: Resource = store.decode(notificationMessage)
-
+  object AclChangedNotificationHandler extends AclChangeNotificationHandler {
+    override def processNotification(resource: Resource) {
       inWriteLock(lock) {
         val versionedAcls = getAclsFromZk(resource)
         updateCache(resource, versionedAcls)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 6ec8e30..ad55a6f 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -941,14 +941,15 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   //Acl management methods
 
   /**
-   * Creates the required zk nodes for Acl storage
+   * Creates the required zk nodes for Acl storage and Acl change storage.
    */
   def createAclPaths(): Unit = {
     ZkAclStore.stores.foreach(store => {
       createRecursive(store.aclPath, throwIfPathExists = false)
-      createRecursive(store.aclChangePath, throwIfPathExists = false)
       ResourceType.values.foreach(resourceType => 
createRecursive(store.path(resourceType), throwIfPathExists = false))
     })
+
+    ZkAclChangeStore.stores.foreach(store => 
createRecursive(store.aclChangePath, throwIfPathExists = false))
   }
 
   /**
@@ -1005,13 +1006,12 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   }
 
   /**
-   * Creates Acl change notification message
-   * @param resource resource name
+   * Creates an Acl change notification message.
+   * @param resource resource pattern that has changed
    */
   def createAclChangeNotification(resource: Resource): Unit = {
-    val store = ZkAclStore(resource.nameType)
-    val path = store.changeSequenceZNode.createPath
-    val createRequest = CreateRequest(path, 
AclChangeNotificationSequenceZNode.encode(resource), acls(path), 
CreateMode.PERSISTENT_SEQUENTIAL)
+    val aclChange = 
ZkAclStore(resource.nameType).changeStore.createChangeNode(resource)
+    val createRequest = CreateRequest(aclChange.path, aclChange.bytes, 
acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow
   }
@@ -1034,10 +1034,10 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @throws KeeperException if there is an error while deleting Acl change 
notifications
    */
   def deleteAclChangeNotifications(): Unit = {
-    ZkAclStore.stores.foreach(store => {
+    ZkAclChangeStore.stores.foreach(store => {
       val getChildrenResponse = 
retryRequestUntilConnected(GetChildrenRequest(store.aclChangePath))
       if (getChildrenResponse.resultCode == Code.OK) {
-        deleteAclChangeNotifications(store, getChildrenResponse.children)
+        deleteAclChangeNotifications(store.aclChangePath, 
getChildrenResponse.children)
       } else if (getChildrenResponse.resultCode != Code.NONODE) {
         getChildrenResponse.maybeThrow
       }
@@ -1045,13 +1045,14 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   }
 
   /**
-   * Deletes the Acl change notifications associated with the given sequence 
nodes
-   * @param sequenceNodes
-   */
-  private def deleteAclChangeNotifications(store: ZkAclStore, sequenceNodes: 
Seq[String]): Unit = {
-    val aclChangeNotificationSequenceZNode = store.changeSequenceZNode
+    * Deletes the Acl change notifications associated with the given sequence 
nodes
+    *
+    * @param aclChangePath the root path
+    * @param sequenceNodes the name of the node to delete.
+    */
+  private def deleteAclChangeNotifications(aclChangePath: String, 
sequenceNodes: Seq[String]): Unit = {
     val deleteRequests = sequenceNodes.map { sequenceNode =>
-      
DeleteRequest(aclChangeNotificationSequenceZNode.deletePath(sequenceNode), 
ZkVersion.NoVersion)
+      DeleteRequest(s"$aclChangePath/$sequenceNode", ZkVersion.NoVersion)
     }
 
     val deleteResponses = retryRequestsUntilConnected(deleteRequests)
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala 
b/core/src/main/scala/kafka/zk/ZkData.scala
index d4470ab..2cbdd80 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -23,13 +23,15 @@ import com.fasterxml.jackson.annotation.JsonProperty
 import com.fasterxml.jackson.core.JsonProcessingException
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
-import kafka.common.KafkaException
+import kafka.common.{KafkaException, NotificationHandler, 
ZkNodeChangeNotificationListener}
 import kafka.controller.{IsrChangeNotificationHandler, 
LeaderIsrAndControllerEpoch}
+import kafka.security.auth.Resource.Separator
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.{ConfigType, DelegationTokenManager}
 import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.resource.ResourceNameType
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -42,6 +44,7 @@ import scala.beans.BeanProperty
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Seq, breakOut}
+import scala.util.{Failure, Success, Try}
 
 // This file contains objects for encoding/decoding data stored in ZooKeeper 
nodes (znodes).
 
@@ -446,75 +449,205 @@ object StateChangeHandlers {
 }
 
 /**
-  * Acls for resources are stored in ZK under a root node that is determined 
by the [[ResourceNameType]].
-  * Under each [[ResourceNameType]] node there will be one child node per 
resource type (Topic, Cluster, Group, etc).
-  * Under each resourceType there will be a unique child for each resource 
path and the data for that child will contain
+  * Acls for resources are stored in ZK under two root paths:
+  * <ul>
+  *   <li>[[org.apache.kafka.common.resource.ResourceNameType#LITERAL 
Literal]] patterns are stored under '/kafka-acl'.
+  *   The format is JSON. See [[kafka.zk.ResourceZNode]] for details.</li>
+  *   <li>All other patterns are stored under 
'/kafka-acl-extended/<i>pattern-type</i>'.
+  *   The format is JSON. See [[kafka.zk.ResourceZNode]] for details.</li>
+  * </ul>
+  *
+  * Under each root node there will be one child node per resource type 
(Topic, Cluster, Group, etc).
+  * Under each resourceType there will be a unique child for each resource 
pattern and the data for that child will contain
   * list of its acls as a json object. Following gives an example:
   *
   * <pre>
+  * // Literal patterns:
   * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", 
"permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
   * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { 
"host":"host1", "permissionType": "Allow","operation": "Read","principal": 
"User:alice"}]}
-  * /kafka-prefixed-acl/Group/group-1 => {"version": 1, "acls": [ { 
"host":"host1", "permissionType": "Allow","operation": "Read","principal": 
"User:alice"}]}
+  *
+  * // Prefixed patterns:
+  * /kafka-acl-extended/PREFIXED/Group/group-1 => {"version": 1, "acls": [ { 
"host":"host1", "permissionType": "Allow","operation": "Read","principal": 
"User:alice"}]}
   * </pre>
+  *
+  * Acl change events are also stored under two paths:
+  * <ul>
+  *   <li>[[org.apache.kafka.common.resource.ResourceNameType#LITERAL 
Literal]] patterns are stored under '/kafka-acl-changes'.
+  *   The format is a UTF8 string in the form: 
&lt;resource-type&gt;:&lt;resource-name&gt;</li>
+  *   <li>All other patterns are stored under '/kafka-acl-extended-changes'
+  *   The format is JSON, as defined by 
[[kafka.zk.ExtendedAclChangeEvent]]</li>
+  * </ul>
   */
-case class ZkAclStore(nameType: ResourceNameType) {
-  val aclPath: String = nameType match {
-    case ResourceNameType.LITERAL => "/kafka-acl"
-    case ResourceNameType.PREFIXED => "/kafka-prefixed-acl"
-    case _ => throw new IllegalArgumentException("Unknown name type:" + 
nameType)
+sealed trait ZkAclStore {
+  val patternType: ResourceNameType
+  val aclPath: String
+
+  def path(resourceType: ResourceType): String = s"$aclPath/$resourceType"
+
+  def path(resourceType: ResourceType, resourceName: String): String = 
s"$aclPath/$resourceType/$resourceName"
+
+  def changeStore: ZkAclChangeStore
+}
+
+object ZkAclStore {
+  private val storesByType: Map[ResourceNameType, ZkAclStore] = 
ResourceNameType.values
+    .filter(nameType => nameType != ResourceNameType.ANY && nameType != 
ResourceNameType.UNKNOWN)
+    .map(nameType => (nameType, create(nameType)))
+    .toMap
+
+  val stores: Iterable[ZkAclStore] = storesByType.values
+
+  val securePaths: Iterable[String] = stores
+    .flatMap(store => Set(store.aclPath, store.changeStore.aclChangePath))
+
+  def apply(patternType: ResourceNameType): ZkAclStore = {
+    storesByType.get(patternType) match {
+      case Some(store) => store
+      case None => throw new KafkaException(s"Invalid pattern type: 
$patternType")
+    }
   }
 
-  val aclChangePath: String = nameType match {
-    case ResourceNameType.LITERAL => "/kafka-acl-changes"
-    case ResourceNameType.PREFIXED => "/kafka-prefixed-acl-changes"
-    case _ => throw new IllegalArgumentException("Unknown name type:" + 
nameType)
+  private def create(patternType: ResourceNameType) = {
+    patternType match {
+      case ResourceNameType.LITERAL => LiteralAclStore
+      case _ => new ExtendedAclStore(patternType)
+    }
   }
+}
 
-  def path(resourceType: ResourceType) = s"$aclPath/$resourceType"
+object LiteralAclStore extends ZkAclStore {
+  val patternType: ResourceNameType = ResourceNameType.LITERAL
+  val aclPath: String = "/kafka-acl"
 
-  def path(resourceType: ResourceType, resourceName: String): String = 
s"$aclPath/$resourceType/$resourceName"
+  def changeStore: ZkAclChangeStore = LiteralAclChangeStore
+}
+
+class ExtendedAclStore(val patternType: ResourceNameType) extends ZkAclStore {
+  if (patternType == ResourceNameType.LITERAL)
+    throw new IllegalArgumentException("Literal pattern types are not 
supported")
 
-  def changeSequenceZNode: AclChangeNotificationSequenceZNode = 
AclChangeNotificationSequenceZNode(this)
+  val aclPath: String = s"/kafka-acl-extended/${patternType.name.toLowerCase}"
 
-  def decode(notificationMessage: Array[Byte]): Resource = 
AclChangeNotificationSequenceZNode.decode(nameType, notificationMessage)
+  def changeStore: ZkAclChangeStore = ExtendedAclChangeStore
 }
 
-object ZkAclStore {
-  val stores: Seq[ZkAclStore] = ResourceNameType.values
-    .filter(nameType => nameType != ResourceNameType.ANY && nameType != 
ResourceNameType.UNKNOWN)
-    .map(nameType => ZkAclStore(nameType))
+trait AclChangeNotificationHandler {
+  def processNotification(resource: Resource): Unit
+}
 
-  val securePaths: Seq[String] = stores
-    .flatMap(store => List(store.aclPath, store.aclChangePath))
+trait AclChangeSubscription extends AutoCloseable {
+  def close(): Unit
 }
 
-object ResourceZNode {
-  def path(resource: Resource): String = 
ZkAclStore(resource.nameType).path(resource.resourceType, resource.name)
+case class AclChangeNode(path: String, bytes: Array[Byte])
 
-  def encode(acls: Set[Acl]): Array[Byte] = 
Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
-  def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = 
VersionedAcls(Acl.fromBytes(bytes), stat.getVersion)
+sealed trait ZkAclChangeStore {
+  val aclChangePath: String
+  def createPath: String = 
s"$aclChangePath/${ZkAclChangeStore.SequenceNumberPrefix}"
+
+  def decode(bytes: Array[Byte]): Resource
+
+  protected def encode(resource: Resource): Array[Byte]
+
+  def createChangeNode(resource: Resource): AclChangeNode = 
AclChangeNode(createPath, encode(resource))
+
+  def createListener(handler: AclChangeNotificationHandler, zkClient: 
KafkaZkClient): AclChangeSubscription = {
+    val rawHandler: NotificationHandler = new NotificationHandler {
+      def processNotification(bytes: Array[Byte]): Unit =
+        handler.processNotification(decode(bytes))
+    }
+
+    val aclChangeListener = new ZkNodeChangeNotificationListener(
+      zkClient, aclChangePath, ZkAclChangeStore.SequenceNumberPrefix, 
rawHandler)
+
+    aclChangeListener.init()
+
+    new AclChangeSubscription {
+      def close(): Unit = aclChangeListener.close()
+    }
+  }
 }
 
-object AclChangeNotificationSequenceZNode {
-  val Separator = ":"
+object ZkAclChangeStore {
+  val stores: Iterable[ZkAclChangeStore] = List(LiteralAclChangeStore, 
ExtendedAclChangeStore)
+
   def SequenceNumberPrefix = "acl_changes_"
+}
+
+case object LiteralAclChangeStore extends ZkAclChangeStore {
+  val name = "LiteralAclChangeStore"
+  val aclChangePath: String = "/kafka-acl-changes"
 
   def encode(resource: Resource): Array[Byte] = {
-    (resource.resourceType.name + Separator + resource.name).getBytes(UTF_8)
+    if (resource.nameType != ResourceNameType.LITERAL)
+      throw new IllegalArgumentException("Only literal resource patterns can 
be encoded")
+
+    val legacyName = resource.resourceType + Resource.Separator + resource.name
+    legacyName.getBytes(UTF_8)
+  }
+
+  def decode(bytes: Array[Byte]): Resource = {
+    val string = new String(bytes, UTF_8)
+    string.split(Separator, 2) match {
+        case Array(resourceType, resourceName, _*) => new 
Resource(ResourceType.fromString(resourceType), resourceName, 
ResourceNameType.LITERAL)
+        case _ => throw new IllegalArgumentException("expected a string in 
format ResourceType:ResourceName but got " + string)
+      }
   }
+}
 
-  def decode(nameType: ResourceNameType, bytes: Array[Byte]): Resource = {
-    val str = new String(bytes, UTF_8)
-    str.split(Separator, 2) match {
-      case Array(resourceType, name, _*) => 
Resource(ResourceType.fromString(resourceType), name, nameType)
-      case _ => throw new IllegalArgumentException("expected a string in 
format ResourceType:ResourceName but got " + str)
+case object ExtendedAclChangeStore extends ZkAclChangeStore {
+  val name = "ExtendedAclChangeStore"
+  val aclChangePath: String = "/kafka-acl-extended-changes"
+
+  def encode(resource: Resource): Array[Byte] = {
+    if (resource.nameType == ResourceNameType.LITERAL)
+      throw new IllegalArgumentException("Literal pattern types are not 
supported")
+
+    Json.encodeAsBytes(ExtendedAclChangeEvent(
+      ExtendedAclChangeEvent.currentVersion,
+      resource.resourceType.name,
+      resource.name,
+      resource.nameType.name))
+  }
+
+  def decode(bytes: Array[Byte]): Resource = {
+    val changeEvent = Json.parseBytesAs[ExtendedAclChangeEvent](bytes) match {
+      case Right(event) => event
+      case Left(e) => throw new IllegalArgumentException("Failed to parse ACL 
change event", e)
+    }
+
+    changeEvent.toResource match {
+      case Success(r) => r
+      case Failure(e) => throw new IllegalArgumentException("Failed to convert 
ACL change event to resource", e)
     }
   }
 }
 
-case class AclChangeNotificationSequenceZNode(store: ZkAclStore) {
-  def createPath = 
s"${store.aclChangePath}/${AclChangeNotificationSequenceZNode.SequenceNumberPrefix}"
-  def deletePath(sequenceNode: String) = 
s"${store.aclChangePath}/$sequenceNode"
+object ResourceZNode {
+  def path(resource: Resource): String = 
ZkAclStore(resource.nameType).path(resource.resourceType, resource.name)
+
+  def encode(acls: Set[Acl]): Array[Byte] = 
Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
+  def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = 
VersionedAcls(Acl.fromBytes(bytes), stat.getVersion)
+}
+
+object ExtendedAclChangeEvent {
+  val currentVersion: Int = 1
+}
+
+case class ExtendedAclChangeEvent(@BeanProperty @JsonProperty("version") 
version: Int,
+                                  @BeanProperty @JsonProperty("resourceType") 
resourceType: String,
+                                  @BeanProperty @JsonProperty("name") name: 
String,
+                                  @BeanProperty 
@JsonProperty("resourceNameType") resourceNameType: String) {
+  if (version > ExtendedAclChangeEvent.currentVersion)
+    throw new UnsupportedVersionException(s"Acl change event received for 
unsupported version: $version")
+
+  def toResource: Try[Resource] = {
+    for {
+      resType <- Try(ResourceType.fromString(resourceType))
+      nameType <- Try(ResourceNameType.fromString(resourceNameType))
+      resource = Resource(resType, name, nameType)
+    } yield resource
+  }
 }
 
 object ClusterZNode {
diff --git a/core/src/test/scala/kafka/security/auth/ResourceTest.scala 
b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
index 2924cff..c7ed949 100644
--- a/core/src/test/scala/kafka/security/auth/ResourceTest.scala
+++ b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
@@ -24,10 +24,15 @@ import org.junit.Assert._
 
 class ResourceTest {
   @Test(expected = classOf[KafkaException])
-  def shouldThrowTwoPartStringWithUnknownResourceType(): Unit = {
+  def shouldThrowOnTwoPartStringWithUnknownResourceType(): Unit = {
     Resource.fromString("Unknown:fred")
   }
 
+  @Test(expected = classOf[KafkaException])
+  def shouldThrowOnBadResourceTypeSeparator(): Unit = {
+    Resource.fromString("Topic-fred")
+  }
+
   @Test
   def shouldParseOldTwoPartString(): Unit = {
     assertEquals(Resource(Group, "fred", LITERAL), 
Resource.fromString("Group:fred"))
@@ -41,14 +46,14 @@ class ResourceTest {
 
   @Test
   def shouldParseThreePartString(): Unit = {
-    assertEquals(Resource(Group, "fred", PREFIXED), 
Resource.fromString("PREFIXED:Group:fred"))
-    assertEquals(Resource(Topic, "t", LITERAL), 
Resource.fromString("LITERAL:Topic:t"))
+    assertEquals(Resource(Group, "fred", PREFIXED), 
Resource.fromString("Group:PREFIXED:fred"))
+    assertEquals(Resource(Topic, "t", LITERAL), 
Resource.fromString("Topic:LITERAL:t"))
   }
 
   @Test
   def shouldParseThreePartWithEmbeddedSeparators(): Unit = {
-    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), 
Resource.fromString("PREFIXED:Group::This:is:a:weird:group:name:"))
-    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), 
Resource.fromString("LITERAL:Group::This:is:a:weird:group:name:"))
+    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), 
Resource.fromString("Group:PREFIXED::This:is:a:weird:group:name:"))
+    assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), 
Resource.fromString("Group:LITERAL::This:is:a:weird:group:name:"))
   }
 
   @Test
diff --git a/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala 
b/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala
new file mode 100644
index 0000000..4e8580b
--- /dev/null
+++ b/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import kafka.security.auth.{Resource, Topic}
+import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class ExtendedAclStoreTest {
+  private val literalResource = Resource(Topic, "some-topic", LITERAL)
+  private val prefixedResource = Resource(Topic, "some-topic", PREFIXED)
+  private val store = new ExtendedAclStore(PREFIXED)
+
+  @Test
+  def shouldHaveCorrectPaths(): Unit = {
+    assertEquals("/kafka-acl-extended/prefixed", store.aclPath)
+    assertEquals("/kafka-acl-extended/prefixed/Topic", store.path(Topic))
+    assertEquals("/kafka-acl-extended-changes", 
store.changeStore.aclChangePath)
+  }
+
+  @Test
+  def shouldHaveCorrectPatternType(): Unit = {
+    assertEquals(PREFIXED, store.patternType)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldThrowIfConstructedWithLiteral(): Unit = {
+    new ExtendedAclStore(LITERAL)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldThrowFromEncodeOnLiteral(): Unit = {
+    store.changeStore.createChangeNode(literalResource)
+  }
+
+  @Test
+  def shouldWriteChangesToTheWritePath(): Unit = {
+    val changeNode = store.changeStore.createChangeNode(prefixedResource)
+
+    assertEquals("/kafka-acl-extended-changes/acl_changes_", changeNode.path)
+  }
+
+  @Test
+  def shouldRoundTripChangeNode(): Unit = {
+    val changeNode = store.changeStore.createChangeNode(prefixedResource)
+
+    val actual = store.changeStore.decode(changeNode.bytes)
+
+    assertEquals(prefixedResource, actual)
+  }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala 
b/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala
new file mode 100644
index 0000000..22d6f23
--- /dev/null
+++ b/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import kafka.security.auth.{Resource, Topic}
+import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class LiteralAclStoreTest {
+  private val literalResource = Resource(Topic, "some-topic", LITERAL)
+  private val prefixedResource = Resource(Topic, "some-topic", PREFIXED)
+  private val store = LiteralAclStore
+
+  @Test
+  def shouldHaveCorrectPaths(): Unit = {
+    assertEquals("/kafka-acl", store.aclPath)
+    assertEquals("/kafka-acl/Topic", store.path(Topic))
+    assertEquals("/kafka-acl-changes", store.changeStore.aclChangePath)
+  }
+
+  @Test
+  def shouldHaveCorrectPatternType(): Unit = {
+    assertEquals(LITERAL, store.patternType)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldThrowFromEncodeOnNoneLiteral(): Unit = {
+    store.changeStore.createChangeNode(prefixedResource)
+  }
+
+  @Test
+  def shouldWriteChangesToTheWritePath(): Unit = {
+    val changeNode = store.changeStore.createChangeNode(literalResource)
+
+    assertEquals("/kafka-acl-changes/acl_changes_", changeNode.path)
+  }
+
+  @Test
+  def shouldRoundTripChangeNode(): Unit = {
+    val changeNode = store.changeStore.createChangeNode(literalResource)
+
+    val actual = store.changeStore.decode(changeNode.bytes)
+
+    assertEquals(literalResource, actual)
+  }
+}
\ No newline at end of file
diff --git 
a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
 
b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 02918d6..58f0962 100644
--- 
a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -18,43 +18,44 @@ package kafka.common
 
 import kafka.security.auth.{Group, Resource}
 import kafka.utils.TestUtils
-import kafka.zk.{AclChangeNotificationSequenceZNode, ZkAclStore, 
ZooKeeperTestHarness}
+import kafka.zk.{LiteralAclChangeStore, LiteralAclStore, ZkAclChangeStore, 
ZooKeeperTestHarness}
 import org.apache.kafka.common.resource.ResourceNameType.LITERAL
-import org.junit.{After, Test}
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable.ArrayBuffer
 
 class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
 
-  var notificationListener: ZkNodeChangeNotificationListener = _
+  private val changeExpirationMs = 1000
+  private var notificationListener: ZkNodeChangeNotificationListener = _
+  private var notificationHandler: TestNotificationHandler = _
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+    zkClient.createAclPaths()
+    notificationHandler = new TestNotificationHandler()
+  }
 
   @After
   override def tearDown(): Unit = {
     if (notificationListener != null) {
       notificationListener.close()
     }
+    super.tearDown()
   }
 
   @Test
   def testProcessNotification() {
-    @volatile var notification: Resource = null
-    @volatile var invocationCount = 0
-    val notificationHandler = new NotificationHandler {
-      override def processNotification(notificationMessage: Array[Byte]): Unit 
= {
-        notification = AclChangeNotificationSequenceZNode.decode(LITERAL, 
notificationMessage)
-        invocationCount += 1
-      }
-    }
-
-    zkClient.createAclPaths()
     val notificationMessage1 = Resource(Group, "messageA", LITERAL)
     val notificationMessage2 = Resource(Group, "messageB", LITERAL)
-    val changeExpirationMs = 1000
 
-    notificationListener = new ZkNodeChangeNotificationListener(zkClient,  
ZkAclStore(LITERAL).aclChangePath,
-      AclChangeNotificationSequenceZNode.SequenceNumberPrefix, 
notificationHandler, changeExpirationMs)
+    notificationListener = new ZkNodeChangeNotificationListener(zkClient, 
LiteralAclChangeStore.aclChangePath,
+      ZkAclChangeStore.SequenceNumberPrefix, notificationHandler, 
changeExpirationMs)
     notificationListener.init()
 
     zkClient.createAclChangeNotification(notificationMessage1)
-    TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == 
notificationMessage1,
+    TestUtils.waitUntilTrue(() => notificationHandler.received().size == 1 && 
notificationHandler.received().last == notificationMessage1,
       "Failed to send/process notification message in the timeout period.")
 
     /*
@@ -66,12 +67,43 @@ class ZkNodeChangeNotificationListenerTest extends 
ZooKeeperTestHarness {
      */
 
     zkClient.createAclChangeNotification(notificationMessage2)
-    TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == 
notificationMessage2,
+    TestUtils.waitUntilTrue(() => notificationHandler.received().size == 2 && 
notificationHandler.received().last == notificationMessage2,
       "Failed to send/process notification message in the timeout period.")
 
     (3 to 10).foreach(i => 
zkClient.createAclChangeNotification(Resource(Group, "message" + i, LITERAL)))
 
-    TestUtils.waitUntilTrue(() => invocationCount == 10 ,
-      s"Expected 10 invocations of processNotifications, but there were 
$invocationCount")
+    TestUtils.waitUntilTrue(() => notificationHandler.received().size == 10,
+      s"Expected 10 invocations of processNotifications, but there were 
${notificationHandler.received()}")
+  }
+
+  @Test
+  def testSwallowsProcessorException() : Unit = {
+    notificationHandler.setThrowSize(2)
+    notificationListener = new ZkNodeChangeNotificationListener(zkClient, 
LiteralAclChangeStore.aclChangePath,
+      ZkAclChangeStore.SequenceNumberPrefix, notificationHandler, 
changeExpirationMs)
+    notificationListener.init()
+
+    zkClient.createAclChangeNotification(Resource(Group, "messageA", LITERAL))
+    zkClient.createAclChangeNotification(Resource(Group, "messageB", LITERAL))
+    zkClient.createAclChangeNotification(Resource(Group, "messageC", LITERAL))
+
+    TestUtils.waitUntilTrue(() => notificationHandler.received().size == 3,
+      s"Expected 2 invocations of processNotifications, but there were 
${notificationHandler.received()}")
+  }
+
+  private class TestNotificationHandler extends NotificationHandler {
+    private val messages = ArrayBuffer.empty[Resource]
+    @volatile private var throwSize = Option.empty[Int]
+
+    override def processNotification(notificationMessage: Array[Byte]): Unit = 
{
+      messages += LiteralAclStore.changeStore.decode(notificationMessage)
+
+      if (throwSize.contains(messages.size))
+        throw new RuntimeException("Oh no, my processing failed!")
+    }
+
+    def received(): Seq[Resource] = messages
+
+    def setThrowSize(index: Int): Unit = throwSize = Option(index)
   }
-}
+}
\ No newline at end of file
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 05a433c..b301271 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -17,15 +17,21 @@
 package kafka.security.auth
 
 import java.net.InetAddress
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.UUID
 
+import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{ZkAclStore, ZooKeeperTestHarness}
+import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
+import org.apache.kafka.common.errors.UnsupportedVersionException
+import org.apache.kafka.common.resource.ResourceNameType
 import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.Time
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -47,7 +53,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   val username = "alice"
   val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
   val session = Session(principal, testHostName)
-  var config: KafkaConfig = null
+  var config: KafkaConfig = _
+  private var zooKeeperClient: ZooKeeperClient = _
 
   @Before
   override def setUp() {
@@ -64,12 +71,16 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     simpleAclAuthorizer.configure(config.originals)
     simpleAclAuthorizer2.configure(config.originals)
     resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL)
+
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, zkMaxInFlightRequests,
+      Time.SYSTEM, "kafka.test", "SimpleAclAuthorizerTest")
   }
 
   @After
   override def tearDown(): Unit = {
     simpleAclAuthorizer.close()
     simpleAclAuthorizer2.close()
+    zooKeeperClient.close()
     super.tearDown()
   }
 
@@ -553,6 +564,88 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness 
{
     assertEquals(4, simpleAclAuthorizer.getAcls(principal).size)
   }
 
+  @Test(expected = classOf[UnsupportedVersionException])
+  def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = {
+    givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, 
"z_other", PREFIXED))
+  }
+
+  @Test
+  def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = {
+    givenAuthorizerWithProtocolVersion(Option.empty)
+    val resource = Resource(Topic, "z_other", PREFIXED)
+    val expected = new 
String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource).bytes, UTF_8)
+
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
+
+    val actual = getAclChangeEventAsString(PREFIXED)
+
+    assertEquals(expected, actual)
+  }
+
+  @Test
+  def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2(): 
Unit = {
+    givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
+    val resource = Resource(Topic, "z_other", PREFIXED)
+    val expected = new 
String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource).bytes, UTF_8)
+
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
+
+    val actual = getAclChangeEventAsString(PREFIXED)
+
+    assertEquals(expected, actual)
+  }
+
+  @Test
+  def 
testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions():
 Unit = {
+    givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
+    val resource = Resource(Topic, "z_other", LITERAL)
+    val expected = new 
String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource).bytes, UTF_8)
+
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
+
+    val actual = getAclChangeEventAsString(LITERAL)
+
+    assertEquals(expected, actual)
+  }
+
+  @Test
+  def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit 
= {
+    givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
+    val resource = Resource(Topic, "z_other", LITERAL)
+    val expected = new 
String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource).bytes, UTF_8)
+
+    simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
+
+    val actual = getAclChangeEventAsString(LITERAL)
+
+    assertEquals(expected, actual)
+  }
+
+  private def givenAuthorizerWithProtocolVersion(protocolVersion: 
Option[ApiVersion]) {
+    simpleAclAuthorizer.close()
+
+    val props = TestUtils.createBrokerConfig(0, zkConnect)
+    props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
+    protocolVersion.foreach(version => 
props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString))
+
+    config = KafkaConfig.fromProps(props)
+
+    simpleAclAuthorizer.configure(config.originals)
+  }
+
+  private def getAclChangeEventAsString(patternType: ResourceNameType) = {
+    val store = ZkAclStore(patternType)
+    val children = 
zooKeeperClient.handleRequest(GetChildrenRequest(store.changeStore.aclChangePath))
+    children.maybeThrow()
+    assertEquals("Expecting 1 change event", 1, children.children.size)
+
+    val data = 
zooKeeperClient.handleRequest(GetDataRequest(s"${store.changeStore.aclChangePath}/${children.children.head}"))
+    data.maybeThrow()
+
+    new String(data.data, UTF_8)
+  }
+
   private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], 
removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
     var acls = originalAcls
 
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index cfaf731..cc67a01 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -34,11 +34,11 @@ import org.apache.kafka.common.utils.{SecurityUtils, Time}
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException, 
NodeExistsException}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Seq, mutable}
 import scala.util.Random
-
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zookeeper._
@@ -426,10 +426,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testAclManagementMethods() {
-
     ZkAclStore.stores.foreach(store => {
       assertFalse(zkClient.pathExists(store.aclPath))
-      assertFalse(zkClient.pathExists(store.aclChangePath))
+      assertFalse(zkClient.pathExists(store.changeStore.aclChangePath))
       ResourceType.values.foreach(resource => 
assertFalse(zkClient.pathExists(store.path(resource))))
     })
 
@@ -438,11 +437,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     ZkAclStore.stores.foreach(store => {
       assertTrue(zkClient.pathExists(store.aclPath))
-      assertTrue(zkClient.pathExists(store.aclChangePath))
+      assertTrue(zkClient.pathExists(store.changeStore.aclChangePath))
       ResourceType.values.foreach(resource => 
assertTrue(zkClient.pathExists(store.path(resource))))
 
-      val resource1 = new Resource(Topic, UUID.randomUUID().toString, 
store.nameType)
-      val resource2 = new Resource(Topic, UUID.randomUUID().toString, 
store.nameType)
+      val resource1 = new Resource(Topic, UUID.randomUUID().toString, 
store.patternType)
+      val resource2 = new Resource(Topic, UUID.randomUUID().toString, 
store.patternType)
 
       // try getting acls for non-existing resource
       var versionedAcls = zkClient.getVersionedAclsForResource(resource1)
@@ -472,10 +471,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       assertEquals(1, versionedAcls.zkVersion)
 
       //get resource Types
-      assertTrue(ResourceType.values.map( rt => rt.name).toSet == 
zkClient.getResourceTypes(store.nameType).toSet)
+      assertTrue(ResourceType.values.map( rt => rt.name).toSet == 
zkClient.getResourceTypes(store.patternType).toSet)
 
       //get resource name
-      val resourceNames = zkClient.getResourceNames(store.nameType, Topic)
+      val resourceNames = zkClient.getResourceNames(store.patternType, Topic)
       assertEquals(2, resourceNames.size)
       assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet)
 
@@ -488,14 +487,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       //delete with valid expected zk version
       assertTrue(zkClient.conditionalDelete(resource2, 0))
 
+      zkClient.createAclChangeNotification(Resource(Group, "resource1", 
store.patternType))
+      zkClient.createAclChangeNotification(Resource(Topic, "resource2", 
store.patternType))
 
-      zkClient.createAclChangeNotification(Resource(Group, "resource1", 
store.nameType))
-      zkClient.createAclChangeNotification(Resource(Topic, "resource2", 
store.nameType))
-
-      assertEquals(2, zkClient.getChildren(store.aclChangePath).size)
+      assertEquals(2, 
zkClient.getChildren(store.changeStore.aclChangePath).size)
 
       zkClient.deleteAclChangeNotifications()
-      assertTrue(zkClient.getChildren(store.aclChangePath).isEmpty)
+      assertTrue(zkClient.getChildren(store.changeStore.aclChangePath).isEmpty)
     })
   }
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index c92e8af..3c75bad 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -64,7 +64,7 @@
     <li>ACLs should not be added to prefixed resources,
         (added in <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs";>KIP-290</a>),
         until all brokers in the cluster have been updated.
-        <p><b>NOTE:</b> any prefixed ACLs added to a cluster will be ignored 
should the cluster be downgraded again.
+        <p><b>NOTE:</b> any prefixed ACLs added to a cluster, even after the 
cluster is fully upgraded, will be ignored should the cluster be downgraded 
again.
     </li>
 </ol>
 

-- 
To stop receiving notification emails like this one, please contact
jun...@apache.org.

Reply via email to