[
https://issues.apache.org/jira/browse/KAFKA-19794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ravindranath Kakarla updated KAFKA-19794:
-----------------------------------------
Description:
h2. *Description*
The MirrorCheckpointTask in Mirror Maker 2 commits offsets for active consumer
groups on the target cluster, causing consumers to rewind their offsets or skip
messages. Typically brokers prevent committing offsets for consumer groups that
are not in `EMPTY` state by throwing {{{}UnknownMemberIdException{}}}. In
addition, {{MirrorCheckpointTask}} has logic in place to prevent committing
offsets older than target for {{EMPTY}} consumer groups. However, due to a bug
in {{MirrorCheckpointTask}} code, this prevention check is not enforced and it
attempts to commit offsets for {{STABLE}} consumers. These calls can go through
if consumers were momentarily disconnected moving the group state to
{{{}EMPTY{}}}. This results in consumers' offsets getting reset to older
values. If the offset is not available on the target broker (due to retention),
the consumers can get reset to "{{{}earliest"{}}} or "{{{}latest"{}}}, thus
reading duplicates or skipping messages.
h2. *Bug location*
1. In [MirrorCheckpointTask|#L305], we only update the latest target cluster
offsets ({{{}idleConsumerGroupsOffset{}}}) if target consumer group state is
{{{}EMPTY{}}}.
2. When {{syncGroupOffset}} is called, we check if the target consumer group is
present in
{{{}idleConsumerGroupsOffset{}}}. The consumer group won't be present as it's
an active group. We assume that this is a new group and start syncing consumer
group offsets to target. These calls fail with {_}{{{{{}Unable to sync offsets
for consumer group XYZ. This is likely caused by consumers currently using this
group in the target cluster.
(org.apache.kafka.connect.mirror.MirrorCheckpointTask{}}}}}{_}. When consumers
have failed over, the logs typically contain a lot of these messages. These
calls can succeed if consumer is momentarily disconnected due to restarts. The
code should not assume the lack of consumer group in
{{idleConsumerGroupsOffset}} map as a new consumer group.
3. These erroneous behavior can also be triggered calls to
{{describeConsumerGroups}} or {{listConsumerGroupOffsets}} fail in
{{refreshIdleConsumerGroupOffset}} method due to transient timeouts.
h2. *Fix*
Potential fix would be to add an explicit check to only sync offsets for EMPTY
consumer group. We should also skip offset syncing for consumer groups for
which we couldn't refresh the offsets.
{code:java}
// Fixed code adds state checking:
ConsumerGroupState groupStateOnTarget =
targetConsumerGroupStates.get(consumerGroupId);
if (!isGroupPresentOnTarget || groupStateOnTarget == ConsumerGroupState.DEAD)
{ // Safe to sync - new or dead group syncGroupOffset(consumerGroupId,
convertedUpstreamOffset); }
else if (groupStateOnTarget == ConsumerGroupState.EMPTY)
{ // Safe to sync - idle group // ... existing offset comparison logic }
else {
// Skip active groups (STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE)
log.info("Consumer group: {} with state: {} is being actively consumed on
the target, skipping sync.",
consumerGroupId, groupStateOnTarget);
}
{code}
was:
h2. *Description*
The MirrorCheckpointTask in Mirror Maker 2 commits offsets for active consumer
groups on the target cluster, causing consumers to rewind their offsets or skip
messages. Typically brokers prevent committing offsets for consumer groups that
are not in `EMPTY` state by throwing {{{}UnknownMemberIdException{}}}. In
addition, {{MirrorCheckpointTask}} has logic in place to prevent committing
offsets older than target for {{EMPTY}} consumer groups. However, due to a bug
in {{MirrorCheckpointTask}} code, this prevention check is not enforced and it
attempts to commit offsets for {{STABLE}} consumers. These calls can go through
if consumers were momentarily disconnected moving the group state to
{{{}EMPTY{}}}. This results in consumers' offsets getting reset to older
values. If the offset is not available on the target broker (due to retention),
the consumers can get reset to "{{{}earliest"{}}} or "{{{}latest"{}}}, thus
reading duplicates or skipping messages.
h2. *Bug location*
1. In [MirrorCheckpointTask|#L305],] we only update the latest target cluster
offsets ({{{}idleConsumerGroupsOffset{}}}) if target consumer group state is
{{{}EMPTY{}}}.
2. When {{syncGroupOffset}} is called, we check if the target consumer group is
present in
{{{}idleConsumerGroupsOffset{}}}. The consumer group won't be present as it's
an active group. We assume that this is a new group and start syncing consumer
group offsets to target. These calls fail with {_}{{{{Unable to sync offsets
for consumer group XYZ. This is likely caused by consumers currently using this
group in the target cluster.
(org.apache.kafka.connect.mirror.MirrorCheckpointTask}}}}{_}. When consumers
have failed over, the logs typically contain a lot of these messages. These
calls can succeed if consumer is momentarily disconnected due to restarts. The
code should not assume the lack of consumer group in
{{idleConsumerGroupsOffset}} map as a new consumer group.
3. These erroneous behavior can also be triggered calls to
{{describeConsumerGroups}} or {{listConsumerGroupOffsets}} fail in
{{refreshIdleConsumerGroupOffset}} method due to transient timeouts.
h2. *Fix*
Potential fix would be to add an explicit check to only sync offsets for EMPTY
consumer group. We should also skip offset syncing for consumer groups for
which we couldn't refresh the offsets.
{code:java}
// Fixed code adds state checking:
ConsumerGroupState groupStateOnTarget =
targetConsumerGroupStates.get(consumerGroupId);
if (!isGroupPresentOnTarget || groupStateOnTarget == ConsumerGroupState.DEAD)
{ // Safe to sync - new or dead group syncGroupOffset(consumerGroupId,
convertedUpstreamOffset); }
else if (groupStateOnTarget == ConsumerGroupState.EMPTY)
{ // Safe to sync - idle group // ... existing offset comparison logic }
else {
// Skip active groups (STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE)
log.info("Consumer group: {} with state: {} is being actively consumed on
the target, skipping sync.",
consumerGroupId, groupStateOnTarget);
}
{code}
> MirrorCheckpointTask causes consumers on target cluster to rewind offsets or
> skip messages due to unsafe offset commits
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-19794
> URL: https://issues.apache.org/jira/browse/KAFKA-19794
> Project: Kafka
> Issue Type: Bug
> Components: mirrormaker
> Affects Versions: 3.5.2
> Reporter: Ravindranath Kakarla
> Priority: Major
>
> h2. *Description*
> The MirrorCheckpointTask in Mirror Maker 2 commits offsets for active
> consumer groups on the target cluster, causing consumers to rewind their
> offsets or skip messages. Typically brokers prevent committing offsets for
> consumer groups that are not in `EMPTY` state by throwing
> {{{}UnknownMemberIdException{}}}. In addition, {{MirrorCheckpointTask}} has
> logic in place to prevent committing offsets older than target for {{EMPTY}}
> consumer groups. However, due to a bug in {{MirrorCheckpointTask}} code, this
> prevention check is not enforced and it attempts to commit offsets for
> {{STABLE}} consumers. These calls can go through if consumers were
> momentarily disconnected moving the group state to {{{}EMPTY{}}}. This
> results in consumers' offsets getting reset to older values. If the offset is
> not available on the target broker (due to retention), the consumers can get
> reset to "{{{}earliest"{}}} or "{{{}latest"{}}}, thus reading duplicates or
> skipping messages.
> h2. *Bug location*
> 1. In [MirrorCheckpointTask|#L305], we only update the latest target cluster
> offsets ({{{}idleConsumerGroupsOffset{}}}) if target consumer group state is
> {{{}EMPTY{}}}.
> 2. When {{syncGroupOffset}} is called, we check if the target consumer group
> is present in
> {{{}idleConsumerGroupsOffset{}}}. The consumer group won't be present as it's
> an active group. We assume that this is a new group and start syncing
> consumer group offsets to target. These calls fail with {_}{{{{{}Unable to
> sync offsets for consumer group XYZ. This is likely caused by consumers
> currently using this group in the target cluster.
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask{}}}}}{_}. When
> consumers have failed over, the logs typically contain a lot of these
> messages. These calls can succeed if consumer is momentarily disconnected due
> to restarts. The code should not assume the lack of consumer group in
> {{idleConsumerGroupsOffset}} map as a new consumer group.
> 3. These erroneous behavior can also be triggered calls to
> {{describeConsumerGroups}} or {{listConsumerGroupOffsets}} fail in
> {{refreshIdleConsumerGroupOffset}} method due to transient timeouts.
> h2. *Fix*
> Potential fix would be to add an explicit check to only sync offsets for
> EMPTY consumer group. We should also skip offset syncing for consumer groups
> for which we couldn't refresh the offsets.
>
> {code:java}
> // Fixed code adds state checking:
> ConsumerGroupState groupStateOnTarget =
> targetConsumerGroupStates.get(consumerGroupId);
> if (!isGroupPresentOnTarget || groupStateOnTarget == ConsumerGroupState.DEAD)
> { // Safe to sync - new or dead group
> syncGroupOffset(consumerGroupId, convertedUpstreamOffset); }
> else if (groupStateOnTarget == ConsumerGroupState.EMPTY)
> { // Safe to sync - idle group // ... existing offset comparison
> logic }
> else {
> // Skip active groups (STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE)
> log.info("Consumer group: {} with state: {} is being actively consumed on
> the target, skipping sync.",
> consumerGroupId, groupStateOnTarget);
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)