chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1562683822
########## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ########## @@ -391,4 +428,30 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + + private void retryUntilEqual(List<Row> expected, Supplier<List<Row>> outputSupplier) { + try { + TestUtils.waitForCondition( + () -> expected.equals(outputSupplier.get()), + "TopicOffsets did not match. Expected: " + expectedTestTopicOffsets() + ", but was: " + + outputSupplier.get() + ". Final offsets: " + getFinalOffsets() + ); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private Set<Row> getFinalOffsets() throws ExecutionException, InterruptedException { Review Comment: we can remove this function now. ########## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ########## @@ -94,15 +109,38 @@ private void setUp() { } } + private void createConsumerAndPoll() { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { + List<String> topics = new ArrayList<>(); + for (int i = 0; i < topicCount + 1; i++) { + topics.add(getTopicName(i)); + } + consumer.subscribe(topics); + consumer.poll(consumerTimeout); + } + } + static class Row { private String name; private int partition; - private Long timestamp; + private Long offset; Review Comment: Could you add `final` to those fields? ########## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ########## @@ -391,4 +428,30 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + + private void retryUntilEqual(List<Row> expected, Supplier<List<Row>> outputSupplier) { + try { + TestUtils.waitForCondition( Review Comment: we don't need this retry now ########## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ########## @@ -48,13 +62,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ZK) +@ClusterTestDefaults(clusterType = Type.ALL) @Tag("integration") public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; private final String topicName = "topic"; + private final Duration consumerTimeout = Duration.ofMillis(1000); Review Comment: this can be a local variable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org