This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new dedfed06f7a KAFKA-15510: Fix follower's lastFetchedEpoch when fetch response has … (#14457) dedfed06f7a is described below commit dedfed06f7a472424080456c997f5200c6bef196 Author: chern <chern...@gmail.com> AuthorDate: Thu Sep 28 06:14:42 2023 -0700 KAFKA-15510: Fix follower's lastFetchedEpoch when fetch response has … (#14457) When a fetch response has no record for a partition, validBytes is 0. We shouldn't set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala since there is no record and it is Optional.empty. We should use currentFetchState.lastFetchedEpoch instead. Reviewers: Divij Vaidya <di...@amazon.com>, Viktor Somogyi-Vass <viktorsomo...@gmail.com>, Kamal Chandraprakash<kamal.chandraprak...@gmail.com>, Rajini Sivaram <rajinisiva...@googlemail.com> --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 5 +++-- core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 1dafb89ef0a..450fcfea461 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -364,10 +364,11 @@ abstract class AbstractFetcherThread(name: String, // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if ((validBytes > 0 || currentFetchState.lag.isEmpty) && partitionStates.contains(topicPartition)) { + val lastFetchedEpoch = + if (logAppendInfo.lastLeaderEpoch.isPresent) logAppendInfo.lastLeaderEpoch.asScala else currentFetchState.lastFetchedEpoch // Update partitionStates only if there is no exception during processPartitionData val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag), - currentFetchState.currentLeaderEpoch, state = Fetching, - logAppendInfo.lastLeaderEpoch.asScala) + currentFetchState.currentLeaderEpoch, state = Fetching, lastFetchedEpoch) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) if (validBytes > 0) fetcherStats.byteRate.mark(validBytes) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index e58532622e3..6a0feaa6456 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -749,9 +749,10 @@ class ReplicaFetcherThreadTest { val log: UnifiedLog = mock(classOf[UnifiedLog]) val partition: Partition = mock(classOf[Partition]) val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + val lastFetchedEpoch = 2 when(log.highWatermark).thenReturn(0) - when(log.latestEpoch).thenReturn(Some(0)) + when(log.latestEpoch).thenReturn(Some(lastFetchedEpoch)) when(log.endOffsetForEpoch(0)).thenReturn(Some(new OffsetAndEpoch(0, 0))) when(log.logEndOffset).thenReturn(0) when(log.maybeUpdateHighWatermark(0)).thenReturn(None) @@ -835,6 +836,7 @@ class ReplicaFetcherThreadTest { // Lag is set to Some(0). assertEquals(Some(0), thread.fetchState(t1p0).flatMap(_.lag)) + assertEquals(Some(lastFetchedEpoch), thread.fetchState(t1p0).flatMap(_.lastFetchedEpoch)) } @Test