junrao commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r772110800
########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -715,6 +726,57 @@ abstract class AbstractFetcherThread(name: String, } } + /** + * Handle a partition whose offset is out of range and return a new fetch offset. + */ + protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { + fetchOffsetAndApplyFun(topicPartition, topicId, currentLeaderEpoch, + leaderLogStartOffset => truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)) + } + + /** + * Handle a partition whose offset is moved to tiered storage and return a new fetch offset. + */ + protected def fetchOffsetAndBuildRemoteLogAuxState(topicPartition: TopicPartition, topicId: Option[Uuid], + currentLeaderEpoch: Int, + leaderLogStartOffset: Long): PartitionFetchState = { + fetchOffsetAndApplyFun(topicPartition, topicId, currentLeaderEpoch, + leaderLocalLogStartOffset => + buildRemoteLogAuxState(topicPartition, currentLeaderEpoch, leaderLocalLogStartOffset, leaderLogStartOffset)) + } + + /** + * Handle the offset moved to tiered storage error. + * + * Return false if + * 1) it is able to build the required remote log auxiliary state or + * 2) was fenced and this thread haven't received new epoch, + * which means we need not backoff and retry. True if there was a retriable error. + */ + private def handleOffsetMovedToTieredStorage(topicPartition: TopicPartition, Review comment: The return value is not very intuitive. I'd expect a true return value to indicate that the request is handled successfully. ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -386,11 +396,76 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + override protected def buildRemoteLogAuxState(partition: TopicPartition, Review comment: Could we add a comment on what this method does? ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -386,11 +396,76 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + override protected def buildRemoteLogAuxState(partition: TopicPartition, + currentLeaderEpoch: Int, + leaderLocalLogStartOffset: Long, + leaderLogStartOffset: Long): Unit = { + replicaMgr.localLog(partition).foreach(log => + if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) { + replicaMgr.remoteLogManager.foreach(rlm => { Review comment: Change from ` replicaMgr.remoteLogManager.foreach(rlm => { } ) ` to ` replicaMgr.remoteLogManager.foreach{rlm => } ` Ditto in a few other places. ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -386,11 +396,76 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + override protected def buildRemoteLogAuxState(partition: TopicPartition, + currentLeaderEpoch: Int, + leaderLocalLogStartOffset: Long, + leaderLogStartOffset: Long): Unit = { + replicaMgr.localLog(partition).foreach(log => + if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) { + replicaMgr.remoteLogManager.foreach(rlm => { + var rlsMetadata: Optional[RemoteLogSegmentMetadata] = Optional.empty() + val epoch = log.leaderEpochCache.flatMap(cache => cache.epochForOffset(leaderLocalLogStartOffset)) + if (epoch.isDefined) { + rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, epoch.get, leaderLocalLogStartOffset) + } else { + // If epoch is not available, then it might be possible that this broker might lost its entire local storage. + // We may also have to build the leader epoch cache. To find out the remote log segment metadata for the + // leaderLocalLogStartOffset-1, start from the current leader epoch and subtract one to the epoch till + // finding the metadata. + var previousLeaderEpoch = currentLeaderEpoch + while (!rlsMetadata.isPresent && previousLeaderEpoch >= 0) { + rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, previousLeaderEpoch, leaderLocalLogStartOffset - 1) + previousLeaderEpoch -= 1 + } + } + if (rlsMetadata.isPresent) { + val epochStream = rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.LEADER_EPOCH) + val epochs = readLeaderEpochCheckpoint(epochStream, log.dir) + + // Truncate the existing local log before restoring the leader epoch cache and producer snapshots. + truncateFullyAndStartAt(partition, leaderLocalLogStartOffset) + + log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented) + epochs.foreach(epochEntry => { + log.leaderEpochCache.map(cache => cache.assign(epochEntry.epoch, epochEntry.startOffset)) + }) + info(s"Updated the epoch cache from remote tier till offset: $leaderLocalLogStartOffset " + + s"with size: ${epochs.size} for $partition") + + // Restore producer snapshot + val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir, leaderLocalLogStartOffset) + Files.copy(rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT), + snapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING) + log.producerStateManager.reloadSnapshots() + log.loadProducerState(leaderLocalLogStartOffset, reloadFromCleanShutdown = false) + info(s"Built the leader epoch cache and producer snapshots from remote tier for $partition. " + + s"Active producers: ${log.producerStateManager.activeProducers.size}, LeaderLogStartOffset: $leaderLogStartOffset") + } else { + throw new RemoteStorageException(s"Couldn't build the state from remote store for partition: $partition, " + + s"currentLeaderEpoch: $currentLeaderEpoch, leaderLocalLogStartOffset: $leaderLocalLogStartOffset, " + + s"leaderLogStartOffset: $leaderLogStartOffset, epoch: $epoch as the previous remote log segment " + + s"metadata was not found") + } + }) + } + ) + } + + private def readLeaderEpochCheckpoint(stream: InputStream, + dir: File): List[EpochEntry] = { + val tmpFile = new File(dir, "leader-epoch-checkpoint" + TmpFileSuffix) + Files.copy(stream, tmpFile.toPath) + val epochEntries = new LeaderEpochCheckpointFile(tmpFile).checkpoint.read().toList Review comment: It's awkward to stream into a temp file only to load it back in memory. ########## File path: clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC; +import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; +import static org.apache.kafka.common.record.Records.MAGIC_OFFSET; +import static org.apache.kafka.common.record.Records.OFFSET_OFFSET; +import static org.apache.kafka.common.record.Records.SIZE_OFFSET; + +public class RemoteLogInputStream implements LogInputStream<RecordBatch> { + private final InputStream is; + private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC); + + public RemoteLogInputStream(InputStream is) { + this.is = is; + } + + @Override + public RecordBatch nextBatch() throws IOException { + logHeaderBuffer.rewind(); + Utils.readFully(is, logHeaderBuffer); + + if (logHeaderBuffer.position() < HEADER_SIZE_UP_TO_MAGIC) + return null; + + logHeaderBuffer.rewind(); + logHeaderBuffer.getLong(OFFSET_OFFSET); + int size = logHeaderBuffer.getInt(SIZE_OFFSET); + + // V0 has the smallest overhead, stricter checking is done later + if (size < LegacyRecord.RECORD_OVERHEAD_V0) + throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " + + "overhead (%d).", size, LegacyRecord.RECORD_OVERHEAD_V0)); + + byte magic = logHeaderBuffer.get(MAGIC_OFFSET); + ByteBuffer buffer = ByteBuffer.allocate(size + LOG_OVERHEAD); + buffer.put(logHeaderBuffer); +// System.arraycopy(logHeaderBuffer.array(), 0, buffer.array(), 0, logHeaderBuffer.limit()); Review comment: Should we remove this? ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -386,11 +396,76 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + override protected def buildRemoteLogAuxState(partition: TopicPartition, + currentLeaderEpoch: Int, + leaderLocalLogStartOffset: Long, + leaderLogStartOffset: Long): Unit = { + replicaMgr.localLog(partition).foreach(log => + if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) { + replicaMgr.remoteLogManager.foreach(rlm => { + var rlsMetadata: Optional[RemoteLogSegmentMetadata] = Optional.empty() + val epoch = log.leaderEpochCache.flatMap(cache => cache.epochForOffset(leaderLocalLogStartOffset)) + if (epoch.isDefined) { + rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, epoch.get, leaderLocalLogStartOffset) + } else { + // If epoch is not available, then it might be possible that this broker might lost its entire local storage. + // We may also have to build the leader epoch cache. To find out the remote log segment metadata for the + // leaderLocalLogStartOffset-1, start from the current leader epoch and subtract one to the epoch till + // finding the metadata. + var previousLeaderEpoch = currentLeaderEpoch + while (!rlsMetadata.isPresent && previousLeaderEpoch >= 0) { + rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, previousLeaderEpoch, leaderLocalLogStartOffset - 1) Review comment: This is kind of awkward and inefficient. Could we add a better API to avoid the trial and error approach? For example, we could have an API that waits for the remote log segment metadata up to offset leaderLocalLogStartOffset - 1 becoming available with leader epoch less than or equal to currentLeaderEpoch. ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -386,11 +396,76 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + override protected def buildRemoteLogAuxState(partition: TopicPartition, + currentLeaderEpoch: Int, + leaderLocalLogStartOffset: Long, + leaderLogStartOffset: Long): Unit = { + replicaMgr.localLog(partition).foreach(log => + if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) { + replicaMgr.remoteLogManager.foreach(rlm => { + var rlsMetadata: Optional[RemoteLogSegmentMetadata] = Optional.empty() + val epoch = log.leaderEpochCache.flatMap(cache => cache.epochForOffset(leaderLocalLogStartOffset)) + if (epoch.isDefined) { + rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, epoch.get, leaderLocalLogStartOffset) Review comment: Hmm, the remote data could end at leaderLocalLogStartOffset - 1. In that case, we won't find a corresponding rlsMetadata. ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -386,11 +396,76 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + override protected def buildRemoteLogAuxState(partition: TopicPartition, + currentLeaderEpoch: Int, + leaderLocalLogStartOffset: Long, + leaderLogStartOffset: Long): Unit = { + replicaMgr.localLog(partition).foreach(log => + if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) { Review comment: In the else case, should we truncate all local log? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -396,7 +399,12 @@ abstract class AbstractFetcherThread(name: String, case Errors.OFFSET_OUT_OF_RANGE => if (handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch)) partitionsWithError += topicPartition - + case Errors.OFFSET_MOVED_TO_TIERED_STORAGE => + // No need to retry this as it indicates that the requested offset is moved to tiered storage. + // Check whether topicId is available here. Review comment: Was this comment addressed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org