wilmerdooley opened a new pull request, #22558:
URL: https://github.com/apache/kafka/pull/22558

   When the partition leader changes, the consumer's 
`FetchPosition.currentLeader` can become stale because it is inherited from the 
previous position and only refreshed when leader-change errors are observed. 
`SubscriptionState.allConsumed()` then committed `position.offsetEpoch` (the 
epoch of the last consumed batch) instead, which can reference log segments 
that have since been deleted by retention, causing `OFFSET_OUT_OF_RANGE` on the 
next session and an `auto.offset.reset` jump.
   
   This change keeps `currentLeader` fresh by looking it up in metadata every 
time the fetch position is advanced in `FetchCollector`, and makes 
`SubscriptionState.allConsumed()` use the maximum of `offsetEpoch` and 
`currentLeader.epoch` (falling back to whichever is present) so that the 
committed offset epoch matches the most recent leader epoch known to the client.
   
   ---
   
   Delete this text and replace it with a detailed description of your change. 
The
   PR title and body will become the squashed commit message.
   
   If you would like to tag individuals, add some commentary, upload images, or
   include other supplemental information that should not be part of the 
eventual
   commit message, please use a separate comment.
   
   If applicable, please include a summary of the testing strategy (including
   rationale) for the proposed change. Unit and/or integration tests are 
expected
   for any behavior change and system tests should be considered for larger
   changes.
   
   ### Testing strategy
   
   - Updated 
`SubscriptionStateTest.testAllConsumedUsesLatestAvailableLeaderEpoch` covers 
the new `allConsumed()` epoch selection across the four combinations of 
present/absent `offsetEpoch` and `currentLeader.epoch`.
   - Added `FetchCollectorTest.testPositionUpdateUsesCurrentLeaderEpoch` to 
verify that advancing the fetch position refreshes `currentLeader.epoch` from 
the metadata cache.
   
   - [x] Unit tests added/updated
   
   JIRA: https://issues.apache.org/jira/browse/KAFKA-19902


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to