This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push: new a49da104982 KAFKA-16319: Divide DeleteTopics requests by leader node (#15479) a49da104982 is described below commit a49da1049823e4d7a76a1b8c3343095affd0a199 Author: Andrew Schofield <aschofi...@confluent.io> AuthorDate: Thu Mar 7 15:24:11 2024 +0000 KAFKA-16319: Divide DeleteTopics requests by leader node (#15479) Reviewers: Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Kirk True <k...@kirktrue.pro>, Daniel Gospodinow <dgospodi...@confluent.io> --- .../admin/internals/DeleteRecordsHandler.java | 6 +- .../admin/internals/DeleteRecordsHandlerTest.java | 73 +++++++++++++++++++--- 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java index 2daad226034..9f40d19f00b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java @@ -79,15 +79,15 @@ public final class DeleteRecordsHandler extends Batched<TopicPartition, DeletedR @Override public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition> keys) { Map<String, DeleteRecordsRequestData.DeleteRecordsTopic> deletionsForTopic = new HashMap<>(); - for (Map.Entry<TopicPartition, RecordsToDelete> entry: recordsToDelete.entrySet()) { - TopicPartition topicPartition = entry.getKey(); + for (TopicPartition topicPartition : keys) { + RecordsToDelete toDelete = recordsToDelete.get(topicPartition); DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = deletionsForTopic.computeIfAbsent( topicPartition.topic(), key -> new DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic()) ); deleteRecords.partitions().add(new DeleteRecordsRequestData.DeleteRecordsPartition() .setPartitionIndex(topicPartition.partition()) - .setOffset(entry.getValue().beforeOffset())); + .setOffset(toDelete.beforeOffset())); } DeleteRecordsRequestData data = new DeleteRecordsRequestData() diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java index c39747f1fba..58492696c4d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java @@ -22,15 +22,24 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; + import org.apache.kafka.clients.admin.DeletedRecords; import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy.LookupResult; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.DeleteRecordsRequestData; import org.apache.kafka.common.message.DeleteRecordsResponseData; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.DeleteRecordsRequest; import org.apache.kafka.common.requests.DeleteRecordsResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.Test; @@ -41,6 +50,7 @@ import static java.util.Collections.singleton; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertTrue; public class DeleteRecordsHandlerTest { @@ -50,7 +60,8 @@ public class DeleteRecordsHandlerTest { private final TopicPartition t0p1 = new TopicPartition("t0", 1); private final TopicPartition t0p2 = new TopicPartition("t0", 2); private final TopicPartition t0p3 = new TopicPartition("t0", 3); - private final Node node = new Node(1, "host", 1234); + private final Node node1 = new Node(1, "host", 1234); + private final Node node2 = new Node(2, "host", 1235); private final Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<TopicPartition, RecordsToDelete>() { { put(t0p0, RecordsToDelete.beforeOffset(10L)); @@ -63,11 +74,11 @@ public class DeleteRecordsHandlerTest { @Test public void testBuildRequestSimple() { DeleteRecordsHandler handler = new DeleteRecordsHandler(recordsToDelete, logContext, timeout); - DeleteRecordsRequest request = handler.buildBatchedRequest(node.id(), mkSet(t0p0, t0p1)).build(); - List<DeleteRecordsRequestData.DeleteRecordsTopic> topicPartitions = request.data().topics(); - assertEquals(1, topicPartitions.size()); - DeleteRecordsRequestData.DeleteRecordsTopic topic = topicPartitions.get(0); - assertEquals(4, topic.partitions().size()); + DeleteRecordsRequest request = handler.buildBatchedRequest(node1.id(), mkSet(t0p0, t0p1)).build(); + List<DeleteRecordsRequestData.DeleteRecordsTopic> topics = request.data().topics(); + assertEquals(1, topics.size()); + DeleteRecordsRequestData.DeleteRecordsTopic topic = topics.get(0); + assertEquals(2, topic.partitions().size()); } @Test @@ -199,6 +210,54 @@ public class DeleteRecordsHandlerTest { assertTrue(result.unmappedKeys.isEmpty()); } + // This is a more complicated test which ensures that DeleteRecords requests for multiple + // leader nodes are correctly divided up among the nodes based on leadership. + // node1 leads t0p0 and t0p2, while node2 leads t0p1 and t0p3. + @Test + public void testBuildRequestMultipleLeaders() { + MetadataResponseData metadataResponseData = new MetadataResponseData(); + MetadataResponseTopic topicMetadata = new MetadataResponseTopic(); + topicMetadata.setName("t0").setErrorCode(Errors.NONE.code()); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(0).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(1).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(2).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(3).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code())); + metadataResponseData.topics().add(topicMetadata); + MetadataResponse metadataResponse = new MetadataResponse(metadataResponseData, ApiKeys.METADATA.latestVersion()); + + DeleteRecordsHandler handler = new DeleteRecordsHandler(recordsToDelete, logContext, timeout); + AdminApiLookupStrategy<TopicPartition> strategy = handler.lookupStrategy(); + assertInstanceOf(PartitionLeaderStrategy.class, strategy); + PartitionLeaderStrategy specificStrategy = (PartitionLeaderStrategy) strategy; + MetadataRequest request = specificStrategy.buildRequest(mkSet(t0p0, t0p1, t0p2, t0p3)).build(); + assertEquals(mkSet("t0"), new HashSet<>(request.topics())); + + Set<TopicPartition> tpSet = mkSet(t0p0, t0p1, t0p2, t0p3); + LookupResult<TopicPartition> lookupResult = strategy.handleResponse(tpSet, metadataResponse); + assertEquals(emptyMap(), lookupResult.failedKeys); + assertEquals(tpSet, lookupResult.mappedKeys.keySet()); + + Map<Integer, Set<TopicPartition>> partitionsPerBroker = new HashMap<>(); + lookupResult.mappedKeys.forEach((tp, node) -> partitionsPerBroker.computeIfAbsent(node, key -> new HashSet<>()).add(tp)); + + DeleteRecordsRequest deleteRequest = handler.buildBatchedRequest(node1.id(), partitionsPerBroker.get(node1.id())).build(); + assertEquals(2, deleteRequest.data().topics().get(0).partitions().size()); + assertEquals(mkSet(t0p0, t0p2), + deleteRequest.data().topics().get(0).partitions().stream() + .map(drp -> new TopicPartition("t0", drp.partitionIndex())) + .collect(Collectors.toSet())); + deleteRequest = handler.buildBatchedRequest(node2.id(), partitionsPerBroker.get(node2.id())).build(); + assertEquals(2, deleteRequest.data().topics().get(0).partitions().size()); + assertEquals(mkSet(t0p1, t0p3), + deleteRequest.data().topics().get(0).partitions().stream() + .map(drp -> new TopicPartition("t0", drp.partitionIndex())) + .collect(Collectors.toSet())); + } + private DeleteRecordsResponse createResponse(Map<TopicPartition, Short> errorsByPartition) { return createResponse(errorsByPartition, recordsToDelete.keySet()); } @@ -227,7 +286,7 @@ public class DeleteRecordsHandlerTest { private AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> handleResponse(DeleteRecordsResponse response) { DeleteRecordsHandler handler = new DeleteRecordsHandler(recordsToDelete, logContext, timeout); - return handler.handleResponse(node, recordsToDelete.keySet(), response); + return handler.handleResponse(node1, recordsToDelete.keySet(), response); } private void assertResult(