This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f6ba10ef9c MINOR: Fix flaky test 
TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft 
(#12189)
f6ba10ef9c is described below

commit f6ba10ef9c2c2d94473efd2fd596b172fcff494a
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Sat May 21 19:33:44 2022 +0200

    MINOR: Fix flaky test 
TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft 
(#12189)
    
    Flaky test as failed in CI 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12184/1/tests/
    
    The test fails because it does not wait for metadata to be propagated 
across brokers before killing a broker which may lead to it getting stale 
information. Note that a similar test was done in #12104 for a different test.
    
    Reviewers: Kvicii Y, Ziming Deng, Jason Gustafson <ja...@confluent.io>, 
Guozhang Wang <wangg...@gmail.com>
---
 .../kafka/admin/TopicCommandIntegrationTest.scala  | 36 +++++++++++++---------
 .../kafka/integration/KafkaServerTestHarness.scala |  6 +++-
 .../kafka/server/DeleteTopicsRequestTest.scala     |  8 ++---
 3 files changed, 29 insertions(+), 21 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
index 26c60e1c3e..3082babd06 100644
--- 
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
@@ -586,16 +586,10 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
     try {
       killBroker(0)
-      val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-
       if (isKRaftTest()) {
-        TestUtils.ensureConsistentKRaftMetadata(
-          aliveServers,
-          controllerServer,
-          "Timeout waiting for partition metadata propagating to brokers"
-        )
+        ensureConsistentKRaftMetadata()
       } else {
-        TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
+        TestUtils.waitForPartitionMetadata(aliveBrokers, testTopicName, 0)
       }
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new 
TopicCommandOptions(Array("--under-replicated-partitions"))))
@@ -618,8 +612,14 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
     try {
       killBroker(0)
-      val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-      TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
+      if (isKRaftTest()) {
+        ensureConsistentKRaftMetadata()
+      } else {
+        TestUtils.waitUntilTrue(
+          () => 
aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 
0).get.isr().size() == 5),
+          s"Timeout waiting for partition metadata propagating to brokers for 
$testTopicName topic"
+        )
+      }
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"))))
       val rows = output.split("\n")
@@ -697,6 +697,16 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     try {
       killBroker(0)
       killBroker(1)
+
+      if (isKRaftTest()) {
+        ensureConsistentKRaftMetadata()
+      } else {
+        TestUtils.waitUntilTrue(
+          () => 
aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 
0).get.isr().size() == 4),
+          s"Timeout waiting for partition metadata propagating to brokers for 
$testTopicName topic"
+        )
+      }
+
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new 
TopicCommandOptions(Array("--at-min-isr-partitions"))))
       val rows = output.split("\n")
@@ -741,13 +751,11 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
     try {
       killBroker(0)
-      val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-
       if (isKRaftTest()) {
-        TestUtils.ensureConsistentKRaftMetadata(aliveServers, 
controllerServer, "Timeout waiting for topic configs propagating to brokers")
+        ensureConsistentKRaftMetadata()
       } else {
         TestUtils.waitUntilTrue(
-          () => aliveServers.forall(
+          () => aliveBrokers.forall(
             broker =>
               broker.metadataCache.getPartitionInfo(underMinIsrTopic, 
0).get.isr().size() < 6 &&
                 broker.metadataCache.getPartitionInfo(offlineTopic, 
0).get.leader() == MetadataResponse.NO_LEADER_ID),
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index f46713337a..26f4c9d4c2 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -353,10 +353,14 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
     }
   }
 
+  def aliveBrokers: Seq[KafkaBroker] = {
+    _brokers.filter(broker => alive(broker.config.brokerId)).toSeq
+  }
+
   def ensureConsistentKRaftMetadata(): Unit = {
     if (isKRaftTest()) {
       TestUtils.ensureConsistentKRaftMetadata(
-        brokers,
+        aliveBrokers,
         controllerServer
       )
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 9137558437..644f21ff3f 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -71,9 +71,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with 
Logging {
     val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
     assertTrue(error.isEmpty, s"There should be no errors, found 
${response.data.responses.asScala}")
 
-    if (isKRaftTest()) {
-      TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
-    }
+    ensureConsistentKRaftMetadata()
 
     request.data.topicNames.forEach { topic =>
       validateTopicIsDeleted(topic)
@@ -85,9 +83,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with 
Logging {
     val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
     assertTrue(error.isEmpty, s"There should be no errors, found 
${response.data.responses.asScala}")
 
-    if (isKRaftTest()) {
-      TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
-    }
+    ensureConsistentKRaftMetadata()
 
     response.data.responses.forEach { response =>
       validateTopicIsDeleted(response.name())

Reply via email to