[ https://issues.apache.org/jira/browse/KAFKA-13932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bale updated KAFKA-13932: ------------------------- Description: *Kafka version: 2.8.0* *Replication flow: A -> B* *Topic: DL2 -> A.DL2* I use mm2 to replicate cluster A data to cluster B. After stop and restart mm2(before restart, the data in source cluster is expired), I found consumer group offset is bigger than topic partiton offset in target cluster which will cause data loss in the mirror topic. *[Steps]* 1. Start mm2 with the attachment: connect-mirror-maker.properties. 2.Create topic DL2, push 5 messages and consume with group g1. Offset info of DL2 and A.DL2:please see attachment offset1.png 3.Stop mm2 and *push 5 messages* to the topic DL2. Offset info of DL2 and A.DL2: please see attachment offset2.png 4. After 1 hour, the data of topic DL2 is expired. 5.Restart mm2 and sync consumer group offset, found the consume offset is bigger than topic partiton offset. Offset info of DL2 and A.DL2: please see attachment offset3.png *[Comments]* In the method translateDownstream of OffsetSyncStore class, I found use upstream consumer group offset and the latest offset-sync topic record's downstream offset to caculate the downstream offset: long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset(); return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep); If there is data is consumed and deleted in source cluster,but isn't replicated to target cluster , the downstream offset will bigger than topic partiton offset after consumer group offset synced to target cluster. In the method syncGroupOffset of MirrorCheckpointTask class, there is no check for consumer group offset and topic partiton offset. was: Kafka version: 2.8.0 Replication flow: A -> B Topic: DL2 -> A.DL2 I use mm2 to replicate cluster A data to cluster B. In some cases, I found consumer group offset is bigger than partition offset in cluster B which will cause data loss of the mirror topic. [Steps] 1. Start mm2 with the attachment: connect-mirror-maker.properties. 2.Create topic DL2, push 5 messages and consume with group g1. Offset info of DL2 and A.DL2:please see attachment offset1.png 3.Stop mm2 and push 5 messages to the topic DL2. Offset info of DL2 and A.DL2: please see attachment offset2.png 4. After 1 hour, the data of topic DL2 is deleted. 5.Restart mm2 and sync consumer group offset, found the consume offset is bigger than partiton offset. Offset info of DL2 and A.DL2: please see attachment offset3.png > Replication data loss in some cases > ----------------------------------- > > Key: KAFKA-13932 > URL: https://issues.apache.org/jira/browse/KAFKA-13932 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 2.8.0 > Reporter: Bale > Priority: Minor > Attachments: connect-mirror-maker.properties, offset1.png, > offset2.png, offset3.png > > > *Kafka version: 2.8.0* > *Replication flow: A -> B* > *Topic: DL2 -> A.DL2* > > I use mm2 to replicate cluster A data to cluster B. After stop and restart > mm2(before restart, the data in source cluster is expired), I found consumer > group offset is bigger than topic partiton offset in target cluster which > will cause data loss in the mirror topic. > > *[Steps]* > 1. Start mm2 with the attachment: connect-mirror-maker.properties. > 2.Create topic DL2, push 5 messages and consume with group g1. Offset info > of DL2 and A.DL2:please see attachment offset1.png > 3.Stop mm2 and *push 5 messages* to the topic DL2. Offset info of DL2 and > A.DL2: please see attachment offset2.png > 4. After 1 hour, the data of topic DL2 is expired. > 5.Restart mm2 and sync consumer group offset, found the consume offset is > bigger than topic partiton offset. Offset info of DL2 and A.DL2: please see > attachment offset3.png > > *[Comments]* > In the method translateDownstream of OffsetSyncStore class, I found use > upstream consumer group offset and the latest offset-sync topic record's > downstream offset to caculate the downstream offset: > long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset(); > return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep); > > If there is data is consumed and deleted in source cluster,but isn't > replicated to target cluster , the downstream offset will bigger than topic > partiton offset after consumer group offset synced to target cluster. > > In the method syncGroupOffset of MirrorCheckpointTask class, there is no > check for consumer group offset and topic partiton offset. > -- This message was sent by Atlassian Jira (v8.20.7#820007)