Lucas Brutschy created KAFKA-16248:
--------------------------------------

             Summary: Kafka consumer should cache leader offset ranges
                 Key: KAFKA-16248
                 URL: https://issues.apache.org/jira/browse/KAFKA-16248
             Project: Kafka
          Issue Type: Bug
            Reporter: Lucas Brutschy


We noticed a streams application received an OFFSET_OUT_OF_RANGE error 
following a network partition and streams task rebalance and subsequently reset 
its offsets to the beginning.

Inspecting the logs, we saw multiple consumer log messages like: 
{code:java}
Setting offset for partition tp to the committed offset 
FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
{code}
Inspecting the streams code, it looks like kafka streams calls `commitSync` 
passing through an explicit OffsetAndMetadata object but does not populate the 
offset leader epoch.

The offset leader epoch is required in the offset commit to ensure that all 
consumers in the consumer group have coherent metadata before fetching. 
Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
leader epoch with respect to the committed offset and get an offset out of 
range error from a zombie partition leader.

An example of where this can cause issues:
1. We have a consumer group with consumer 1 and consumer 2. Partition P is 
assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has 
stale metadata for P.
2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 
50 without an epoch.
3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 
2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will 
now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due 
to a network partition, the zombie leader may accept consumer 2's fetch leader 
epoch and return an OFFSET_OUT_OF_RANGE to consumer 2.

If in step 1, consumer 1 committed the leader epoch for the message, then when 
consumer 2 receives assignment P it would force a metadata refresh to discover 
a sufficiently new leader epoch for the committed offset.

Kafka Streams cannot fully determine the leader epoch of the offsets it wants 
to commit - in EOS mode, streams commits the offset after the last control 
records (to avoid always having a lag of >0), but the leader epoch of the 
control record is not known to streams (since only non-control records are 
returned from Consumer.poll).

A fix discussed with [~hachikuji] is to have the consumer cache leader epoch 
ranges, similar to how the broker maintains a leader epoch cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to