rajinisivaram commented on a change in pull request #9275:
URL: https://github.com/apache/kafka/pull/9275#discussion_r487974401



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -232,19 +263,25 @@ public FetchRequest build(short version) {
             // We collect the partitions in a single FetchTopic only if they 
appear sequentially in the fetchData
             FetchRequestData.FetchTopic fetchTopic = null;
             for (Map.Entry<TopicPartition, PartitionData> entry : 
fetchData.entrySet()) {
-                if (fetchTopic == null || 
!entry.getKey().topic().equals(fetchTopic.topic())) {
+                TopicPartition topicPartition = entry.getKey();
+                PartitionData partitionData = entry.getValue();
+
+                if (fetchTopic == null || 
!topicPartition.topic().equals(fetchTopic.topic())) {
                     fetchTopic = new FetchRequestData.FetchTopic()
-                       .setTopic(entry.getKey().topic())
+                       .setTopic(topicPartition.topic())
                        .setPartitions(new ArrayList<>());
                     fetchRequestData.topics().add(fetchTopic);
                 }
 
-                fetchTopic.partitions().add(
-                    new 
FetchRequestData.FetchPartition().setPartition(entry.getKey().partition())
-                        
.setCurrentLeaderEpoch(entry.getValue().currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
-                        .setFetchOffset(entry.getValue().fetchOffset)
-                        .setLogStartOffset(entry.getValue().logStartOffset)
-                        .setPartitionMaxBytes(entry.getValue().maxBytes));
+                FetchRequestData.FetchPartition fetchPartition = new 
FetchRequestData.FetchPartition()
+                    .setPartition(topicPartition.partition())
+                    
.setCurrentLeaderEpoch(partitionData.currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+                    .setFetchOffset(partitionData.fetchOffset)
+                    .setLogStartOffset(partitionData.logStartOffset)
+                    .setPartitionMaxBytes(partitionData.maxBytes);
+                
partitionData.lastFetchedEpoch.ifPresent(fetchPartition::setLastFetchedEpoch);

Review comment:
       `currentLeaderEpoch` and `lastFetchEpoch` are both set to -1 by default, 
but they are set in different ways above, is that deliberate?

##########
File path: core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
##########
@@ -212,6 +212,44 @@ class FetchRequestTest extends BaseRequestTest {
     assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, partitionData.error)
   }
 
+  @Test
+  def testLastFetchedEpochValidation(): Unit = {
+    val topic = "topic"
+    val topicPartition = new TopicPartition(topic, 0)
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 3, servers)
+    val firstLeaderId = partitionToLeader(topicPartition.partition)
+    val firstLeaderEpoch = TestUtils.findLeaderEpoch(firstLeaderId, 
topicPartition, servers)
+
+    initProducer()
+
+    // Write some data in epoch 0
+    produceData(Seq(topicPartition), 100)
+    // Force a leader change
+    killBroker(firstLeaderId)
+    // Write some more data
+    produceData(Seq(topicPartition), 100)
+
+    val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, 
firstLeaderId)
+    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, servers)
+
+    // Build a fetch request at offset 150 with last fetched epoch 0
+    val fetchOffset = 150
+    val partitionMap = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+    partitionMap.put(topicPartition, new 
FetchRequest.PartitionData(fetchOffset, 0L, 1024,
+      Optional.of(secondLeaderEpoch), Optional.of(firstLeaderEpoch)))
+    val fetchRequest = FetchRequest.Builder.forConsumer(0, 1, 
partitionMap).build()
+
+    // Validate the expected truncation
+    val fetchResponse = sendFetchRequest(secondLeaderId, fetchRequest)
+    val partitionData = fetchResponse.responseData.get(topicPartition)
+    assertEquals(Errors.NONE, partitionData.error)
+    assertEquals(0L, partitionData.records.sizeInBytes())
+    assertTrue(partitionData.truncationOffset.isPresent)
+
+    // Should be exactly 100, but use a fuzzy truncation estimate in case 
there were produce retries
+    assertTrue(partitionData.truncationOffset.get < 150)

Review comment:
       we could produce without retries, wait for completion and then check 
exact value?

##########
File path: core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
##########
@@ -91,8 +95,23 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
     }
   }
 
+  def removeEntries(predicate: EpochEntry => Boolean): Seq[EpochEntry] = {
+    val removedEpochs = mutable.ListBuffer.empty[EpochEntry]
+    val iterator = epochs.entrySet().iterator()
+
+    while (iterator.hasNext) {

Review comment:
       I guess we would always be removing a range from the start or end. Are 
we going through the whole map and checking each one because there is no 
suitable API to remove multiple entries from start or end?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to