[
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392914#comment-17392914
]
Alexis Josephides commented on KAFKA-12468:
-------------------------------------------
Thanks for the suggestions and apologies for the delay in updating how we
handled this issue in the end.
Should say from the outset that we did not completely remove this issue but we
minimised the occurrences, fixed some and in the remainder - lived with it.
The first step was minimisation. We achieved this via the phasing of turning on
our connectors. The first connector we applied was the `Source` connector. For
our setup we had a number of source connectors - some set to replicate from
`latest` and others from `earliest`. We let this connector run and replicate
until we hit a steady state and all replication was confirmed to be at the head
of their relevant topic. This soak could be a few days depending on your data
volumes, throughputs (client limits) etc.....
Once the soak has completed we then turned on the Checkpoint connector.
If there are negative offsets after this first step we then took steps to
manage them. There are 2 categories here. Partitions that have data on them and
partitions that have no data on them.
In the first instance (data on partitions) the first thing we try is to
`delete` the affected consumer group. This is absolutely fine to do as a) no
consumers on the target cluster yet, b) the group is replicated again by MM2.
In 90% of instances the negative offset was corrected.
In the second instance (no data on partitions) the first thing we examined is
whether we could publish data (on source cluster) onto the topic to put data
onto the partition. This was then followed by a refresh (delete) of the
affected consumer group. This was possible only if the downstream consumer
handled either dummy garbage messages ok or was fine with a small number of
duplicate messages.
What if following the above a negative offset remained?
In the instance where there was zero data on a partition and no new data could
be published to it we let the consumer migrate onto the target cluster without
much worry. The Kafka consumer behaviour at this point would look at a negative
offset and throw a warning that it was out of range. It would then reset it's
offset on the cluster to its default setting - either consumer from `latest` or
`earliest`. Since there is 0 data on that partition this is one and the same
thing.
For instances (rare but did occur) where there remained a negative offset and
data on the partition we still migrated and relied on the consumer behaviour to
reset its offset to either `earliest` or `latest`. Depending on the consumer
and it's use case we picked whichever best suited the scenario.
Hope this is helpful in some way to others that might be experiencing these
issues.
> Initial offsets are copied from source to target cluster
> --------------------------------------------------------
>
> Key: KAFKA-12468
> URL: https://issues.apache.org/jira/browse/KAFKA-12468
> Project: Kafka
> Issue Type: Bug
> Components: mirrormaker
> Affects Versions: 2.7.0
> Reporter: Bart De Neuter
> Priority: Major
>
> We have an active-passive setup where the 3 connectors from mirror maker 2
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it
> seems the offsets from the source cluster are initially copied to the target
> cluster without translation. This causes a negative lag for all synced
> consumer groups. Only when we reset the offsets for each topic/partition on
> the target cluster and produce a record on the topic/partition in the source,
> the sync starts working correctly.
> I would expect that the consumer groups are synced but that the current
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>
> {code:xml}
> {
> "name": "mm2-mirror-heartbeat",
> "config": {
> "name": "mm2-mirror-heartbeat",
> "connector.class":
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
> "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "1",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter"
> }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
> "name": "mm2-mirror-checkpoint",
> "config": {
> "name": "mm2-mirror-checkpoint",
> "connector.class":
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
> "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter"
> }
> }
> {code}
> Source connector:
> {code:xml}
> {
> "name": "mm2-mirror-source",
> "config": {
> "name": "mm2-mirror-source",
> "connector.class":
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
> "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter"
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)