[ https://issues.apache.org/jira/browse/KAFKA-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525893#comment-16525893 ]
ASF GitHub Bot commented on KAFKA-7104: --------------------------------------- ijuma closed pull request #5305: KAFKA-7104: Consistent leader's state in fetch response URL: https://github.com/apache/kafka/pull/5305 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ed9559f856a..9658f1a33a5 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -90,11 +90,6 @@ case class LogReadResult(info: FetchDataInfo, case Some(e) => Errors.forException(e) } - def updateLeaderReplicaInfo(leaderReplica: Replica): LogReadResult = - copy(highWatermark = leaderReplica.highWatermark.messageOffset, - leaderLogStartOffset = leaderReplica.logStartOffset, - leaderLogEndOffset = leaderReplica.logEndOffset.messageOffset) - def withEmptyFetchInfo: LogReadResult = copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)) @@ -1340,7 +1335,12 @@ class ReplicaManager(val config: KafkaConfig, /** * Update the follower's fetch state in the leader based on the last fetch request and update `readResult`, - * if necessary. + * if the follower replica is not recognized to be one of the assigned replicas. Do not update + * `readResult` otherwise, so that log start/end offset and high watermark is consistent with + * records in fetch response. Log start/end offset and high watermark may change not only due to + * this fetch request, e.g., rolling new log segment and removing old log segment may move log + * start offset further than the last offset in the fetched records. The followers will get the + * updated leader's state in the next fetch response. */ private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = { @@ -1351,10 +1351,7 @@ class ReplicaManager(val config: KafkaConfig, case Some(partition) => partition.getReplica(replicaId) match { case Some(replica) => - if (partition.updateReplicaLogReadResult(replica, readResult)) - partition.leaderReplicaIfLocal.foreach { leaderReplica => - updatedReadResult = readResult.updateLeaderReplicaInfo(leaderReplica) - } + partition.updateReplicaLogReadResult(replica, readResult) case None => warn(s"Leader $localBrokerId failed to record follower $replicaId's position " + s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " + diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index ce8868861f7..56d4b7919e4 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -463,7 +463,9 @@ class ReplicaManagerTest { val tp0Status = responseStatusMap.get(tp0) assertTrue(tp0Status.isDefined) - assertEquals(1, tp0Status.get.highWatermark) + // the response contains high watermark on the leader before it is updated based + // on this fetch request + assertEquals(0, tp0Status.get.highWatermark) assertEquals(None, tp0Status.get.lastStableOffset) assertEquals(Errors.NONE, tp0Status.get.error) assertTrue(tp0Status.get.records.batches.iterator.hasNext) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ReplicaFetcher thread may die because of inconsistent log start offset in > fetch response > ---------------------------------------------------------------------------------------- > > Key: KAFKA-7104 > URL: https://issues.apache.org/jira/browse/KAFKA-7104 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.0.0, 1.1.0 > Reporter: Anna Povzner > Assignee: Anna Povzner > Priority: Major > > What we saw: > The follower fetches offset 116617, which it was able successfully append. > However, leader's log start offset in fetch request was 116753, which was > higher than fetched offset 116617. When replica fetcher thread tried to > increment log start offset to leader's log start offset, it failed with > OffsetOutOfRangeException: > [2018-06-23 00:45:37,409] ERROR Error due to > (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition X-N offset > 116617 > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 116753 of partition X-N since it is larger > than the high watermark 116619 > > In leader's log, we see that log start offset was incremented almost at the > same time (within one 100 ms or so). > [2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N > to 116753 > > In leader's logic: ReplicaManager#ReplicaManager first calls > readFromLocalLog() that reads from local log and returns LogReadResult that > contains fetched data and leader's log start offset and HW. However, it then > calls ReplicaManager#updateFollowerLogReadResults() which may move leader's > log start offset and update leader's log start offset and HW in fetch > response. If deleteRecords() happens in between, it is possible that log > start offset may move beyond fetched offset. Or possibly, the leader moves > log start offset because of deleting old log segments. Basically, the issue > is that log start offset can move between records are read from the log and > LogReadResult is updated with new log start offset. As a result, fetch > response may contain fetched data but leader's log start offset in the > response could be set beyond fetched offset (and indicate the state on leader > that fetched data does not actually exist anymore on leader). > When a follower receives such fetch response, it will first append, then move > it's HW no further than its LEO, which maybe less than leader's log start > offset in fetch response, and then call > `replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw > OffsetOutOfRangeException exception causing the fetcher thread to stop. > Note that this can happen if the follower is not in ISR, otherwise the leader > will not move its log start offsets beyond follower's HW. > > *Suggested fix:* > 1) Since ReplicaFetcher bounds follower's HW to follower's LEO, we should > also bound follower's log start offset to its LEO. In this situation, the > follower's log start offset will be updated to LEO. > 2) In addition to #1, we could try to make sure that leader builds fetch > response based on the state of the log as of time of reading data from > replica (but including moving leader's HW based on the follower's fetch). > That could be another JIRA potentially, since the fix could be more involved. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)