[ 
https://issues.apache.org/jira/browse/KAFKA-19794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravindranath Kakarla updated KAFKA-19794:
-----------------------------------------
    Description: 
h2. *Description*

The MirrorCheckpointTask in Mirror Maker 2 commits offsets for active consumer 
groups on the target cluster, causing consumers to rewind their offsets or skip 
messages. Typically brokers prevent committing offsets for consumer groups that 
are not in `EMPTY` state by throwing {{{}UnknownMemberIdException{}}}. In 
addition, {{MirrorCheckpointTask}} has logic in place to prevent committing 
offsets older than target for {{EMPTY}} consumer groups. However, due to a bug 
in {{MirrorCheckpointTask}} code, this prevention check is not enforced and it 
attempts to commit offsets for {{STABLE}} consumers. These calls can go through 
if consumers were momentarily disconnected moving the group state to 
{{{}EMPTY{}}}. This results in consumers' offsets getting reset to older 
values. If the offset is not available on the target broker (due to retention), 
the consumers can get reset to "{{{}earliest"{}}} or "{{{}latest"{}}}, thus 
reading duplicates or skipping messages. 
h2. *Bug location*

1. In [MirrorCheckpointTask|#L305],] we only update the latest target cluster 
offsets ({{{}idleConsumerGroupsOffset{}}})  if target consumer group state is 
{{{}EMPTY{}}}.

2. When {{syncGroupOffset}} is called, we check if the target consumer group is 
present in  

{{{}idleConsumerGroupsOffset{}}}. The consumer group won't be present as it's 
an active group. We assume that this is a new group and start syncing consumer 
group offsets to target. These calls fail with {_}{{{{Unable to sync offsets 
for consumer group XYZ. This is likely caused by consumers currently using this 
group in the target cluster. 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask}}}}{_}. When consumers 
have failed over, the logs typically contain a lot of these messages. These 
calls can succeed if consumer is momentarily disconnected due to restarts. The 
code should not assume the lack of consumer group in 
{{idleConsumerGroupsOffset}} map as a new consumer group.

3. These erroneous behavior can also be triggered calls to 
{{describeConsumerGroups}} or {{listConsumerGroupOffsets}} fail in 
{{refreshIdleConsumerGroupOffset}} method due to transient timeouts.
h2. *Fix*

Potential fix would be to add an explicit check to only sync offsets for EMPTY 
consumer group. We should also skip offset syncing for consumer groups for 
which we couldn't refresh the offsets. 

 
{code:java}
// Fixed code adds state checking:
ConsumerGroupState groupStateOnTarget = 
targetConsumerGroupStates.get(consumerGroupId);
if (!isGroupPresentOnTarget || groupStateOnTarget == ConsumerGroupState.DEAD)
{     // Safe to sync - new or dead group     syncGroupOffset(consumerGroupId, 
convertedUpstreamOffset); }
else if (groupStateOnTarget == ConsumerGroupState.EMPTY)
{     // Safe to sync - idle group     // ... existing offset comparison logic }
else {
    // Skip active groups (STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE)
    log.info("Consumer group: {} with state: {} is being actively consumed on 
the target, skipping sync.",
             consumerGroupId, groupStateOnTarget);
}
{code}
 

 

  was:
The MirrorCheckpointTask in Mirror Maker 2 commits offsets for active consumer 
groups on the target cluster, causing consumers to rewind their offsets or skip 
messages. Typically brokers prevent committing offsets for consumer groups that 
are not in `EMPTY` state by throwing `UnknownMemberIdException`. In addition, 
`MirrorCheckpointTask` has logic in place to prevent committing offsets older 
than target for `EMPTY` consumer groups. However, due to a bug in 
`MirrorCheckpointTask` code, this prevention check is not enforced and it 
attempts to commit offsets for `STABLE` consumers. These calls can go through 
if consumers were momentarily disconnected moving the group state to `EMPTY`. 
This results in consumers' offsets getting reset to older values. If the offset 
is not available on the target broker (due to retention), the consumers can get 
reset to `earliest` or `latest`, thus reading duplicates or skipping messages. 

### Bug location

1. In 
[MirrorCheckpointTask|[https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L305],]
 we only update the latest target cluster offsets (`idleConsumerGroupsOffset`)  
if target consumer group state is `EMPTY`.

2. When `syncGroupOffset` is called, we check if the target consumer group is 
present in  

`idleConsumerGroupsOffset`. The consumer group won't be present as it's an 
active group. We assume that this is a new group and start syncing consumer 
group offsets to target. These calls fail with `{{{}Unable to sync offsets for 
consumer group XYZ. This is likely caused by consumers currently using this 
group in the target cluster. 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:{}}}`. When consumers 
have failed over, the logs typically contain a lot of these messages. These 
calls can succeed if consumer is momentarily disconnected due to restarts. The 
code should not assume the lack of consumer group in `idleConsumerGroupsOffset` 
map as a new consumer group.

3. These erroneous behavior can also be triggered calls to 
`describeConsumerGroups` or `listConsumerGroupOffsets` fail in 
`refreshIdleConsumerGroupOffset` method due to transient timeouts.

### Fix

Potential fix would be to add an explicit check to only sync offsets for 
`EMPTY` consumer group. We should also skip offset syncing for consumer groups 
for which we couldn't refresh the offsets. 

 

```

java
// Fixed code adds state checking:
ConsumerGroupState groupStateOnTarget = 
targetConsumerGroupStates.get(consumerGroupId);
if (!isGroupPresentOnTarget || groupStateOnTarget == ConsumerGroupState.DEAD) {
    // Safe to sync - new or dead group
    syncGroupOffset(consumerGroupId, convertedUpstreamOffset);
} else if (groupStateOnTarget == ConsumerGroupState.EMPTY) {
    // Safe to sync - idle group
    // ... existing offset comparison logic
} else {
    // Skip active groups (STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE)
    log.info("Consumer group: {} with state: {} is being actively consumed on 
the target, skipping sync.",
             consumerGroupId, groupStateOnTarget);
}

```


> MirrorCheckpointTask causes consumers on target cluster to rewind offsets or 
> skip messages due to unsafe offset commits
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19794
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19794
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.5.2
>            Reporter: Ravindranath Kakarla
>            Priority: Major
>
> h2. *Description*
> The MirrorCheckpointTask in Mirror Maker 2 commits offsets for active 
> consumer groups on the target cluster, causing consumers to rewind their 
> offsets or skip messages. Typically brokers prevent committing offsets for 
> consumer groups that are not in `EMPTY` state by throwing 
> {{{}UnknownMemberIdException{}}}. In addition, {{MirrorCheckpointTask}} has 
> logic in place to prevent committing offsets older than target for {{EMPTY}} 
> consumer groups. However, due to a bug in {{MirrorCheckpointTask}} code, this 
> prevention check is not enforced and it attempts to commit offsets for 
> {{STABLE}} consumers. These calls can go through if consumers were 
> momentarily disconnected moving the group state to {{{}EMPTY{}}}. This 
> results in consumers' offsets getting reset to older values. If the offset is 
> not available on the target broker (due to retention), the consumers can get 
> reset to "{{{}earliest"{}}} or "{{{}latest"{}}}, thus reading duplicates or 
> skipping messages. 
> h2. *Bug location*
> 1. In [MirrorCheckpointTask|#L305],] we only update the latest target cluster 
> offsets ({{{}idleConsumerGroupsOffset{}}})  if target consumer group state is 
> {{{}EMPTY{}}}.
> 2. When {{syncGroupOffset}} is called, we check if the target consumer group 
> is present in  
> {{{}idleConsumerGroupsOffset{}}}. The consumer group won't be present as it's 
> an active group. We assume that this is a new group and start syncing 
> consumer group offsets to target. These calls fail with {_}{{{{Unable to sync 
> offsets for consumer group XYZ. This is likely caused by consumers currently 
> using this group in the target cluster. 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask}}}}{_}. When consumers 
> have failed over, the logs typically contain a lot of these messages. These 
> calls can succeed if consumer is momentarily disconnected due to restarts. 
> The code should not assume the lack of consumer group in 
> {{idleConsumerGroupsOffset}} map as a new consumer group.
> 3. These erroneous behavior can also be triggered calls to 
> {{describeConsumerGroups}} or {{listConsumerGroupOffsets}} fail in 
> {{refreshIdleConsumerGroupOffset}} method due to transient timeouts.
> h2. *Fix*
> Potential fix would be to add an explicit check to only sync offsets for 
> EMPTY consumer group. We should also skip offset syncing for consumer groups 
> for which we couldn't refresh the offsets. 
>  
> {code:java}
> // Fixed code adds state checking:
> ConsumerGroupState groupStateOnTarget = 
> targetConsumerGroupStates.get(consumerGroupId);
> if (!isGroupPresentOnTarget || groupStateOnTarget == ConsumerGroupState.DEAD)
> {     // Safe to sync - new or dead group     
> syncGroupOffset(consumerGroupId, convertedUpstreamOffset); }
> else if (groupStateOnTarget == ConsumerGroupState.EMPTY)
> {     // Safe to sync - idle group     // ... existing offset comparison 
> logic }
> else {
>     // Skip active groups (STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE)
>     log.info("Consumer group: {} with state: {} is being actively consumed on 
> the target, skipping sync.",
>              consumerGroupId, groupStateOnTarget);
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to