kamalcph commented on code in PR #20428:
URL: https://github.com/apache/kafka/pull/20428#discussion_r2757622885


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -781,7 +787,15 @@ abstract class AbstractFetcherThread(name: String,
                                                 leaderEpochInRequest: 
Optional[Integer],
                                                 fetchPartitionData: 
PartitionData): Boolean = {
     try {
-      val newFetchState = fetchTierStateMachine.start(topicPartition, 
fetchState, fetchPartitionData)
+      val fetchFromLastTieredOffset = 
shouldFetchFromLastTieredOffset(topicPartition, fetchState)
+      val epochAndLogStartOffset = leader.fetchEarliestOffset(topicPartition, 
fetchState.currentLeaderEpoch())

Review Comment:
   `epochAndLogStartOffset` -> `leaderLogStartOffsetAndEpoch`



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -781,7 +787,15 @@ abstract class AbstractFetcherThread(name: String,
                                                 leaderEpochInRequest: 
Optional[Integer],
                                                 fetchPartitionData: 
PartitionData): Boolean = {
     try {
-      val newFetchState = fetchTierStateMachine.start(topicPartition, 
fetchState, fetchPartitionData)
+      val fetchFromLastTieredOffset = 
shouldFetchFromLastTieredOffset(topicPartition, fetchState)

Review Comment:
   Shall we rename `fetchFromLastTieredOffset` to 
`isLastTieredOffsetFetchEnabled`?



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -671,7 +671,13 @@ abstract class AbstractFetcherThread(name: String,
      */
     val offsetAndEpoch = leader.fetchLatestOffset(topicPartition, 
currentLeaderEpoch)
     val leaderEndOffset = offsetAndEpoch.offset
-    if (leaderEndOffset < replicaEndOffset) {
+    val fetchFromLastTieredOffset = 
shouldFetchFromLastTieredOffset(topicPartition, leaderEndOffset, 
replicaEndOffset)
+
+    if (fetchFromLastTieredOffset) {
+      val leaderStartOffset = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)

Review Comment:
   can `leaderStartOffset` be renamed to `leaderStartOffsetAndEpoch`? And, 
`epochAndStartingOffset` to `earliestPendingUploadOffsetAndEpoch`



##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -182,7 +179,7 @@ private void buildProducerSnapshotFile(UnifiedLog 
unifiedLog,
      * fetching records from the leader. The return value is the next offset 
to fetch from the leader, which is the
      * next offset following the end offset of the remote log portion.
      */
-    private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+    protected Long buildRemoteLogAuxState(TopicPartition topicPartition,

Review Comment:
   Why do we change the method access specifier from `private` to `protected`?



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -671,7 +671,13 @@ abstract class AbstractFetcherThread(name: String,
      */
     val offsetAndEpoch = leader.fetchLatestOffset(topicPartition, 
currentLeaderEpoch)
     val leaderEndOffset = offsetAndEpoch.offset
-    if (leaderEndOffset < replicaEndOffset) {
+    val fetchFromLastTieredOffset = 
shouldFetchFromLastTieredOffset(topicPartition, leaderEndOffset, 
replicaEndOffset)
+
+    if (fetchFromLastTieredOffset) {
+      val leaderStartOffset = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)
+      val epochAndStartingOffset = earliestPendingUploadOffset(topicPartition, 
currentLeaderEpoch, leaderStartOffset)
+      fetchTierStateMachine.start(topicPartition, topicId.asJava, 
currentLeaderEpoch, epochAndStartingOffset, leaderStartOffset.offset())

Review Comment:
   replace: 
   
   topicId.asJava -> topicId.toJava to avoid using deprecated methods



##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -156,7 +156,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
             new OffsetAndEpoch(-1L, -1)
           }
         case _ =>
-          val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1, 
logStartOffset.offset())
+          val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1, 
Math.max(logStartOffset.offset(), localLogStartOffset.offset()))

Review Comment:
   shall we add an unit test for this case in LocalLeaderEndPointTest? 



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -805,6 +819,29 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
+  /**
+   * Determines the earliest offset for pending uploads, taking into account
+   * both local and remote storage conditions.
+   */
+  private def earliestPendingUploadOffset(topicPartition: TopicPartition, 
currentLeaderEpoch: Int, leaderLogStartOffset: OffsetAndEpoch): OffsetAndEpoch 
= {
+    val earliestPendingUploadOffset = 
leader.fetchEarliestPendingUploadOffset(topicPartition, currentLeaderEpoch)
+    if (earliestPendingUploadOffset.offset == -1L) {
+      val leaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch)
+      if (leaderLocalStartOffset.offset == leaderLogStartOffset.offset) {
+        return leaderLocalStartOffset
+      }
+      throw new OffsetNotAvailableException("Segments are uploaded to remote 
storage, but the leader does not have the information about the uploaded 
segments")

Review Comment:
   Shall we update the error message to be specific? 
   
   ```
   egments are uploaded to remote storage, but the leader does not know the 
earliest pending upload offset. 
   ```



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -781,7 +787,15 @@ abstract class AbstractFetcherThread(name: String,
                                                 leaderEpochInRequest: 
Optional[Integer],
                                                 fetchPartitionData: 
PartitionData): Boolean = {
     try {
-      val newFetchState = fetchTierStateMachine.start(topicPartition, 
fetchState, fetchPartitionData)
+      val fetchFromLastTieredOffset = 
shouldFetchFromLastTieredOffset(topicPartition, fetchState)
+      val epochAndLogStartOffset = leader.fetchEarliestOffset(topicPartition, 
fetchState.currentLeaderEpoch())
+      val epochAndStartingOffset = if (fetchFromLastTieredOffset) {

Review Comment:
   `epochAndStartingOffset` -> `fetchOffsetAndEpoch`



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -805,6 +819,29 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
+  /**
+   * Determines the earliest offset for pending uploads, taking into account
+   * both local and remote storage conditions.
+   */
+  private def earliestPendingUploadOffset(topicPartition: TopicPartition, 
currentLeaderEpoch: Int, leaderLogStartOffset: OffsetAndEpoch): OffsetAndEpoch 
= {

Review Comment:
   can this method be renamed to `fetchEarliestPendingUploadOffset` for 
clarity? Also, rename the  parameter `leaderLogStartOffset` -> 
`leaderLogStartOffsetAndEpoch`.



##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##########
@@ -18,7 +18,6 @@ package kafka.server
 
 import kafka.cluster.Partition
 import kafka.log.LogManager
-

Review Comment:
   can this change be avoided?



##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -85,21 +85,19 @@ public TierStateMachine(LeaderEndPoint leader,
     /**
      * Start the tier state machine for the provided topic partition.
      *
-     * @param topicPartition the topic partition
-     * @param currentFetchState the current PartitionFetchState which will
-     *                          be used to derive the return value
-     * @param fetchPartitionData the data from the fetch response that 
returned the offset moved to tiered storage error
-     *
-     * @return the new PartitionFetchState after the successful start of the
-     *         tier state machine
+     * @param topicPartition          the topic partition for which the tier 
state machine is to be started
+     * @param topicId                 the optional unique identifier of the 
topic
+     * @param currentLeaderEpoch      the current leader epoch of the partition
+     * @param epochAndStartingOffset  the offset on the leader's local log 
from which to start replicating logs
+     * @param leaderLogStartOffset    the starting offset in the leader's log
+     * @return the new PartitionFetchState after the successful start of the 
tier state machine
+     * @throws Exception if an error occurs during the process, such as issues 
with remote storage
      */
     PartitionFetchState start(TopicPartition topicPartition,
-                              PartitionFetchState currentFetchState,
-                              PartitionData fetchPartitionData) throws 
Exception {
-        OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
-        int epoch = epochAndLeaderLocalStartOffset.epoch();
-        long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
-
+                              Optional<Uuid> topicId,
+                              int currentLeaderEpoch,
+                              OffsetAndEpoch epochAndStartingOffset,

Review Comment:
   Shall we rename `epochAndStartingOffset` parameter to `fetchOffsetAndEpoch` 
/ `fetchStartOffsetAndEpoch` or something similar?



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1740,4 +1741,1157 @@ class AbstractFetcherThreadTest {
     assertEquals(151, replicaState.logEndOffset)
     assertEquals(151, replicaState.highWatermark)
   }
+
+  @Test

Review Comment:
   Please add the Javadoc for the understand the tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to