ning2008wisc commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r416995009



##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
             Checkpoint.unwrapGroup(record.sourcePartition()),
             System.currentTimeMillis() - record.timestamp());
     }
+
+    private void refreshIdleConsumerGroupOffset() {
+        Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc 
= targetAdminClient
+            .describeConsumerGroups(consumerGroups).describedGroups();
+
+        for (String group : consumerGroups) {
+            try {
+                if (consumerGroupsDesc.get(group) == null) {
+                    // if consumerGroupsDesc does not contain this group, it 
should be the new consumer
+                    // group created at source cluster and its offsets should 
be sync-ed to target
+                    newConsumerGroup.add(group);
+                    continue;
+                }
+                ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+                // sync offset to the target cluster only if the state of 
current consumer group is idle or dead
+                ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+                if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || 
consumerGroupState.equals(ConsumerGroupState.DEAD)) {
+                    idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+                        .partitionsToOffsetAndMetadata().get().entrySet());
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+            }
+        }
+    }
+
+    Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = 
new HashMap<>();
+
+        // first, sync offsets for the idle consumers at target
+        for (Map.Entry<String, Set<Map.Entry<TopicPartition, 
OffsetAndMetadata>>> group : idleConsumerGroupsOffset.entrySet()) {
+            String consumerGroupId = group.getKey();
+            // for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+            // from the pre-populated map
+            Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+            if (convertedUpstreamOffset == null) continue;
+
+            Map<TopicPartition, OffsetAndMetadata> offsetToSync = new 
HashMap<>();
+            for (Entry<TopicPartition, OffsetAndMetadata> entry : 
group.getValue()) {

Review comment:
       If I am understanding right, are you asking about this scenario: 
consumer A is consuming from Topic `x` and ` y` and MM is replicating the 
offset of consumer A for Topic `x` and `y`. What if consumer A starts consume 
from Topic `x`, `y` and `z` where `z` is a new topic, why MM does not replicate 
the offset of consumer A for Topic `z`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to