[ 
https://issues.apache.org/jira/browse/KAFKA-13932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bale updated KAFKA-13932:
-------------------------
    Description: 
*Kafka version: 2.8.0*
*Replication flow: A -> B*
*Topic: DL2 -> A.DL2*
 
I use mm2 to replicate cluster A data to cluster B. After stop and restart 
mm2(before restart, the data in source cluster is expired), I found consumer 
group offset is bigger than topic partiton offset in target cluster which will 
cause data loss in the mirror topic.

 
*[Steps]*
1. Start mm2 with the attachment: connect-mirror-maker.properties.
2.Create topic DL2, push 5 messages and consume with group g1.  Offset info of 
DL2 and A.DL2:please see attachment offset1.png
3.Stop mm2 and *push 5 messages* to the topic DL2.  Offset info of DL2 and 
A.DL2: please see attachment offset2.png
4. After 1 hour, the data of topic DL2 is expired.
5.Restart mm2 and sync consumer group offset, found the consume offset is 
bigger than topic partiton offset. Offset info of DL2 and A.DL2: please see 
attachment offset3.png

 
*[Comments]*

In the method translateDownstream of OffsetSyncStore class, I found use 
upstream consumer group offset and the latest offset-sync topic record's 
downstream offset to caculate the downstream offset:
  long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
  return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
 
If there is data is consumed and deleted in source cluster,but isn't replicated 
to target cluster , the downstream offset will bigger than topic partiton 
offset after consumer group offset synced to target cluster.
 
In the method syncGroupOffset of MirrorCheckpointTask class, there is no check 
for consumer group offset and topic partiton offset.


 

  was:
Kafka version: 2.8.0
Replication flow: A -> B
Topic: DL2 -> A.DL2
 
I use mm2 to replicate cluster A data to cluster B. In some cases, I found 
consumer group offset is bigger than partition offset in cluster B which will 
cause data loss of the mirror topic.

 
[Steps]
1. Start mm2 with the attachment: connect-mirror-maker.properties.
2.Create topic DL2, push 5 messages and consume with group g1. Offset info of 
DL2 and A.DL2:please see attachment offset1.png
3.Stop mm2 and push 5 messages to the topic DL2.  Offset info of DL2 and A.DL2: 
please see attachment offset2.png
4. After 1 hour, the data of topic DL2 is deleted.
5.Restart mm2 and sync consumer group offset, found the consume offset is 
bigger than partiton offset. Offset info of DL2 and A.DL2: please see 
attachment offset3.png

 

 


> Replication data loss in some cases
> -----------------------------------
>
>                 Key: KAFKA-13932
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13932
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.8.0
>            Reporter: Bale
>            Priority: Minor
>         Attachments: connect-mirror-maker.properties, offset1.png, 
> offset2.png, offset3.png
>
>
> *Kafka version: 2.8.0*
> *Replication flow: A -> B*
> *Topic: DL2 -> A.DL2*
>  
> I use mm2 to replicate cluster A data to cluster B. After stop and restart 
> mm2(before restart, the data in source cluster is expired), I found consumer 
> group offset is bigger than topic partiton offset in target cluster which 
> will cause data loss in the mirror topic.
>  
> *[Steps]*
> 1. Start mm2 with the attachment: connect-mirror-maker.properties.
> 2.Create topic DL2, push 5 messages and consume with group g1.  Offset info 
> of DL2 and A.DL2:please see attachment offset1.png
> 3.Stop mm2 and *push 5 messages* to the topic DL2.  Offset info of DL2 and 
> A.DL2: please see attachment offset2.png
> 4. After 1 hour, the data of topic DL2 is expired.
> 5.Restart mm2 and sync consumer group offset, found the consume offset is 
> bigger than topic partiton offset. Offset info of DL2 and A.DL2: please see 
> attachment offset3.png
>  
> *[Comments]*
> In the method translateDownstream of OffsetSyncStore class, I found use 
> upstream consumer group offset and the latest offset-sync topic record's 
> downstream offset to caculate the downstream offset:
>   long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
>   return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
>  
> If there is data is consumed and deleted in source cluster,but isn't 
> replicated to target cluster , the downstream offset will bigger than topic 
> partiton offset after consumer group offset synced to target cluster.
>  
> In the method syncGroupOffset of MirrorCheckpointTask class, there is no 
> check for consumer group offset and topic partiton offset.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to