[jira] [Commented] (KAFKA-16291) Mirrormaker2 wrong checkpoints

2024-03-11 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825467#comment-17825467
 ] 

Greg Harris commented on KAFKA-16291:
-

[~claudio.benfatto] That's a good idea, I agree that the default behavior isn't 
good enough in all scenarios and a configuration is needed.

Since it includes a user configuration, and needs significant design work, this 
will need a KIP. I've opened KAFKA-16364 to track the work there, and you're 
welcome to assign yourself and draft a KIP.

But just to temper your expectations here:

> Offset translation guarantees zero-redelivery

This is not possible given the asynchronous pattern used for offset 
translation. I think this can be true in an eventual-consistency sense: If the 
upstream consumer group is inactive for sufficiently long enough (and lag < N), 
then translation could be exact.

We can also use this as an opportunity to design an alternative to 
offset.lag.max=0 and the mirror source sync send semaphore[!] because even with 
a 100% retention solution on the MirrorCheckpointTask side, the 
MirrorSourceTask still drops syncs occasionally.

> Mirrormaker2 wrong checkpoints
> --
>
> Key: KAFKA-16291
> URL: https://issues.apache.org/jira/browse/KAFKA-16291
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.6.1
> Environment: Mirrormaker2 version 3.6.1 running on docker containers
>Reporter: Claudio Benfatto
>Priority: Major
>
> I am running Mirrormaker2 with the following configuration:
> {noformat}
> clusters = fallingwaterfall, weatheredbase
> sync.group.offsets.interval.seconds=30
> emit.checkpoints.interval.seconds=30
> offset.lag.max=0
> fallingwaterfall->weatheredbase.enabled = true
> weatheredbase->fallingwaterfall.enabled = false
> sync.group.offsets.enabled=true
> emit.heartbeats.enabled=true
> emit.checkpoints.enabled=true
> emit.checkpoints.interval.seconds=30
> refresh.groups.enabled=true
> refresh.groups.interval.seconds=30
> refresh.topics.enabled=true
> sync.topic.configs.enabled=true
> refresh.topics.interval.seconds=30
> sync.topic.acls.enabled = false
> fallingwaterfall->weatheredbase.topics = storage-demo-.*
> fallingwaterfall->weatheredbase.groups = storage-demo-.*
> group.id=mirror-maker-fallingwaterfall-weatheredbase
> consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase
> fallingwaterfall.consumer.isolation.level = read_committed
> weatheredbase.producer.enable.idempotence = true
> weatheredbase.producer.acks=all
> weatheredbase.exactly.once.source.support = enabled
> replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> {noformat}
> I am experiencing issues with the consumer group offset synchronisation.
> I have a setup with a 12-partition topic, named *storage-demo-test,* a single 
> transactional producer to this topic and a consumer group, named 
> *storage-demo-test-cg,* consuming from it.
> The consumer configuration is:
> {code:java}
> 'auto.offset.reset': 'earliest',
> 'isolation.level': 'read_committed',
> 'enable.auto.commit': False, {code}
> and I'm committing the offsets explicitly and synchronously after each poll.
> What I observed is that the synchronised offsets between the upstream and 
> downstream cluster for the *storage-demo-test-cg* are often wrong.
> For example in the case of this checkpoint:
> {code:java}
> (1, 1708505669764) - 6252 - 
> CheckpointKey(consumer_group='storage-demo-test-cg', 
> topic='storage-demo-test', partition=5) - 
> CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code}
> We have a mismatch in the replicated messages:
> {code:java}
> [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1
> Test message 1027-0 {code}
> {code:java}
> [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1
> Test message 1015-9 {code}
> In the Mirrormaker2 logs I see many of these messages:
> {code:java}
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] 
> latestDownstreamOffset 196300 is larger than or equal to 
> convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] 
> translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
> Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, 
> upstreamOffset=196913, downstreamOffset=195683}) 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointCo

[jira] [Commented] (KAFKA-16291) Mirrormaker2 wrong checkpoints

2024-02-22 Thread Claudio Benfatto (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819522#comment-17819522
 ] 

Claudio Benfatto commented on KAFKA-16291:
--

In terms of Mirrormaker features, what would be great to have is a more 
customisable behaviour for the *OffsetSyncStore.*
What I'm thinking about is the possibility to keep information, in particular 
when *offset.lag.max=0,*  about the last *N* OffsetSync records (configurable) 
in a FIFO data structure. And to emit a metric with the last offset translated.

This way we could always guarantee exact offset translation within certain 
boundaries. The contract with the consumers in this case would be:
 * Offset translation guarantees zero-redelivery when *offset.lag.max=0* and 
within a consumer lag of less than *N*
 * No offset translation happens when we cannot guarantee that it is exact 
(maps 1:1 with an upstream offset)
 * We can check the last offset for which we had an exact translation via the 
value of a metric
 * Memory is still bounded in size and its size depends on the value of *N* and 
the number of topics being replicated
 * Not sure about performance when looking up for the index tbh

This way we could have more predictable behaviour when performing topic 
migration from an upstream to a downstream cluster, which I believe is a very 
common use case.

 

 

> Mirrormaker2 wrong checkpoints
> --
>
> Key: KAFKA-16291
> URL: https://issues.apache.org/jira/browse/KAFKA-16291
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.6.1
> Environment: Mirrormaker2 version 3.6.1 running on docker containers
>Reporter: Claudio Benfatto
>Priority: Major
>
> I am running Mirrormaker2 with the following configuration:
> {noformat}
> clusters = fallingwaterfall, weatheredbase
> sync.group.offsets.interval.seconds=30
> emit.checkpoints.interval.seconds=30
> offset.lag.max=0
> fallingwaterfall->weatheredbase.enabled = true
> weatheredbase->fallingwaterfall.enabled = false
> sync.group.offsets.enabled=true
> emit.heartbeats.enabled=true
> emit.checkpoints.enabled=true
> emit.checkpoints.interval.seconds=30
> refresh.groups.enabled=true
> refresh.groups.interval.seconds=30
> refresh.topics.enabled=true
> sync.topic.configs.enabled=true
> refresh.topics.interval.seconds=30
> sync.topic.acls.enabled = false
> fallingwaterfall->weatheredbase.topics = storage-demo-.*
> fallingwaterfall->weatheredbase.groups = storage-demo-.*
> group.id=mirror-maker-fallingwaterfall-weatheredbase
> consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase
> fallingwaterfall.consumer.isolation.level = read_committed
> weatheredbase.producer.enable.idempotence = true
> weatheredbase.producer.acks=all
> weatheredbase.exactly.once.source.support = enabled
> replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> {noformat}
> I am experiencing issues with the consumer group offset synchronisation.
> I have a setup with a 12-partition topic, named *storage-demo-test,* a single 
> transactional producer to this topic and a consumer group, named 
> *storage-demo-test-cg,* consuming from it.
> The consumer configuration is:
> {code:java}
> 'auto.offset.reset': 'earliest',
> 'isolation.level': 'read_committed',
> 'enable.auto.commit': False, {code}
> and I'm committing the offsets explicitly and synchronously after each poll.
> What I observed is that the synchronised offsets between the upstream and 
> downstream cluster for the *storage-demo-test-cg* are often wrong.
> For example in the case of this checkpoint:
> {code:java}
> (1, 1708505669764) - 6252 - 
> CheckpointKey(consumer_group='storage-demo-test-cg', 
> topic='storage-demo-test', partition=5) - 
> CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code}
> We have a mismatch in the replicated messages:
> {code:java}
> [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1
> Test message 1027-0 {code}
> {code:java}
> [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1
> Test message 1015-9 {code}
> In the Mirrormaker2 logs I see many of these messages:
> {code:java}
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] 
> latestDownstreamOffset 196300 is larger than or equal to 
> convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] 
> translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
> Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, 

[jira] [Commented] (KAFKA-16291) Mirrormaker2 wrong checkpoints

2024-02-21 Thread Claudio Benfatto (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819397#comment-17819397
 ] 

Claudio Benfatto commented on KAFKA-16291:
--

Hi [~gharris1727] , thanks a lot for your clear explanation of Mirrormaker's 
behaviour. Now I understand much better how everything ties together.

My use case is a cluster migration, planning to move the consumers ahead of the 
producers. And yes, you are right, my original idea was to have a 
zero-redelivery migration for the consumers without stopping the producers.
Now I see better how this is hard to achieve, and especially when the consumers 
are lagging a bit behind.

I appreciate how Mirrormaker, being a general purpose tool, need to accommodate 
different use cases and prioritise some guarantees over others (in an ideal 
world, perhaps, that behaviour could be tuned and customised and some of the 
guarantees swapped for others).

This said, I think I will be testing a different strategy based on the 
*OffsetSync* records (as you observed, with {*}offset.lag.max=0{*}, they offer 
complete and accurate data for the offset translation). What I plan on doing is:
 # Stop the consumer in upstream
 # Find its current offsets for each partition
 # Consume the offset sync topic, from a recent enough timestamp, to seek for 
the records with the upstream offsets which are the closest to the upstream 
consumer current offsets (usually that should be 1 or 2 offsets behind, 
depending on the transaction markers)
 # Use the downstream offset (+1) for the record in 3. to alter the offsets for 
the downstream consumer groups

My main concern is with the efficiency of consuming the offset sync topic. Even 
though I would process it only for, let's say, the last 10 minutes it could 
still be a lot of records to go through.

Let me know if this plan sounds reasonable to you, I would really appreciate 
your advice ;)

Many thanks!

> Mirrormaker2 wrong checkpoints
> --
>
> Key: KAFKA-16291
> URL: https://issues.apache.org/jira/browse/KAFKA-16291
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.6.1
> Environment: Mirrormaker2 version 3.6.1 running on docker containers
>Reporter: Claudio Benfatto
>Priority: Major
>
> I am running Mirrormaker2 with the following configuration:
> {noformat}
> clusters = fallingwaterfall, weatheredbase
> sync.group.offsets.interval.seconds=30
> emit.checkpoints.interval.seconds=30
> offset.lag.max=0
> fallingwaterfall->weatheredbase.enabled = true
> weatheredbase->fallingwaterfall.enabled = false
> sync.group.offsets.enabled=true
> emit.heartbeats.enabled=true
> emit.checkpoints.enabled=true
> emit.checkpoints.interval.seconds=30
> refresh.groups.enabled=true
> refresh.groups.interval.seconds=30
> refresh.topics.enabled=true
> sync.topic.configs.enabled=true
> refresh.topics.interval.seconds=30
> sync.topic.acls.enabled = false
> fallingwaterfall->weatheredbase.topics = storage-demo-.*
> fallingwaterfall->weatheredbase.groups = storage-demo-.*
> group.id=mirror-maker-fallingwaterfall-weatheredbase
> consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase
> fallingwaterfall.consumer.isolation.level = read_committed
> weatheredbase.producer.enable.idempotence = true
> weatheredbase.producer.acks=all
> weatheredbase.exactly.once.source.support = enabled
> replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> {noformat}
> I am experiencing issues with the consumer group offset synchronisation.
> I have a setup with a 12-partition topic, named *storage-demo-test,* a single 
> transactional producer to this topic and a consumer group, named 
> *storage-demo-test-cg,* consuming from it.
> The consumer configuration is:
> {code:java}
> 'auto.offset.reset': 'earliest',
> 'isolation.level': 'read_committed',
> 'enable.auto.commit': False, {code}
> and I'm committing the offsets explicitly and synchronously after each poll.
> What I observed is that the synchronised offsets between the upstream and 
> downstream cluster for the *storage-demo-test-cg* are often wrong.
> For example in the case of this checkpoint:
> {code:java}
> (1, 1708505669764) - 6252 - 
> CheckpointKey(consumer_group='storage-demo-test-cg', 
> topic='storage-demo-test', partition=5) - 
> CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code}
> We have a mismatch in the replicated messages:
> {code:java}
> [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1
> Test message 1027-0 {code}
> {code:java}
> [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1
> Test message 1015-9 {code}
> In the Mirrormaker2 logs I see many of these messages:
> {code:java}
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
>

[jira] [Commented] (KAFKA-16291) Mirrormaker2 wrong checkpoints

2024-02-21 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819368#comment-17819368
 ] 

Greg Harris commented on KAFKA-16291:
-

Hi [~claudio.benfatto] Thanks for the bug report and the very relevant logs! 
I'm sorry that offset translation isn't working how you expect.

These two lines show what is happening:
{noformat}
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
[2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync 
OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, 
downstreamOffset=197535} applied, new state is 
[198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868]
 (org.apache.kafka.connect.mirror.OffsetSyncStore:193) 
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
[2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync 
OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, 
downstreamOffset=196302} applied, new state is 
[197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868]
 (org.apache.kafka.connect.mirror.OffsetSyncStore:193){noformat}
These log lines are printing out the whole in-memory state of the offset 
translation cache. If an offset isn't in this list, then it isn't available for 
translation after that point in time.
You can see the critical sync 197532:196302 get added to the front of the 
cache, and by the time the next line is printed 4 minutes later, that sync is 
no longer present. It has syncs on either side (196913:195683 and 
197616:196387) and separated by some gap, which is expected.
The earlier/lower of these two syncs is the one that is being used for 
translation later:
mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
[2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] 
translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
Translated 195684 (relative to OffsetSync\{topicPartition=storage-demo-test-5, 
upstreamOffset=196913, downstreamOffset=195683}
The idea here is MirrorMaker2 keeps only a finite number of offset syncs 
in-memory, and expires syncs as they become older. After the sync is expired, 
it is going to be available in the offset syncs topic (as you've shown), but 
won't be in-memory, and instead the closest earlier sync will be used for 
translation.

At the cost of increased re-delivery/re-processing on the downstream side and 
changing the offset commit boundaries, this prioritizes the following 
guarantees:
 # If the downstream commits offsets for some record, then the upstream must 
have committed offsets including that record (no data loss when resuming from 
downstream offsets)
 # If you watch the progress of the downstream offsets, they increase 
monotonically (no rewinding downstream offsets)
 # The in-memory cache is bounded in size and won't cause an OOM

The translation cache is optimized for offsets near the end of the topic, and 
you can see that how the later offsets are closer together and the earlier 
offsets are farther apart (198765 and 198764 are 1 apart, 196913 and 194098 are 
2815 apart). The closer your offsets are to the end of the topic, the less 
redelivery that consumer group will experience, and at the end of the topic, 
you can get a "perfect sync".

So if you're looking for a zero-re-delivery failover, I would make sure that 
you stop the producers, let the consumers commit at the end of the topic, wait 
for MM2 to translate the offsets, and then perform the failover. Unplanned 
failovers will nearly always include data re-delivery because MM2 does 
asynchronous replication, so your application should be tolerant of that.

This has been changing since KAFKA-12468, so if you're only seeing this after 
an upgrade, that would explain why.

> Mirrormaker2 wrong checkpoints
> --
>
> Key: KAFKA-16291
> URL: https://issues.apache.org/jira/browse/KAFKA-16291
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.6.1
> Environment: Mirrormaker2 version 3.6.1 running on docker containers
>Reporter: Claudio Benfatto
>Priority: Major
>
> I am running Mirrormaker2 with the following configuration:
> {noformat}
> clusters = fallingwaterfall, weatheredbase
> sync.group.offsets.interval.seconds=30
> emit.checkpoints.interval.seconds=30
> offset.lag.max=0
> fallingwaterfall->weatheredbase.enabled = true
> weatheredbase->fallingwaterfall.enabled = false
> sync.group.offsets.enabled=true
> emit.heartbeats.enabled=true
> emit.checkpoints.