kowshik commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r814640948



##########
File path: core/src/main/scala/kafka/log/BaseIndex.scala
##########
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{Closeable, File}
+import java.nio.file.Path
+
+import org.apache.kafka.common.utils.Utils
+
+/**
+ * This class represents a common abstraction for operations like delete and 
rename of the index files.

Review comment:
       > This class represents a common abstraction for operations like delete 
and rename of the index files.
   
   This class is slim in functionality, and I don't feel there is any real 
benefit for introducing this.
   It is not clear to me going forward what operations can be included in this 
class, and which ones can't be.
   Are you planning to add new functionality in the future into this class, 
that, you want to introduce it in this PR?
   If not, I feel that the earlier design without this base class was simpler.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -623,40 +660,25 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   /**
-   * Handle the out of range error. Return false if
-   * 1) the request succeeded or
-   * 2) was fenced and this thread haven't received new epoch,
-   * which means we need not backoff and retry. True if there was a retriable 
error.
-   */
-  private def handleOutOfRangeError(topicPartition: TopicPartition,
-                                    fetchState: PartitionFetchState,
-                                    requestEpoch: Optional[Integer]): Boolean 
= {
-    try {
-      val newFetchState = fetchOffsetAndTruncate(topicPartition, 
fetchState.topicId, fetchState.currentLeaderEpoch)
-      partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-      info(s"Current offset ${fetchState.fetchOffset} for partition 
$topicPartition is " +
-        s"out of range, which typically implies a leader change. Reset fetch 
offset to ${newFetchState.fetchOffset}")
-      false
-    } catch {
-      case _: FencedLeaderEpochException =>
-        onPartitionFenced(topicPartition, requestEpoch)
-
-      case e @ (_ : UnknownTopicOrPartitionException |
-                _ : UnknownLeaderEpochException |
-                _ : NotLeaderOrFollowerException) =>
-        info(s"Could not fetch offset for $topicPartition due to error: 
${e.getMessage}")
-        true
-
-      case e: Throwable =>
-        error(s"Error getting offset for partition $topicPartition", e)
-        true
-    }
-  }
-
-  /**
-   * Handle a partition whose offset is out of range and return a new fetch 
offset.
+   * It returns the next fetch state. It fetches the  log-start-offset or 
local-log-start-offset based on
+   * `fetchFromLocalLogStartOffset` flag. This is used in truncation by 
passing it to the given `truncateAndBuild`
+   * function.
+   *
+   * @param topicPartition               topic partition
+   * @param topicId                      topic id
+   * @param currentLeaderEpoch           current leader epoch maintained by 
this follower replica.
+   * @param truncateAndBuild             Function to truncate for the given 
epoch and offset. It returns the next fetch offset value.
+   * @param fetchFromLocalLogStartOffset Whether to fetch from 
local-log-start-offset or log-start-offset. If true, it
+   *                                     requests the local-log-start-offset 
from the leader, else it requests
+   *                                     log-start-offset from the leader. 
This is used in sending the value to the
+   *                                     given `truncateAndBuild` function.
+   * @return
    */
-  protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, 
topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
+  private def fetchOffsetAndApplyTruncateAndBuild(topicPartition: 
TopicPartition,

Review comment:
       This method `fetchOffsetAndApplyTruncateAndBuild` is currently doing a 
number of things, which is clear from the method name. It will be hard to cover 
all test cases in unit test. So, it is better if its simplified.
   

##########
File path: core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
##########
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.log._
+import kafka.log.remote.RemoteIndexCache.DirName
+import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
+import org.apache.kafka.common.errors.CorruptRecordException
+import org.apache.kafka.common.utils.Utils
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
+import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, 
RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{File, InputStream}
+import java.nio.file.{Files, Path}
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+
+object RemoteIndexCache {
+  val DirName = "remote-log-index-cache"
+  val TmpFileSuffix = ".tmp"
+}
+
+class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val 
txnIndex: TransactionIndex) {
+  private val markedForCleanup = new AtomicBoolean(false)
+
+  def lookupOffset(targetOffset: Long): OffsetPosition = {
+    if (markedForCleanup.get()) throw new IllegalStateException("This entry is 
marked for cleanup")
+    else offsetIndex.lookup(targetOffset)
+  }
+
+  def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = 
{
+    if (markedForCleanup.get()) throw new IllegalStateException("This entry is 
marked for cleanup")
+
+    val timestampOffset = timeIndex.lookup(timestamp)
+    offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset))
+  }
+
+  def markForCleanup(): Unit = {
+    if (!markedForCleanup.getAndSet(true)) {
+      Array(offsetIndex, timeIndex, txnIndex).foreach(x =>

Review comment:
       Lets rename `x` to something more readable, ex: `index`.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -715,6 +745,102 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
+  /**
+   * Handle a partition whose offset is out of range and return a new fetch 
offset.
+   */
+  private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: 
Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
+    fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, 
currentLeaderEpoch,
+      (_, leaderLogStartOffset) => {
+        truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)
+        leaderLogStartOffset
+      },
+      // In this case, it will fetch from leader's log-start-offset like 
earlier instead of fetching from
+      // local-log-start-offset. This handles both the scenarios of whether 
tiered storage is enabled or not.
+      // If tiered storage is enabled, the next fetch result of fetching from 
log-start-offset may result in
+      // OffsetMovedToTieredStorage error and it will handle building the 
remote log state.
+      fetchFromLocalLogStartOffset = false)
+  }
+
+  /**
+   * Handles the out of range error for the given topic partition.
+   *
+   * Returns true if
+   *    - the request succeeded or
+   *    - it was fenced and this thread haven't received new epoch, which 
means we need not backoff and retry as the
+   *    partition is moved to failed state.
+   *
+   * Returns false if there was a retriable error.
+   *
+   * @param topicPartition topic partition
+   * @param fetchState current fetch state
+   * @param leaderEpochInRequest current leader epoch sent in the fetch 
request.
+   */
+  private def handleOutOfRangeError(topicPartition: TopicPartition,
+                                    fetchState: PartitionFetchState,
+                                    leaderEpochInRequest: Optional[Integer]): 
Boolean = {
+    try {
+      val newFetchState = fetchOffsetAndTruncate(topicPartition, 
fetchState.topicId, fetchState.currentLeaderEpoch)
+      partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
+      info(s"Current offset ${fetchState.fetchOffset} for partition 
$topicPartition is " +
+        s"out of range, which typically implies a leader change. Reset fetch 
offset to ${newFetchState.fetchOffset}")
+      true
+    } catch {
+      case _: FencedLeaderEpochException =>
+        onPartitionFenced(topicPartition, leaderEpochInRequest)
+
+      case e@(_: UnknownTopicOrPartitionException |
+              _: UnknownLeaderEpochException |
+              _: NotLeaderOrFollowerException) =>
+        info(s"Could not fetch offset for $topicPartition due to error: 
${e.getMessage}")
+        false
+
+      case e: Throwable =>
+        error(s"Error getting offset for partition $topicPartition", e)
+        false
+    }
+  }
+
+  /**
+   * Handles the offset moved to tiered storage error for the given topic 
partition.
+   *
+   * Returns true if
+   *    - the request succeeded or
+   *    - it was fenced and this thread haven't received new epoch, which 
means we need not backoff and retry as the
+   *    partition is moved to failed state.
+   *
+   * Returns false if there was a retriable error.
+   *
+   * @param topicPartition topic partition
+   * @param fetchState current partition fetch state.
+   * @param leaderEpochInRequest current leader epoch sent in the fetch 
request.
+   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   */
+  private def handleOffsetsMovedToTieredStorage(topicPartition: TopicPartition,
+                                                fetchState: 
PartitionFetchState,
+                                                leaderEpochInRequest: 
Optional[Integer],
+                                                leaderLogStartOffset: Long): 
Boolean = {
+    try {
+      val newFetchState = fetchOffsetAndApplyTruncateAndBuild(topicPartition, 
fetchState.topicId, fetchState.currentLeaderEpoch,
+        (offsetEpoch, leaderLocalLogStartOffset) => 
buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, 
leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset))

Review comment:
       Here to build the remote log aux state we only need the leader local log 
start offset, right?
   In such a case, I think it gets complicated if we try to repurpose 
`fetchOffsetAndApplyTruncateAndBuild` here. Can we just introduce a separate 
method that would just attempt to get the leader's local log start offset, and 
pass it into `buildRemoteLogAuxState`?
   

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -394,9 +427,13 @@ abstract class AbstractFetcherThread(name: String,
                       markPartitionFailed(topicPartition)
                   }
                 case Errors.OFFSET_OUT_OF_RANGE =>
-                  if (handleOutOfRangeError(topicPartition, currentFetchState, 
fetchPartitionData.currentLeaderEpoch))
+                  if (!handleOutOfRangeError(topicPartition, 
currentFetchState, fetchPartitionData.currentLeaderEpoch))
+                    partitionsWithError += topicPartition
+                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
+                  debug(s"Received error related to offset moved to tiered 
storage, fetch offset: ${currentFetchState.fetchOffset}")

Review comment:
       s/related to offset moved to tiered 
storage/OFFSET_MOVED_TO_TIERED_STORAGE
   
   Can we also log the topicPartition?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +397,143 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: 
PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && 
quota.isQuotaExceeded
   }
 
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,

Review comment:
       This method is doing a lot of things, and it is worthwhile thinking 
about how to simplify it. In its current form, it could be hard to test it.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to