[ 
https://issues.apache.org/jira/browse/KAFKA-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525759#comment-16525759
 ] 

ASF GitHub Bot commented on KAFKA-7104:
---------------------------------------

apovzner closed pull request #5302: KAFKA-7104: Handle leader's log start 
offset beyond last fetched offset
URL: https://github.com/apache/kafka/pull/5302
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index e46473b69e9..55bc384fa53 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -100,8 +100,9 @@ class ReplicaAlterLogDirsThread(name: String,
 
     partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
     val futureReplicaHighWatermark = 
futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark)
+    val newLogStartOffset = 
futureReplica.logEndOffset.messageOffset.min(partitionData.logStartOffset)
     futureReplica.highWatermark = new 
LogOffsetMetadata(futureReplicaHighWatermark)
-    futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
+    futureReplica.maybeIncrementLogStartOffset(newLogStartOffset)
 
     if (partition.maybeReplaceCurrentWithFutureReplica())
       removePartitions(Set(topicPartition))
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index ce6e350d0bd..eb906c1f133 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -118,12 +118,15 @@ class ReplicaFetcherThread(name: String,
       trace("Follower has replica log end offset %d after appending %d bytes 
of messages for partition %s"
         .format(replica.logEndOffset.messageOffset, records.sizeInBytes, 
topicPartition))
     val followerHighWatermark = 
replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
-    val leaderLogStartOffset = partitionData.logStartOffset
+    // the fetch response from the leader may contain log start offset beyond 
end offset of the
+    // fetched records, because log start offset may change on the leader 
while the fetch
+    // response is being built. We don't move log start offset beyond LEO on 
the follower,
+    val newLogStartOffset = 
replica.logEndOffset.messageOffset.min(partitionData.logStartOffset)
     // for the follower replica, we do not need to keep
     // its segment base offset the physical position,
     // these values will be computed upon making the leader
     replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
-    replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
+    replica.maybeIncrementLogStartOffset(newLogStartOffset)
     if (isTraceEnabled)
       trace(s"Follower set replica high watermark for partition 
$topicPartition to $followerHighWatermark")
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReplicaFetcher thread may die because of inconsistent log start offset in 
> fetch response
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7104
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7104
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.0.0, 1.1.0
>            Reporter: Anna Povzner
>            Assignee: Anna Povzner
>            Priority: Major
>
> What we saw:
> The follower fetches offset 116617, which it was able successfully append. 
> However, leader's log start offset in fetch request was 116753, which was 
> higher than fetched offset 116617. When replica fetcher thread tried to 
> increment log start offset to leader's log start offset, it failed with 
> OffsetOutOfRangeException: 
> [2018-06-23 00:45:37,409] ERROR  Error due to 
> (kafka.server.ReplicaFetcherThread) 
>  kafka.common.KafkaException: Error processing data for partition X-N offset 
> 116617 
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 116753 of partition X-N since it is larger 
> than the high watermark 116619
>  
> In leader's log, we see that log start offset was incremented almost at the 
> same time (within one 100 ms or so). 
> [2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N 
> to 116753
>  
> In leader's logic: ReplicaManager#ReplicaManager first calls 
> readFromLocalLog() that reads from local log and returns LogReadResult that 
> contains fetched data and leader's log start offset and HW. However, it then 
> calls ReplicaManager#updateFollowerLogReadResults() which may move leader's 
> log start offset and update leader's log start offset and HW in fetch 
> response. If deleteRecords() happens in between, it is possible that log 
> start offset may move beyond fetched offset. Or possibly, the leader moves 
> log start offset because of deleting old log segments. Basically, the issue 
> is that log start offset can move between records are read from the log and 
> LogReadResult is updated with new log start offset. As a result, fetch 
> response may contain fetched data but leader's log start offset in the 
> response could be set beyond fetched offset (and indicate the state on leader 
> that fetched data does not actually exist anymore on leader). 
> When a follower receives such fetch response, it will first append, then move 
> it's HW no further than its LEO, which maybe less than leader's log start 
> offset in fetch response, and then call 
> `replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw 
> OffsetOutOfRangeException exception causing the fetcher thread to stop. 
> Note that this can happen if the follower is not in ISR, otherwise the leader 
> will not move its log start offsets beyond follower's HW. 
>  
> *Suggested fix:*
> 1) Since ReplicaFetcher bounds follower's HW to follower's LEO, we should 
> also bound follower's log start offset to its LEO. In this situation, the 
> follower's log start offset will be updated to LEO.
> 2) In addition to #1, we could try to make sure that leader builds fetch 
> response based on the state of the log as of time of reading data from 
> replica (but including moving leader's HW based on the follower's fetch). 
> That could be another JIRA potentially, since the fix could be more involved.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to