[ 
https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879025#comment-17879025
 ] 

Matthias J. Sax commented on KAFKA-15344:
-----------------------------------------

Should we close this one, in favor of KAFKA-16248 ?

> Kafka Streams should include the message leader epoch when committing offsets
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-15344
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15344
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: David Mao
>            Assignee: Alieh Saeedi
>            Priority: Major
>
> We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
> network partition and streams task rebalance and subsequently reset its 
> offsets to the beginning.
> Inspecting the logs, we saw multiple consumer log messages like: 
> {code:java}
> Setting offset for partition tp to the committed offset 
> FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
> {code}
> Inspecting the streams code, it looks like kafka streams calls `commitSync` 
> passing through an explicit OffsetAndMetadata object but does not populate 
> the offset leader epoch.
> The offset leader epoch is required in the offset commit to ensure that all 
> consumers in the consumer group have coherent metadata before fetching. 
> Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
> leader epoch with respect to the committed offset and get an offset out of 
> range error from a zombie partition leader.
> An example of where this can cause issues:
> 1. We have a consumer group with consumer 1 and consumer 2. Partition P is 
> assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has 
> stale metadata for P.
> 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 
> 50 without an epoch.
> 3. The consumer group rebalances and P is now assigned to consumer 2. 
> Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). 
> Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a 
> zombie leader due to a network partition, the zombie leader may accept 
> consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 
> 2.
> If in step 1, consumer 1 committed the leader epoch for the message, then 
> when consumer 2 receives assignment P it would force a metadata refresh to 
> discover a sufficiently new leader epoch for the committed offset.
> The aim of this ticket is to let Kafka Streams commit offsets with leader 
> epochs wherever possible. However, note that Kafka Streams cannot fully 
> determine the leader epoch of the offsets it wants to commit - in EOS mode, 
> streams commits the offset after the last control records (to avoid always 
> having a lag of >0), but the leader epoch of the control record is not known 
> to streams (since only non-control records are returned from Consumer.poll). 
> Therefore, for the EOS case, the above problem cannot be solved with a change 
> in streams alone. Instead, https://issues.apache.org/jira/browse/KAFKA-16248 
> shouild be implemented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to