[jira] [Updated] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets
[ https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-15344: --- Description: We noticed an application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. An example of where this can cause issues: 1. We have a consumer group with consumer 1 and consumer 2. Partition P is assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has stale metadata for P. 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 50 without an epoch. 3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due to a network partition, the zombie leader may accept consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 2. If in step 1, consumer 1 committed the leader epoch for the message, then when consumer 2 receives assignment P it would force a metadata refresh to discover a sufficiently new leader epoch for the committed offset. The aim of this ticket is to let Kafka Streams commit offsets with leader epochs wherever possible. However, note that Kafka Streams cannot fully determine the leader epoch of the offsets it wants to commit - in EOS mode, streams commits the offset after the last control records (to avoid always having a lag of >0), but the leader epoch of the control record is not known to streams (since only non-control records are returned from Consumer.poll). Therefore, for the EOS case, the above problem cannot be solved with a change in streams alone. Instead, https://issues.apache.org/jira/browse/KAFKA-16248 shouild be implemented. was: We noticed an application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. An example of where this can cause issues: 1. We have a consumer group with consumer 1 and consumer 2. Partition P is assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has stale metadata for P. 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 50 without an epoch. 3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due to a network partition, the zombie leader may accept consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 2. If in step 1, consumer 1 committed the leader epoch for the message, then when consumer 2 receives assignment P it would force a metadata refresh to discover a sufficiently new leader epoch for the committed offset. The low-hanging fruit fix would be to have streams pass in the message epoch for each commit. Another fix discussed with [~hachikuji] is to have the consumer cache leader epoch ranges, similar to how the broker maintains a leader epoch cache. > Kafka Streams should include the message leader epoch when committing offsets > - > > Key: KAFKA-15344 > URL: https://issues.apache.org/ji
[jira] [Updated] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets
[ https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Mao updated KAFKA-15344: -- Description: We noticed an application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. An example of where this can cause issues: 1. We have a consumer group with consumer 1 and consumer 2. Partition P is assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has stale metadata for P. 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 50 without an epoch. 3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due to a network partition, the zombie leader may accept consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 2. If in step 1, consumer 1 committed the leader epoch for the message, then when consumer 2 receives assignment P it would force a metadata refresh to discover a sufficiently new leader epoch for the committed offset. The low-hanging fruit fix would be to have streams pass in the message epoch for each commit. Another fix discussed with [~hachikuji] is to have the consumer cache leader epoch ranges, similar to how the broker maintains a leader epoch cache. was: We noticed an application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. The low-hanging fruit fix would be to have streams pass in the message epoch for each commit. Another fix discussed with [~hachikuji] is to have the consumer cache leader epoch ranges, similar to how the broker maintains a leader epoch cache. > Kafka Streams should include the message leader epoch when committing offsets > - > > Key: KAFKA-15344 > URL: https://issues.apache.org/jira/browse/KAFKA-15344 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: David Mao >Priority: Major > > We noticed an application received an OFFSET_OUT_OF_RANGE error following a > network partition and streams task rebalance and subsequently reset its > offsets to the beginning. > Inspecting the logs, we saw multiple consumer log messages like: > {code:java} > Setting offset for partition tp to the committed offset > FetchPosition{offset=1234, offsetEpoch=Optional.empty...) > {code} > Inspecting the streams code, it looks like kafka streams calls `commitSync` > passing through an explicit OffsetAndMetadata object but does not populate > the offset leader epoch. > The offset leader epoch is required in the offset commit to ensure that all > consumers in the consumer group have coherent metadata before fetching. > Otherwise after a consumer group rebalance, a consumer may fetch with a stale > leader epoch with respect to the committed offset and get an offset out of > range error from a zombie partition leader. > An example of where this can cause issues: > 1. We have a consumer group with consumer 1 and consumer 2. Partition P is > assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has >
[jira] [Updated] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets
[ https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15344: Component/s: streams > Kafka Streams should include the message leader epoch when committing offsets > - > > Key: KAFKA-15344 > URL: https://issues.apache.org/jira/browse/KAFKA-15344 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: David Mao >Priority: Major > > We noticed an application received an OFFSET_OUT_OF_RANGE error following a > network partition and streams task rebalance and subsequently reset its > offsets to the beginning. > Inspecting the logs, we saw multiple consumer log messages like: > {code:java} > Setting offset for partition tp to the committed offset > FetchPosition{offset=1234, offsetEpoch=Optional.empty...) > {code} > Inspecting the streams code, it looks like kafka streams calls `commitSync` > passing through an explicit OffsetAndMetadata object but does not populate > the offset leader epoch. > The offset leader epoch is required in the offset commit to ensure that all > consumers in the consumer group have coherent metadata before fetching. > Otherwise after a consumer group rebalance, a consumer may fetch with a stale > leader epoch with respect to the committed offset and get an offset out of > range error from a zombie partition leader. > The low-hanging fruit fix would be to have streams pass in the message epoch > for each commit. Another fix discussed with [~hachikuji] is to have the > consumer cache leader epoch ranges, similar to how the broker maintains a > leader epoch cache. -- This message was sent by Atlassian Jira (v8.20.10#820010)