This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dedfed06f7a KAFKA-15510: Fix follower's lastFetchedEpoch when fetch 
response has … (#14457)
dedfed06f7a is described below

commit dedfed06f7a472424080456c997f5200c6bef196
Author: chern <chern...@gmail.com>
AuthorDate: Thu Sep 28 06:14:42 2023 -0700

    KAFKA-15510: Fix follower's lastFetchedEpoch when fetch response has … 
(#14457)
    
    When a fetch response has no record for a partition, validBytes is 0. We 
shouldn't set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala 
since there is no record and it is Optional.empty. We should use 
currentFetchState.lastFetchedEpoch instead.
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Viktor Somogyi-Vass 
<viktorsomo...@gmail.com>, Kamal 
Chandraprakash<kamal.chandraprak...@gmail.com>, Rajini Sivaram 
<rajinisiva...@googlemail.com>
---
 core/src/main/scala/kafka/server/AbstractFetcherThread.scala         | 5 +++--
 core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala | 4 +++-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 1dafb89ef0a..450fcfea461 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -364,10 +364,11 @@ abstract class AbstractFetcherThread(name: String,
 
                         // ReplicaDirAlterThread may have removed 
topicPartition from the partitionStates after processing the partition data
                         if ((validBytes > 0 || currentFetchState.lag.isEmpty) 
&& partitionStates.contains(topicPartition)) {
+                          val lastFetchedEpoch =
+                            if (logAppendInfo.lastLeaderEpoch.isPresent) 
logAppendInfo.lastLeaderEpoch.asScala else currentFetchState.lastFetchedEpoch
                           // Update partitionStates only if there is no 
exception during processPartitionData
                           val newFetchState = 
PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                            currentFetchState.currentLeaderEpoch, state = 
Fetching,
-                            logAppendInfo.lastLeaderEpoch.asScala)
+                            currentFetchState.currentLeaderEpoch, state = 
Fetching, lastFetchedEpoch)
                           partitionStates.updateAndMoveToEnd(topicPartition, 
newFetchState)
                           if (validBytes > 0) 
fetcherStats.byteRate.mark(validBytes)
                         }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index e58532622e3..6a0feaa6456 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -749,9 +749,10 @@ class ReplicaFetcherThreadTest {
     val log: UnifiedLog = mock(classOf[UnifiedLog])
     val partition: Partition = mock(classOf[Partition])
     val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+    val lastFetchedEpoch = 2
 
     when(log.highWatermark).thenReturn(0)
-    when(log.latestEpoch).thenReturn(Some(0))
+    when(log.latestEpoch).thenReturn(Some(lastFetchedEpoch))
     when(log.endOffsetForEpoch(0)).thenReturn(Some(new OffsetAndEpoch(0, 0)))
     when(log.logEndOffset).thenReturn(0)
     when(log.maybeUpdateHighWatermark(0)).thenReturn(None)
@@ -835,6 +836,7 @@ class ReplicaFetcherThreadTest {
 
     // Lag is set to Some(0).
     assertEquals(Some(0), thread.fetchState(t1p0).flatMap(_.lag))
+    assertEquals(Some(lastFetchedEpoch), 
thread.fetchState(t1p0).flatMap(_.lastFetchedEpoch))
   }
 
   @Test

Reply via email to