OmniaGM commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1706691975


##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -740,175 +843,181 @@ public void testDescribeUnavailablePartitions(String 
quorum) throws ExecutionExc
                     "Unexpected Topic " + rows[0] + " received. Expect " + 
String.format("Topic: %s", testTopicName));
             assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:"),
                     "Rows did not contain 'Leader: none\tReplicas: 0\tIsr:'");
-        } finally {
-            restartDeadBrokers(false);
+
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDescribeUnderReplicatedPartitions(String quorum) {
-        int partitions = 1;
-        short replicationFactor = 6;
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partitions, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        try {
-            killBroker(0);
-            if (isKRaftTest()) {
-                ensureConsistentKRaftMetadata();
-            } else {
-                TestUtils.waitForPartitionMetadata(aliveBrokers(), 
testTopicName, 0, org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS);
-            }
+    @ClusterTemplate("generate1")
+    public void testDescribeUnderReplicatedPartitions(TestInfo testInfo) 
throws InterruptedException {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            int partitions = 1;
+            short replicationFactor = 6;
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, partitions, replicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, partitions);
+
+            clusterInstance.shutdownBroker(0);
+            Assertions.assertEquals(clusterInstance.aliveBrokers().size(), 5);
+
+            TestUtils.waitForCondition(
+                    () -> 
clusterInstance.aliveBrokers().values().stream().allMatch(
+                            broker -> {
+                                
Optional<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionState 
=
+                                        
Optional.ofNullable(broker.metadataCache().getPartitionInfo(testTopicName, 
0).getOrElse(null));
+                                return partitionState.map(s -> 
FetchRequest.isValidBrokerId(s.leader())).orElse(false);
+                            }
+                    ), 60000, "Meta data propogation fail in 60000 ms");
+
             String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--under-replicated-partitions"));
             String[] rows = output.split(System.lineSeparator());
             assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)), String.format("Unexpected output: %s", rows[0]));
-        } finally {
-            restartDeadBrokers(false);
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDescribeUnderMinIsrPartitions(String quorum) {
-        Properties topicConfig = new Properties();
-        topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
-        int partitions = 1;
-        short replicationFactor = 6;
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partitions, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
-        );
-        try {
-            killBroker(0);
-            if (isKRaftTest()) {
-                ensureConsistentKRaftMetadata();
-            } else {
-                kafka.utils.TestUtils.waitUntilTrue(
-                    () -> aliveBrokers().forall(b -> 
b.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 5),
-                    () -> String.format("Timeout waiting for partition 
metadata propagating to brokers for %s topic", testTopicName),
-                    org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L
-                );
-            }
-            String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--under-min-isr-partitions"));
+
+    @ClusterTemplate("generate1")
+    public void testDescribeUnderMinIsrPartitions(TestInfo testInfo) throws 
InterruptedException {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            Map<String, String> topicConfig = new HashMap<>();
+            topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
+            int partitions = 1;
+            short replicationFactor = 6;
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
+            clusterInstance.waitForTopic(testTopicName, partitions);
+
+            clusterInstance.shutdownBroker(0);
+            assertEquals(5, clusterInstance.aliveBrokers().size());
+
+            TestUtils.waitForCondition(
+                    () -> 
clusterInstance.aliveBrokers().values().stream().allMatch(broker -> 
broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 
5),
+                    60000, String.format("Timeout waiting for partition 
metadata propagating to brokers for %s topic", testTopicName)
+            );
+
+            String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--under-min-isr-partitions", "--exclude-internal"));
             String[] rows = output.split(System.lineSeparator());
             assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)),
                     "Unexpected topic: " + rows[0]);
-        } finally {
-            restartDeadBrokers(false);
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String 
quorum) throws ExecutionException, InterruptedException {
-        TopicPartition tp = new TopicPartition(testTopicName, 0);
+    @ClusterTemplate("generate1")
+    public void 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TestInfo 
testInfo) throws ExecutionException, InterruptedException {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
 
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
+        try (Admin adminClient = clusterInstance.createAdminClient();
+            KafkaProducer<String, String> producer = createProducer()) {
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
 
-        // Produce multiple batches.
-        TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1);
-        TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1);
+            TopicPartition tp = new TopicPartition(testTopicName, 0);
 
-        // Enable throttling. Note the broker config sets the replica max 
fetch bytes to `1` upon to minimize replication
-        // throughput so the reassignment doesn't complete quickly.
-        List<Integer> brokerIds = 
JavaConverters.seqAsJavaList(brokers()).stream()
-            .map(broker -> 
broker.config().brokerId()).collect(Collectors.toList());
+            // Produce multiple batches.
+            sendProducerRecords(testTopicName, producer, 10);
+            sendProducerRecords(testTopicName, producer, 10);
 
-        ToolsTestUtils.setReplicationThrottleForPartitions(adminClient, 
brokerIds, Collections.singleton(tp), 1);
+            // Enable throttling. Note the broker config sets the replica max 
fetch bytes to `1` upon to minimize replication
+            // throughput so the reassignment doesn't complete quickly.
+            List<Integer> brokerIds = new 
ArrayList<>(clusterInstance.brokerIds());
 
-        TopicDescription testTopicDesc = 
adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName);
-        TopicPartitionInfo firstPartition = testTopicDesc.partitions().get(0);
+            ToolsTestUtils.setReplicationThrottleForPartitions(adminClient, 
brokerIds, Collections.singleton(tp), 1);
 
-        List<Integer> replicasOfFirstPartition = 
firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList());
-        List<Integer> replicasDiff = new ArrayList<>(brokerIds);
-        replicasDiff.removeAll(replicasOfFirstPartition);
-        Integer targetReplica = replicasDiff.get(0);
+            TopicDescription testTopicDesc = 
adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName);
+            TopicPartitionInfo firstPartition = 
testTopicDesc.partitions().get(0);
 
-        adminClient.alterPartitionReassignments(Collections.singletonMap(tp,
-            Optional.of(new 
NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get();
+            List<Integer> replicasOfFirstPartition = 
firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList());
+            List<Integer> replicasDiff = new ArrayList<>(brokerIds);
+            replicasDiff.removeAll(replicasOfFirstPartition);
+            Integer targetReplica = replicasDiff.get(0);
 
-        // let's wait until the LAIR is propagated
-        kafka.utils.TestUtils.waitUntilTrue(
-            () -> {
-                try {
-                    return 
!adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()
-                        .get(tp).addingReplicas().isEmpty();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RuntimeException(e);
-                }
-            },
-            () -> "Reassignment didn't add the second node",
-            org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
-
-        ensureConsistentKRaftMetadata();
-
-        // describe the topic and test if it's under-replicated
-        String simpleDescribeOutput = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--topic", testTopicName));
-        String[] simpleDescribeOutputRows = 
simpleDescribeOutput.split(System.lineSeparator());
-        
assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", 
testTopicName)),
-                "Unexpected describe output: " + simpleDescribeOutputRows[0]);
-        assertEquals(2, simpleDescribeOutputRows.length,
-                "Unexpected describe output length: " + 
simpleDescribeOutputRows.length);
-
-        String underReplicatedOutput = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--under-replicated-partitions"));
-        assertEquals("", underReplicatedOutput,
-            String.format("--under-replicated-partitions shouldn't return 
anything: '%s'", underReplicatedOutput));
-
-        int maxRetries = 20;
-        long pause = 100L;
-        long waitTimeMs = maxRetries * pause;
-        AtomicReference<PartitionReassignment> reassignmentsRef = new 
AtomicReference<>();
-
-        TestUtils.waitUntilTrue(() -> {
-            try {
-                PartitionReassignment tempReassignments = 
adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp);
-                reassignmentsRef.set(tempReassignments);
-            } catch (InterruptedException | ExecutionException e) {
-                throw new RuntimeException("Error while fetching 
reassignments", e);
-            }
-            return reassignmentsRef.get() != null;
-        }, () -> "Reassignments did not become non-null within the specified 
time", waitTimeMs, pause);
+            
adminClient.alterPartitionReassignments(Collections.singletonMap(tp,
+                    Optional.of(new 
NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get();
+
+            // let's wait until the LAIR is propagated
+            TestUtils.waitForCondition(
+                    () -> {
+                        try {
+                            return 
!adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()
+                                    .get(tp).addingReplicas().isEmpty();
+                        } catch (InterruptedException | ExecutionException e) {

Review Comment:
   this can be 
   ```
   TestUtils.waitForCondition(
                       () -> 
!adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()
                           .get(tp).addingReplicas().isEmpty(), 60000, 
"Reassignment didn't add the second node"
               );
    ```
   
   no need to wrap it in try and catch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to