hachikuji commented on a change in pull request #9275:
URL: https://github.com/apache/kafka/pull/9275#discussion_r488271329



##########
File path: core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
##########
@@ -212,6 +212,44 @@ class FetchRequestTest extends BaseRequestTest {
     assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, partitionData.error)
   }
 
+  @Test
+  def testLastFetchedEpochValidation(): Unit = {
+    val topic = "topic"
+    val topicPartition = new TopicPartition(topic, 0)
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 3, servers)
+    val firstLeaderId = partitionToLeader(topicPartition.partition)
+    val firstLeaderEpoch = TestUtils.findLeaderEpoch(firstLeaderId, 
topicPartition, servers)
+
+    initProducer()
+
+    // Write some data in epoch 0
+    produceData(Seq(topicPartition), 100)
+    // Force a leader change
+    killBroker(firstLeaderId)
+    // Write some more data
+    produceData(Seq(topicPartition), 100)
+
+    val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, 
firstLeaderId)
+    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, servers)
+
+    // Build a fetch request at offset 150 with last fetched epoch 0
+    val fetchOffset = 150
+    val partitionMap = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+    partitionMap.put(topicPartition, new 
FetchRequest.PartitionData(fetchOffset, 0L, 1024,
+      Optional.of(secondLeaderEpoch), Optional.of(firstLeaderEpoch)))
+    val fetchRequest = FetchRequest.Builder.forConsumer(0, 1, 
partitionMap).build()
+
+    // Validate the expected truncation
+    val fetchResponse = sendFetchRequest(secondLeaderId, fetchRequest)
+    val partitionData = fetchResponse.responseData.get(topicPartition)
+    assertEquals(Errors.NONE, partitionData.error)
+    assertEquals(0L, partitionData.records.sizeInBytes())
+    assertTrue(partitionData.truncationOffset.isPresent)
+
+    // Should be exactly 100, but use a fuzzy truncation estimate in case 
there were produce retries
+    assertTrue(partitionData.truncationOffset.get < 150)

Review comment:
       That's a good idea. Let me try that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to