OmniaGM commented on code in PR #16127: URL: https://github.com/apache/kafka/pull/16127#discussion_r1706691975
########## tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java: ########## @@ -740,175 +843,181 @@ public void testDescribeUnavailablePartitions(String quorum) throws ExecutionExc "Unexpected Topic " + rows[0] + " received. Expect " + String.format("Topic: %s", testTopicName)); assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:"), "Rows did not contain 'Leader: none\tReplicas: 0\tIsr:'"); - } finally { - restartDeadBrokers(false); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeUnderReplicatedPartitions(String quorum) { - int partitions = 1; - short replicationFactor = 6; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - try { - killBroker(0); - if (isKRaftTest()) { - ensureConsistentKRaftMetadata(); - } else { - TestUtils.waitForPartitionMetadata(aliveBrokers(), testTopicName, 0, org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS); - } + @ClusterTemplate("generate1") + public void testDescribeUnderReplicatedPartitions(TestInfo testInfo) throws InterruptedException { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + int partitions = 1; + short replicationFactor = 6; + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor))); + clusterInstance.waitForTopic(testTopicName, partitions); + + clusterInstance.shutdownBroker(0); + Assertions.assertEquals(clusterInstance.aliveBrokers().size(), 5); + + TestUtils.waitForCondition( + () -> clusterInstance.aliveBrokers().values().stream().allMatch( + broker -> { + Optional<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionState = + Optional.ofNullable(broker.metadataCache().getPartitionInfo(testTopicName, 0).getOrElse(null)); + return partitionState.map(s -> FetchRequest.isValidBrokerId(s.leader())).orElse(false); + } + ), 60000, "Meta data propogation fail in 60000 ms"); + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-replicated-partitions")); String[] rows = output.split(System.lineSeparator()); assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), String.format("Unexpected output: %s", rows[0])); - } finally { - restartDeadBrokers(false); } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeUnderMinIsrPartitions(String quorum) { - Properties topicConfig = new Properties(); - topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); - int partitions = 1; - short replicationFactor = 6; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), topicConfig - ); - try { - killBroker(0); - if (isKRaftTest()) { - ensureConsistentKRaftMetadata(); - } else { - kafka.utils.TestUtils.waitUntilTrue( - () -> aliveBrokers().forall(b -> b.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 5), - () -> String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName), - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L - ); - } - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions")); + + @ClusterTemplate("generate1") + public void testDescribeUnderMinIsrPartitions(TestInfo testInfo) throws InterruptedException { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + Map<String, String> topicConfig = new HashMap<>(); + topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); + int partitions = 1; + short replicationFactor = 6; + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig))); + clusterInstance.waitForTopic(testTopicName, partitions); + + clusterInstance.shutdownBroker(0); + assertEquals(5, clusterInstance.aliveBrokers().size()); + + TestUtils.waitForCondition( + () -> clusterInstance.aliveBrokers().values().stream().allMatch(broker -> broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 5), + 60000, String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName) + ); + + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions", "--exclude-internal")); String[] rows = output.split(System.lineSeparator()); assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Unexpected topic: " + rows[0]); - } finally { - restartDeadBrokers(false); } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String quorum) throws ExecutionException, InterruptedException { - TopicPartition tp = new TopicPartition(testTopicName, 0); + @ClusterTemplate("generate1") + public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TestInfo testInfo) throws ExecutionException, InterruptedException { + String testTopicName = testInfo.getTestMethod().get().getName() + "-" + + TestUtils.randomString(10); - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); + try (Admin adminClient = clusterInstance.createAdminClient(); + KafkaProducer<String, String> producer = createProducer()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); - // Produce multiple batches. - TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1); - TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1); + TopicPartition tp = new TopicPartition(testTopicName, 0); - // Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication - // throughput so the reassignment doesn't complete quickly. - List<Integer> brokerIds = JavaConverters.seqAsJavaList(brokers()).stream() - .map(broker -> broker.config().brokerId()).collect(Collectors.toList()); + // Produce multiple batches. + sendProducerRecords(testTopicName, producer, 10); + sendProducerRecords(testTopicName, producer, 10); - ToolsTestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp), 1); + // Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication + // throughput so the reassignment doesn't complete quickly. + List<Integer> brokerIds = new ArrayList<>(clusterInstance.brokerIds()); - TopicDescription testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName); - TopicPartitionInfo firstPartition = testTopicDesc.partitions().get(0); + ToolsTestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp), 1); - List<Integer> replicasOfFirstPartition = firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList()); - List<Integer> replicasDiff = new ArrayList<>(brokerIds); - replicasDiff.removeAll(replicasOfFirstPartition); - Integer targetReplica = replicasDiff.get(0); + TopicDescription testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName); + TopicPartitionInfo firstPartition = testTopicDesc.partitions().get(0); - adminClient.alterPartitionReassignments(Collections.singletonMap(tp, - Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get(); + List<Integer> replicasOfFirstPartition = firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList()); + List<Integer> replicasDiff = new ArrayList<>(brokerIds); + replicasDiff.removeAll(replicasOfFirstPartition); + Integer targetReplica = replicasDiff.get(0); - // let's wait until the LAIR is propagated - kafka.utils.TestUtils.waitUntilTrue( - () -> { - try { - return !adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get() - .get(tp).addingReplicas().isEmpty(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }, - () -> "Reassignment didn't add the second node", - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); - - ensureConsistentKRaftMetadata(); - - // describe the topic and test if it's under-replicated - String simpleDescribeOutput = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)); - String[] simpleDescribeOutputRows = simpleDescribeOutput.split(System.lineSeparator()); - assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", testTopicName)), - "Unexpected describe output: " + simpleDescribeOutputRows[0]); - assertEquals(2, simpleDescribeOutputRows.length, - "Unexpected describe output length: " + simpleDescribeOutputRows.length); - - String underReplicatedOutput = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-replicated-partitions")); - assertEquals("", underReplicatedOutput, - String.format("--under-replicated-partitions shouldn't return anything: '%s'", underReplicatedOutput)); - - int maxRetries = 20; - long pause = 100L; - long waitTimeMs = maxRetries * pause; - AtomicReference<PartitionReassignment> reassignmentsRef = new AtomicReference<>(); - - TestUtils.waitUntilTrue(() -> { - try { - PartitionReassignment tempReassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp); - reassignmentsRef.set(tempReassignments); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Error while fetching reassignments", e); - } - return reassignmentsRef.get() != null; - }, () -> "Reassignments did not become non-null within the specified time", waitTimeMs, pause); + adminClient.alterPartitionReassignments(Collections.singletonMap(tp, + Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get(); + + // let's wait until the LAIR is propagated + TestUtils.waitForCondition( + () -> { + try { + return !adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get() + .get(tp).addingReplicas().isEmpty(); + } catch (InterruptedException | ExecutionException e) { Review Comment: this can be ``` TestUtils.waitForCondition( () -> !adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get() .get(tp).addingReplicas().isEmpty(), 60000, "Reassignment didn't add the second node" ); ``` no need to wrap it in try and catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org