TaiJuWu commented on code in PR #17579:
URL: https://github.com/apache/kafka/pull/17579#discussion_r1828924456


##########
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java:
##########
@@ -225,4 +235,56 @@ default void waitAcls(AclBindingFilter filter, 
Collection<AccessControlEntry> en
         }
     }
 
+    default <B extends KafkaBroker> void verifyTopicDeletion(String topic, int 
numPartitions) throws Exception {
+        verifyTopicDeletion(topic, numPartitions, brokers().values());
+    }
+
+    default <B extends KafkaBroker> void verifyTopicDeletion(String topic, int 
numPartitions, Collection<B> brokers) throws Exception {
+        List<TopicPartition> topicPartitions = IntStream.range(0, 
numPartitions)
+            .mapToObj(partition -> new TopicPartition(topic, partition))
+            .collect(Collectors.toList());
+
+        // Ensure that the topic-partition has been deleted from all brokers' 
replica managers
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->

Review Comment:
   Since `clusterInstance` support shutdown broker, using `aliveBroker` is 
better solution.
   You can refer https://github.com/apache/kafka/pull/17085



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to