[kafka] 01/01: fix test

2022-09-29 Thread cmccabe
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b9da249bdfa9df7a511284d815af8200ab0dfbd9
Author: Colin P. McCabe 
AuthorDate: Thu Sep 29 09:17:03 2022 -0700

fix test
---
 .../test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d6f736055b2..ff1b2f5934d 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -2628,8 +2628,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   new ResourcePattern(TOPIC, "fooa", PREFIXED))
 addAndVerifyAcls(Set(new AccessControlEntry("User:otherPrincipal", 
WildcardHost, CREATE, ALLOW)),
   new ResourcePattern(TOPIC, "foob", PREFIXED))
-val future = createAdminClient().createTopics(Collections.
-  singletonList(new NewTopic("foobar", 1, 1.toShort))).all()
-JTestUtils.assertFutureThrows(future, classOf[TopicAuthorizationException])
+createAdminClient().createTopics(Collections.
+  singletonList(new NewTopic("foobar", 1, 1.toShort))).all().get()
   }
 }



[kafka] 01/01: Fix test and minor tweaks

2019-12-08 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch hachikuji/KAFKA-9212
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit cfb88f4f116e0e2932c5dd2e416841824a915257
Author: Ismael Juma 
AuthorDate: Sun Dec 8 09:27:59 2019 -0800

Fix test and minor tweaks
---
 clients/src/test/java/org/apache/kafka/clients/MetadataTest.java | 7 ++-
 core/src/main/scala/kafka/controller/KafkaController.scala   | 8 
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java 
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index fc51957..7067e88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -158,6 +158,11 @@ public class MetadataTest {
 assertEquals(0, metadata.timeToNextUpdate(now + 1));
 }
 
+/**
+ * Prior to Kafka version 2.4 (which coincides with Metadata version 9), 
the broker does not propagate leader epoch
+ * information accurately while a reassignment is in progress, so we 
cannot rely on it. This is explained in more
+ * detail in MetadataResponse's constructor.
+ */
 @Test
 public void testIgnoreLeaderEpochInOlderMetadataResponse() {
 TopicPartition tp = new TopicPartition("topic", 0);
@@ -196,7 +201,7 @@ public class MetadataTest {
 assertEquals(-1, info.epoch());
 }
 
-for (short version = 9; version <= ApiKeys.METADATA.oldestVersion(); 
version++) {
+for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); 
version++) {
 Struct struct = data.toStruct(version);
 MetadataResponse response = new MetadataResponse(struct, version);
 assertTrue(response.hasReliableLeaderEpochs());
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6133cc6..444e74d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1081,15 +1081,15 @@ class KafkaController(val config: KafkaConfig,
   val UpdateLeaderAndIsrResult(finishedUpdates, _) =
 zkClient.updateLeaderAndIsr(immutable.Map(partition -> 
newLeaderAndIsr), epoch, controllerContext.epochZkVersion)
 
-  finishedUpdates.get(partition).exists {
-case Right(leaderAndIsr) =>
+  finishedUpdates.get(partition) match {
+case Some(Right(leaderAndIsr)) =>
   val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)
   controllerContext.partitionLeadershipInfo.put(partition, 
leaderIsrAndControllerEpoch)
   finalLeaderIsrAndControllerEpoch = 
Some(leaderIsrAndControllerEpoch)
   info(s"Updated leader epoch for partition $partition to 
${leaderAndIsr.leaderEpoch}")
   true
-case Left(e) =>
-  throw e
+case Some(Left(e)) => throw e
+case None => false
   }
 case None =>
   throw new IllegalStateException(s"Cannot update leader epoch for 
partition $partition as " +