[ https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879025#comment-17879025 ]
Matthias J. Sax commented on KAFKA-15344: ----------------------------------------- Should we close this one, in favor of KAFKA-16248 ? > Kafka Streams should include the message leader epoch when committing offsets > ----------------------------------------------------------------------------- > > Key: KAFKA-15344 > URL: https://issues.apache.org/jira/browse/KAFKA-15344 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: David Mao > Assignee: Alieh Saeedi > Priority: Major > > We noticed an application received an OFFSET_OUT_OF_RANGE error following a > network partition and streams task rebalance and subsequently reset its > offsets to the beginning. > Inspecting the logs, we saw multiple consumer log messages like: > {code:java} > Setting offset for partition tp to the committed offset > FetchPosition{offset=1234, offsetEpoch=Optional.empty...) > {code} > Inspecting the streams code, it looks like kafka streams calls `commitSync` > passing through an explicit OffsetAndMetadata object but does not populate > the offset leader epoch. > The offset leader epoch is required in the offset commit to ensure that all > consumers in the consumer group have coherent metadata before fetching. > Otherwise after a consumer group rebalance, a consumer may fetch with a stale > leader epoch with respect to the committed offset and get an offset out of > range error from a zombie partition leader. > An example of where this can cause issues: > 1. We have a consumer group with consumer 1 and consumer 2. Partition P is > assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has > stale metadata for P. > 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset > 50 without an epoch. > 3. The consumer group rebalances and P is now assigned to consumer 2. > Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). > Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a > zombie leader due to a network partition, the zombie leader may accept > consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer > 2. > If in step 1, consumer 1 committed the leader epoch for the message, then > when consumer 2 receives assignment P it would force a metadata refresh to > discover a sufficiently new leader epoch for the committed offset. > The aim of this ticket is to let Kafka Streams commit offsets with leader > epochs wherever possible. However, note that Kafka Streams cannot fully > determine the leader epoch of the offsets it wants to commit - in EOS mode, > streams commits the offset after the last control records (to avoid always > having a lag of >0), but the leader epoch of the control record is not known > to streams (since only non-control records are returned from Consumer.poll). > Therefore, for the EOS case, the above problem cannot be solved with a change > in streams alone. Instead, https://issues.apache.org/jira/browse/KAFKA-16248 > shouild be implemented. -- This message was sent by Atlassian Jira (v8.20.10#820010)