kowshik commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r814640948
########## File path: core/src/main/scala/kafka/log/BaseIndex.scala ########## @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.{Closeable, File} +import java.nio.file.Path + +import org.apache.kafka.common.utils.Utils + +/** + * This class represents a common abstraction for operations like delete and rename of the index files. Review comment: > This class represents a common abstraction for operations like delete and rename of the index files. This class is slim in functionality, and I don't feel there is any real benefit for introducing this. It is not clear to me going forward what operations can be included in this class, and which ones can't be. Are you planning to add new functionality in the future into this class, that, you want to introduce it in this PR? If not, I feel that the earlier design without this base class was simpler. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -623,40 +660,25 @@ abstract class AbstractFetcherThread(name: String, } /** - * Handle the out of range error. Return false if - * 1) the request succeeded or - * 2) was fenced and this thread haven't received new epoch, - * which means we need not backoff and retry. True if there was a retriable error. - */ - private def handleOutOfRangeError(topicPartition: TopicPartition, - fetchState: PartitionFetchState, - requestEpoch: Optional[Integer]): Boolean = { - try { - val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch) - partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) - info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + - s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}") - false - } catch { - case _: FencedLeaderEpochException => - onPartitionFenced(topicPartition, requestEpoch) - - case e @ (_ : UnknownTopicOrPartitionException | - _ : UnknownLeaderEpochException | - _ : NotLeaderOrFollowerException) => - info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}") - true - - case e: Throwable => - error(s"Error getting offset for partition $topicPartition", e) - true - } - } - - /** - * Handle a partition whose offset is out of range and return a new fetch offset. + * It returns the next fetch state. It fetches the log-start-offset or local-log-start-offset based on + * `fetchFromLocalLogStartOffset` flag. This is used in truncation by passing it to the given `truncateAndBuild` + * function. + * + * @param topicPartition topic partition + * @param topicId topic id + * @param currentLeaderEpoch current leader epoch maintained by this follower replica. + * @param truncateAndBuild Function to truncate for the given epoch and offset. It returns the next fetch offset value. + * @param fetchFromLocalLogStartOffset Whether to fetch from local-log-start-offset or log-start-offset. If true, it + * requests the local-log-start-offset from the leader, else it requests + * log-start-offset from the leader. This is used in sending the value to the + * given `truncateAndBuild` function. + * @return */ - protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { + private def fetchOffsetAndApplyTruncateAndBuild(topicPartition: TopicPartition, Review comment: This method `fetchOffsetAndApplyTruncateAndBuild` is currently doing a number of things, which is clear from the method name. It will be hard to cover all test cases in unit test. So, it is better if its simplified. ########## File path: core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala ########## @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.log._ +import kafka.log.remote.RemoteIndexCache.DirName +import kafka.utils.{CoreUtils, Logging, ShutdownableThread} +import org.apache.kafka.common.errors.CorruptRecordException +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType +import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{File, InputStream} +import java.nio.file.{Files, Path} +import java.util +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +object RemoteIndexCache { + val DirName = "remote-log-index-cache" + val TmpFileSuffix = ".tmp" +} + +class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) { + private val markedForCleanup = new AtomicBoolean(false) + + def lookupOffset(targetOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is marked for cleanup") + else offsetIndex.lookup(targetOffset) + } + + def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = { + if (markedForCleanup.get()) throw new IllegalStateException("This entry is marked for cleanup") + + val timestampOffset = timeIndex.lookup(timestamp) + offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset)) + } + + def markForCleanup(): Unit = { + if (!markedForCleanup.getAndSet(true)) { + Array(offsetIndex, timeIndex, txnIndex).foreach(x => Review comment: Lets rename `x` to something more readable, ex: `index`. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -715,6 +745,102 @@ abstract class AbstractFetcherThread(name: String, } } + /** + * Handle a partition whose offset is out of range and return a new fetch offset. + */ + private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { + fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch, + (_, leaderLogStartOffset) => { + truncateFullyAndStartAt(topicPartition, leaderLogStartOffset) + leaderLogStartOffset + }, + // In this case, it will fetch from leader's log-start-offset like earlier instead of fetching from + // local-log-start-offset. This handles both the scenarios of whether tiered storage is enabled or not. + // If tiered storage is enabled, the next fetch result of fetching from log-start-offset may result in + // OffsetMovedToTieredStorage error and it will handle building the remote log state. + fetchFromLocalLogStartOffset = false) + } + + /** + * Handles the out of range error for the given topic partition. + * + * Returns true if + * - the request succeeded or + * - it was fenced and this thread haven't received new epoch, which means we need not backoff and retry as the + * partition is moved to failed state. + * + * Returns false if there was a retriable error. + * + * @param topicPartition topic partition + * @param fetchState current fetch state + * @param leaderEpochInRequest current leader epoch sent in the fetch request. + */ + private def handleOutOfRangeError(topicPartition: TopicPartition, + fetchState: PartitionFetchState, + leaderEpochInRequest: Optional[Integer]): Boolean = { + try { + val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch) + partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) + info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + + s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}") + true + } catch { + case _: FencedLeaderEpochException => + onPartitionFenced(topicPartition, leaderEpochInRequest) + + case e@(_: UnknownTopicOrPartitionException | + _: UnknownLeaderEpochException | + _: NotLeaderOrFollowerException) => + info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}") + false + + case e: Throwable => + error(s"Error getting offset for partition $topicPartition", e) + false + } + } + + /** + * Handles the offset moved to tiered storage error for the given topic partition. + * + * Returns true if + * - the request succeeded or + * - it was fenced and this thread haven't received new epoch, which means we need not backoff and retry as the + * partition is moved to failed state. + * + * Returns false if there was a retriable error. + * + * @param topicPartition topic partition + * @param fetchState current partition fetch state. + * @param leaderEpochInRequest current leader epoch sent in the fetch request. + * @param leaderLogStartOffset log-start-offset in the leader replica. + */ + private def handleOffsetsMovedToTieredStorage(topicPartition: TopicPartition, + fetchState: PartitionFetchState, + leaderEpochInRequest: Optional[Integer], + leaderLogStartOffset: Long): Boolean = { + try { + val newFetchState = fetchOffsetAndApplyTruncateAndBuild(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch, + (offsetEpoch, leaderLocalLogStartOffset) => buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset)) Review comment: Here to build the remote log aux state we only need the leader local log start offset, right? In such a case, I think it gets complicated if we try to repurpose `fetchOffsetAndApplyTruncateAndBuild` here. Can we just introduce a separate method that would just attempt to get the leader's local log start offset, and pass it into `buildRemoteLogAuxState`? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -394,9 +427,13 @@ abstract class AbstractFetcherThread(name: String, markPartitionFailed(topicPartition) } case Errors.OFFSET_OUT_OF_RANGE => - if (handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch)) + if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch)) + partitionsWithError += topicPartition + case Errors.OFFSET_MOVED_TO_TIERED_STORAGE => + debug(s"Received error related to offset moved to tiered storage, fetch offset: ${currentFetchState.fetchOffset}") Review comment: s/related to offset moved to tiered storage/OFFSET_MOVED_TO_TIERED_STORAGE Can we also log the topicPartition? ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -386,11 +397,143 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + override protected def buildRemoteLogAuxState(partition: TopicPartition, Review comment: This method is doing a lot of things, and it is worthwhile thinking about how to simplify it. In its current form, it could be hard to test it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org