mimaison commented on a change in pull request #9029: URL: https://github.com/apache/kafka/pull/9029#discussion_r462989546
########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ########## @@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } + private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) throws InterruptedException { + final AtomicInteger totalConsumedRecords = new AtomicInteger(0); + waitForCondition(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + consumer.commitSync(); + return NUM_RECORDS_PRODUCED == totalConsumedRecords.addAndGet(records.count()); + }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the records in time"); + } + @Test public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException { // create consumers before starting the connectors so we don't need to wait for discovery - Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( - "group.id", "consumer-group-1"), "test-topic-1"); - consumer1.poll(Duration.ofMillis(500)); - consumer1.commitSync(); - consumer1.close(); + try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( + "group.id", "consumer-group-1"), "test-topic-1")) { + // we need to wait for consuming all the records for MM2 replicaing the expected offsets Review comment: `replicaing` -> `replicating` ########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ########## @@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } + private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) throws InterruptedException { + final AtomicInteger totalConsumedRecords = new AtomicInteger(0); + waitForCondition(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); Review comment: Can we add the types `<byte[], byte[]>` to `ConsumerRecords`? ########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ########## @@ -387,11 +393,11 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio } // create a consumer at primary cluster to consume the new topic - consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( - "group.id", "consumer-group-1"), "test-topic-2"); - consumer1.poll(Duration.ofMillis(500)); - consumer1.commitSync(); - consumer1.close(); + try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( + "group.id", "consumer-group-1"), "test-topic-2")) { + // we need to wait for consuming all the records for MM2 replicaing the expected offsets Review comment: `replicaing` -> `replicating` ########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ########## @@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } + private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) throws InterruptedException { + final AtomicInteger totalConsumedRecords = new AtomicInteger(0); + waitForCondition(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + consumer.commitSync(); Review comment: We can move that line after the `waitForCondition()` block to just commit once all records have been consumed. ########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ########## @@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); } + private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) throws InterruptedException { + final AtomicInteger totalConsumedRecords = new AtomicInteger(0); + waitForCondition(() -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + consumer.commitSync(); + return NUM_RECORDS_PRODUCED == totalConsumedRecords.addAndGet(records.count()); + }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the records in time"); Review comment: nit: The sentence sounds slightly better if you remove `the` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org