rajinisivaram commented on a change in pull request #9275: URL: https://github.com/apache/kafka/pull/9275#discussion_r487974401
########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -232,19 +263,25 @@ public FetchRequest build(short version) { // We collect the partitions in a single FetchTopic only if they appear sequentially in the fetchData FetchRequestData.FetchTopic fetchTopic = null; for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) { - if (fetchTopic == null || !entry.getKey().topic().equals(fetchTopic.topic())) { + TopicPartition topicPartition = entry.getKey(); + PartitionData partitionData = entry.getValue(); + + if (fetchTopic == null || !topicPartition.topic().equals(fetchTopic.topic())) { fetchTopic = new FetchRequestData.FetchTopic() - .setTopic(entry.getKey().topic()) + .setTopic(topicPartition.topic()) .setPartitions(new ArrayList<>()); fetchRequestData.topics().add(fetchTopic); } - fetchTopic.partitions().add( - new FetchRequestData.FetchPartition().setPartition(entry.getKey().partition()) - .setCurrentLeaderEpoch(entry.getValue().currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH)) - .setFetchOffset(entry.getValue().fetchOffset) - .setLogStartOffset(entry.getValue().logStartOffset) - .setPartitionMaxBytes(entry.getValue().maxBytes)); + FetchRequestData.FetchPartition fetchPartition = new FetchRequestData.FetchPartition() + .setPartition(topicPartition.partition()) + .setCurrentLeaderEpoch(partitionData.currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH)) + .setFetchOffset(partitionData.fetchOffset) + .setLogStartOffset(partitionData.logStartOffset) + .setPartitionMaxBytes(partitionData.maxBytes); + partitionData.lastFetchedEpoch.ifPresent(fetchPartition::setLastFetchedEpoch); Review comment: `currentLeaderEpoch` and `lastFetchEpoch` are both set to -1 by default, but they are set in different ways above, is that deliberate? ########## File path: core/src/test/scala/unit/kafka/server/FetchRequestTest.scala ########## @@ -212,6 +212,44 @@ class FetchRequestTest extends BaseRequestTest { assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, partitionData.error) } + @Test + def testLastFetchedEpochValidation(): Unit = { + val topic = "topic" + val topicPartition = new TopicPartition(topic, 0) + val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers) + val firstLeaderId = partitionToLeader(topicPartition.partition) + val firstLeaderEpoch = TestUtils.findLeaderEpoch(firstLeaderId, topicPartition, servers) + + initProducer() + + // Write some data in epoch 0 + produceData(Seq(topicPartition), 100) + // Force a leader change + killBroker(firstLeaderId) + // Write some more data + produceData(Seq(topicPartition), 100) + + val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, firstLeaderId) + val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, servers) + + // Build a fetch request at offset 150 with last fetched epoch 0 + val fetchOffset = 150 + val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + partitionMap.put(topicPartition, new FetchRequest.PartitionData(fetchOffset, 0L, 1024, + Optional.of(secondLeaderEpoch), Optional.of(firstLeaderEpoch))) + val fetchRequest = FetchRequest.Builder.forConsumer(0, 1, partitionMap).build() + + // Validate the expected truncation + val fetchResponse = sendFetchRequest(secondLeaderId, fetchRequest) + val partitionData = fetchResponse.responseData.get(topicPartition) + assertEquals(Errors.NONE, partitionData.error) + assertEquals(0L, partitionData.records.sizeInBytes()) + assertTrue(partitionData.truncationOffset.isPresent) + + // Should be exactly 100, but use a fuzzy truncation estimate in case there were produce retries + assertTrue(partitionData.truncationOffset.get < 150) Review comment: we could produce without retries, wait for completion and then check exact value? ########## File path: core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala ########## @@ -91,8 +95,23 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, } } + def removeEntries(predicate: EpochEntry => Boolean): Seq[EpochEntry] = { + val removedEpochs = mutable.ListBuffer.empty[EpochEntry] + val iterator = epochs.entrySet().iterator() + + while (iterator.hasNext) { Review comment: I guess we would always be removing a range from the start or end. Are we going through the whole map and checking each one because there is no suitable API to remove multiple entries from start or end? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org