ning2008wisc commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r435705249



##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -316,6 +316,69 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+
+    @Test
+    public void testOneWayReplicationWithAutorOffsetSync1() throws 
InterruptedException {
+
+        // create consumers before starting the connectors so we don't need to 
wait for discovery
+        Consumer<byte[], byte[]> consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-1");
+        consumer1.poll(Duration.ofMillis(500));
+        consumer1.commitSync();
+        consumer1.close();
+
+        // enable automated consumer group offset sync
+        mm2Props.put("sync.group.offsets.enabled", "true");
+        mm2Props.put("sync.group.offsets.interval.seconds", "1");
+        // one way replication from primary to backup
+        mm2Props.put("backup->primary.enabled", "false");
+
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
+
+        // sleep 5 seconds to ensure the automated group offset sync is 
complete
+        time.sleep(5000);
+
+        // create a consumer at backup cluster with same consumer group Id to 
consume 1 topic
+        Consumer<byte[], byte[]> consumer = 
backup.kafka().createConsumerAndSubscribeTo(
+            Collections.singletonMap("group.id", "consumer-group-1"), 
"primary.test-topic-1");
+        ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+        // the size of consumer record should be zero, because the offsets of 
the same consumer group
+        // have been automatically synchronized from primary to backup by the 
background job, so no
+        // more records to consume from the replicated topic by the same 
consumer group at backup cluster
+        assertTrue("consumer record size is not zero", records.count() == 0);
+
+        // now create a new topic in primary cluster
+        primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
+        backup.kafka().createTopic("primary.test-topic-2", 1);
+        // produce some records to the new topic in primary cluster
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            primary.kafka().produce("test-topic-2", i % NUM_PARTITIONS, "key", 
"message-1-" + i);
+        }
+
+        // create a consumer at primary cluster to consume the new topic
+        consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-2");
+        consumer1.poll(Duration.ofMillis(500));
+        consumer1.commitSync();
+        consumer1.close();
+
+        // sleep 5 seconds to ensure the automated group offset sync is 
complete
+        time.sleep(5000);
+
+        // create a consumer at backup cluster with same consumer group Id to 
consume old and new topic
+        consumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "primary.test-topic-1", 
"primary.test-topic-2");
+
+        records = consumer.poll(Duration.ofMillis(500));
+        // similar reasoning as above, no more records to consume by the same 
consumer group at backup cluster
+        assertTrue("consumer record size is not zero", records.count() == 0);

Review comment:
       updated

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -207,8 +185,30 @@ public void close() {
         backup.stop();
     }
 
+
+
     @Test
     public void testReplication() throws InterruptedException {
+
+        // create consumers before starting the connectors so we don't need to 
wait for discovery
+        Consumer<byte[], byte[]> consumer3 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(

Review comment:
       updated to `consumer1` and `consumer2`

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -64,4 +70,58 @@ public void testCheckpoint() {
         assertEquals(13, checkpoint2.downstreamOffset());
         assertEquals(234L, sourceRecord2.timestamp().longValue());
     }
+
+    @Test
+    public void testSyncOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> 
idleConsumerGroupsOffset = new HashMap<>();
+        Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new 
HashMap<>();
+
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        // 'c1t1' denotes consumer offsets of all partitions of topic1 for 
consumer1
+        Map<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<>();
+        // 't1p0' denotes topic1, partition 0
+        TopicPartition t1p0 = new TopicPartition(topic1, 0);
+        // 'c1t1p0' denotes the consumer offsets for topic1, partition 0 for 
consumer1
+        Entry<TopicPartition, OffsetAndMetadata> c1t1p0 = new 
SimpleEntry<>(t1p0, new OffsetAndMetadata(100, ""));
+
+        c1t1.put(t1p0, new OffsetAndMetadata(100, ""));
+
+        Map<TopicPartition, OffsetAndMetadata> c2t2 = new HashMap<>();
+        TopicPartition t2p0 = new TopicPartition(topic2, 0);
+        Entry<TopicPartition, OffsetAndMetadata> c2t2p0 = new 
SimpleEntry<>(t2p0, new OffsetAndMetadata(50, ""));

Review comment:
       updated

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -64,4 +70,58 @@ public void testCheckpoint() {
         assertEquals(13, checkpoint2.downstreamOffset());
         assertEquals(234L, sourceRecord2.timestamp().longValue());
     }
+
+    @Test
+    public void testSyncOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> 
idleConsumerGroupsOffset = new HashMap<>();
+        Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new 
HashMap<>();
+
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+
+        // 'c1t1' denotes consumer offsets of all partitions of topic1 for 
consumer1
+        Map<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<>();
+        // 't1p0' denotes topic1, partition 0
+        TopicPartition t1p0 = new TopicPartition(topic1, 0);
+        // 'c1t1p0' denotes the consumer offsets for topic1, partition 0 for 
consumer1
+        Entry<TopicPartition, OffsetAndMetadata> c1t1p0 = new 
SimpleEntry<>(t1p0, new OffsetAndMetadata(100, ""));

Review comment:
       removed the unused

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##########
@@ -29,7 +35,7 @@
     @Test
     public void testDownstreamTopicRenaming() {
         MirrorCheckpointTask mirrorCheckpointTask = new 
MirrorCheckpointTask("source1", "target2",
-            new DefaultReplicationPolicy(), null);
+            new DefaultReplicationPolicy(), null, new HashMap<>(), new 
HashMap<>());

Review comment:
       updated

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -234,6 +243,17 @@ Duration adminTimeout() {
         return props;
     }
 
+
+    Map<String, Object> targetConsumerConfig() {
+        Map<String, Object> props = new HashMap<>();
+        props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX));
+        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
+        props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
+        props.put("enable.auto.commit", "false");

Review comment:
       done

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -188,4 +219,101 @@ public void commitRecord(SourceRecord record) {
             Checkpoint.unwrapGroup(record.sourcePartition()),
             System.currentTimeMillis() - record.timestamp());
     }
+
+    private void refreshIdleConsumerGroupOffset() {
+        Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc 
= targetAdminClient
+            .describeConsumerGroups(consumerGroups).describedGroups();
+
+        for (String group : consumerGroups) {
+            try {
+                ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+                ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+                // sync offset to the target cluster only if the state of 
current consumer group is:
+                // (1) idle: because the consumer at target is not actively 
consuming the mirrored topic
+                // (2) dead: the new consumer that is recently created at 
source and never exist at target
+                if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+                    idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+                        
.partitionsToOffsetAndMetadata().get().entrySet().stream().collect(
+                            Collectors.toMap(e -> e.getKey(), e -> 
e.getValue())));
+                }
+                // new consumer upstream has state "DEAD" and will be 
identified during the offset sync-up
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+            }
+        }
+    }
+
+    Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = 
new HashMap<>();
+
+        // first, sync offsets for the idle consumers at target
+        for (Entry<String, Map<TopicPartition, OffsetAndMetadata>> group : 
getConvertedUpstreamOffset().entrySet()) {
+            String consumerGroupId = group.getKey();
+            // for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+            // from the pre-populated map
+            Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = 
group.getValue();
+
+            Map<TopicPartition, OffsetAndMetadata> offsetToSync = new 
HashMap<>();
+            Map<TopicPartition, OffsetAndMetadata> targetConsumerOffset = 
idleConsumerGroupsOffset.get(consumerGroupId);
+            if (targetConsumerOffset == null) {
+                // this is a new consumer, just sync the offset to target
+                syncGroupOffset(consumerGroupId, convertedUpstreamOffset);
+                offsetToSyncAll.put(consumerGroupId, convertedUpstreamOffset);
+                continue;
+            }
+
+            for (Entry<TopicPartition, OffsetAndMetadata> convertedEntry : 
convertedUpstreamOffset.entrySet()) {
+
+                TopicPartition topicPartition = convertedEntry.getKey();
+                OffsetAndMetadata convertedOffset = 
convertedUpstreamOffset.get(topicPartition);
+                if (!targetConsumerOffset.containsKey(topicPartition)) {
+                    // if is a new topicPartition from upstream, just sync the 
offset to target
+                    offsetToSync.put(topicPartition, convertedOffset);
+                    continue;
+                }
+
+                // if translated offset from upstream is smaller than the 
current consumer offset
+                // in the target, skip updating the offset for that partition
+                long latestDownstreamOffset = 
targetConsumerOffset.get(topicPartition).offset();
+                if (latestDownstreamOffset >= convertedOffset.offset()) {
+                    log.trace("latestDownstreamOffset {} is larger than 
convertedUpstreamOffset {} for "

Review comment:
       updated to `latestDownstreamOffset {} is larger than or equal to 
convertedUpstreamOffset {} for....`

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -188,4 +219,101 @@ public void commitRecord(SourceRecord record) {
             Checkpoint.unwrapGroup(record.sourcePartition()),
             System.currentTimeMillis() - record.timestamp());
     }
+
+    private void refreshIdleConsumerGroupOffset() {
+        Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc 
= targetAdminClient
+            .describeConsumerGroups(consumerGroups).describedGroups();

Review comment:
       great to know that KIP, then I will keep using 
`describeConsumerGroups()` here

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -79,7 +93,15 @@ public void start(Map<String, String> props) {
         pollTimeout = config.consumerPollTimeout();
         offsetSyncStore = new OffsetSyncStore(config);
         sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
+        targetAdminClient = (KafkaAdminClient) 
Admin.create(config.targetAdminConfig());

Review comment:
       updated

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -16,6 +16,13 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import java.util.HashMap;

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to