Ryanne,

> > b.a.replicate-me-0
> That's actually impossible with MM2.

Thanks, I see the isCycle check in MirrorSourceConnector. That makes
me even more curious how the renameTopicPartition method triggers
without a change such as the one that Jeroen has prototyped, since the
only thing that emits offset syncs is the MirrorSourceTask, and it is
disallowed from sending topics back in a cycle.

Greg

On Fri, Jan 12, 2024 at 6:13 AM Jeroen Schutrup
<jer...@cloudflare.com.invalid> wrote:
>
> Hey Greg,
> There are no offset collisions as the offset-syncs albeit being stored on
> the same cluster, offsets from A->B are stored
> in mm2-offset-syncs.b.internal whereas offsets from B->A are stored
> in mm2-offset-syncs.a.internal.
> What's curious though is the B->A checkpoint connector (which has
> offset-syncs.topic.location: target) actually uses the offsets stored in
> mm2-offset-syncs.b.internal (which contains the downstream offsets) while I
> expected it to only read offsets stored in mm2-offset-syncs.a.internal, as
> cluster A is its target.
>
> I'm positive on driving a KIP for this feature to see whether we can get it
> implemented. I'm hoping to submit a draft in the upcoming weeks, though I'd
> need a bit of time to get a better grasp on the mirror connector codebase.
>
> Thank you both for your valuable insights so far!
>
> Jeroen
>
> On Thu, Jan 11, 2024 at 8:06 PM Greg Harris <greg.har...@aiven.io.invalid>
> wrote:
>
> > Hey Jeroen,
> >
> > Thanks for sharing your prototype! It is very interesting!
> >
> > > I couldn't reproduce your hypothesis.
> >
> > I think my hypothesis was for another setup which didn't involve code
> > changes, and instead relied on A->B->A round trip replication to
> > produce the "backwards" offset syncs.
> > I believe this would replicate data from "replicate-me-0" to
> > "b.a.replicate-me-0", and then possibly take the offsets intended for
> > "b.a.replicate-me-0" and apply them to "replicate-me-0" creating the
> > infinite cycle.
> > I would not expect your implementation to suffer from this failure
> > mode, because it's using the offset in "replicate-me-0" as the
> > downstream offset, not the offset of "b.a.replicate-me-0".
> >
> > With your prototype, do you experience "collisions" in the
> > offset-syncs topic? Since you're sharing a single offset-syncs topic
> > between both replication flows, I would expect offsets for topics with
> > the same names on both clusters to conflict, and cause the translation
> > to happen using the opposite topic's offsets.
> > It would also be visible in the state of the OffsetSyncStore here:
> > [1], you can compare the normal A->B behavior before and after
> > starting the B -> A source connector to see if the concurrent flows
> > causes more syncs to be cleared, or the wrong syncs to be present.
> >
> > I think it is normal for every MM2 connector to have the same
> > offset-syncs.topic.location to avoid these sorts of conflicts, so that
> > each syncs topic is only used by one of the MM2 replication flows.
> > I think that turning on bidirectional offset syncs will probably
> > require a second producer in the MirrorSourceTask to contact the
> > opposite cluster, or a second admin client in the
> > MirrorCheckpointTask.
> >
> > > Do you think it'd be worthwhile proceeding with this?
> >
> > This is certainly a capability that MM2 is missing right now, and
> > seems like it would be a natural component of failing consumers back
> > and forth. If you see value in it, and are interested in driving the
> > feature, you can open a KIP [2] to discuss the interface and design
> > with the rest of the community.
> >
> > Thanks!
> > Greg
> >
> > [1]
> > https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L194
> > [2]
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> >
> > On Thu, Jan 11, 2024 at 9:27 AM Jeroen Schutrup
> > <jer...@cloudflare.com.invalid> wrote:
> > >
> > > I see, makes complete sense to me. I built a custom version [1] based off
> > > of Kafka 3.5.1 with bidirectional offset replication enabled so I could
> > do
> > > some more testing. Offset translation back upstream works well; I think
> > > because of the reason Ryanne pointed out, both topics contain identical
> > > data. Tested this by truncating the upstream topic before starting
> > > replication (so the downstream/upstream topics have different offsets).
> > > Truncating the upstream topic while replication is running neither
> > results
> > > in any weirdness.
> > >
> > > > Before starting the replication, insert a few records into
> > > `a.replicate-me` to force replicate-me-0's offset n to replicate to
> > > a.replicate-me-0's offset n+k.
> > > I couldn't reproduce your hypothesis. After doing the above and then
> > > starting replication I didn't see any offset replication loops. Once I
> > > started producing data into the upstream topic and subscribing a
> > > console-consumer on the downstream topic, offsets were translated and
> > > replicated correctly back upstream. My guess is the CheckpointConnector
> > can
> > > offset these surplus of messages as the actual log offsets of the
> > > downstream topic are written to the offset-sync topic.
> > >
> > > As this kind of active/active replication would be very beneficial to us
> > > for reasons stated in my previous message, we'd love to help out building
> > > this kind of offset replication into the Mirror connectors. I understand
> > > this is not something that should be enabled by default, but having it
> > > behind configuration toggle could help out users desiring a similar kind
> > of
> > > active/active setup and who understand the restrictions. Do you think
> > it'd
> > > be worthwhile proceeding with this?
> > >
> > > [1]
> > >
> > https://github.com/jeroen92/kafka/commit/1a27696ec6777c230f100cf9887368c431ebe0f8
> > >
> > > On Thu, Jan 11, 2024 at 1:06 AM Greg Harris <greg.har...@aiven.io.invalid
> > >
> > > wrote:
> > >
> > > > Hi Jeroen,
> > > >
> > > > I'm glad you're experimenting with MM2, and I hope we can give you
> > > > some more context to explain what you're seeing.
> > > >
> > > > > I wrote a small program to produce these offset syncs for the
> > prefixed
> > > > > topic, and this successfully triggers the Checkpoint connector to
> > start
> > > > > replicating the consumer offsets back to the primary cluster.
> > > >
> > > > This is interesting, and I wouldn't have expected it to work.
> > > >
> > > > To rewind, each flow Source->Target has a MirrorSourceConnector, an
> > > > Offset Syncs Topic, and a MirrorCheckpointConnector. With both
> > > > directions enabled, there are two separate flows each with Source,
> > > > Syncs topic, and Checkpoint.
> > > > With offset-syncs.topic.location=source, the
> > > > mm2-offset-syncs.b.internal on the A cluster is used for the A -> B
> > > > replication flow. It contains topic names from cluster A, and the
> > > > corresponding offsets those records were written to on the B cluster.
> > > > When translation is performed, the consumer groups from A are
> > > > replicated to the B cluster, and the replication mapping (cluster
> > > > prefix) is added.
> > > > Using your syncs topic as an example,
> > > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
> > > > downstreamOffset=28} will be used to write offsets for
> > > > "a.replicate-me-0" for the equivalent group on the B cluster.
> > > >
> > > > When your artificial sync OffsetSync{topicPartition=a.replicate-me-0,
> > > > upstreamOffset=29, downstreamOffset=29} is processed, it should be
> > > > used to write offsets for "a.a.replicate-me-0" but it actually writes
> > > > offsets to "replicate-me-0" due to this function that I've never
> > > > encountered before: [1].
> > > > I think you could get those sorts of syncs into the syncs-topic if you
> > > > had A->B configured with offset-syncs.topic.location=source, and B->A
> > > > with offset-syncs-topic.location=target, and configured the topic
> > > > filter to do A -> B -> A round trip replication.
> > > >
> > > > This appears to work as expected if there are no failures or restarts,
> > > > but as soon as a record is re-delivered in either flow, I think the
> > > > offsets should end up constantly advancing in an infinite loop. Maybe
> > > > you can try that: Before starting the replication, insert a few
> > > > records into `a.replicate-me` to force replicate-me-0's offset n to
> > > > replicate to a.replicate-me-0's offset n+k.
> > > >
> > > > Ryanne, do you recall the purpose of the renameTopicPartition
> > > > function? To me it looks like it could only be harmful, as it renames
> > > > checkpoints to target topics that MirrorMaker2 isn't writing. It also
> > > > looks like it isn't active in a typical MM2 setup.
> > > >
> > > > Thanks!
> > > > Greg
> > > >
> > > > [1]:
> > > >
> > https://github.com/apache/kafka/blob/13a83d58f897de2f55d8d3342ffb058b230a9183/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L257-L267
> > > >
> > > > On Tue, Jan 9, 2024 at 5:54 AM Jeroen Schutrup
> > > > <jer...@cloudflare.com.invalid> wrote:
> > > > >
> > > > > Thank you both for your swift responses!
> > > > >
> > > > > Ryanne, the MirrorConnectorsIntegrationBaseTest only tests offset
> > > > > replication in cases where the producer migrated to the secondary
> > cluster
> > > > > as well, starts feeding messages into the non-prefixed topic which
> > are
> > > > > subsequently consumed by the consumer on the secondary cluster.
> > After the
> > > > > fallback, it asserts the consumer offsets on the non-prefixed topic
> > in
> > > > the
> > > > > secondary cluster are translated and replicated to the consumer
> > offsets
> > > > of
> > > > > the prefixed topic in the primary cluster.
> > > > > In my example, the producer keeps producing in the primary cluster
> > > > whereas
> > > > > only the consumer fails over to the secondary cluster and, after some
> > > > time
> > > > > fails back to the primary cluster. This consumer will then consume
> > > > messages
> > > > > from the prefixed topic in the secondary cluster, and I'd like to
> > have
> > > > > those offsets replicated back to the non-prefixed topic in the
> > primary
> > > > > cluster. If you like I can provide an illustration if that helps to
> > > > clarify
> > > > > this use case.
> > > > >
> > > > > To add some context on why I'd like to have this is to retain loose
> > > > > coupling between producers and consumers so we're able to test
> > failovers
> > > > > for individual applications without the need for all
> > producers/consumers
> > > > to
> > > > > failover and failback at once.
> > > > >
> > > > > Digging through the Connect debug logs I found the offset-syncs of
> > the
> > > > > prefixed topic not being pushed to mm2-offset-syncs.b.internal is
> > likely
> > > > > the reason the checkpoint connector doesn't replicate consumer
> > offsets:
> > > > > DEBUG translateDownstream(replication,a.replicate-me-0,25): Skipped
> > > > (offset
> > > > > sync not found) (org.apache.kafka.connect.mirror.OffsetSyncStore)
> > > > >
> > > > > I wrote a small program to produce these offset syncs for the
> > prefixed
> > > > > topic, and this successfully triggers the Checkpoint connector to
> > start
> > > > > replicating the consumer offsets back to the primary cluster.
> > > > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
> > > > > downstreamOffset=28}
> > > > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=29,
> > > > > downstreamOffset=29}
> > > > > OffsetSync{topicPartition=a.replicate-me-0, upstreamOffset=29,
> > > > > downstreamOffset=29} <-- the artificially generated offset-sync
> > > > >
> > > > > At this point it goes a bit beyond my understanding of the MM2
> > internals
> > > > of
> > > > > whether this is a wise thing to do and if it would have any negative
> > side
> > > > > effects. I'd need to spend some more time in the MM2 source, though I
> > > > > welcome any feedback on this hack :-)
> > > > >
> > > > > On the two complications you're mentioning Greg, the second one is
> > > > > something we should figure out regardless, as any given consumer
> > group
> > > > may
> > > > > not be active on both the primary and secondary cluster as it would
> > block
> > > > > MM2 from replicating its offsets from primary to the cluster-prefixed
> > > > topic
> > > > > on the secondary cluster already. On the first point, I think it
> > would
> > > > be a
> > > > > good practice to only allow MM2 to produce to any cluster-prefixed
> > topic
> > > > by
> > > > > using topic ACLs. In other words, the only application producing to a
> > > > > cluster-prefixed (or downstream) topic would be mirror maker and I
> > think
> > > > > that prevents this kind of message 'drift'. In case a producer has to
> > > > > failover, it starts producing to the non-prefixed topic on the
> > secondary
> > > > > cluster whose offsets are subject to a different Source/Checkpoint
> > > > > connector replication stream.
> > > > >
> > > > > On Mon, Jan 8, 2024 at 9:12 PM Ryanne Dolan <ryannedo...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Jeroen, MirrorClient will correctly translate offsets for both
> > > > failover and
> > > > > > failback, exactly as you describe. It's possible to automate
> > failover
> > > > and
> > > > > > failback using that logic. The integration tests automatically fail
> > > > over
> > > > > > and fail back, for example. I've seen it done two ways: during
> > startup
> > > > > > within the consumer itself, or in an external tool which writes
> > offsets
> > > > > > directly. In either case MirrorClient will give you the correct
> > > > offsets to
> > > > > > resume from.
> > > > > >
> > > > > > MirrorCheckpointConnector will automatically write offsets, but
> > only
> > > > under
> > > > > > certain conditions, to avoid accidentally overwriting offsets. I'm
> > not
> > > > sure
> > > > > > whether you can failover and failback using just the automatic
> > > > behavior. My
> > > > > > guess is it works, but you are tripping over one of the safety
> > checks.
> > > > You
> > > > > > might try deleting the consumer group on the source cluster prior
> > to
> > > > > > failback.
> > > > > >
> > > > > > Ryanne
> > > > > >
> > > > > > On Mon, Jan 8, 2024, 9:10 AM Jeroen Schutrup
> > > > <jer...@cloudflare.com.invalid
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > > I'm exploring using the MirrorSourceConnector and
> > > > > > MirrorCheckpointConnector
> > > > > > > on Kafka Connect to setup active/active replication between two
> > Kafka
> > > > > > > clusters. Using the DefaultReplicationPolicy replication policy
> > > > class,
> > > > > > > messages originating from the source cluster get replicated as
> > > > expected
> > > > > > to
> > > > > > > the cluster-prefixed topic in the target cluster. Consumergroup
> > > > offsets
> > > > > > > from the source to target cluster are replicated likewise.
> > However,
> > > > once
> > > > > > > the consumer group migrates from the source to the target
> > cluster,
> > > > its
> > > > > > > offsets are not replicated from the target back to the source
> > > > cluster.
> > > > > > > For an active/active setup I'd want consumer group offsets for
> > topic
> > > > > > > <source-cluster-alias>.<topic-name> in the target cluster to be
> > > > > > replicated
> > > > > > > back to <topic-name> in the source cluster. This would allow
> > > > consumers to
> > > > > > > failover & failback between clusters with minimal duplicate
> > message
> > > > > > > consumption.
> > > > > > >
> > > > > > > To clarify my setup a bit; I'm running two single-broker Kafka
> > > > clusters
> > > > > > in
> > > > > > > Docker (cluster A & B), along with a single Connect instance on
> > which
> > > > > > I've
> > > > > > > provisioned four source connectors:
> > > > > > > - A MirrorSourceConnector replicating topics from cluster A to
> > > > cluster B
> > > > > > > - A MirrorSourceConnector replicating topics from cluster B to
> > > > cluster A
> > > > > > > - A MirrorCheckpointConnector translating & replicating offsets
> > from
> > > > > > > cluster A to cluster B
> > > > > > > - A MirrorCheckpointConnector translating & replicating offsets
> > from
> > > > > > > cluster B to cluster A
> > > > > > >
> > > > > > > I'm not sure whether this is by design, or maybe I'm missing
> > > > something.
> > > > > > > I've seen a similar question posted to KAFKA-9076 [1] without a
> > > > > > resolution.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Jeroen
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > https://issues.apache.org/jira/browse/KAFKA-9076?focusedCommentId=17268908&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17268908
> > > > > > >
> > > > > >
> > > >
> >

Reply via email to