This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 9868976747fb4a7a4a9b18e1f4050633c226be7d
Author: Sandor Murakozi <smurak...@gmail.com>
AuthorDate: Thu Mar 1 18:12:52 2018 -0800

    KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new 
tests;
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   5 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 576 ++++++++++++++++++++-
 2 files changed, 555 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 6545fde..d61b281 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
     val brokerIdPath = brokerInfo.path
     val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, 
ZkVersion.NoVersion)
-    retryRequestUntilConnected(setDataRequest)
+    val response = retryRequestUntilConnected(setDataRequest)
+    response.maybeThrow()
     info("Updated broker %d at path %s with addresses: 
%s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
   }
 
@@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def deleteLogDirEventNotifications(): Unit = {
     val getChildrenResponse = 
retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
     if (getChildrenResponse.resultCode == Code.OK) {
-      deleteLogDirEventNotifications(getChildrenResponse.children)
+      
deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
       getChildrenResponse.maybeThrow
     }
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c2..e44c2c9 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
 */
 package kafka.zk
 
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
 import kafka.security.auth._
@@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.utils.{SecurityUtils, Time}
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, 
NodeExistsException}
 import org.junit.Assert._
-import org.junit.Test
-
+import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
 import scala.util.Random
 
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.data.Stat
+
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
+  private val topic1 = "topic1"
+  private val topic2 = "topic2"
+
+  val topicPartition10 = new TopicPartition(topic1, 0)
+  val topicPartition11 = new TopicPartition(topic1, 1)
+  val topicPartition20 = new TopicPartition(topic2, 0)
+  val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
+  var otherZkClient: KafkaZkClient = _
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+    otherZkClient = KafkaZkClient(zkConnect, 
zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (otherZkClient != null)
+      otherZkClient.close()
+    super.tearDown()
+  }
+
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
@@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testTopicAssignmentMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
 
     // test with non-existing topic
+    assertFalse(zkClient.topicExists(topic1))
     assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
     assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
@@ -108,6 +140,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // create a topic assignment
     zkClient.createTopicAssignment(topic1, assignment)
 
+    assertTrue(zkClient.topicExists(topic1))
+
     val expectedAssignment = assignment map { topicAssignment =>
       val partition = topicAssignment._1.partition
       val assignment = topicAssignment._2
@@ -215,6 +249,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testIsrChangeNotificationGetters(): Unit = {
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, 
zkClient.getAllIsrChangeNotifications)
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, 
zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
+
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+    zkClient.propagateIsrChanges(Set(topicPartition10))
+
+    assertEquals(Set("0000000000", "0000000001"), 
zkClient.getAllIsrChangeNotifications.toSet)
+
+    // A partition can have multiple notifications
+    assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
+      zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", 
"0000000001")))
+  }
+
+  @Test
+  def testIsrChangeNotificationsDeletion(): Unit = {
+    // Should not fail even if parent node does not exist
+    zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+    zkClient.propagateIsrChanges(Set(topicPartition10))
+    zkClient.propagateIsrChanges(Set(topicPartition11))
+
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+    // Should not fail if called on a non-existent notification
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+
+    assertEquals(Set("0000000000", "0000000002"), 
zkClient.getAllIsrChangeNotifications.toSet)
+    zkClient.deleteIsrChangeNotifications()
+    assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+  }
+
+  @Test
   def testPropagateLogDir(): Unit = {
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -238,6 +309,54 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testLogDirGetters(): Unit = {
+    assertEquals("getAllLogDirEventNotifications failed for non existing 
parent ZK node",
+      Seq.empty, zkClient.getAllLogDirEventNotifications)
+    assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent 
ZK node",
+      Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+    zkClient.propagateLogDirEvent(brokerId)
+
+    assertEquals(Seq(3), 
zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+    zkClient.propagateLogDirEvent(brokerId)
+
+    val anotherBrokerId = 4
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    val notifications012 = Seq("0000000000", "0000000001", "0000000002")
+    assertEquals(notifications012.toSet, 
zkClient.getAllLogDirEventNotifications.toSet)
+    assertEquals(Seq(3, 3, 4), 
zkClient.getBrokerIdsFromLogDirEvents(notifications012))
+  }
+
+  @Test
+  def testLogDirEventNotificationsDeletion(): Unit = {
+    // Should not fail even if parent node does not exist
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+    val anotherBrokerId = 4
+
+    zkClient.propagateLogDirEvent(brokerId)
+    zkClient.propagateLogDirEvent(brokerId)
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+    assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
+
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    zkClient.deleteLogDirEventNotifications()
+    assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
+  }
+
+  @Test
   def testSetGetAndDeletePartitionReassignment() {
     zkClient.createRecursive(AdminZNode.path)
 
@@ -377,10 +496,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeleteTopicPathMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+  def testDeletePath(): Unit = {
+    val path = "/a/b/c"
+    zkClient.createRecursive(path)
+    zkClient.deletePath(path)
+    assertFalse(zkClient.pathExists(path))
+  }
+
+  @Test
+  def testDeleteTopicZNode(): Unit = {
+    zkClient.deleteTopicZNode(topic1)
+    zkClient.createRecursive(TopicZNode.path(topic1))
+    zkClient.deleteTopicZNode(topic1)
+    assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
+  }
 
+  @Test
+  def testDeleteTopicPathMethods() {
     assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
     assertTrue(zkClient.getTopicDeletions.isEmpty)
 
@@ -394,18 +526,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
+  private def assertPathExistenceAndData(expectedPath: String, data: String): 
Unit = {
+    assertTrue(zkClient.pathExists(expectedPath))
+    assertEquals(Some(data), dataAsString(expectedPath))
+   }
+
+  @Test
+  def testCreateTokenChangeNotification(): Unit = {
+    intercept[NoNodeException] {
+      zkClient.createTokenChangeNotification("delegationToken")
+    }
+    zkClient.createDelegationTokenPaths()
+
+    zkClient.createTokenChangeNotification("delegationToken")
+    
assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000",
 "delegationToken")
+  }
+
   @Test
   def testEntityConfigManagementMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
-
     assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
 
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, "1024")
-    logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
     assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
 
@@ -421,15 +561,399 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testBrokerRegistrationMethods() {
+  def testCreateConfigChangeNotification(): Unit = {
+    intercept[NoNodeException] {
+      
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic,
 topic1))
+    }
+
+    zkClient.createTopLevelPaths()
+    
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic,
 topic1))
+
+    assertPathExistenceAndData(
+      "/config/changes/config_change_0000000000",
+      """{"version":2,"entity_path":"/config/topics/topic1"}""")
+  }
+
+  private def createLogProps(bytesProp: Int): Properties = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
+    logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    logProps
+  }
+
+  private val logProps = createLogProps(1024)
+
+  @Test
+  def testGetLogConfigs(): Unit = {
+    val emptyConfig = LogConfig(Collections.emptyMap())
+    assertEquals("Non existent config, no defaults",
+      (Map(topic1 -> emptyConfig), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
+
+    val logProps2 = createLogProps(2048)
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+    assertEquals("One existing and one non-existent topic",
+      (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
+    assertEquals("Two existing topics",
+      (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), 
Map.empty),
+      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+    val logProps1WithMoreValues = createLogProps(1024)
+    logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
+    logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
+
+    assertEquals("Config with defaults",
+      (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1),
+        Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", 
LogConfig.SegmentBytesProp -> "128").asJava))
+  }
+
+  private def createBrokerInfo(id: Int, host: String, port: Int, 
securityProtocol: SecurityProtocol,
+                               rack: Option[String] = None): BrokerInfo =
+    BrokerInfo(Broker(id, Seq(new EndPoint(host, port, 
ListenerName.forSecurityProtocol
+    (securityProtocol), securityProtocol)), rack = rack), 
ApiVersion.latestVersion, jmxPort = port + 10)
+
+  @Test
+  def testRegisterBrokerInfo(): Unit = {
     zkClient.createTopLevelPaths()
 
-    val brokerInfo = BrokerInfo(Broker(1,
-      Seq(new EndPoint("test.host", 9999, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), 
SecurityProtocol.PLAINTEXT)),
-      rack = None), ApiVersion.latestVersion, jmxPort = 9998)
+    val brokerInfo = createBrokerInfo(1, "test.host", 9999, 
SecurityProtocol.PLAINTEXT)
+    val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 
9995, SecurityProtocol.SSL)
 
     zkClient.registerBrokerInZk(brokerInfo)
     assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+    assertEquals("Other ZK clients can read broker info", 
Some(brokerInfo.broker), otherZkClient.getBroker(1))
+
+    // Node exists, owned by current session - no error, no update
+    zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+
+    // Other client tries to register broker with same id causes failure, info 
is not changed in ZK
+    intercept[NodeExistsException] {
+      otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    }
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+  }
+
+  @Test
+  def testGetBrokerMethods(): Unit = {
+    zkClient.createTopLevelPaths()
+
+    assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
+    assertEquals(Seq.empty, zkClient.getSortedBrokerList())
+    assertEquals(None, zkClient.getBroker(0))
+
+    val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, 
SecurityProtocol.PLAINTEXT)
+    val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, 
SecurityProtocol.SSL)
+
+    zkClient.registerBrokerInZk(brokerInfo1)
+    otherZkClient.registerBrokerInZk(brokerInfo0)
+
+    assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
+    assertEquals(
+      Seq(brokerInfo0.broker, brokerInfo1.broker),
+      zkClient.getAllBrokersInCluster
+    )
+    assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
+  }
+
+  @Test
+  def testUpdateBrokerInfo(): Unit = {
+    zkClient.createTopLevelPaths()
+
+    // Updating info of a broker not existing in ZK fails
+    val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, 
SecurityProtocol.PLAINTEXT)
+    intercept[NoNodeException]{
+      zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    }
+
+    zkClient.registerBrokerInZk(originalBrokerInfo)
+
+    val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, 
SecurityProtocol.SSL)
+    zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+    assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+    // Other ZK clients can update info
+    otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
+  }
+
+  private def statWithVersion(version: Int): Stat = {
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    stat.setVersion(version)
+    stat
+  }
+
+  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): 
Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+    Map(
+      topicPartition10 -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 
+ state), zkVersion = zkVersion),
+        controllerEpoch = 4),
+      topicPartition11 -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + 
state, 2 + state), zkVersion = zkVersion),
+        controllerEpoch = 4))
+
+  val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, 
LeaderIsrAndControllerEpoch] =
+    leaderIsrAndControllerEpochs(0, 0)
+
+  val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = 
initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+  private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, 
LeaderAndIsr] =
+    leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
+
+  private def checkUpdateLeaderAndIsrResult(
+                  expectedSuccessfulPartitions: Map[TopicPartition, 
LeaderAndIsr],
+                  expectedPartitionsToRetry: Seq[TopicPartition],
+                  expectedFailedPartitions: Map[TopicPartition, (Class[_], 
String)],
+                  actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): 
Unit = {
+    val failedPartitionsExcerpt =
+      actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => 
(e.getClass, e.getMessage))
+    assertEquals("Permanently failed updates do not match expected",
+      expectedFailedPartitions, failedPartitionsExcerpt)
+    assertEquals("Retriable updates (due to BADVERSION) do not match expected",
+      expectedPartitionsToRetry, 
actualUpdateLeaderAndIsrResult.partitionsToRetry)
+    assertEquals("Successful updates do not match expected",
+      expectedSuccessfulPartitions, 
actualUpdateLeaderAndIsrResult.successfulPartitions)
+  }
+
+  @Test
+  def testUpdateLeaderAndIsr(): Unit = {
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    // Non-existing topicPartitions
+    checkUpdateLeaderAndIsrResult(
+        Map.empty,
+        mutable.ArrayBuffer.empty,
+      Map(
+        topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = 
NoNode for /brokers/topics/topic1/partitions/0/state"),
+        topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = 
NoNode for /brokers/topics/topic1/partitions/1/state")),
+      zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    checkUpdateLeaderAndIsrResult(
+      leaderIsrs(state = 1, zkVersion = 1),
+      mutable.ArrayBuffer.empty,
+      Map.empty,
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 
0),controllerEpoch = 4))
+
+    // Try to update with wrong ZK version
+    checkUpdateLeaderAndIsrResult(
+      Map.empty,
+      ArrayBuffer(topicPartition10, topicPartition11),
+      Map.empty,
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 
0),controllerEpoch = 4))
+
+    // Trigger successful, to be retried and failed partitions in same call
+    val mixedState = Map(
+      topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = 
List(4, 5), zkVersion = 1),
+      topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = 
List(3, 4), zkVersion = 0),
+      topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = 
List(3, 4), zkVersion = 0))
+
+    checkUpdateLeaderAndIsrResult(
+      leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
+      ArrayBuffer(topicPartition11),
+      Map(
+        topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = 
NoNode for /brokers/topics/topic2/partitions/0/state")),
+      zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+  }
+
+  private def checkGetDataResponse(
+      leaderIsrAndControllerEpochs: 
Map[TopicPartition,LeaderIsrAndControllerEpoch],
+      topicPartition: TopicPartition,
+      response: GetDataResponse): Unit = {
+    val zkVersion = 
leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
+    assertEquals(Code.OK, response.resultCode)
+    assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
+    assertEquals(Some(topicPartition), response.ctx)
+    assertEquals(
+      Some(leaderIsrAndControllerEpochs(topicPartition)),
+      TopicPartitionStateZNode.decode(response.data, 
statWithVersion(zkVersion)))
+  }
+
+  private def eraseMetadata(response: CreateResponse): CreateResponse =
+    response.copy(metadata = ResponseMetadata(0, 0))
+
+  @Test
+  def testGetTopicsAndPartitions(): Unit = {
+    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+    assertTrue(zkClient.getAllPartitions.isEmpty)
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+    zkClient.createRecursive(TopicZNode.path(topic2))
+    assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+
+    assertTrue(zkClient.getAllPartitions.isEmpty)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    assertEquals(Set(topicPartition10, topicPartition11), 
zkClient.getAllPartitions)
+  }
+
+  @Test
+  def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    assertEquals(
+      Seq(
+        CreateResponse(Code.OK, 
TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+          TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 
0)),
+        CreateResponse(Code.OK, 
TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+          TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 
0))),
+      
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+        .map(eraseMetadata).toList)
+
+    val getResponses = 
zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+    assertEquals(2, getResponses.size)
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => 
checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
+
+    // Trying to create existing topicPartition states fails
+    assertEquals(
+      Seq(
+        CreateResponse(Code.NODEEXISTS, 
TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+          null, ResponseMetadata(0, 0)),
+        CreateResponse(Code.NODEEXISTS, 
TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+          null, ResponseMetadata(0, 0))),
+      
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+  }
+
+  @Test
+  def testSetTopicPartitionStatesRaw(): Unit = {
+
+    def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: 
Code, stat: Stat) =
+      topicPartitions.map { topicPartition =>
+        SetDataResponse(resultCode, 
TopicPartitionStateZNode.path(topicPartition),
+          Some(topicPartition), stat, ResponseMetadata(0, 0))
+      }
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    // Trying to set non-existing topicPartition's data results in NONODE 
responses
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, 
topicPartition11)(Code.NONODE, null),
+      
zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
+        _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, 
statWithVersion(1)),
+      zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 
1, zkVersion = 0)).map {
+        eraseMetadataAndStat}.toList)
+
+
+    val getResponses = 
zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+    assertEquals(2, getResponses.size)
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => 
checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), 
tp, r)}
+
+    // Other ZK client can also write the state of a partition
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, 
statWithVersion(2)),
+      
otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 
2, zkVersion = 1)).map {
+        eraseMetadataAndStat}.toList)
+  }
+
+  @Test
+  def testReassignPartitionsInProgress(): Unit = {
+    assertFalse(zkClient.reassignPartitionsInProgress)
+    zkClient.createRecursive(ReassignPartitionsZNode.path)
+    assertTrue(zkClient.reassignPartitionsInProgress)
+  }
+
+  @Test
+  def testGetTopicPartitionStates(): Unit = {
+    assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
+    assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    assertEquals(
+      initialLeaderIsrAndControllerEpochs,
+      zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
+    )
+
+    assertEquals(
+      Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
+      zkClient.getTopicPartitionState(topicPartition10)
+    )
+
+    assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
+
+    val notExistingPartition = new TopicPartition(topic1, 2)
+    
assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
+    assertEquals(
+      Map(topicPartition10 -> 
initialLeaderIsrAndControllerEpochs(topicPartition10)),
+      zkClient.getTopicPartitionStates(Seq(topicPartition10, 
notExistingPartition))
+    )
+
+    assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
+    assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
+
+  }
+
+  private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse 
= {
+    val stat = if (response.stat != null) 
statWithVersion(response.stat.getVersion) else null
+    response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+  }
+
+  @Test
+  def testControllerEpochMethods(): Unit = {
+    assertEquals(None, zkClient.getControllerEpoch)
+
+    assertEquals("Setting non existing nodes should return NONODE results",
+      SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, 
ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+
+    assertEquals("Creating non existing nodes is OK",
+      CreateResponse(Code.OK, ControllerEpochZNode.path, None, 
ControllerEpochZNode.path, ResponseMetadata(0, 0)),
+      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+    assertEquals(0, zkClient.getControllerEpoch.get._1)
+
+    assertEquals("Attemt to create existing nodes should return NODEEXISTS",
+      CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, 
ResponseMetadata(0, 0)),
+      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+
+    assertEquals("Updating existing nodes is OK",
+      SetDataResponse(Code.OK, ControllerEpochZNode.path, None, 
statWithVersion(1), ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+    assertEquals(1, zkClient.getControllerEpoch.get._1)
+
+    assertEquals("Updating with wrong ZK version returns BADVERSION",
+      SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, 
ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+  }
+
+  @Test
+  def testControllerManagementMethods(): Unit = {
+    // No controller
+    assertEquals(None, zkClient.getControllerId)
+    // Create controller
+    zkClient.checkedEphemeralCreate(ControllerZNode.path, 
ControllerZNode.encode(brokerId = 1, timestamp = 123456))
+    assertEquals(Some(1), zkClient.getControllerId)
+    zkClient.deleteController()
+    assertEquals(None, zkClient.getControllerId)
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDataChange(): Unit = {
+    val mockPath = "/foo"
+
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+
+      override val path: String = mockPath
+    }
+
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
+    zkClient.createRecursive(mockPath)
+    assertTrue("Failed to receive create notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
   @Test
@@ -458,7 +982,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
 
-    val topic1 = "topic1"
     val electionPartitions = Set(new TopicPartition(topic1, 0), new 
TopicPartition(topic1, 1))
 
     zkClient.createPreferredReplicaElection(electionPartitions)
@@ -498,6 +1021,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     // test non-existent token
     assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+    assertFalse(zkClient.deleteDelegationToken(tokenId))
 
     // create a token
     zkClient.setOrCreateDelegationToken(token)
@@ -511,5 +1035,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     //test updated token
     assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+    //test deleting token
+    assertTrue(zkClient.deleteDelegationToken(tokenId))
+    assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
   }
 }

-- 
To stop receiving notification emails like this one, please contact
ij...@apache.org.

Reply via email to