This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 981815c  KAFKA-8034: Use automatic RPC generation in DeleteTopics
981815c is described below

commit 981815c8d14daf1042e06f8fa9cb355187719b1d
Author: Mickael Maison <mimai...@users.noreply.github.com>
AuthorDate: Fri Mar 29 21:32:36 2019 +0000

    KAFKA-8034: Use automatic RPC generation in DeleteTopics
    
    Reviewers: Colin P. McCabe <cmcc...@apache.org>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 16 ++--
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  6 +-
 .../kafka/common/requests/AbstractResponse.java    |  2 +-
 .../kafka/common/requests/DeleteTopicsRequest.java | 86 +++++++-------------
 .../common/requests/DeleteTopicsResponse.java      | 93 +++++-----------------
 .../common/message/DeleteTopicsResponse.json       |  6 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 16 +++-
 .../kafka/common/requests/RequestResponseTest.java | 20 +++--
 core/src/main/scala/kafka/server/KafkaApis.scala   | 75 ++++++++++-------
 .../kafka/api/AuthorizerIntegrationTest.scala      | 18 +++--
 .../kafka/server/DeleteTopicsRequestTest.scala     | 55 ++++++++-----
 ...leteTopicsRequestWithDeletionDisabledTest.scala | 17 ++--
 .../scala/unit/kafka/server/RequestQuotaTest.scala | 10 ++-
 13 files changed, 206 insertions(+), 214 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 317cd7c..336597f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -63,6 +63,8 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteTopicsRequestData;
+import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
 import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
@@ -1365,14 +1367,16 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new DeleteTopicsRequest.Builder(new 
HashSet<>(validTopicNames), timeoutMs);
+                return new DeleteTopicsRequest.Builder(new 
DeleteTopicsRequestData()
+                        .setTopicNames(validTopicNames)
+                        .setTimeoutMs(timeoutMs));
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 DeleteTopicsResponse response = (DeleteTopicsResponse) 
abstractResponse;
                 // Check for controller change
-                for (Errors error : response.errors().values()) {
+                for (Errors error : response.errorCounts().keySet()) {
                     if (error == Errors.NOT_CONTROLLER) {
                         metadataManager.clearController();
                         metadataManager.requestUpdate();
@@ -1380,12 +1384,12 @@ public class KafkaAdminClient extends AdminClient {
                     }
                 }
                 // Handle server responses for particular topics.
-                for (Map.Entry<String, Errors> entry : 
response.errors().entrySet()) {
-                    KafkaFutureImpl<Void> future = 
topicFutures.get(entry.getKey());
+                for (DeletableTopicResult result : 
response.data().responses()) {
+                    KafkaFutureImpl<Void> future = 
topicFutures.get(result.name());
                     if (future == null) {
-                        log.warn("Server response mentioned unknown topic {}", 
entry.getKey());
+                        log.warn("Server response mentioned unknown topic {}", 
result.name());
                     } else {
-                        ApiException exception = entry.getValue().exception();
+                        ApiException exception = 
Errors.forCode(result.errorCode()).exception();
                         if (exception != null) {
                             future.completeExceptionally(exception);
                         } else {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 3f8d80d..ed9787f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.DeleteTopicsRequestData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
@@ -61,8 +63,6 @@ import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
-import org.apache.kafka.common.requests.DeleteTopicsRequest;
-import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.DescribeAclsRequest;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsRequest;
@@ -153,7 +153,7 @@ public enum ApiKeys {
         }
     },
     CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, 
CreateTopicsResponseData.SCHEMAS),
-    DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequest.schemaVersions(), 
DeleteTopicsResponse.schemaVersions()),
+    DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, 
DeleteTopicsResponseData.SCHEMAS),
     DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), 
DeleteRecordsResponse.schemaVersions()),
     INIT_PRODUCER_ID(22, "InitProducerId", 
InitProducerIdRequest.schemaVersions(),
             InitProducerIdResponse.schemaVersions()),
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index eadd302..f74e1ae 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -111,7 +111,7 @@ public abstract class AbstractResponse extends 
AbstractRequestResponse {
             case CREATE_TOPICS:
                 return new CreateTopicsResponse(struct, version);
             case DELETE_TOPICS:
-                return new DeleteTopicsResponse(struct);
+                return new DeleteTopicsResponse(struct, version);
             case DELETE_RECORDS:
                 return new DeleteRecordsResponse(struct);
             case INIT_PRODUCER_ID:
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index facb55e..978d1c0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -16,19 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.DeleteTopicsRequestData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
 
 import static org.apache.kafka.common.protocol.types.Type.INT32;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
@@ -62,85 +59,62 @@ public class DeleteTopicsRequest extends AbstractRequest {
             DELETE_TOPICS_REQUEST_V2, DELETE_TOPICS_REQUEST_V3};
     }
 
-    private final Set<String> topics;
-    private final Integer timeout;
+    private DeleteTopicsRequestData data;
+    private final short version;
 
     public static class Builder extends 
AbstractRequest.Builder<DeleteTopicsRequest> {
-        private final Set<String> topics;
-        private final Integer timeout;
+        private DeleteTopicsRequestData data;
 
-        public Builder(Set<String> topics, Integer timeout) {
+        public Builder(DeleteTopicsRequestData data) {
             super(ApiKeys.DELETE_TOPICS);
-            this.topics = topics;
-            this.timeout = timeout;
+            this.data = data;
         }
 
         @Override
         public DeleteTopicsRequest build(short version) {
-            return new DeleteTopicsRequest(topics, timeout, version);
+            return new DeleteTopicsRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=DeleteTopicsRequest").
-                append(", topics=(").append(Utils.join(topics, ", 
")).append(")").
-                append(", timeout=").append(timeout).
-                append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private DeleteTopicsRequest(Set<String> topics, Integer timeout, short 
version) {
+    private DeleteTopicsRequest(DeleteTopicsRequestData data, short version) {
         super(ApiKeys.DELETE_TOPICS, version);
-        this.topics = topics;
-        this.timeout = timeout;
+        this.data = data;
+        this.version = version;
     }
 
     public DeleteTopicsRequest(Struct struct, short version) {
         super(ApiKeys.DELETE_TOPICS, version);
-        Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
-        Set<String> topics = new HashSet<>(topicsArray.length);
-        for (Object topic : topicsArray)
-            topics.add((String) topic);
-
-        this.topics = topics;
-        this.timeout = struct.getInt(TIMEOUT_KEY_NAME);
+        this.data = new DeleteTopicsRequestData(struct, version);
+        this.version = version;
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new 
Struct(ApiKeys.DELETE_TOPICS.requestSchema(version()));
-        struct.set(TOPICS_KEY_NAME, topics.toArray());
-        struct.set(TIMEOUT_KEY_NAME, timeout);
-        return struct;
+        return data.toStruct(version);
+    }
+
+    public DeleteTopicsRequestData data() {
+        return data;
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Map<String, Errors> topicErrors = new HashMap<>();
-        for (String topic : topics)
-            topicErrors.put(topic, Errors.forException(e));
-
-        switch (version()) {
-            case 0:
-                return new DeleteTopicsResponse(topicErrors);
-            case 1:
-            case 2:
-            case 3:
-                return new DeleteTopicsResponse(throttleTimeMs, topicErrors);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
-                    version(), this.getClass().getSimpleName(), 
ApiKeys.DELETE_TOPICS.latestVersion()));
+        DeleteTopicsResponseData response = new DeleteTopicsResponseData();
+        if (version >= 1) {
+            response.setThrottleTimeMs(throttleTimeMs);
         }
-    }
-
-    public Set<String> topics() {
-        return topics;
-    }
-
-    public Integer timeout() {
-        return this.timeout;
+        ApiError apiError = ApiError.fromThrowable(e);
+        for (String topic : data.topicNames()) {
+            response.responses().add(new DeletableTopicResult()
+                    .setName(topic)
+                    .setErrorCode(apiError.error().code()));
+        }
+        return new DeleteTopicsResponse(response);
     }
 
     public static DeleteTopicsRequest parse(ByteBuffer buffer, short version) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 650caa8..aa8e552 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -16,54 +16,18 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
 
 public class DeleteTopicsResponse extends AbstractResponse {
-    private static final String TOPIC_ERROR_CODES_KEY_NAME = 
"topic_error_codes";
-
-    private static final Schema TOPIC_ERROR_CODE = new Schema(
-            TOPIC_NAME,
-            ERROR_CODE);
-
-    private static final Schema DELETE_TOPICS_RESPONSE_V0 = new Schema(
-            new Field(TOPIC_ERROR_CODES_KEY_NAME,
-                    new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic 
error codes."));
-
-    private static final Schema DELETE_TOPICS_RESPONSE_V1 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(TOPIC_ERROR_CODES_KEY_NAME, new 
ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes."));
-
-    /**
-     * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
-     */
-    private static final Schema DELETE_TOPICS_RESPONSE_V2 = 
DELETE_TOPICS_RESPONSE_V1;
-
-    /**
-     * v3 request is the same that as v2. The response is different based on 
the request version.
-     * In v3 version a TopicDeletionDisabledException is returned
-     */
-    private static final Schema DELETE_TOPICS_RESPONSE_V3 = 
DELETE_TOPICS_RESPONSE_V2;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_TOPICS_RESPONSE_V0, 
DELETE_TOPICS_RESPONSE_V1,
-            DELETE_TOPICS_RESPONSE_V2, DELETE_TOPICS_RESPONSE_V3};
-    }
-
 
     /**
      * Possible error codes:
@@ -75,63 +39,42 @@ public class DeleteTopicsResponse extends AbstractResponse {
      * INVALID_REQUEST(42)
      * TOPIC_DELETION_DISABLED(73)
      */
-    private final Map<String, Errors> errors;
-    private final int throttleTimeMs;
+    private DeleteTopicsResponseData data;
 
-    public DeleteTopicsResponse(Map<String, Errors> errors) {
-        this(DEFAULT_THROTTLE_TIME, errors);
+    public DeleteTopicsResponse(DeleteTopicsResponseData data) {
+        this.data = data;
     }
 
-    public DeleteTopicsResponse(int throttleTimeMs, Map<String, Errors> 
errors) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.errors = errors;
-    }
-
-    public DeleteTopicsResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
-        Object[] topicErrorCodesStructs = 
struct.getArray(TOPIC_ERROR_CODES_KEY_NAME);
-        Map<String, Errors> errors = new HashMap<>();
-        for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
-            Struct topicErrorCodeStruct = (Struct) topicErrorCodeStructObj;
-            String topic = topicErrorCodeStruct.get(TOPIC_NAME);
-            Errors error = 
Errors.forCode(topicErrorCodeStruct.get(ERROR_CODE));
-            errors.put(topic, error);
-        }
-
-        this.errors = errors;
+    public DeleteTopicsResponse(Struct struct, short version) {
+        this.data = new DeleteTopicsResponseData(struct, version);
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new 
Struct(ApiKeys.DELETE_TOPICS.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-        List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
-        for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
-            Struct topicErrorCodeStruct = 
struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
-            topicErrorCodeStruct.set(TOPIC_NAME, topicError.getKey());
-            topicErrorCodeStruct.set(ERROR_CODE, topicError.getValue().code());
-            topicErrorCodeStructs.add(topicErrorCodeStruct);
-        }
-        struct.set(TOPIC_ERROR_CODES_KEY_NAME, 
topicErrorCodeStructs.toArray());
-        return struct;
+        return data.toStruct(version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
-    public Map<String, Errors> errors() {
-        return errors;
+    public DeleteTopicsResponseData data() {
+        return data;
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(errors);
+        HashMap<Errors, Integer> counts = new HashMap<>();
+        for (DeletableTopicResult result : data.responses()) {
+            Errors error = Errors.forCode(result.errorCode());
+            counts.put(error, counts.getOrDefault(error, 0) + 1);
+        }
+        return counts;
     }
 
     public static DeleteTopicsResponse parse(ByteBuffer buffer, short version) 
{
-        return new 
DeleteTopicsResponse(ApiKeys.DELETE_TOPICS.responseSchema(version).read(buffer));
+        return new 
DeleteTopicsResponse(ApiKeys.DELETE_TOPICS.parseResponse(version, buffer), 
version);
     }
 
     @Override
diff --git 
a/clients/src/main/resources/common/message/DeleteTopicsResponse.json 
b/clients/src/main/resources/common/message/DeleteTopicsResponse.json
index cf0837b..4cea44b 100644
--- a/clients/src/main/resources/common/message/DeleteTopicsResponse.json
+++ b/clients/src/main/resources/common/message/DeleteTopicsResponse.json
@@ -22,11 +22,11 @@
   // Starting in version 3, a TOPIC_DELETION_DISABLED error code may be 
returned.
   "validVersions": "0-3",
   "fields": [
-    { "name": "throttleTimeMs", "type": "int32", "versions": "1+",
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+",
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
     { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
-      "about": "The results for each topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      "about": "The results for each topic we tried to delete.", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
         "about": "The topic name" },
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The deletion error, or 0 if the deletion succeeded." }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index af6f49b..220fc50 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -53,6 +53,8 @@ import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import 
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
@@ -223,6 +225,14 @@ public class KafkaAdminClientTest {
         return new CreateTopicsResponse(data);
     }
 
+    private static DeleteTopicsResponse prepareDeleteTopicsResponse(String 
topicName, Errors error) {
+        DeleteTopicsResponseData data = new DeleteTopicsResponseData();
+        data.responses().add(new DeletableTopicResult()
+                .setName(topicName)
+                .setErrorCode(error.code()));
+        return new DeleteTopicsResponse(data);
+    }
+
     /**
      * Test that the client properly times out when we don't receive any 
metadata.
      */
@@ -394,19 +404,19 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             env.kafkaClient().prepareResponse(body -> body instanceof 
DeleteTopicsRequest,
-                    new 
DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.NONE)));
+                    prepareDeleteTopicsResponse("myTopic", Errors.NONE));
             KafkaFuture<Void> future = 
env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
                     new DeleteTopicsOptions()).all();
             future.get();
 
             env.kafkaClient().prepareResponse(body -> body instanceof 
DeleteTopicsRequest,
-                    new 
DeleteTopicsResponse(Collections.singletonMap("myTopic", 
Errors.TOPIC_DELETION_DISABLED)));
+                    prepareDeleteTopicsResponse("myTopic", 
Errors.TOPIC_DELETION_DISABLED));
             future = 
env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
                     new DeleteTopicsOptions()).all();
             TestUtils.assertFutureError(future, 
TopicDeletionDisabledException.class);
 
             env.kafkaClient().prepareResponse(body -> body instanceof 
DeleteTopicsRequest,
-                    new 
DeleteTopicsResponse(Collections.singletonMap("myTopic", 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+                    prepareDeleteTopicsResponse("myTopic", 
Errors.UNKNOWN_TOPIC_OR_PARTITION));
             future = 
env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
                     new DeleteTopicsOptions()).all();
             TestUtils.assertFutureError(future, 
UnknownTopicOrPartitionException.class);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5690743..5b92b1b 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -38,6 +38,9 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteTopicsRequestData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
@@ -1124,14 +1127,21 @@ public class RequestResponseTest {
     }
 
     private DeleteTopicsRequest createDeleteTopicsRequest() {
-        return new DeleteTopicsRequest.Builder(Utils.mkSet("my_t1", "my_t2"), 
10000).build();
+        return new DeleteTopicsRequest.Builder(
+                new DeleteTopicsRequestData()
+                .setTopicNames(Arrays.asList("my_t1", "my_t2"))
+                .setTimeoutMs(1000)).build();
     }
 
     private DeleteTopicsResponse createDeleteTopicsResponse() {
-        Map<String, Errors> errors = new HashMap<>();
-        errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION);
-        errors.put("t2", Errors.TOPIC_AUTHORIZATION_FAILED);
-        return new DeleteTopicsResponse(errors);
+        DeleteTopicsResponseData data = new DeleteTopicsResponseData();
+        data.responses().add(new DeletableTopicResult()
+                .setName("t1")
+                .setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code()));
+        data.responses().add(new DeletableTopicResult()
+                .setName("t2")
+                .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()));
+        return new DeleteTopicsResponse(data);
     }
 
     private InitProducerIdRequest createInitPidRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 68d0823..fc06162 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -47,6 +47,8 @@ import 
org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import org.apache.kafka.common.message.CreateTopicsResponseData
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, 
CreatableTopicResultSet}
+import org.apache.kafka.common.message.DeleteTopicsResponseData
+import 
org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, 
DeletableTopicResultSet}
 import org.apache.kafka.common.message.DescribeGroupsResponseData
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
 import org.apache.kafka.common.message.JoinGroupResponseData
@@ -1565,51 +1567,66 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {
-    val deleteTopicRequest = request.body[DeleteTopicsRequest]
-
-    val unauthorizedTopicErrors = mutable.Map[String, Errors]()
-    val nonExistingTopicErrors = mutable.Map[String, Errors]()
-    val authorizedForDeleteTopics =  mutable.Set[String]()
-
-    for (topic <- deleteTopicRequest.topics.asScala) {
-      if (!authorize(request.session, Delete, Resource(Topic, topic, LITERAL)))
-        unauthorizedTopicErrors += topic -> Errors.TOPIC_AUTHORIZATION_FAILED
-      else if (!metadataCache.contains(topic))
-        nonExistingTopicErrors += topic -> Errors.UNKNOWN_TOPIC_OR_PARTITION
-      else
-        authorizedForDeleteTopics.add(topic)
-    }
-
-    def sendResponseCallback(authorizedTopicErrors: Map[String, Errors]): Unit 
= {
+    def sendResponseCallback(results: DeletableTopicResultSet): Unit = {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val completeResults = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++ authorizedTopicErrors
-        val responseBody = new DeleteTopicsResponse(requestThrottleMs, 
completeResults.asJava)
+        val responseData = new DeleteTopicsResponseData()
+          .setThrottleTimeMs(requestThrottleMs)
+          .setResponses(results)
+        val responseBody = new DeleteTopicsResponse(responseData)
         trace(s"Sending delete topics response $responseBody for correlation 
id ${request.header.correlationId} to client ${request.header.clientId}.")
         responseBody
       }
       sendResponseMaybeThrottle(request, createResponse)
     }
 
+    val deleteTopicRequest = request.body[DeleteTopicsRequest]
+    val results = new 
DeletableTopicResultSet(deleteTopicRequest.data.topicNames.size)
+    val toDelete = mutable.Set[String]()
     if (!controller.isActive) {
-      val results = deleteTopicRequest.topics.asScala.map { topic =>
-        (topic, Errors.NOT_CONTROLLER)
-      }.toMap
+      deleteTopicRequest.data.topicNames.asScala.foreach { case topic =>
+        results.add(new DeletableTopicResult()
+          .setName(topic)
+          .setErrorCode(Errors.NOT_CONTROLLER.code))
+      }
       sendResponseCallback(results)
     } else if (!config.deleteTopicEnable) {
       val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-      val results = deleteTopicRequest.topics.asScala.map { topic =>
-        (topic, error)
-      }.toMap
+      deleteTopicRequest.data.topicNames.asScala.foreach { case topic =>
+        results.add(new DeletableTopicResult()
+          .setName(topic)
+          .setErrorCode(error.code))
+      }
       sendResponseCallback(results)
     } else {
+      deleteTopicRequest.data.topicNames.asScala.foreach { case topic =>
+        results.add(new DeletableTopicResult()
+          .setName(topic))
+      }
+      results.asScala.foreach(topic => {
+         if (!authorize(request.session, Delete, Resource(Topic, topic.name, 
LITERAL))) 
+           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+         else if (!metadataCache.contains(topic.name))
+           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+         else
+           toDelete += topic.name
+      })
       // If no authorized topics return immediately
-      if (authorizedForDeleteTopics.isEmpty)
-        sendResponseCallback(Map())
+      if (toDelete.isEmpty)
+        sendResponseCallback(results)
       else {
+        def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = {
+          errors.foreach {
+            case (topicName, error) =>
+              results.find(topicName)
+                .setErrorCode(error.code)
+          }
+          sendResponseCallback(results)
+        }
+
         adminManager.deleteTopics(
-          deleteTopicRequest.timeout.toInt,
-          authorizedForDeleteTopics,
-          sendResponseCallback
+          deleteTopicRequest.data.timeoutMs.toInt,
+          toDelete,
+          handleDeleteTopicsResults
         )
       }
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 30cc161..11ee87c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.message.{CreateTopicsRequestData, 
DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, 
DeleteTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, 
LeaveGroupRequestData}
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicSet}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -172,8 +172,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => 
resp.responses.asScala.find(_._1 == tp).get._2),
     ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => 
resp.responses.asScala.find(_._1 == tp).get._2),
     ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: 
requests.ControlledShutdownResponse) => resp.error),
-    ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => 
Errors.forCode(resp.data().topics().find(createTopic).errorCode())),
-    ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => 
resp.errors.asScala.find(_._1 == deleteTopic).get._2),
+    ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => 
Errors.forCode(resp.data.topics.find(createTopic).errorCode())),
+    ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => 
Errors.forCode(resp.data.responses.find(deleteTopic).errorCode())),
     ApiKeys.DELETE_RECORDS -> ((resp: requests.DeleteRecordsResponse) => 
resp.responses.get(deleteRecordsPartition).error),
     ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) 
=> resp.responses.get(tp).error),
     ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) =>
@@ -371,7 +371,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
         setName(createTopic).setNumPartitions(1).
           setReplicationFactor(1.toShort)).iterator))).build()
 
-  private def deleteTopicsRequest = new 
DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
+  private def deleteTopicsRequest =
+    new DeleteTopicsRequest.Builder(
+      new DeleteTopicsRequestData()
+        .setTopicNames(Collections.singletonList(deleteTopic))
+        .setTimeoutMs(5000)).build()
 
   private def deleteRecordsRequest = new DeleteRecordsRequest.Builder(5000, 
Collections.singletonMap(deleteRecordsPartition, 0L)).build()
 
@@ -1184,7 +1188,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
     val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
-    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, 
deleteResponse.errors.asScala.head._2)
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, 
deleteResponse.data.responses.find(deleteTopic).errorCode)
   }
 
   @Test
@@ -1194,7 +1198,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
 
-    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, 
deleteResponse.errors.asScala.head._2)
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, 
deleteResponse.data.responses.find(deleteTopic).errorCode)
   }
 
   @Test
@@ -1204,7 +1208,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
 
-    assertEquals(Errors.NONE, deleteResponse.errors.asScala.head._2)
+    assertEquals(Errors.NONE.code, 
deleteResponse.data.responses.find(deleteTopic).errorCode)
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 4388e64..2df7528 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -19,12 +19,15 @@ package kafka.server
 
 import kafka.network.SocketServer
 import kafka.utils._
+import org.apache.kafka.common.message.DeleteTopicsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{DeleteTopicsRequest, 
DeleteTopicsResponse, MetadataRequest, MetadataResponse}
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.JavaConverters._
+import java.util.Collections
+import java.util.Arrays
 
 class DeleteTopicsRequestTest extends BaseRequestTest {
 
@@ -33,20 +36,24 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
     val timeout = 10000
     // Single topic
     createTopic("topic-1", 1, 1)
-    validateValidDeleteTopicRequests(new 
DeleteTopicsRequest.Builder(Set("topic-1").asJava, timeout).build())
+    validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(
+        new DeleteTopicsRequestData()
+          .setTopicNames(Arrays.asList("topic-1"))
+          .setTimeoutMs(timeout)).build())
     // Multi topic
     createTopic("topic-3", 5, 2)
     createTopic("topic-4", 1, 2)
-    validateValidDeleteTopicRequests(new 
DeleteTopicsRequest.Builder(Set("topic-3", "topic-4").asJava, timeout).build())
+    validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(
+        new DeleteTopicsRequestData()
+          .setTopicNames(Arrays.asList("topic-3", "topic-4"))
+          .setTimeoutMs(timeout)).build())
   }
 
   private def validateValidDeleteTopicRequests(request: DeleteTopicsRequest): 
Unit = {
     val response = sendDeleteTopicsRequest(request)
-
-    val error = response.errors.values.asScala.find(_ != Errors.NONE)
-    assertTrue(s"There should be no errors, found ${response.errors.asScala}", 
error.isEmpty)
-
-    request.topics.asScala.foreach { topic =>
+    val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
+    assertTrue(s"There should be no errors, found 
${response.data.responses.asScala}", error.isEmpty)
+    request.data.topicNames.asScala.foreach { topic =>
       validateTopicIsDeleted(topic)
     }
   }
@@ -57,14 +64,18 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
     val timeoutTopic = "invalid-timeout"
 
     // Basic
-    validateErrorDeleteTopicRequests(new 
DeleteTopicsRequest.Builder(Set("invalid-topic").asJava, timeout).build(),
+    validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
+        new DeleteTopicsRequestData()
+          .setTopicNames(Arrays.asList("invalid-topic"))
+          .setTimeoutMs(timeout)).build(),
       Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION))
 
     // Partial
     createTopic("partial-topic-1", 1, 1)
-    validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set(
-      "partial-topic-1",
-      "partial-invalid-topic").asJava, timeout).build(),
+    validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
+        new DeleteTopicsRequestData()
+          .setTopicNames(Arrays.asList("partial-topic-1", 
"partial-invalid-topic"))
+          .setTimeoutMs(timeout)).build(),
       Map(
         "partial-topic-1" -> Errors.NONE,
         "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -74,7 +85,10 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
     // Timeout
     createTopic(timeoutTopic, 5, 2)
     // Must be a 0ms timeout to avoid transient test failures. Even a timeout 
of 1ms has succeeded in the past.
-    validateErrorDeleteTopicRequests(new 
DeleteTopicsRequest.Builder(Set(timeoutTopic).asJava, 0).build(),
+    validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
+        new DeleteTopicsRequestData()
+          .setTopicNames(Arrays.asList(timeoutTopic))
+          .setTimeoutMs(0)).build(),
       Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT))
     // The topic should still get deleted eventually
     TestUtils.waitUntilTrue(() => 
!servers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is 
never deleted")
@@ -83,11 +97,13 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
 
   private def validateErrorDeleteTopicRequests(request: DeleteTopicsRequest, 
expectedResponse: Map[String, Errors]): Unit = {
     val response = sendDeleteTopicsRequest(request)
-    val errors = response.errors.asScala
-    assertEquals("The response size should match", expectedResponse.size, 
response.errors.size)
+    val errors = response.data.responses
+
+    val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2)
+    assertEquals("The response size should match", expectedResponse.size, 
errorCount)
 
     expectedResponse.foreach { case (topic, expectedError) =>
-      assertEquals("The response error should match", expectedResponse(topic), 
errors(topic))
+      assertEquals("The response error should match", 
expectedResponse(topic).code, errors.find(topic).errorCode)
       // If no error validate the topic was deleted
       if (expectedError == Errors.NONE) {
         validateTopicIsDeleted(topic)
@@ -97,11 +113,14 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
 
   @Test
   def testNotController() {
-    val request = new 
DeleteTopicsRequest.Builder(Set("not-controller").asJava, 1000).build()
+    val request = new DeleteTopicsRequest.Builder(
+        new DeleteTopicsRequestData()
+          .setTopicNames(Collections.singletonList("not-controller"))
+          .setTimeoutMs(1000)).build()
     val response = sendDeleteTopicsRequest(request, notControllerSocketServer)
 
-    val error = response.errors.asScala.head._2
-    assertEquals("Expected controller error when routed incorrectly",  
Errors.NOT_CONTROLLER, error)
+    val error = response.data.responses().find("not-controller").errorCode()
+    assertEquals("Expected controller error when routed incorrectly",  
Errors.NOT_CONTROLLER.code, error)
   }
 
   private def validateTopicIsDeleted(topic: String): Unit = {
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
index 20e30c0..7240d77 100644
--- 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
@@ -19,12 +19,13 @@ package kafka.server
 
 import kafka.network.SocketServer
 import kafka.utils._
+import org.apache.kafka.common.message.DeleteTopicsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{DeleteTopicsRequest, 
DeleteTopicsResponse}
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConverters._
+import java.util.Collections
 
 class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
 
@@ -42,13 +43,19 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends 
BaseRequestTest {
   @Test
   def testDeleteRecordsRequest() {
     val topic = "topic-1"
-    val request = new DeleteTopicsRequest.Builder(Set(topic).asJava, 
1000).build()
+    val request = new DeleteTopicsRequest.Builder(
+        new DeleteTopicsRequestData()
+          .setTopicNames(Collections.singletonList(topic))
+          .setTimeoutMs(1000)).build()
     val response = sendDeleteTopicsRequest(request)
-    assertEquals(Errors.TOPIC_DELETION_DISABLED, response.errors.get(topic))
+    assertEquals(Errors.TOPIC_DELETION_DISABLED.code, 
response.data.responses.find(topic).errorCode)
 
-    val v2request = new DeleteTopicsRequest.Builder(Set(topic).asJava, 
1000).build(2)
+    val v2request = new DeleteTopicsRequest.Builder(
+        new DeleteTopicsRequestData()
+        .setTopicNames(Collections.singletonList(topic))
+        .setTimeoutMs(1000)).build(2)
     val v2response = sendDeleteTopicsRequest(v2request)
-    assertEquals(Errors.INVALID_REQUEST, v2response.errors.get(topic))
+    assertEquals(Errors.INVALID_REQUEST.code, 
v2response.data.responses.find(topic).errorCode)
   }
 
   private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, 
socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 22967e2..28f7e07 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,7 +24,7 @@ import kafka.security.auth._
 import kafka.utils.TestUtils
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, 
AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{CreateTopicsRequestData, 
DescribeGroupsRequestData, ElectPreferredLeadersRequestData, 
LeaveGroupRequestData, JoinGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, 
DeleteTopicsRequestData, DescribeGroupsRequestData, 
ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicSet}
@@ -307,7 +307,10 @@ class RequestQuotaTest extends BaseRequestTest {
         }
 
         case ApiKeys.DELETE_TOPICS =>
-          new DeleteTopicsRequest.Builder(Set("topic-2").asJava, 5000)
+          new DeleteTopicsRequest.Builder(
+              new DeleteTopicsRequestData()
+              .setTopicNames(Collections.singletonList("topic-2"))
+              .setTimeoutMs(5000))
 
         case ApiKeys.DELETE_RECORDS =>
           new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: 
java.lang.Long)).asJava)
@@ -469,7 +472,8 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.API_VERSIONS => new 
ApiVersionsResponse(response).throttleTimeMs
       case ApiKeys.CREATE_TOPICS =>
         new CreateTopicsResponse(response, 
ApiKeys.CREATE_TOPICS.latestVersion()).throttleTimeMs
-      case ApiKeys.DELETE_TOPICS => new 
DeleteTopicsResponse(response).throttleTimeMs
+      case ApiKeys.DELETE_TOPICS => 
+        new DeleteTopicsResponse(response, 
ApiKeys.DELETE_TOPICS.latestVersion()).throttleTimeMs
       case ApiKeys.DELETE_RECORDS => new 
DeleteRecordsResponse(response).throttleTimeMs
       case ApiKeys.INIT_PRODUCER_ID => new 
InitProducerIdResponse(response).throttleTimeMs
       case ApiKeys.ADD_PARTITIONS_TO_TXN => new 
AddPartitionsToTxnResponse(response).throttleTimeMs

Reply via email to