This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch hachikuji/KAFKA-9212
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit cfb88f4f116e0e2932c5dd2e416841824a915257
Author: Ismael Juma
AuthorDate: Sun Dec 8 09:27:59 2019 -0800
Fix test and minor tweaks
---
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java | 7 ++-
core/src/main/scala/kafka/controller/KafkaController.scala | 8
2 files changed, 10 insertions(+), 5 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index fc51957..7067e88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -158,6 +158,11 @@ public class MetadataTest {
assertEquals(0, metadata.timeToNextUpdate(now + 1));
}
+/**
+ * Prior to Kafka version 2.4 (which coincides with Metadata version 9),
the broker does not propagate leader epoch
+ * information accurately while a reassignment is in progress, so we
cannot rely on it. This is explained in more
+ * detail in MetadataResponse's constructor.
+ */
@Test
public void testIgnoreLeaderEpochInOlderMetadataResponse() {
TopicPartition tp = new TopicPartition("topic", 0);
@@ -196,7 +201,7 @@ public class MetadataTest {
assertEquals(-1, info.epoch());
}
-for (short version = 9; version <= ApiKeys.METADATA.oldestVersion();
version++) {
+for (short version = 9; version <= ApiKeys.METADATA.latestVersion();
version++) {
Struct struct = data.toStruct(version);
MetadataResponse response = new MetadataResponse(struct, version);
assertTrue(response.hasReliableLeaderEpochs());
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6133cc6..444e74d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1081,15 +1081,15 @@ class KafkaController(val config: KafkaConfig,
val UpdateLeaderAndIsrResult(finishedUpdates, _) =
zkClient.updateLeaderAndIsr(immutable.Map(partition ->
newLeaderAndIsr), epoch, controllerContext.epochZkVersion)
- finishedUpdates.get(partition).exists {
-case Right(leaderAndIsr) =>
+ finishedUpdates.get(partition) match {
+case Some(Right(leaderAndIsr)) =>
val leaderIsrAndControllerEpoch =
LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)
controllerContext.partitionLeadershipInfo.put(partition,
leaderIsrAndControllerEpoch)
finalLeaderIsrAndControllerEpoch =
Some(leaderIsrAndControllerEpoch)
info(s"Updated leader epoch for partition $partition to
${leaderAndIsr.leaderEpoch}")
true
-case Left(e) =>
- throw e
+case Some(Left(e)) => throw e
+case None => false
}
case None =>
throw new IllegalStateException(s"Cannot update leader epoch for
partition $partition as " +