This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 9868976747fb4a7a4a9b18e1f4050633c226be7d Author: Sandor Murakozi <smurak...@gmail.com> AuthorDate: Thu Mar 1 18:12:52 2018 -0800 KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests; --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 5 +- .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 576 ++++++++++++++++++++- 2 files changed, 555 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 6545fde..d61b281 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = { val brokerIdPath = brokerInfo.path val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion) - retryRequestUntilConnected(setDataRequest) + val response = retryRequestUntilConnected(setDataRequest) + response.maybeThrow() info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints)) } @@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def deleteLogDirEventNotifications(): Unit = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) if (getChildrenResponse.resultCode == Code.OK) { - deleteLogDirEventNotifications(getChildrenResponse.children) + deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index d3726c2..e44c2c9 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -16,10 +16,11 @@ */ package kafka.zk -import java.util.{Properties, UUID} +import java.util.{Collections, Properties, UUID} import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.{CountDownLatch, TimeUnit} -import kafka.api.ApiVersion +import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} import kafka.log.LogConfig import kafka.security.auth._ @@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.TokenInformation -import org.apache.kafka.common.utils.SecurityUtils -import org.apache.zookeeper.KeeperException.NodeExistsException +import org.apache.kafka.common.utils.{SecurityUtils, Time} +import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.junit.Assert._ -import org.junit.Test - +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.{Seq, mutable} import scala.util.Random +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper._ +import org.apache.kafka.common.security.JaasUtils +import org.apache.zookeeper.data.Stat + class KafkaZkClientTest extends ZooKeeperTestHarness { private val group = "my-group" + private val topic1 = "topic1" + private val topic2 = "topic2" + + val topicPartition10 = new TopicPartition(topic1, 0) + val topicPartition11 = new TopicPartition(topic1, 1) + val topicPartition20 = new TopicPartition(topic2, 0) + val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) + + var otherZkClient: KafkaZkClient = _ + + @Before + override def setUp(): Unit = { + super.setUp() + otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) + } + + @After + override def tearDown(): Unit = { + if (otherZkClient != null) + otherZkClient.close() + super.tearDown() + } + private val topicPartition = new TopicPartition("topic", 0) @Test @@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testTopicAssignmentMethods() { - val topic1 = "topic1" - val topic2 = "topic2" + assertTrue(zkClient.getAllTopicsInCluster.isEmpty) // test with non-existing topic + assertFalse(zkClient.topicExists(topic1)) assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty) assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty) assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty) @@ -108,6 +140,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { // create a topic assignment zkClient.createTopicAssignment(topic1, assignment) + assertTrue(zkClient.topicExists(topic1)) + val expectedAssignment = assignment map { topicAssignment => val partition = topicAssignment._1.partition val assignment = topicAssignment._2 @@ -215,6 +249,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { } @Test + def testIsrChangeNotificationGetters(): Unit = { + assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications) + assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000"))) + + zkClient.createRecursive("/isr_change_notification") + + zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11)) + zkClient.propagateIsrChanges(Set(topicPartition10)) + + assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet) + + // A partition can have multiple notifications + assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10), + zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001"))) + } + + @Test + def testIsrChangeNotificationsDeletion(): Unit = { + // Should not fail even if parent node does not exist + zkClient.deleteIsrChangeNotifications(Seq("0000000000")) + + zkClient.createRecursive("/isr_change_notification") + + zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11)) + zkClient.propagateIsrChanges(Set(topicPartition10)) + zkClient.propagateIsrChanges(Set(topicPartition11)) + + zkClient.deleteIsrChangeNotifications(Seq("0000000001")) + // Should not fail if called on a non-existent notification + zkClient.deleteIsrChangeNotifications(Seq("0000000001")) + + assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet) + zkClient.deleteIsrChangeNotifications() + assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications) + } + + @Test def testPropagateLogDir(): Unit = { zkClient.createRecursive("/log_dir_event_notification") @@ -238,6 +309,54 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { } @Test + def testLogDirGetters(): Unit = { + assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node", + Seq.empty, zkClient.getAllLogDirEventNotifications) + assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node", + Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000"))) + + zkClient.createRecursive("/log_dir_event_notification") + + val brokerId = 3 + zkClient.propagateLogDirEvent(brokerId) + + assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000"))) + + zkClient.propagateLogDirEvent(brokerId) + + val anotherBrokerId = 4 + zkClient.propagateLogDirEvent(anotherBrokerId) + + val notifications012 = Seq("0000000000", "0000000001", "0000000002") + assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet) + assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012)) + } + + @Test + def testLogDirEventNotificationsDeletion(): Unit = { + // Should not fail even if parent node does not exist + zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002")) + + zkClient.createRecursive("/log_dir_event_notification") + + val brokerId = 3 + val anotherBrokerId = 4 + + zkClient.propagateLogDirEvent(brokerId) + zkClient.propagateLogDirEvent(brokerId) + zkClient.propagateLogDirEvent(anotherBrokerId) + + zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002")) + + assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications) + + zkClient.propagateLogDirEvent(anotherBrokerId) + + zkClient.deleteLogDirEventNotifications() + assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications) + } + + @Test def testSetGetAndDeletePartitionReassignment() { zkClient.createRecursive(AdminZNode.path) @@ -377,10 +496,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { } @Test - def testDeleteTopicPathMethods() { - val topic1 = "topic1" - val topic2 = "topic2" + def testDeletePath(): Unit = { + val path = "/a/b/c" + zkClient.createRecursive(path) + zkClient.deletePath(path) + assertFalse(zkClient.pathExists(path)) + } + + @Test + def testDeleteTopicZNode(): Unit = { + zkClient.deleteTopicZNode(topic1) + zkClient.createRecursive(TopicZNode.path(topic1)) + zkClient.deleteTopicZNode(topic1) + assertFalse(zkClient.pathExists(TopicZNode.path(topic1))) + } + @Test + def testDeleteTopicPathMethods() { assertFalse(zkClient.isTopicMarkedForDeletion(topic1)) assertTrue(zkClient.getTopicDeletions.isEmpty) @@ -394,18 +526,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertTrue(zkClient.getTopicDeletions.isEmpty) } + private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = { + assertTrue(zkClient.pathExists(expectedPath)) + assertEquals(Some(data), dataAsString(expectedPath)) + } + + @Test + def testCreateTokenChangeNotification(): Unit = { + intercept[NoNodeException] { + zkClient.createTokenChangeNotification("delegationToken") + } + zkClient.createDelegationTokenPaths() + + zkClient.createTokenChangeNotification("delegationToken") + assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken") + } + @Test def testEntityConfigManagementMethods() { - val topic1 = "topic1" - val topic2 = "topic2" - assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, "1024") - logProps.put(LogConfig.SegmentIndexBytesProp, "1024") - logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps) assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1)) @@ -421,15 +561,399 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { } @Test - def testBrokerRegistrationMethods() { + def testCreateConfigChangeNotification(): Unit = { + intercept[NoNodeException] { + zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1)) + } + + zkClient.createTopLevelPaths() + zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1)) + + assertPathExistenceAndData( + "/config/changes/config_change_0000000000", + """{"version":2,"entity_path":"/config/topics/topic1"}""") + } + + private def createLogProps(bytesProp: Int): Properties = { + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString) + logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + logProps + } + + private val logProps = createLogProps(1024) + + @Test + def testGetLogConfigs(): Unit = { + val emptyConfig = LogConfig(Collections.emptyMap()) + assertEquals("Non existent config, no defaults", + (Map(topic1 -> emptyConfig), Map.empty), + zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap())) + + val logProps2 = createLogProps(2048) + + zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps) + assertEquals("One existing and one non-existent topic", + (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty), + zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap())) + + zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2) + assertEquals("Two existing topics", + (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty), + zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap())) + + val logProps1WithMoreValues = createLogProps(1024) + logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100") + logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024") + + assertEquals("Config with defaults", + (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty), + zkClient.getLogConfigs(Seq(topic1), + Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava)) + } + + private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol, + rack: Option[String] = None): BrokerInfo = + BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol + (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10) + + @Test + def testRegisterBrokerInfo(): Unit = { zkClient.createTopLevelPaths() - val brokerInfo = BrokerInfo(Broker(1, - Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)), - rack = None), ApiVersion.latestVersion, jmxPort = 9998) + val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT) + val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL) zkClient.registerBrokerInZk(brokerInfo) assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1)) + assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1)) + + // Node exists, owned by current session - no error, no update + zkClient.registerBrokerInZk(differentBrokerInfoWithSameId) + assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1)) + + // Other client tries to register broker with same id causes failure, info is not changed in ZK + intercept[NodeExistsException] { + otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId) + } + assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1)) + } + + @Test + def testGetBrokerMethods(): Unit = { + zkClient.createTopLevelPaths() + + assertEquals(Seq.empty,zkClient.getAllBrokersInCluster) + assertEquals(Seq.empty, zkClient.getSortedBrokerList()) + assertEquals(None, zkClient.getBroker(0)) + + val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT) + val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL) + + zkClient.registerBrokerInZk(brokerInfo1) + otherZkClient.registerBrokerInZk(brokerInfo0) + + assertEquals(Seq(0, 1), zkClient.getSortedBrokerList()) + assertEquals( + Seq(brokerInfo0.broker, brokerInfo1.broker), + zkClient.getAllBrokersInCluster + ) + assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0)) + } + + @Test + def testUpdateBrokerInfo(): Unit = { + zkClient.createTopLevelPaths() + + // Updating info of a broker not existing in ZK fails + val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT) + intercept[NoNodeException]{ + zkClient.updateBrokerInfoInZk(originalBrokerInfo) + } + + zkClient.registerBrokerInZk(originalBrokerInfo) + + val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL) + zkClient.updateBrokerInfoInZk(updatedBrokerInfo) + assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1)) + + // Other ZK clients can update info + otherZkClient.updateBrokerInfoInZk(originalBrokerInfo) + assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1)) + } + + private def statWithVersion(version: Int): Stat = { + val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + stat.setVersion(version) + stat + } + + private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] = + Map( + topicPartition10 -> LeaderIsrAndControllerEpoch( + LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion), + controllerEpoch = 4), + topicPartition11 -> LeaderIsrAndControllerEpoch( + LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion), + controllerEpoch = 4)) + + val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] = + leaderIsrAndControllerEpochs(0, 0) + + val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr) + private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] = + leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr) + + private def checkUpdateLeaderAndIsrResult( + expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr], + expectedPartitionsToRetry: Seq[TopicPartition], + expectedFailedPartitions: Map[TopicPartition, (Class[_], String)], + actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = { + val failedPartitionsExcerpt = + actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage)) + assertEquals("Permanently failed updates do not match expected", + expectedFailedPartitions, failedPartitionsExcerpt) + assertEquals("Retriable updates (due to BADVERSION) do not match expected", + expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry) + assertEquals("Successful updates do not match expected", + expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions) + } + + @Test + def testUpdateLeaderAndIsr(): Unit = { + zkClient.createRecursive(TopicZNode.path(topic1)) + + // Non-existing topicPartitions + checkUpdateLeaderAndIsrResult( + Map.empty, + mutable.ArrayBuffer.empty, + Map( + topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"), + topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")), + zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4)) + + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs) + + checkUpdateLeaderAndIsrResult( + leaderIsrs(state = 1, zkVersion = 1), + mutable.ArrayBuffer.empty, + Map.empty, + zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4)) + + // Try to update with wrong ZK version + checkUpdateLeaderAndIsrResult( + Map.empty, + ArrayBuffer(topicPartition10, topicPartition11), + Map.empty, + zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4)) + + // Trigger successful, to be retried and failed partitions in same call + val mixedState = Map( + topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1), + topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0), + topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0)) + + checkUpdateLeaderAndIsrResult( + leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10}, + ArrayBuffer(topicPartition11), + Map( + topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")), + zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4)) + } + + private def checkGetDataResponse( + leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch], + topicPartition: TopicPartition, + response: GetDataResponse): Unit = { + val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion + assertEquals(Code.OK, response.resultCode) + assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path) + assertEquals(Some(topicPartition), response.ctx) + assertEquals( + Some(leaderIsrAndControllerEpochs(topicPartition)), + TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion))) + } + + private def eraseMetadata(response: CreateResponse): CreateResponse = + response.copy(metadata = ResponseMetadata(0, 0)) + + @Test + def testGetTopicsAndPartitions(): Unit = { + assertTrue(zkClient.getAllTopicsInCluster.isEmpty) + assertTrue(zkClient.getAllPartitions.isEmpty) + + zkClient.createRecursive(TopicZNode.path(topic1)) + zkClient.createRecursive(TopicZNode.path(topic2)) + assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet) + + assertTrue(zkClient.getAllPartitions.isEmpty) + + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs) + assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions) + } + + @Test + def testCreateAndGetTopicPartitionStatesRaw(): Unit = { + zkClient.createRecursive(TopicZNode.path(topic1)) + + assertEquals( + Seq( + CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10), + TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)), + CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11), + TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))), + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs) + .map(eraseMetadata).toList) + + val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11) + assertEquals(2, getResponses.size) + topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)} + + // Trying to create existing topicPartition states fails + assertEquals( + Seq( + CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10), + null, ResponseMetadata(0, 0)), + CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11), + null, ResponseMetadata(0, 0))), + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList) + } + + @Test + def testSetTopicPartitionStatesRaw(): Unit = { + + def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) = + topicPartitions.map { topicPartition => + SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition), + Some(topicPartition), stat, ResponseMetadata(0, 0)) + } + + zkClient.createRecursive(TopicZNode.path(topic1)) + + // Trying to set non-existing topicPartition's data results in NONODE responses + assertEquals( + expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null), + zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map { + _.copy(metadata = ResponseMetadata(0, 0))}.toList) + + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs) + + assertEquals( + expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)), + zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map { + eraseMetadataAndStat}.toList) + + + val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11) + assertEquals(2, getResponses.size) + topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)} + + // Other ZK client can also write the state of a partition + assertEquals( + expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)), + otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map { + eraseMetadataAndStat}.toList) + } + + @Test + def testReassignPartitionsInProgress(): Unit = { + assertFalse(zkClient.reassignPartitionsInProgress) + zkClient.createRecursive(ReassignPartitionsZNode.path) + assertTrue(zkClient.reassignPartitionsInProgress) + } + + @Test + def testGetTopicPartitionStates(): Unit = { + assertEquals(None, zkClient.getTopicPartitionState(topicPartition10)) + assertEquals(None, zkClient.getLeaderForPartition(topicPartition10)) + + zkClient.createRecursive(TopicZNode.path(topic1)) + + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs) + assertEquals( + initialLeaderIsrAndControllerEpochs, + zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11)) + ) + + assertEquals( + Some(initialLeaderIsrAndControllerEpochs(topicPartition10)), + zkClient.getTopicPartitionState(topicPartition10) + ) + + assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10)) + + val notExistingPartition = new TopicPartition(topic1, 2) + assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty) + assertEquals( + Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)), + zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition)) + ) + + assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition)) + assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition)) + + } + + private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = { + val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null + response.copy(metadata = ResponseMetadata(0, 0), stat = stat) + } + + @Test + def testControllerEpochMethods(): Unit = { + assertEquals(None, zkClient.getControllerEpoch) + + assertEquals("Setting non existing nodes should return NONODE results", + SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)), + eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0))) + + assertEquals("Creating non existing nodes is OK", + CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)), + eraseMetadata(zkClient.createControllerEpochRaw(0))) + assertEquals(0, zkClient.getControllerEpoch.get._1) + + assertEquals("Attemt to create existing nodes should return NODEEXISTS", + CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)), + eraseMetadata(zkClient.createControllerEpochRaw(0))) + + assertEquals("Updating existing nodes is OK", + SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)), + eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0))) + assertEquals(1, zkClient.getControllerEpoch.get._1) + + assertEquals("Updating with wrong ZK version returns BADVERSION", + SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)), + eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0))) + } + + @Test + def testControllerManagementMethods(): Unit = { + // No controller + assertEquals(None, zkClient.getControllerId) + // Create controller + zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456)) + assertEquals(Some(1), zkClient.getControllerId) + zkClient.deleteController() + assertEquals(None, zkClient.getControllerId) + } + + @Test + def testZNodeChangeHandlerForDataChange(): Unit = { + val mockPath = "/foo" + + val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) + val zNodeChangeHandler = new ZNodeChangeHandler { + override def handleCreation(): Unit = { + znodeChangeHandlerCountDownLatch.countDown() + } + + override val path: String = mockPath + } + + zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler) + zkClient.createRecursive(mockPath) + assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) } @Test @@ -458,7 +982,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertTrue(zkClient.getPreferredReplicaElection.isEmpty) - val topic1 = "topic1" val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1)) zkClient.createPreferredReplicaElection(electionPartitions) @@ -498,6 +1021,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { // test non-existent token assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty) + assertFalse(zkClient.deleteDelegationToken(tokenId)) // create a token zkClient.setOrCreateDelegationToken(token) @@ -511,5 +1035,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { //test updated token assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get) + + //test deleting token + assertTrue(zkClient.deleteDelegationToken(tokenId)) + assertEquals(None, zkClient.getDelegationTokenInfo(tokenId)) } } -- To stop receiving notification emails like this one, please contact ij...@apache.org.