This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new a49da104982 KAFKA-16319: Divide DeleteTopics requests by leader node 
(#15479)
a49da104982 is described below

commit a49da1049823e4d7a76a1b8c3343095affd0a199
Author: Andrew Schofield <aschofi...@confluent.io>
AuthorDate: Thu Mar 7 15:24:11 2024 +0000

    KAFKA-16319: Divide DeleteTopics requests by leader node (#15479)
    
    
    Reviewers: Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Kirk True 
<k...@kirktrue.pro>, Daniel Gospodinow <dgospodi...@confluent.io>
---
 .../admin/internals/DeleteRecordsHandler.java      |  6 +-
 .../admin/internals/DeleteRecordsHandlerTest.java  | 73 +++++++++++++++++++---
 2 files changed, 69 insertions(+), 10 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
index 2daad226034..9f40d19f00b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
@@ -79,15 +79,15 @@ public final class DeleteRecordsHandler extends 
Batched<TopicPartition, DeletedR
     @Override
     public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set<TopicPartition> keys) {
         Map<String, DeleteRecordsRequestData.DeleteRecordsTopic> 
deletionsForTopic = new HashMap<>();
-        for (Map.Entry<TopicPartition, RecordsToDelete> entry: 
recordsToDelete.entrySet()) {
-            TopicPartition topicPartition = entry.getKey();
+        for (TopicPartition topicPartition : keys) {
+            RecordsToDelete toDelete = recordsToDelete.get(topicPartition);
             DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = 
deletionsForTopic.computeIfAbsent(
                     topicPartition.topic(),
                     key -> new 
DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic())
             );
             deleteRecords.partitions().add(new 
DeleteRecordsRequestData.DeleteRecordsPartition()
                     .setPartitionIndex(topicPartition.partition())
-                    .setOffset(entry.getValue().beforeOffset()));
+                    .setOffset(toDelete.beforeOffset()));
         }
 
         DeleteRecordsRequestData data = new DeleteRecordsRequestData()
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
index c39747f1fba..58492696c4d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
@@ -22,15 +22,24 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.kafka.clients.admin.DeletedRecords;
 import org.apache.kafka.clients.admin.RecordsToDelete;
+import 
org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy.LookupResult;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.DeleteRecordsRequestData;
 import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.junit.jupiter.api.Test;
 
@@ -41,6 +50,7 @@ import static java.util.Collections.singleton;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class DeleteRecordsHandlerTest {
@@ -50,7 +60,8 @@ public class DeleteRecordsHandlerTest {
     private final TopicPartition t0p1 = new TopicPartition("t0", 1);
     private final TopicPartition t0p2 = new TopicPartition("t0", 2);
     private final TopicPartition t0p3 = new TopicPartition("t0", 3);
-    private final Node node = new Node(1, "host", 1234);
+    private final Node node1 = new Node(1, "host", 1234);
+    private final Node node2 = new Node(2, "host", 1235);
     private final Map<TopicPartition, RecordsToDelete> recordsToDelete = new 
HashMap<TopicPartition, RecordsToDelete>() {
         {
             put(t0p0, RecordsToDelete.beforeOffset(10L));
@@ -63,11 +74,11 @@ public class DeleteRecordsHandlerTest {
     @Test
     public void testBuildRequestSimple() {
         DeleteRecordsHandler handler = new 
DeleteRecordsHandler(recordsToDelete, logContext, timeout);
-        DeleteRecordsRequest request = handler.buildBatchedRequest(node.id(), 
mkSet(t0p0, t0p1)).build();
-        List<DeleteRecordsRequestData.DeleteRecordsTopic> topicPartitions = 
request.data().topics();
-        assertEquals(1, topicPartitions.size());
-        DeleteRecordsRequestData.DeleteRecordsTopic topic = 
topicPartitions.get(0);
-        assertEquals(4, topic.partitions().size());
+        DeleteRecordsRequest request = handler.buildBatchedRequest(node1.id(), 
mkSet(t0p0, t0p1)).build();
+        List<DeleteRecordsRequestData.DeleteRecordsTopic> topics = 
request.data().topics();
+        assertEquals(1, topics.size());
+        DeleteRecordsRequestData.DeleteRecordsTopic topic = topics.get(0);
+        assertEquals(2, topic.partitions().size());
     }
 
     @Test
@@ -199,6 +210,54 @@ public class DeleteRecordsHandlerTest {
         assertTrue(result.unmappedKeys.isEmpty());
     }
 
+    // This is a more complicated test which ensures that DeleteRecords 
requests for multiple
+    // leader nodes are correctly divided up among the nodes based on 
leadership.
+    // node1 leads t0p0 and t0p2, while node2 leads t0p1 and t0p3.
+    @Test
+    public void testBuildRequestMultipleLeaders() {
+        MetadataResponseData metadataResponseData = new MetadataResponseData();
+        MetadataResponseTopic topicMetadata = new MetadataResponseTopic();
+        topicMetadata.setName("t0").setErrorCode(Errors.NONE.code());
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(0).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(1).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(2).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(3).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code()));
+        metadataResponseData.topics().add(topicMetadata);
+        MetadataResponse metadataResponse = new 
MetadataResponse(metadataResponseData, ApiKeys.METADATA.latestVersion());
+
+        DeleteRecordsHandler handler = new 
DeleteRecordsHandler(recordsToDelete, logContext, timeout);
+        AdminApiLookupStrategy<TopicPartition> strategy = 
handler.lookupStrategy();
+        assertInstanceOf(PartitionLeaderStrategy.class, strategy);
+        PartitionLeaderStrategy specificStrategy = (PartitionLeaderStrategy) 
strategy;
+        MetadataRequest request = specificStrategy.buildRequest(mkSet(t0p0, 
t0p1, t0p2, t0p3)).build();
+        assertEquals(mkSet("t0"), new HashSet<>(request.topics()));
+
+        Set<TopicPartition> tpSet = mkSet(t0p0, t0p1, t0p2, t0p3);
+        LookupResult<TopicPartition> lookupResult = 
strategy.handleResponse(tpSet, metadataResponse);
+        assertEquals(emptyMap(), lookupResult.failedKeys);
+        assertEquals(tpSet, lookupResult.mappedKeys.keySet());
+
+        Map<Integer, Set<TopicPartition>> partitionsPerBroker = new 
HashMap<>();
+        lookupResult.mappedKeys.forEach((tp, node) -> 
partitionsPerBroker.computeIfAbsent(node, key -> new HashSet<>()).add(tp));
+
+        DeleteRecordsRequest deleteRequest = 
handler.buildBatchedRequest(node1.id(), 
partitionsPerBroker.get(node1.id())).build();
+        assertEquals(2, 
deleteRequest.data().topics().get(0).partitions().size());
+        assertEquals(mkSet(t0p0, t0p2),
+                deleteRequest.data().topics().get(0).partitions().stream()
+                        .map(drp -> new TopicPartition("t0", 
drp.partitionIndex()))
+                        .collect(Collectors.toSet()));
+        deleteRequest = handler.buildBatchedRequest(node2.id(), 
partitionsPerBroker.get(node2.id())).build();
+        assertEquals(2, 
deleteRequest.data().topics().get(0).partitions().size());
+        assertEquals(mkSet(t0p1, t0p3),
+                deleteRequest.data().topics().get(0).partitions().stream()
+                        .map(drp -> new TopicPartition("t0", 
drp.partitionIndex()))
+                        .collect(Collectors.toSet()));
+    }
+
     private DeleteRecordsResponse createResponse(Map<TopicPartition, Short> 
errorsByPartition) {
         return createResponse(errorsByPartition, recordsToDelete.keySet());
     }
@@ -227,7 +286,7 @@ public class DeleteRecordsHandlerTest {
     private AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> 
handleResponse(DeleteRecordsResponse response) {
         DeleteRecordsHandler handler =
                 new DeleteRecordsHandler(recordsToDelete, logContext, timeout);
-        return handler.handleResponse(node, recordsToDelete.keySet(), 
response);
+        return handler.handleResponse(node1, recordsToDelete.keySet(), 
response);
     }
 
     private void assertResult(

Reply via email to