[ 
https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819368#comment-17819368
 ] 

Greg Harris commented on KAFKA-16291:
-------------------------------------

Hi [~claudio.benfatto] Thanks for the bug report and the very relevant logs! 
I'm sorry that offset translation isn't working how you expect.

These two lines show what is happening:
{noformat}
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
[2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync 
OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, 
downstreamOffset=197535} applied, new state is 
[198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868]
 (org.apache.kafka.connect.mirror.OffsetSyncStore:193) 
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
[2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync 
OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, 
downstreamOffset=196302} applied, new state is 
[197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868]
 (org.apache.kafka.connect.mirror.OffsetSyncStore:193){noformat}
These log lines are printing out the whole in-memory state of the offset 
translation cache. If an offset isn't in this list, then it isn't available for 
translation after that point in time.
You can see the critical sync 197532:196302 get added to the front of the 
cache, and by the time the next line is printed 4 minutes later, that sync is 
no longer present. It has syncs on either side (196913:195683 and 
197616:196387) and separated by some gap, which is expected.
The earlier/lower of these two syncs is the one that is being used for 
translation later:
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
[2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] 
translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
Translated 195684 (relative to OffsetSync\{topicPartition=storage-demo-test-5, 
upstreamOffset=196913, downstreamOffset=195683}
The idea here is MirrorMaker2 keeps only a finite number of offset syncs 
in-memory, and expires syncs as they become older. After the sync is expired, 
it is going to be available in the offset syncs topic (as you've shown), but 
won't be in-memory, and instead the closest earlier sync will be used for 
translation.

At the cost of increased re-delivery/re-processing on the downstream side and 
changing the offset commit boundaries, this prioritizes the following 
guarantees:
 # If the downstream commits offsets for some record, then the upstream must 
have committed offsets including that record (no data loss when resuming from 
downstream offsets)
 # If you watch the progress of the downstream offsets, they increase 
monotonically (no rewinding downstream offsets)
 # The in-memory cache is bounded in size and won't cause an OOM

The translation cache is optimized for offsets near the end of the topic, and 
you can see that how the later offsets are closer together and the earlier 
offsets are farther apart (198765 and 198764 are 1 apart, 196913 and 194098 are 
2815 apart). The closer your offsets are to the end of the topic, the less 
redelivery that consumer group will experience, and at the end of the topic, 
you can get a "perfect sync".

So if you're looking for a zero-re-delivery failover, I would make sure that 
you stop the producers, let the consumers commit at the end of the topic, wait 
for MM2 to translate the offsets, and then perform the failover. Unplanned 
failovers will nearly always include data re-delivery because MM2 does 
asynchronous replication, so your application should be tolerant of that.

This has been changing since KAFKA-12468, so if you're only seeing this after 
an upgrade, that would explain why.

> Mirrormaker2 wrong checkpoints
> ------------------------------
>
>                 Key: KAFKA-16291
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16291
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.6.1
>         Environment: Mirrormaker2 version 3.6.1 running on docker containers
>            Reporter: Claudio Benfatto
>            Priority: Major
>
> I am running Mirrormaker2 with the following configuration:
> {noformat}
> clusters = fallingwaterfall, weatheredbase
> sync.group.offsets.interval.seconds=30
> emit.checkpoints.interval.seconds=30
> offset.lag.max=0
> fallingwaterfall->weatheredbase.enabled = true
> weatheredbase->fallingwaterfall.enabled = false
> sync.group.offsets.enabled=true
> emit.heartbeats.enabled=true
> emit.checkpoints.enabled=true
> emit.checkpoints.interval.seconds=30
> refresh.groups.enabled=true
> refresh.groups.interval.seconds=30
> refresh.topics.enabled=true
> sync.topic.configs.enabled=true
> refresh.topics.interval.seconds=30
> sync.topic.acls.enabled = false
> fallingwaterfall->weatheredbase.topics = storage-demo-.*
> fallingwaterfall->weatheredbase.groups = storage-demo-.*
> group.id=mirror-maker-fallingwaterfall-weatheredbase
> consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase
> fallingwaterfall.consumer.isolation.level = read_committed
> weatheredbase.producer.enable.idempotence = true
> weatheredbase.producer.acks=all
> weatheredbase.exactly.once.source.support = enabled
> replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> {noformat}
> I am experiencing issues with the consumer group offset synchronisation.
> I have a setup with a 12-partition topic, named *storage-demo-test,* a single 
> transactional producer to this topic and a consumer group, named 
> *storage-demo-test-cg,* consuming from it.
> The consumer configuration is:
> {code:java}
> 'auto.offset.reset': 'earliest',
> 'isolation.level': 'read_committed',
> 'enable.auto.commit': False, {code}
> and I'm committing the offsets explicitly and synchronously after each poll.
> What I observed is that the synchronised offsets between the upstream and 
> downstream cluster for the *storage-demo-test-cg* are often wrong.
> For example in the case of this checkpoint:
> {code:java}
> (1, 1708505669764) - 6252 - 
> CheckpointKey(consumer_group='storage-demo-test-cg', 
> topic='storage-demo-test', partition=5) - 
> CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code}
> We have a mismatch in the replicated messages:
> {code:java}
> [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1
> Test message 1027-0 {code}
> {code:java}
> [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1
> Test message 1015-9 {code}
> In the Mirrormaker2 logs I see many of these messages:
> {code:java}
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] 
> latestDownstreamOffset 196300 is larger than or equal to 
> convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] 
> translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
> Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, 
> upstreamOffset=196913, downstreamOffset=195683}) 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointConnector|task-0] Skipping 
> Checkpoint{consumerGroupId=storage-demo-test-cg, 
> topicPartition=storage-demo-test-5, upstreamOffset=197532, 
> downstreamOffset=195684, metadata=} (preventing downstream rewind) 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:59:00,859] TRACE [MirrorCheckpointConnector|task-0] Skipping 
> Checkpoint{consumerGroupId=storage-demo-test-cg, 
> topicPartition=storage-demo-test-5, upstreamOffset=197532, 
> downstreamOffset=195684, metadata=} (preventing downstream rewind) 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:59:00,859] DEBUG [MirrorCheckpointConnector|task-0] 
> translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
> Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, 
> upstreamOffset=196913, downstreamOffset=195683}) 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync 
> OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, 
> downstreamOffset=197535} applied, new state is 
> [198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868]
>  (org.apache.kafka.connect.mirror.OffsetSyncStore:193)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync 
> OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, 
> downstreamOffset=196302} applied, new state is 
> [197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868]
>  (org.apache.kafka.connect.mirror.OffsetSyncStore:193)
> mirrormaker2-fallingwaterfall-weatheredbase-0 - mirrormaker2-server - 
> [2024-02-21 08:54:05,030] TRACE [MirrorSourceConnector|task-0] Sync'd offsets 
> for storage-demo-test-5: 197532==196302 
> (org.apache.kafka.connect.mirror.MirrorSourceTask:251){code}
> And looking in the OffsetSync topic, I see the correct value for the offset 
> sync:
> {code:java}
> (1, 1708505645010) - 3945070 - OffsetSyncKey(topic='storage-demo-test', 
> partition=5) - OffsetSyncValue(upstream_offset=197532, 
> downstream_offset=196302)
>  {code}
> {code:java}
> [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196302 -c 1
> Test message 1027-0 {code}
> So it seems that the offset conversions and checkpoints produced in the 
> *MirrorCheckpointTask* are not matching the information committed to the 
> OffsetSync topic by the *MirrorSourceTask.*
> Please let me know if you need additional info about the setup I'm running or 
> collecting more logs.
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to