[ 
https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819397#comment-17819397
 ] 

Claudio Benfatto edited comment on KAFKA-16291 at 2/21/24 9:48 PM:
-------------------------------------------------------------------

Hi [~gharris1727] , thanks a lot for your clear explanation of Mirrormaker's 
behaviour. Now I understand much better how everything ties together.

My use case is a cluster migration, planning to move the consumers ahead of the 
producers. And yes, you are right, my original idea was to have a 
zero-redelivery migration for the consumers without stopping the producers.
Now I see better how this is hard to achieve, and especially when the consumers 
are lagging a bit behind.

I appreciate how Mirrormaker, being a general purpose tool, needs to 
accommodate different use cases and prioritise some guarantees over others (in 
an ideal world, perhaps, that behaviour could be tuned and customised and some 
of the guarantees swapped for others).

This said, I think I will be testing a different strategy based on the 
*OffsetSync* records (as you observed, with {*}offset.lag.max=0{*}, they offer 
complete and accurate data on which to base the offset translation). What I 
plan on doing is:
 # Stop the consumer in upstream
 # Find its current offsets for each partition
 # Consume the offset sync topic, from a recent enough timestamp, to seek for 
the records with the upstream offsets which are the closest to the upstream 
consumer current offsets (usually that should be 1 or 2 offsets behind, 
depending on the transaction markers)
 # Use the downstream offset (+1) for the record in 3. to alter the offsets for 
the downstream consumer groups

My main concern is with the efficiency of consuming the offset sync topic. Even 
though I would process it only for, let's say, the last 10 minutes it could 
still be a lot of records to go through.

Let me know if this plan sounds reasonable to you, I would really appreciate 
your advice ;)

Many thanks!


was (Author: JIRAUSER304342):
Hi [~gharris1727] , thanks a lot for your clear explanation of Mirrormaker's 
behaviour. Now I understand much better how everything ties together.

My use case is a cluster migration, planning to move the consumers ahead of the 
producers. And yes, you are right, my original idea was to have a 
zero-redelivery migration for the consumers without stopping the producers.
Now I see better how this is hard to achieve, and especially when the consumers 
are lagging a bit behind.

I appreciate how Mirrormaker, being a general purpose tool, needs to 
accommodate different use cases and prioritise some guarantees over others (in 
an ideal world, perhaps, that behaviour could be tuned and customised and some 
of the guarantees swapped for others).

This said, I think I will be testing a different strategy based on the 
*OffsetSync* records (as you observed, with {*}offset.lag.max=0{*}, they offer 
complete and accurate data for the offset translation). What I plan on doing is:
 # Stop the consumer in upstream
 # Find its current offsets for each partition
 # Consume the offset sync topic, from a recent enough timestamp, to seek for 
the records with the upstream offsets which are the closest to the upstream 
consumer current offsets (usually that should be 1 or 2 offsets behind, 
depending on the transaction markers)
 # Use the downstream offset (+1) for the record in 3. to alter the offsets for 
the downstream consumer groups

My main concern is with the efficiency of consuming the offset sync topic. Even 
though I would process it only for, let's say, the last 10 minutes it could 
still be a lot of records to go through.

Let me know if this plan sounds reasonable to you, I would really appreciate 
your advice ;)

Many thanks!

> Mirrormaker2 wrong checkpoints
> ------------------------------
>
>                 Key: KAFKA-16291
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16291
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.6.1
>         Environment: Mirrormaker2 version 3.6.1 running on docker containers
>            Reporter: Claudio Benfatto
>            Priority: Major
>
> I am running Mirrormaker2 with the following configuration:
> {noformat}
> clusters = fallingwaterfall, weatheredbase
> sync.group.offsets.interval.seconds=30
> emit.checkpoints.interval.seconds=30
> offset.lag.max=0
> fallingwaterfall->weatheredbase.enabled = true
> weatheredbase->fallingwaterfall.enabled = false
> sync.group.offsets.enabled=true
> emit.heartbeats.enabled=true
> emit.checkpoints.enabled=true
> emit.checkpoints.interval.seconds=30
> refresh.groups.enabled=true
> refresh.groups.interval.seconds=30
> refresh.topics.enabled=true
> sync.topic.configs.enabled=true
> refresh.topics.interval.seconds=30
> sync.topic.acls.enabled = false
> fallingwaterfall->weatheredbase.topics = storage-demo-.*
> fallingwaterfall->weatheredbase.groups = storage-demo-.*
> group.id=mirror-maker-fallingwaterfall-weatheredbase
> consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase
> fallingwaterfall.consumer.isolation.level = read_committed
> weatheredbase.producer.enable.idempotence = true
> weatheredbase.producer.acks=all
> weatheredbase.exactly.once.source.support = enabled
> replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> {noformat}
> I am experiencing issues with the consumer group offset synchronisation.
> I have a setup with a 12-partition topic, named *storage-demo-test,* a single 
> transactional producer to this topic and a consumer group, named 
> *storage-demo-test-cg,* consuming from it.
> The consumer configuration is:
> {code:java}
> 'auto.offset.reset': 'earliest',
> 'isolation.level': 'read_committed',
> 'enable.auto.commit': False, {code}
> and I'm committing the offsets explicitly and synchronously after each poll.
> What I observed is that the synchronised offsets between the upstream and 
> downstream cluster for the *storage-demo-test-cg* are often wrong.
> For example in the case of this checkpoint:
> {code:java}
> (1, 1708505669764) - 6252 - 
> CheckpointKey(consumer_group='storage-demo-test-cg', 
> topic='storage-demo-test', partition=5) - 
> CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code}
> We have a mismatch in the replicated messages:
> {code:java}
> [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1
> Test message 1027-0 {code}
> {code:java}
> [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1
> Test message 1015-9 {code}
> In the Mirrormaker2 logs I see many of these messages:
> {code:java}
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] 
> latestDownstreamOffset 196300 is larger than or equal to 
> convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] 
> translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
> Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, 
> upstreamOffset=196913, downstreamOffset=195683}) 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointConnector|task-0] Skipping 
> Checkpoint{consumerGroupId=storage-demo-test-cg, 
> topicPartition=storage-demo-test-5, upstreamOffset=197532, 
> downstreamOffset=195684, metadata=} (preventing downstream rewind) 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:59:00,859] TRACE [MirrorCheckpointConnector|task-0] Skipping 
> Checkpoint{consumerGroupId=storage-demo-test-cg, 
> topicPartition=storage-demo-test-5, upstreamOffset=197532, 
> downstreamOffset=195684, metadata=} (preventing downstream rewind) 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:59:00,859] DEBUG [MirrorCheckpointConnector|task-0] 
> translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
> Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, 
> upstreamOffset=196913, downstreamOffset=195683}) 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync 
> OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, 
> downstreamOffset=197535} applied, new state is 
> [198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868]
>  (org.apache.kafka.connect.mirror.OffsetSyncStore:193)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync 
> OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, 
> downstreamOffset=196302} applied, new state is 
> [197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868]
>  (org.apache.kafka.connect.mirror.OffsetSyncStore:193)
> mirrormaker2-fallingwaterfall-weatheredbase-0 - mirrormaker2-server - 
> [2024-02-21 08:54:05,030] TRACE [MirrorSourceConnector|task-0] Sync'd offsets 
> for storage-demo-test-5: 197532==196302 
> (org.apache.kafka.connect.mirror.MirrorSourceTask:251){code}
> And looking in the OffsetSync topic, I see the correct value for the offset 
> sync:
> {code:java}
> (1, 1708505645010) - 3945070 - OffsetSyncKey(topic='storage-demo-test', 
> partition=5) - OffsetSyncValue(upstream_offset=197532, 
> downstream_offset=196302)
>  {code}
> {code:java}
> [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196302 -c 1
> Test message 1027-0 {code}
> So it seems that the offset conversions and checkpoints produced in the 
> *MirrorCheckpointTask* are not matching the information committed to the 
> OffsetSync topic by the *MirrorSourceTask.*
> Please let me know if you need additional info about the setup I'm running or 
> collecting more logs.
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to