satishd commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r814031095
########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -715,6 +738,87 @@ abstract class AbstractFetcherThread(name: String, } } + /** + * Handle a partition whose offset is out of range and return a new fetch offset. + */ + private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { + fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch, + (_, leaderLogStartOffset) => { + truncateFullyAndStartAt(topicPartition, leaderLogStartOffset) + leaderLogStartOffset + }, + // In this case, it will fetch from leader's log-start-offset like earlier instead of fetching from + // local-log-start-offset. This handles both the scenarios of whether tiered storage is enabled or not. + // If tiered storage is enabled, the next fetch result of fetching from log-start-offset may result in + // OffsetMovedToTieredStorage error and it will handle building the remote log state. + fetchFromLocalLogStartOffset = false) + } + + /** + * Handle the out of range error. Return false if + * 1) the request succeeded or + * 2) was fenced and this thread haven't received new epoch, + * which means we need not backoff and retry. True if there was a retriable error. + */ + private def handleOutOfRangeError(topicPartition: TopicPartition, + fetchState: PartitionFetchState, + requestEpoch: Optional[Integer]): Boolean = { + try { + val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch) + partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) + info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + + s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}") + false + } catch { + case _: FencedLeaderEpochException => + onPartitionFenced(topicPartition, requestEpoch) + + case e@(_: UnknownTopicOrPartitionException | + _: UnknownLeaderEpochException | + _: NotLeaderOrFollowerException) => + info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}") + true + + case e: Throwable => + error(s"Error getting offset for partition $topicPartition", e) + true + } + } + + /** + * Handle the offset out of range error or offset moved to tiered storage error. + * + * Return false if + * 1) it is able to build the required remote log auxiliary state or + * 2) was fenced and this thread haven't received new epoch, + * which means we need not backoff and retry. True if there was a retriable error. Review comment: Sure, I also updated the method to return true if it was able to handle it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org