ning2008wisc commented on a change in pull request #7577: URL: https://github.com/apache/kafka/pull/7577#discussion_r416995009
########## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ########## @@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) { Checkpoint.unwrapGroup(record.sourcePartition()), System.currentTimeMillis() - record.timestamp()); } + + private void refreshIdleConsumerGroupOffset() { + Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient + .describeConsumerGroups(consumerGroups).describedGroups(); + + for (String group : consumerGroups) { + try { + if (consumerGroupsDesc.get(group) == null) { + // if consumerGroupsDesc does not contain this group, it should be the new consumer + // group created at source cluster and its offsets should be sync-ed to target + newConsumerGroup.add(group); + continue; + } + ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get(); + // sync offset to the target cluster only if the state of current consumer group is idle or dead + ConsumerGroupState consumerGroupState = consumerGroupDesc.state(); + if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || consumerGroupState.equals(ConsumerGroupState.DEAD)) { + idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group) + .partitionsToOffsetAndMetadata().get().entrySet()); + } + } catch (InterruptedException | ExecutionException e) { + log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e); + } + } + } + + Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() { + Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>(); + + // first, sync offsets for the idle consumers at target + for (Map.Entry<String, Set<Map.Entry<TopicPartition, OffsetAndMetadata>>> group : idleConsumerGroupsOffset.entrySet()) { + String consumerGroupId = group.getKey(); + // for each idle consumer at target, read the checkpoints (converted upstream offset) + // from the pre-populated map + Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = getConvertedUpstreamOffset(consumerGroupId); + + if (convertedUpstreamOffset == null) continue; + + Map<TopicPartition, OffsetAndMetadata> offsetToSync = new HashMap<>(); + for (Entry<TopicPartition, OffsetAndMetadata> entry : group.getValue()) { Review comment: If I am understanding right, are you asking about this scenario: consumer A is consuming from Topic `x` and ` y` and MM is replicating the offset of consumer A for Topic `x` and `y`. What if consumer A starts consume from Topic `x`, `y` and `z` where `z` is a new topic, why MM does not replicate the offset of consumer A for Topic `z`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org