This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push: new 047608f KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) 047608f is described below commit 047608fe5e6631139d890ce1ca045052daa0e43c Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Wed Jul 28 17:27:26 2021 +0100 KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) Reviewers: Jason Gustafson <ja...@confluent.io> --- .../main/scala/kafka/server/ReplicaManager.scala | 7 ++++++- .../unit/kafka/server/ReplicaManagerTest.scala | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6837d81..f03571c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1750,7 +1750,8 @@ class ReplicaManager(val config: KafkaConfig, * records in fetch response. Log start/end offset and high watermark may change not only due to * this fetch request, e.g., rolling new log segment and removing old log segment may move log * start offset further than the last offset in the fetched records. The followers will get the - * updated leader's state in the next fetch response. + * updated leader's state in the next fetch response. If follower has a diverging epoch or if read + * fails with any error, follower fetch state is not updated. */ private def updateFollowerFetchState(followerId: Int, readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = { @@ -1759,6 +1760,10 @@ class ReplicaManager(val config: KafkaConfig, debug(s"Skipping update of fetch state for follower $followerId since the " + s"log read returned error ${readResult.error}") readResult + } else if (readResult.divergingEpoch.nonEmpty) { + debug(s"Skipping update of fetch state for follower $followerId since the " + + s"log read returned diverging epoch ${readResult.divergingEpoch}") + readResult } else { onlinePartition(topicPartition) match { case Some(partition) => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 5e2563e..fd6ba75 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -698,6 +698,28 @@ class ReplicaManagerTest { assertEquals(0L, followerReplica.logStartOffset) assertEquals(0L, followerReplica.logEndOffset) + // Next we receive an invalid request with a higher fetch offset, but a diverging epoch. + // We expect that the replica state does not get updated. + val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, maxFetchBytes, + Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1)) + + replicaManager.fetchMessages( + timeout = 0L, + replicaId = 1, + fetchMinBytes = 1, + fetchMaxBytes = maxFetchBytes, + hardMaxBytesLimit = false, + fetchInfos = Seq(tp -> divergingFetchPartitionData), + quota = UnboundedQuota, + isolationLevel = IsolationLevel.READ_UNCOMMITTED, + responseCallback = callback, + clientMetadata = None + ) + + assertTrue(successfulFetch.isDefined) + assertEquals(0L, followerReplica.logStartOffset) + assertEquals(0L, followerReplica.logEndOffset) + } finally { replicaManager.shutdown(checkpointHW = false) }