kamalcph commented on code in PR #16884:
URL: https://github.com/apache/kafka/pull/16884#discussion_r2295697173


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1180,4 +1180,551 @@ class AbstractFetcherThreadTest {
     fetcher.processFetchRequest(partitionData, fetchRequestOpt)
     assertEquals(0, replicaState.logEndOffset, "FetchResponse should be 
ignored when leader epoch does not match")
   }
+
+  private def emptyReplicaState(rlmEnabled: Boolean, partition: 
TopicPartition, fetcher: MockFetcherThread) = {
+    // Follower begins with an empty log
+    val replicaState = PartitionState(Seq(), leaderEpoch = 0, highWatermark = 
0L, rlmEnabled = rlmEnabled)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 
initialFetchState(topicIds.get(partition.topic), fetchOffset = 0, leaderEpoch = 
0)))
+    replicaState
+  }
+
+  /**
+   * Test: Empty Follower Fetch with TieredStorage Disabled and Leader 
LogStartOffset = 0
+   *
+   * Purpose:
+   * - Simulate a leader with logs starting at offset 0 and validate how the 
follower
+   *   behaves when TieredStorage is disabled.
+   *
+   * Conditions:
+   * - TieredStorage: **Disabled**
+   * - Leader LogStartOffset: **0**
+   *
+   * Scenario:
+   * - The leader starts with a log at offset 0, containing three record 
batches offset at 0, 150, and 199.
+   * - The follower begins fetching, and we validate the correctness of its 
replica state as it fetches.
+   *
+   * Expected Outcomes:
+   * 1. The follower fetch state should transition to `FETCHING` initially.
+   * 2. After the first poll, one record batch is fetched.
+   * 3. After subsequent polls, the entire leader log is fetched:
+   *    - Replica log size: 3
+   *    - Replica LogStartOffset: 0
+   *    - Replica LogEndOffset: 200
+   *    - Replica HighWatermark: 199
+   */
+  @Test
+  def testEmptyFollowerFetchTieredStorageDisabledLeaderLogStartOffsetZero(): 
Unit = {
+    val rlmEnabled = false
+    val partition = new TopicPartition("topic1", 0)
+    val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 0
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(1, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(1, replicaState.logEndOffset)
+
+    // Only 1 record batch is returned after a poll so calling 'n' number of 
times to get the desired result.
+    for (_ <- 1 to 2) fetcher.doWork()
+    assertEquals(3, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Empty Follower Fetch with TieredStorage Disabled and Leader 
LogStartOffset != 0
+   *
+   * Purpose:
+   * - Validate follower behavior when the leader's log starts at a non-zero 
offset (10).
+   *
+   * Conditions:
+   * - TieredStorage: **Disabled**
+   * - Leader LogStartOffset: **10**
+   *
+   * Scenario:
+   * - The leader log starts at offset 10 with batches at 10, 150, and 199.
+   * - The follower starts fetching from offset 10.
+   *
+   * Expected Outcomes:
+   * 1. The follower's initial log is empty.
+   * 2. Replica offsets after polls:
+   *    - LogStartOffset = 10
+   *    - LogEndOffset = 200
+   *    - HighWatermark = 199
+   */
+  @Test
+  def 
testEmptyFollowerFetchTieredStorageDisabledLeaderLogStartOffsetNonZero(): Unit 
= {
+    val rlmEnabled = false
+    val partition = new TopicPartition("topic1", 0)
+    val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)

Review Comment:
   could you add a comment to explain why the replica logSize is zero on 
first-follower-fetch when the fetch-offset is non-zero?



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1180,4 +1180,551 @@ class AbstractFetcherThreadTest {
     fetcher.processFetchRequest(partitionData, fetchRequestOpt)
     assertEquals(0, replicaState.logEndOffset, "FetchResponse should be 
ignored when leader epoch does not match")
   }
+
+  private def emptyReplicaState(rlmEnabled: Boolean, partition: 
TopicPartition, fetcher: MockFetcherThread) = {
+    // Follower begins with an empty log
+    val replicaState = PartitionState(Seq(), leaderEpoch = 0, highWatermark = 
0L, rlmEnabled = rlmEnabled)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 
initialFetchState(topicIds.get(partition.topic), fetchOffset = 0, leaderEpoch = 
0)))
+    replicaState
+  }
+
+  /**
+   * Test: Empty Follower Fetch with TieredStorage Disabled and Leader 
LogStartOffset = 0
+   *
+   * Purpose:
+   * - Simulate a leader with logs starting at offset 0 and validate how the 
follower
+   *   behaves when TieredStorage is disabled.
+   *
+   * Conditions:
+   * - TieredStorage: **Disabled**
+   * - Leader LogStartOffset: **0**
+   *
+   * Scenario:
+   * - The leader starts with a log at offset 0, containing three record 
batches offset at 0, 150, and 199.
+   * - The follower begins fetching, and we validate the correctness of its 
replica state as it fetches.
+   *
+   * Expected Outcomes:
+   * 1. The follower fetch state should transition to `FETCHING` initially.
+   * 2. After the first poll, one record batch is fetched.
+   * 3. After subsequent polls, the entire leader log is fetched:
+   *    - Replica log size: 3
+   *    - Replica LogStartOffset: 0
+   *    - Replica LogEndOffset: 200
+   *    - Replica HighWatermark: 199
+   */
+  @Test
+  def testEmptyFollowerFetchTieredStorageDisabledLeaderLogStartOffsetZero(): 
Unit = {
+    val rlmEnabled = false
+    val partition = new TopicPartition("topic1", 0)
+    val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 0
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(1, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(1, replicaState.logEndOffset)
+
+    // Only 1 record batch is returned after a poll so calling 'n' number of 
times to get the desired result.
+    for (_ <- 1 to 2) fetcher.doWork()
+    assertEquals(3, replicaState.log.size)
+    assertEquals(0, replicaState.logStartOffset)
+    assertEquals(200, replicaState.logEndOffset)
+    assertEquals(199, replicaState.highWatermark)
+  }
+
+  /**
+   * Test: Empty Follower Fetch with TieredStorage Disabled and Leader 
LogStartOffset != 0
+   *
+   * Purpose:
+   * - Validate follower behavior when the leader's log starts at a non-zero 
offset (10).
+   *
+   * Conditions:
+   * - TieredStorage: **Disabled**
+   * - Leader LogStartOffset: **10**
+   *
+   * Scenario:
+   * - The leader log starts at offset 10 with batches at 10, 150, and 199.
+   * - The follower starts fetching from offset 10.
+   *
+   * Expected Outcomes:
+   * 1. The follower's initial log is empty.
+   * 2. Replica offsets after polls:
+   *    - LogStartOffset = 10
+   *    - LogEndOffset = 200
+   *    - HighWatermark = 199
+   */
+  @Test
+  def 
testEmptyFollowerFetchTieredStorageDisabledLeaderLogStartOffsetNonZero(): Unit 
= {
+    val rlmEnabled = false
+    val partition = new TopicPartition("topic1", 0)
+    val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine)
+
+    val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+    val leaderLog = Seq(
+      // LogStartOffset = LocalLogStartOffset = 10
+      mkBatch(baseOffset = 10, leaderEpoch = 0, new 
SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 150, leaderEpoch = 0, new 
SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 199, leaderEpoch = 0, new 
SimpleRecord("e".getBytes))
+    )
+
+    val leaderState = PartitionState(
+      leaderLog,
+      leaderEpoch = 0,
+      highWatermark = 199L,
+      rlmEnabled = rlmEnabled
+    )
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    assertEquals(Option(ReplicaState.FETCHING), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(0, replicaState.log.size)
+    assertEquals(10, replicaState.logStartOffset)
+    assertEquals(10, replicaState.logEndOffset)

Review Comment:
   ditto: please add a comment for reader to understand why logStartOffset == 
logEndOffset on first-follower-fetch when the fetch-offset is non-zero.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to