AndrewJSchofield commented on code in PR #15479:
URL: https://github.com/apache/kafka/pull/15479#discussion_r1514890396


##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##########
@@ -79,15 +79,15 @@ public static SimpleAdminApiFuture<TopicPartition, 
DeletedRecords> newFuture(
     @Override
     public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set<TopicPartition> keys) {
         Map<String, DeleteRecordsRequestData.DeleteRecordsTopic> 
deletionsForTopic = new HashMap<>();
-        for (Map.Entry<TopicPartition, RecordsToDelete> entry: 
recordsToDelete.entrySet()) {
-            TopicPartition topicPartition = entry.getKey();
+        for (TopicPartition topicPartition : keys) {

Review Comment:
   The PartitionLeaderStrategy builds a map from broker ID to set of 
topic-partition. So, it's essentially divided up the full set of 
topic-partitions for which records are being deleted. Each call to 
`buildBatchedRequest` is supposed to build a broker-specific list of 
DeleteRecordsPartitions. But actually, it was using the complete list for the 
entire multi-leader operation (which is in `this.recordsToDelete`). So, every 
broker got every topic-partition, even ones that it didn't lead. By using the 
set of topic-partitions which is in the `keys` argument, it's using the 
relevant subset of the topic-partitions for the broker in question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to