This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bc95aa21169 KAFKA-14248; Fix flaky test 
PlaintextAdminIntegrationTest.testCreateTopicsReturnsConfigs (#12669)
bc95aa21169 is described below

commit bc95aa21169b8b5b9b8a4b609e88cc125157234b
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Fri Sep 30 01:24:03 2022 +0200

    KAFKA-14248; Fix flaky test 
PlaintextAdminIntegrationTest.testCreateTopicsReturnsConfigs (#12669)
    
    The test is failing intermittently because we do not wait for propagation 
of the altered config (LogRetentionTimeMillisProp) across all brokers before 
proceeding ahead with the test.
    
    This PR makes the following changes:
    1. Wait for propagation of altered configuration to propagate to all 
brokers.
    2. Use the existing `killBroker` utility method which waits for shutdown 
using `awaitshutdown`.
    3. Improve code readability by using `TestUtils.incrementalAlterConfigs` to 
send alter config requests.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 82 ++++++++++------------
 1 file changed, 38 insertions(+), 44 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 7121f98bb9c..1656af08bc1 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -33,7 +33,6 @@ import kafka.server.{Defaults, DynamicConfig, KafkaConfig}
 import kafka.utils.TestUtils._
 import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.HostResolver
-import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
@@ -159,22 +158,6 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     waitForTopics(client, List(), topics)
   }
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk")) // KRaft mode will be supported in 
KAFKA-13910
-  def testMetadataRefresh(quorum: String): Unit = {
-    client = Admin.create(createConfig)
-    val topics = Seq("mytopic")
-    val newTopics = Seq(new NewTopic("mytopic", 3, 3.toShort))
-    client.createTopics(newTopics.asJava).all.get()
-    waitForTopics(client, expectedPresent = topics, expectedMissing = List())
-
-    val controller = brokers.find(_.config.brokerId == 
brokers.flatMap(_.metadataCache.getControllerId).head).get
-    controller.shutdown()
-    controller.awaitShutdown()
-    val topicDesc = client.describeTopics(topics.asJava).allTopicNames.get()
-    assertEquals(topics.toSet, topicDesc.keySet.asScala)
-  }
-
   /**
     * describe should not auto create topics
     */
@@ -821,10 +804,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   @ValueSource(strings = Array("zk", "kraft"))
   def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(quorum: String): 
Unit = {
     val leaders = createTopic(topic, replicationFactor = brokerCount)
-    val followerIndex = if (leaders(0) != brokers(0).config.brokerId) 0 else 1
+    val followerIndex = if (leaders(0) != brokers.head.config.brokerId) 0 else 
1
 
     def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: 
Long): Unit = {
-      TestUtils.waitUntilTrue(() => 
brokers(followerIndex).replicaManager.localLog(topicPartition) != None,
+      TestUtils.waitUntilTrue(() => 
brokers(followerIndex).replicaManager.localLog(topicPartition).isDefined,
                               "Expected follower to create replica for 
partition")
 
       // wait until the follower discovers that log start offset moved beyond 
its HW
@@ -862,6 +845,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val result1 = client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(117L)).asJava)
     result1.all().get()
     restartDeadBrokers()
+    TestUtils.waitForBrokersInIsr(client, topicPartition, Set(followerIndex))
     waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
   }
 
@@ -1522,7 +1506,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
     // but shut it down...
-    brokers(1).shutdown()
+    killBroker(1)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), 
Set(1))
 
     def assertPreferredLeaderNotAvailable(
@@ -1576,9 +1560,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    brokers(broker2).shutdown()
+    killBroker(broker2)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertNoLeader(client, partition1)
     brokers(broker2).startup()
 
@@ -1610,9 +1594,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    brokers(broker2).shutdown()
+    killBroker(broker2)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), 
Set(broker2))
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertNoLeader(client, partition2)
     brokers(broker2).startup()
@@ -1648,9 +1632,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    brokers(broker2).shutdown()
+    killBroker(broker2)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertLeader(client, partition2, broker3)
     brokers(broker2).startup()
@@ -1708,9 +1692,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    brokers(broker2).shutdown()
+    killBroker(broker2)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertNoLeader(client, partition1)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, 
Set(partition1).asJava)
@@ -1737,7 +1721,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertLeader(client, partition1, broker2)
     brokers(broker1).startup()
 
@@ -1769,9 +1753,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    brokers(broker2).shutdown()
+    killBroker(broker2)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertLeader(client, partition2, broker3)
     brokers(broker2).startup()
@@ -2505,7 +2489,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val alterResult = client.incrementalAlterConfigs(Map(
       topicResource -> topicAlterConfigs
     ).asJava)
-    alterResult.all().get()
+    alterResult.all().get(15, TimeUnit.SECONDS)
 
     ensureConsistentKRaftMetadata()
     val config = 
client.describeConfigs(List(topicResource).asJava).all().get().get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp)
@@ -2523,19 +2507,29 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   def testCreateTopicsReturnsConfigs(quorum: String): Unit = {
     client = Admin.create(super.createConfig)
 
-    val alterMap = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]
-    alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, ""), 
util.Arrays.asList(
-      new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "10800000"), OpType.SET)))
-    (brokers.map(_.config) ++ controllerServers.map(_.config)).foreach { case 
config =>
-      alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, 
config.nodeId.toString()),
-        util.Arrays.asList(new AlterConfigOp(new ConfigEntry(
-          KafkaConfig.LogCleanerDeleteRetentionMsProp, "34"), OpType.SET)))
+    val newLogRetentionProperties = new Properties
+    newLogRetentionProperties.put(KafkaConfig.LogRetentionTimeMillisProp, 
"10800000")
+    TestUtils.incrementalAlterConfigs(null, client, newLogRetentionProperties, 
perBrokerConfig = false)
+      .all().get(15, TimeUnit.SECONDS)
+
+    val newLogCleanerDeleteRetention = new Properties
+    
newLogCleanerDeleteRetention.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, 
"34")
+    TestUtils.incrementalAlterConfigs(brokers, client, 
newLogCleanerDeleteRetention, perBrokerConfig = true)
+      .all().get(15, TimeUnit.SECONDS)
+
+    if (isKRaftTest()) {
+      ensureConsistentKRaftMetadata()
+    } else {
+      waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
+        KafkaConfig.LogCleanerDeleteRetentionMsProp, 
"").toString.equals("34")),
+        s"Timed out waiting for change to 
${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
+        waitTimeMs = 60000L)
+
+      waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
+        KafkaConfig.LogRetentionTimeMillisProp, 
"").toString.equals("10800000")),
+        s"Timed out waiting for change to 
${KafkaConfig.LogRetentionTimeMillisProp}",
+        waitTimeMs = 60000L)
     }
-    client.incrementalAlterConfigs(alterMap).all().get()
-    waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
-      KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")),
-      s"Timed out waiting for change to 
${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
-      waitTimeMs = 60000L)
 
     val newTopics = Seq(new NewTopic("foo", Map((0: Integer) -> 
Seq[Integer](1, 2).asJava,
       (1: Integer) -> Seq[Integer](2, 0).asJava).asJava).

Reply via email to