chia7712 commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1562683822


##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -391,4 +428,30 @@ private String[] addBootstrapServer(String... args) {
 
         return newArgs.toArray(new String[0]);
     }
+
+    private void retryUntilEqual(List<Row> expected, Supplier<List<Row>> 
outputSupplier) {
+        try {
+            TestUtils.waitForCondition(
+                    () -> expected.equals(outputSupplier.get()),
+                    "TopicOffsets did not match. Expected: " + 
expectedTestTopicOffsets() + ", but was: "
+                            + outputSupplier.get() + ". Final offsets: " + 
getFinalOffsets()
+            );
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Set<Row> getFinalOffsets() throws ExecutionException, 
InterruptedException {

Review Comment:
   we can remove this function now.



##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -94,15 +109,38 @@ private void setUp() {
         }
     }
 
+    private void createConsumerAndPoll() {
+        Properties props = new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(props)) {
+            List<String> topics = new ArrayList<>();
+            for (int i = 0; i < topicCount + 1; i++) {
+                topics.add(getTopicName(i));
+            }
+            consumer.subscribe(topics);
+            consumer.poll(consumerTimeout);
+        }
+    }
+
     static class Row {
         private String name;
         private int partition;
-        private Long timestamp;
+        private Long offset;

Review Comment:
   Could you add `final` to those fields?



##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -391,4 +428,30 @@ private String[] addBootstrapServer(String... args) {
 
         return newArgs.toArray(new String[0]);
     }
+
+    private void retryUntilEqual(List<Row> expected, Supplier<List<Row>> 
outputSupplier) {
+        try {
+            TestUtils.waitForCondition(

Review Comment:
   we don't need this retry now



##########
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##########
@@ -48,13 +62,14 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ZK)
+@ClusterTestDefaults(clusterType = Type.ALL)
 @Tag("integration")
 public class GetOffsetShellTest {
     private final int topicCount = 4;
     private final int offsetTopicPartitionCount = 4;
     private final ClusterInstance cluster;
     private final String topicName = "topic";
+    private final Duration consumerTimeout = Duration.ofMillis(1000);

Review Comment:
   this can be a local variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to