divijvaidya commented on code in PR #14212: URL: https://github.com/apache/kafka/pull/14212#discussion_r1294964226
########## core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala: ########## @@ -85,6 +85,64 @@ class ReplicaFetcherTierStateMachineTest { assertEquals(9L, replicaState.logEndOffset) } + @Test + def testFollowerFetchMovedToAndDeletedFromTieredStore(): Unit = { + val partition = new TopicPartition("topic", 0) + + val replicaLog = Seq( + mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)), + mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)), + mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes))) + + val replicaState = PartitionState(replicaLog, leaderEpoch = 7, highWatermark = 0L, rlmEnabled = true) + + val mockLeaderEndpoint = new MockLeaderEndPoint + val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) + val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) + + fetcher.setReplicaState(partition, replicaState) + fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 7))) + + val leaderLog = Seq( + mkBatch(baseOffset = 7, leaderEpoch = 7, new SimpleRecord("h".getBytes)), + mkBatch(baseOffset = 8, leaderEpoch = 7, new SimpleRecord("i".getBytes)), + mkBatch(baseOffset = 9, leaderEpoch = 7, new SimpleRecord("j".getBytes)), + mkBatch(baseOffset = 10, leaderEpoch = 7, new SimpleRecord("k".getBytes))) + + val leaderState = PartitionState(leaderLog, leaderEpoch = 7, highWatermark = 10L, rlmEnabled = true) + // Overriding the log start offset to 5 for mocking the scenario of segments 5-6 moved to remote store and + // segments 0-4 expired. + leaderState.logStartOffset = 5 + fetcher.mockLeader.setLeaderState(partition, leaderState) + fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState) + + assertEquals(3L, replicaState.logEndOffset) + val expectedState = if (truncateOnFetch) Option(Fetching) else Option(Truncating) + assertEquals(expectedState, fetcher.fetchState(partition).map(_.state)) + + fetcher.doWork() + // Verify that the out of range error is triggered and the fetch offset is reset to the global log start offset. + assertEquals(0L, replicaState.logStartOffset) Review Comment: Isn't this incorrect? logStartOffset should be 5 here after the first fetch call to the leader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org