mimaison commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r462989546



##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void 
waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
         }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
     }
 
+    private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) 
throws InterruptedException {
+        final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+        waitForCondition(() -> {
+            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+            consumer.commitSync();
+            return NUM_RECORDS_PRODUCED == 
totalConsumedRecords.addAndGet(records.count());
+        }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the 
records in time");
+    }
+
     @Test
     public void testOneWayReplicationWithAutoOffsetSync() throws 
InterruptedException {
 
         // create consumers before starting the connectors so we don't need to 
wait for discovery
-        Consumer<byte[], byte[]> consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "test-topic-1");
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
-        consumer1.close();
+        try (Consumer<byte[], byte[]> consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-1")) {
+            // we need to wait for consuming all the records for MM2 
replicaing the expected offsets

Review comment:
       `replicaing` -> `replicating`

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void 
waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
         }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
     }
 
+    private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) 
throws InterruptedException {
+        final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+        waitForCondition(() -> {
+            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));

Review comment:
       Can we add the types `<byte[], byte[]>` to `ConsumerRecords`?

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -387,11 +393,11 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
         }
 
         // create a consumer at primary cluster to consume the new topic
-        consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "test-topic-2");
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
-        consumer1.close();
+        try (Consumer<byte[], byte[]> consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-2")) {
+            // we need to wait for consuming all the records for MM2 
replicaing the expected offsets

Review comment:
       `replicaing` -> `replicating`

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void 
waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
         }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
     }
 
+    private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) 
throws InterruptedException {
+        final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+        waitForCondition(() -> {
+            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+            consumer.commitSync();

Review comment:
       We can move that line after the `waitForCondition()` block to just 
commit once all records have been consumed.

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void 
waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
         }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
     }
 
+    private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) 
throws InterruptedException {
+        final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+        waitForCondition(() -> {
+            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+            consumer.commitSync();
+            return NUM_RECORDS_PRODUCED == 
totalConsumedRecords.addAndGet(records.count());
+        }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the 
records in time");

Review comment:
       nit: The sentence sounds slightly better if you remove `the`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to