[
https://issues.apache.org/jira/browse/KAFKA-19902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18044617#comment-18044617
]
Nicolae Marasoiu edited comment on KAFKA-19902 at 12/12/25 8:47 AM:
--------------------------------------------------------------------
Hi RivenSun,
Thanks for sharing your ConsumerRebalanceListener code. This confirms the root
cause lies in how the offsets are constructed in your application, rather than
a bug in Kafka.
The Fix
Your code currently initializes OffsetAndMetadata using only the offset, which
forces the leader epoch to Optional.empty(). You must explicitly preserve the
epoch to ensure valid commits.
Update your code as follows:
Java
// Current (Causes the issue):
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
// Recommended Fix:
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(
record.offset() + 1,
record.leaderEpoch(), // Critical: Preserve the epoch
""
));
Explanation
The Root Cause: By committing with Optional.empty(), you effectively delete the
epoch information. When the consumer later fetches this position, the
OffsetsForLeaderEpoch validation is skipped because the epoch is missing.
Why Production Failed: In your production logs, a log truncation event (due to
the leader change/ISR shrink) made that specific offset invalid. Without the
epoch to protect it during validation, the fetch failed with
OFFSET_OUT_OF_RANGE.
Why Local Reproduction Failed: To reproduce this, you need both the epoch-less
commit AND a log truncation that invalidates that offset. Your local test
likely had the bad commit but did not trigger the specific truncation scenario.
was (Author: nmarasoiu):
This is excellent - RivenSun's reply reveals the actual root cause. Let me
break down the facts and analysis carefully.
---
Facts from RivenSun's Reply
The smoking gun - their onPartitionsRevoked code:
// How lastOffsets is populated:
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)); // <-- No epoch!
// In onPartitionsRevoked:
lastOffsets.forEach((k, v) -> kafkaConsumer.seek(k, v));
kafkaConsumer.commitSync(); // <- this flushes the (epoch=Optional.empty,
offset) as a fresh re-commit, effectively deleting the epoch - so not its no
longer a stale epoch, its a missing epoch entirely, leading to full reset
RivenSun confirms:
"This causes the leaderEpoch in OffsetAndMetadata to be Optional.empty()"
And critically (comment 18044362):
"I re-simulated manually committing offsets locally... None of these methods
reproduced the problem... I believe the real cause is log truncation triggering
the edge cases... Perhaps a combination of
both is the true root cause."
---
The Complete Causal Chain
1. Normal operation: Auto-commit (enabled) stores offsets with correct epochs
2. Rebalance triggers onPartitionsRevoked
3. User code executes: seek(tp, OffsetAndMetadata(offset+1)) ← epoch is
Optional.empty
4. User code commits: commitSync() overwrites the good auto-committed offset
with epoch-less one
5. After rebalance: Consumer fetches committed offset (now without epoch)
6. Without epoch: Validation via OffsetsForLeaderEpoch is skipped entirely
(the check requires offsetEpoch to be present)
7. Consumer fetches from committed offset blindly
8. If log truncation happened and offset exceeds new log end →
OFFSET_OUT_OF_RANGE
9. Reset per auto.offset.reset=earliest
---
Why They Couldn't Reproduce Locally
The problem requires both conditions:
| Condition | Production | Local Test |
|--------------------------------------|------------|------------|
| Committed offset without epoch | ✓ | ✓ |
| Log truncation making offset invalid | ✓ | ✗ |
The production incident had:
- ISR=1 (from broker logs: "Partition ISR shrinking from 10,11,30 to 10")
- Potential unclean leader election
- New leader's log end was below the committed offset (30795645 < 30795753)
Locally, even with leader switch, there was no truncation that made the
offset invalid.
---
Assessment: User Error vs. Kafka Bug
This is primarily user error, with API usability concerns
User's mistake:
// They construct OffsetAndMetadata without epoch:
new OffsetAndMetadata(record.offset() + 1)
// Should be:
new OffsetAndMetadata(record.offset() + 1, record.leaderEpoch(), "")
Better pattern - they don't need seek() at all:
// Instead of:
lastOffsets.forEach((k, v) -> kafkaConsumer.seek(k, v));
kafkaConsumer.commitSync();
// Just do:
kafkaConsumer.commitSync(lastOffsets); // Direct commit with specific offsets
But even this requires fixing how lastOffsets is populated.
---
API Usability Concerns (for Kafka maintainers)
1. Easy to lose epoch: OffsetAndMetadata(long offset) constructor makes it
trivial to create epoch-less metadata
2. No warning: No log when committing without epoch
3. seek() clears epoch: ClassicKafkaConsumer.seek() line 789 explicitly
passes Optional.empty()
Potential Kafka improvements (not bugs, but hardening):
1. Log a warning when committing offset without epoch in Kafka 3.x+ where
epochs are expected
2. Defensive commit path: If committing without epoch, could attempt to use
current leader epoch from metadata instead of empty
3. Documentation: Better docs on preserving epochs in manual commit scenarios
---
Response to RivenSun
I would suggest answering:
Short answer: Your code is the proximate cause. You should preserve the
leader epoch when constructing OffsetAndMetadata.
The fix for your code:
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1,
record.leaderEpoch(), // ← Add this!
""));
Why you couldn't reproduce: You need both (a) epoch-less commit AND (b) log
truncation that makes the offset invalid. Your local test had (a) but not (b).
Is this a Kafka bug?: No, but the API makes it easy to lose epochs. A
possible Kafka improvement would be to warn when committing without epoch, or
to defensively use the current leader epoch if none is
provided.
> Consumer triggers OFFSET_OUT_OF_RANGE when committed offset uses stale epoch
> after leader change
> ------------------------------------------------------------------------------------------------
>
> Key: KAFKA-19902
> URL: https://issues.apache.org/jira/browse/KAFKA-19902
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Affects Versions: 3.9.0
> Reporter: RivenSun
> Priority: Major
> Attachments: analysis.md, image-2025-11-21-18-03-24-592.png,
> image-2025-11-21-18-11-24-316.png, image-2025-11-21-20-00-10-862.png
>
>
> h2. Summary
> When a partition leader changes and the consumer commits offsets with a stale
> epoch, if the log segments containing that epoch are subsequently deleted due
> to retention policy, the consumer will encounter OFFSET_OUT_OF_RANGE error
> and reset to earliest (if auto.offset.reset=earliest), causing massive
> message reprocessing.The root cause is that SubscriptionState.allConsumed()
> uses position.offsetEpoch instead of position.currentLeader.epoch when
> constructing OffsetAndMetadata for commit, which can become stale when leader
> changes occur.
> If `auto.offset.reset=latest`, *the consequences will be more severe,
> resulting in message loss.*
> ----
> h2. Environment
> Cluster Configuration:
> * Kafka Server Version: 3.9.0
> * Kafka Client Version: 3.9.0
> * Topic: 200 partitions, 7-day retention, no tiered storage
> * Consumer Group: 45 consumers (1 KafkaConsumer thread per machine)
> * No broker/controller restarts occurred
> * High throughput producer continuously writing messages
> Consumer Configuration:
> {code:java}
> auto.offset.reset=earliest
> enable.auto.commit=true {code}
>
> Consumer Code:
> * Registered ConsumerRebalanceListener
> * Calls kafkaConsumer.commitSync() in onPartitionsRevoked() method
> ----
> h2. Problem Description
> In a scenario where the consumer group has no lag, consumers suddenly
> consumed a massive amount of messages, far exceeding the recent few minutes
> of producer writes. Investigation revealed that multiple partitions reset to
> the earliest offset and reprocessed up to 7 days of historical data.
> ----
> h2. Observed Symptoms (Timeline)
> # Consumer group rebalance occurred (triggered by normal consumer group
> management)
> # Consumer logged OFFSET_OUT_OF_RANGE errors immediately after rebalance
> # Consumer reset to earliest offset due to auto.offset.reset=earliest
> configuration
> # Kafka broker logged NotLeaderOrFollowerException around the same
> timeframe, indicating partition leader changes
> # Consumer did not log any NOT_LEADER_OR_FOLLOWER errors (these are DEBUG
> level and not visible in production logs)
> The image below uses the partition
> asyncmq_local_us_us_marketplace-ssl-a_aws_us-east-1_2a7e053c-9d90-4efd-af2d-3a8bf9564715-153
> as an example to trace the problem log chain. {*}See attached log screenshot
> at the bottom{*}.
> ----
> h2. Root Cause Analysis
> h3. The Problem Chain
> 1. Leader change occurs (epoch changes from N to N+1)
> ↓
> 2. Consumer continues processing old batches (epoch=N)
> ↓
> 3. Consumer commits offset during/after rebalance
> ├─ Committed offset: 1000
> └─ Committed epoch: N (using position.offsetEpoch from old batch)
> ↓
> 4. High throughput + retention policy causes old segments (epoch=N) to be
> deleted
> ↓
> 5. Consumer restarts/rebalances and fetches committed offset
> ├─ Tries to validate offset 1000 with epoch=N
> └─ Broker cannot find epoch=N ({*}segments deleted or tp leader change{*})
> ↓
> 6. Broker returns OFFSET_OUT_OF_RANGE
> ↓
> 7. Consumer resets to earliest offset
> h3. Code Analysis
> The problematic code in SubscriptionState.allConsumed():
> {code:java}
> //
> kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition())
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> partitionState.position.offsetEpoch, // Problem: uses
> offsetEpoch from consumed batch
> ""));
> });
> return allConsumed;
> } {code}
>
> Why this is problematic:The FetchPosition class contains two different epoch
> values:
> * offsetEpoch: The epoch from the last consumed record's batch
> * currentLeader.epoch: The current partition leader's epoch from metadata
> When committing offsets, we should use currentLeader.epoch instead of
> offsetEpoch because:
> # offsetEpoch represents the epoch of already consumed data (historical)
> # currentLeader.epoch represents the current partition leader (up-to-date)
> h3. Scenarios Where These Epochs Diverge
> Scenario A: Leader changes while consumer is processing old batches
> * T1: Consumer fetches batch with epoch=5
> * T2: Leader changes to epoch=6
> * T3: Metadata updates with new leader epoch=6
> * T4: Consumer commits offset
> * offsetEpoch = 5 (from batch being processed)
> * currentLeader.epoch = 6 (from updated metadata)
> * Problem: Commits epoch=5, which may soon be deleted
> Scenario B: Recovery from committed offset after leader change
> * Consumer commits offset with old epoch=N
> * Leader changes to epoch=N+1
> * Old segments (epoch=N) are deleted by retention policy
> * Consumer rebalances and tries to restore from committed offset
> * offsetEpoch = N (from committed offset)
> * currentLeader.epoch = N+1 (from current metadata)
> * Problem: Validation fails because epoch=N no longer exists
> ----
> h2. Steps to Reproduce
> This is a timing-sensitive edge case. The following conditions increase the
> likelihood:
> # Setup:
> * High-throughput topic (to trigger faster log rotation)
> * Relatively short retention period (e.g., 7 days)
> * Consumer group with rebalance listener calling commitSync()
> * enable.auto.commit=true (or any manual commit)
> # Trigger:
> * Trigger a partition leader change (broker restart, controller election,
> etc.)
> * Simultaneously or shortly after, trigger a consumer group rebalance
> * Wait for retention policy to delete old log segments
> # Expected Result:
> Consumer should resume from committed offset
> # Actual Result:
> Consumer encounters OFFSET_OUT_OF_RANGE and resets to earliest
> ----
> h2. Impact
> * Data Reprocessing: Consumers may reprocess up to retention.ms worth of data
> * Service Degradation: Sudden spike in consumer throughput can overwhelm
> downstream systems
> * Resource Waste: Unnecessary CPU, memory, and network usage
> * Potential Duplicates: If using auto.offset.reset=earliest, duplicate
> message processing is guaranteed
> * If `auto.offset.reset=latest`, *the consequences will be more severe,
> resulting in message loss.*
> ----
> h2. Proposed Fix
> h3. Root Cause Analysis
> The issue is more fundamental than a simple field selection problem. The core
> issue is that both epoch values in FetchPosition can be stale at commit time:
> # offsetEpoch: Contains the epoch from the last consumed record's batch. If
> a leader change occurs after consumption but before commit, this epoch
> becomes stale and may reference log segments that have been deleted.
> # currentLeader.epoch: Inherited from the previous position during normal
> consumption and only updated when:
> * NOT_LEADER_OR_FOLLOWER or FENCED_LEADER_EPOCH errors are detected
> * Position is restored from committed offsets (fetches from metadata)
> * Explicit validation is triggered via
> maybeValidatePositionForCurrentLeader()
> During normal, error-free consumption, currentLeader is never updated and can
> also become stale.
> h3. Problem with Current Code
> Location: org.apache.kafka.clients.consumer.internals.FetchCollector
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed
> batch
> position.currentLeader); // ❌ currentLeader: inherited,
> NOT updated!
> log.trace("Updating fetch position from {} to {} for partition {} and
> returning {} records from `poll()`",
> position, nextPosition, tp, partRecords.size());
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> }
> {code}
> The inherited currentLeader means it can be as stale as offsetEpoch in
> certain scenarios.
> ----
> h3. Recommended Solution: Proactively Update currentLeader During Position
> Updates
> Option 1: Update currentLeader when advancing position (Primary
> Recommendation)Modify FetchCollector to fetch the latest leader information
> from metadata every time the position is updated:
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> // Fetch the latest leader information from metadata
> Metadata.LeaderAndEpoch currentLeaderAndEpoch =
> metadata.currentLeader(tp);
>
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(),
> currentLeaderAndEpoch); // ✅ Use fresh leader info from metadata
>
> log.trace("Updating fetch position from {} to {} for partition {} and
> returning {} records from `poll()`",
> position, nextPosition, tp, partRecords.size());
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> } {code}
> Advantages:
> * Ensures currentLeader is always up-to-date
> * Makes allConsumed() safe to use *currentLeader.epoch* for commits
> Modify SubscriptionState.allConsumed() to {color:#de350b}use
> currentLeader.epoch instead of offsetEpoch{color}:
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition())
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> partitionState.position.currentLeader.epoch, // ✅ Use
> current leader epoch
> ""));
> });
> return allConsumed;
> } {code}
>
> * Minimal performance impact (metadata lookup is O(1) from local cache)
> * Aligns with the existing pattern in refreshCommittedOffsets()
> Potential Concerns:
> * Adds one metadata lookup per position update
> * If metadata is stale, currentLeader.epoch could still lag slightly, but
> this is the same risk as today
> ----
> h3. Alternative Solutions
> Option 2: Fetch fresh leader info during commitModify allConsumed() to fetch
> the latest leader information at commit time:
> {code:java}
> // Note: This would require passing metadata reference to allConsumed()
> public synchronized Map<TopicPartition, OffsetAndMetadata>
> allConsumed(ConsumerMetadata metadata) {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition()) {
> // Fetch the latest leader epoch from metadata at commit time
> Metadata.LeaderAndEpoch latestLeader =
> metadata.currentLeader(topicPartition);
> Optional<Integer> epochToCommit = latestLeader.epoch.isPresent()
> ? latestLeader.epoch
> : partitionState.position.offsetEpoch; // Fallback to
> offsetEpoch
>
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> epochToCommit,
> ""));
> }
> });
> return allConsumed;
> } {code}
> Advantages:
> * Only impacts commit path, not consumption hot path
> * Directly addresses the commit-time staleness issue
> Disadvantages:
> * Requires changing the signature of allConsumed() (API change)
> * May still have a race condition if leader changes between metadata fetch
> and commit
> * Metadata could be stale if update hasn't been processed yet
> ----
> Option 3: Use the maximum epoch valueUse the larger of the two epoch values,
> assuming newer epochs have higher values:
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition()) {
> Optional<Integer> epochToCommit;
>
> if (partitionState.position.offsetEpoch.isPresent() &&
> partitionState.position.currentLeader.epoch.isPresent()) {
> // Use the maximum of the two epochs
> int maxEpoch = Math.max(
> partitionState.position.offsetEpoch.get(),
> partitionState.position.currentLeader.epoch.get());
> epochToCommit = Optional.of(maxEpoch);
> } else {
> // Fallback to whichever is present
> epochToCommit =
> partitionState.position.currentLeader.epoch.isPresent()
> ? partitionState.position.currentLeader.epoch
> : partitionState.position.offsetEpoch;
> }
>
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> epochToCommit,
> ""));
> }
> });
> return allConsumed;
> } {code}
> Advantages:
> * No API changes required
> * Simple to implement
> * Provides better protection than using only one epoch
> Disadvantages:
> * Heuristic-based; assumes epochs are monotonically increasing
> * Could still use a stale epoch if both values are old
> * Doesn't solve the root cause of stale currentLeader
> ----
> h3. Recommendation
> Primary recommendation: Implement Option 1 (Update currentLeader during
> position updates)This is the most robust solution because:
> # It ensures currentLeader is always fresh
> # It fixes the root cause rather than working around symptoms
> # It has minimal performance impact
> # It makes the codebase more consistent and maintainable
> Secondary recommendation: Implement Option 3 as a defense-in-depth
> measureEven with Option 1, using max(offsetEpoch, currentLeader.epoch) in
> allConsumed() provides additional safety against any edge cases where one
> epoch might be more up-to-date than the other.Combined approach (strongest
> protection):
> {code:java}
> // In FetchCollector.java
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> Metadata.LeaderAndEpoch currentLeaderAndEpoch =
> metadata.currentLeader(tp);
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(),
> currentLeaderAndEpoch); // ✅ Keep currentLeader fresh
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> }
>
> // In SubscriptionState.java
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition()) {
> // Use the maximum epoch as defense-in-depth
> Optional<Integer> epochToCommit;
> if (partitionState.position.offsetEpoch.isPresent() &&
> partitionState.position.currentLeader.epoch.isPresent()) {
> epochToCommit = Optional.of(Math.max(
> partitionState.position.offsetEpoch.get(),
> partitionState.position.currentLeader.epoch.get()));
> } else {
> epochToCommit =
> partitionState.position.currentLeader.epoch.isPresent()
> ? partitionState.position.currentLeader.epoch
> : partitionState.position.offsetEpoch;
> }
>
> allConsumed.put(topicPartition, new OffsetAndMetadata(
> partitionState.position.offset,
> epochToCommit,
> ""));
> }
> });
> return allConsumed;
> } {code}
> This combined approach provides:
> * Prevention: Keep currentLeader fresh during normal operation
> * Defense: Use the best available epoch value at commit time
> * Resilience: Minimize the window where a stale epoch can cause issues
> ----
> h2. Additional Notes
> Why consumers don't log NOT_LEADER_OR_FOLLOWER errors:All consumer-side
> handling of NOT_LEADER_OR_FOLLOWER errors uses DEBUG level logging:
> {code:java}
> // FetchCollector.java line 325
> log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
>
> // AbstractFetch.java line 207
> log.debug("For {}, received error {}, with leaderIdAndEpoch {}", partition,
> partitionError, ...);
>
> // OffsetsForLeaderEpochUtils.java line 102
> LOG.debug("Attempt to fetch offsets for partition {} failed due to {},
> retrying.", ...); {code}
>
> This makes the issue difficult to diagnose in production environments.
> ----
> h2. Workarounds (Until Fixed)
> # Increase retention period to reduce likelihood of epoch deletion
> # Monitor consumer lag to ensure it stays low
> # Reduce rebalance frequency (increase max.poll.interval.ms,
> session.timeout.ms)
> # Use cooperative rebalance strategy to minimize rebalance impact
> # Consider using auto.offset.reset=latest if reprocessing is more costly
> than data loss (application-dependent)
> ----
> h2. Related Code References
> h3. 1. The problematic method: SubscriptionState.allConsumed()
> Location: org.apache.kafka.clients.consumer.internals.SubscriptionState
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
> Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
> assignment.forEach((topicPartition, partitionState) -> {
> if (partitionState.hasValidPosition())
> allConsumed.put(topicPartition, new
> OffsetAndMetadata(partitionState.position.offset,
> partitionState.position.offsetEpoch, "")); // Uses
> offsetEpoch instead of currentLeader.epoch
> });
> return allConsumed;
> } {code}
> h3. 2. How FetchPosition is updated during normal consumption
> Location: org.apache.kafka.clients.consumer.internals.FetchCollector
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
> SubscriptionState.FetchPosition nextPosition = new
> SubscriptionState.FetchPosition(
> nextInLineFetch.nextFetchOffset(),
> nextInLineFetch.lastEpoch(), // offsetEpoch: from consumed
> batch
> position.currentLeader); // currentLeader: inherited from
> old position, NOT updated!
> log.trace("Updating fetch position from {} to {} for partition {} and
> returning {} records from `poll()`",
> position, nextPosition, tp, partRecords.size());
> subscriptions.position(tp, nextPosition);
> positionAdvanced = true;
> } {code}
> Key Issue: The currentLeader field is inherited from the previous position
> and not automatically updated during normal consumption. It only gets updated
> when leader change errors are detected.
> h3. 3. How committed offsets are restored after rebalance
> Location:
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets()
> {code:java}
> public static void refreshCommittedOffsets(final Map<TopicPartition,
> OffsetAndMetadata> offsetsAndMetadata,
> final ConsumerMetadata metadata,
> final SubscriptionState
> subscriptions) {
> for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry :
> offsetsAndMetadata.entrySet()) {
> final TopicPartition tp = entry.getKey();
> final OffsetAndMetadata offsetAndMetadata = entry.getValue();
> if (offsetAndMetadata != null) {
> // first update the epoch if necessary
> entry.getValue().leaderEpoch().ifPresent(epoch ->
> metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
>
> // it's possible that the partition is no longer assigned when
> the response is received,
> // so we need to ignore seeking if that's the case
> if (subscriptions.isAssigned(tp)) {
> final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
> metadata.currentLeader(tp);
> final SubscriptionState.FetchPosition position = new
> SubscriptionState.FetchPosition(
> offsetAndMetadata.offset(),
> offsetAndMetadata.leaderEpoch(), // offsetEpoch from
> committed offset (may be old)
> leaderAndEpoch); // currentLeader
> from current metadata (may be new)
>
> subscriptions.seekUnvalidated(tp, position);
>
> log.info("Setting offset for partition {} to the committed
> offset {}", tp, position);
> }
> }
> }
> } {code}
> The Divergence Point: When restoring from committed offsets, offsetEpoch
> comes from the stored offset (potentially old), while currentLeader comes
> from fresh metadata (potentially new after leader change).
> h3. 4. How OffsetsForLeaderEpoch validation request is constructed
> Location:
> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.prepareRequest()
> {code:java}
> static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
> Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
> OffsetForLeaderTopicCollection topics = new
> OffsetForLeaderTopicCollection(requestData.size());
> requestData.forEach((topicPartition, fetchPosition) ->
> fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
> OffsetForLeaderTopic topic =
> topics.find(topicPartition.topic());
> if (topic == null) {
> topic = new
> OffsetForLeaderTopic().setTopic(topicPartition.topic());
> topics.add(topic);
> }
> topic.partitions().add(new OffsetForLeaderPartition()
> .setPartition(topicPartition.partition())
> .setLeaderEpoch(fetchEpoch) // Uses
> offsetEpoch for validation
>
> .setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
>
> .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
> );
> })
> );
> return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
> } {code}
> The Validation Problem: The validation request uses fetchEpoch (which is
> offsetEpoch) to validate against the broker. If this epoch no longer exists
> in the broker's log, validation fails and triggers OFFSET_OUT_OF_RANGE.
> h3. 5. FetchPosition class definition
> Location:
> org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition
>
> {code:java}
> /**
> * Represents the position of a partition subscription.
> *
> * This includes the offset and epoch from the last record in
> * the batch from a FetchResponse. It also includes the leader epoch at the
> time the batch was consumed.
> */
> public static class FetchPosition {
> public final long offset;
> final Optional<Integer> offsetEpoch; // Epoch from last consumed
> record's batch
> final Metadata.LeaderAndEpoch currentLeader; // Current partition leader
> info from metadata
>
> FetchPosition(long offset) {
> this(offset, Optional.empty(),
> Metadata.LeaderAndEpoch.noLeaderOrEpoch());
> }
>
> public FetchPosition(long offset, Optional<Integer> offsetEpoch,
> Metadata.LeaderAndEpoch currentLeader) {
> this.offset = offset;
> this.offsetEpoch = Objects.requireNonNull(offsetEpoch);
> this.currentLeader = Objects.requireNonNull(currentLeader);
> }
>
> @Override
> public String toString() {
> return "FetchPosition{" +
> "offset=" + offset +
> ", offsetEpoch=" + offsetEpoch +
> ", currentLeader=" + currentLeader +
> '}';
> }
> }{code}
> Class Design: The class contains both offsetEpoch (historical data epoch) and
> currentLeader.epoch (current metadata epoch), but allConsumed() only uses the
> former when committing.
> h1. Attachements logs:
> h2. group rebalance log
> !image-2025-11-21-18-03-24-592.png!
> h2. consumer client log
> !image-2025-11-21-20-00-10-862.png!
> h2. broker server log
> !image-2025-11-21-18-11-24-316.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)