satishd commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r814031095



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -715,6 +738,87 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
+  /**
+   * Handle a partition whose offset is out of range and return a new fetch 
offset.
+   */
+  private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: 
Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
+    fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, 
currentLeaderEpoch,
+      (_, leaderLogStartOffset) => {
+        truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)
+        leaderLogStartOffset
+      },
+      // In this case, it will fetch from leader's log-start-offset like 
earlier instead of fetching from
+      // local-log-start-offset. This handles both the scenarios of whether 
tiered storage is enabled or not.
+      // If tiered storage is enabled, the next fetch result of fetching from 
log-start-offset may result in
+      // OffsetMovedToTieredStorage error and it will handle building the 
remote log state.
+      fetchFromLocalLogStartOffset = false)
+  }
+
+  /**
+   * Handle the out of range error. Return false if
+   * 1) the request succeeded or
+   * 2) was fenced and this thread haven't received new epoch,
+   * which means we need not backoff and retry. True if there was a retriable 
error.
+   */
+  private def handleOutOfRangeError(topicPartition: TopicPartition,
+                                    fetchState: PartitionFetchState,
+                                    requestEpoch: Optional[Integer]): Boolean 
= {
+    try {
+      val newFetchState = fetchOffsetAndTruncate(topicPartition, 
fetchState.topicId, fetchState.currentLeaderEpoch)
+      partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
+      info(s"Current offset ${fetchState.fetchOffset} for partition 
$topicPartition is " +
+        s"out of range, which typically implies a leader change. Reset fetch 
offset to ${newFetchState.fetchOffset}")
+      false
+    } catch {
+      case _: FencedLeaderEpochException =>
+        onPartitionFenced(topicPartition, requestEpoch)
+
+      case e@(_: UnknownTopicOrPartitionException |
+              _: UnknownLeaderEpochException |
+              _: NotLeaderOrFollowerException) =>
+        info(s"Could not fetch offset for $topicPartition due to error: 
${e.getMessage}")
+        true
+
+      case e: Throwable =>
+        error(s"Error getting offset for partition $topicPartition", e)
+        true
+    }
+  }
+
+  /**
+   * Handle the offset out of range error or offset moved to tiered storage 
error.
+   *
+   * Return false if
+   * 1) it is able to build the required remote log auxiliary state or
+   * 2) was fenced and this thread haven't received new epoch,
+   * which means we need not backoff and retry. True if there was a retriable 
error.

Review comment:
       Sure, I also updated the method to return true if it was able to handle 
it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to