http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index def4c85..3a1de51 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -29,21 +32,68 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class CreateTopicsRequest extends AbstractRequest {
     private static final String REQUESTS_KEY_NAME = "create_topic_requests";
 
     private static final String TIMEOUT_KEY_NAME = "timeout";
     private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String NUM_PARTITIONS_KEY_NAME = "num_partitions";
     private static final String REPLICATION_FACTOR_KEY_NAME = 
"replication_factor";
     private static final String REPLICA_ASSIGNMENT_KEY_NAME = 
"replica_assignment";
-    private static final String REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME = 
"partition_id";
     private static final String REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME = 
"replicas";
 
-    private static final String CONFIG_KEY_KEY_NAME = "config_name";
+    private static final String CONFIG_NAME_KEY_NAME = "config_name";
     private static final String CONFIG_VALUE_KEY_NAME = "config_value";
-    private static final String CONFIGS_KEY_NAME = "config_entries";
+    private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries";
+
+    private static final Schema CONFIG_ENTRY = new Schema(
+            new Field(CONFIG_NAME_KEY_NAME, STRING, "Configuration name"),
+            new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING, "Configuration 
value"));
+
+    private static final Schema PARTITION_REPLICA_ASSIGNMENT_ENTRY = new 
Schema(
+            PARTITION_ID,
+            new Field(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, new 
ArrayOf(INT32), "The set of all nodes that should " +
+                    "host this partition. The first replica in the list is the 
preferred leader."));
+
+    private static final Schema SINGLE_CREATE_TOPIC_REQUEST_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(NUM_PARTITIONS_KEY_NAME, INT32, "Number of partitions to 
be created. -1 indicates unset."),
+            new Field(REPLICATION_FACTOR_KEY_NAME, INT16, "Replication factor 
for the topic. -1 indicates unset."),
+            new Field(REPLICA_ASSIGNMENT_KEY_NAME, new 
ArrayOf(PARTITION_REPLICA_ASSIGNMENT_ENTRY),
+                    "Replica assignment among kafka brokers for this topic 
partitions. If this is set num_partitions " +
+                            "and replication_factor must be unset."),
+            new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY), 
"Topic level configuration for topic to be set."));
+
+    private static final Schema SINGLE_CREATE_TOPIC_REQUEST_V1 = 
SINGLE_CREATE_TOPIC_REQUEST_V0;
+
+    private static final Schema CREATE_TOPICS_REQUEST_V0 = new Schema(
+            new Field(REQUESTS_KEY_NAME, new 
ArrayOf(SINGLE_CREATE_TOPIC_REQUEST_V0),
+                    "An array of single topic creation requests. Can not have 
multiple entries for the same topic."),
+            new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a 
topic to be completely created on the " +
+                    "controller node. Values <= 0 will trigger topic creation 
and return immediately"));
+
+    private static final Schema CREATE_TOPICS_REQUEST_V1 = new Schema(
+            new Field(REQUESTS_KEY_NAME, new 
ArrayOf(SINGLE_CREATE_TOPIC_REQUEST_V1), "An array of single " +
+                    "topic creation requests. Can not have multiple entries 
for the same topic."),
+            new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a 
topic to be completely created on the " +
+                    "controller node. Values <= 0 will trigger topic creation 
and return immediately"),
+            new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN, "If this is true, the 
request will be validated, but the " +
+                    "topic won't be created."));
+
+    /* v2 request is the same as v1. Throttle time has been added to the 
response */
+    private static final Schema CREATE_TOPICS_REQUEST_V2 = 
CREATE_TOPICS_REQUEST_V1;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{CREATE_TOPICS_REQUEST_V0, 
CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2};
+    }
 
     public static final class TopicDetails {
         public final int numPartitions;
@@ -157,7 +207,7 @@ public class CreateTopicsRequest extends AbstractRequest {
 
         for (Object requestStructObj : requestStructs) {
             Struct singleRequestStruct = (Struct) requestStructObj;
-            String topic = singleRequestStruct.getString(TOPIC_KEY_NAME);
+            String topic = singleRequestStruct.get(TOPIC_NAME);
 
             if (topics.containsKey(topic))
                 duplicateTopics.add(topic);
@@ -171,7 +221,7 @@ public class CreateTopicsRequest extends AbstractRequest {
             for (Object assignmentStructObj : assignmentsArray) {
                 Struct assignmentStruct = (Struct) assignmentStructObj;
 
-                Integer partitionId = 
assignmentStruct.getInt(REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME);
+                Integer partitionId = assignmentStruct.get(PARTITION_ID);
 
                 Object[] replicasArray = 
assignmentStruct.getArray(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME);
                 List<Integer> replicas = new ArrayList<>(replicasArray.length);
@@ -182,12 +232,12 @@ public class CreateTopicsRequest extends AbstractRequest {
                 partitionReplicaAssignments.put(partitionId, replicas);
             }
 
-            Object[] configArray = 
singleRequestStruct.getArray(CONFIGS_KEY_NAME);
+            Object[] configArray = 
singleRequestStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
             Map<String, String> configs = new HashMap<>(configArray.length);
             for (Object configStructObj : configArray) {
                 Struct configStruct = (Struct) configStructObj;
 
-                String key = configStruct.getString(CONFIG_KEY_KEY_NAME);
+                String key = configStruct.getString(CONFIG_NAME_KEY_NAME);
                 String value = configStruct.getString(CONFIG_VALUE_KEY_NAME);
 
                 configs.put(key, value);
@@ -262,7 +312,7 @@ public class CreateTopicsRequest extends AbstractRequest {
             String topic = entry.getKey();
             TopicDetails args = entry.getValue();
 
-            singleRequestStruct.set(TOPIC_KEY_NAME, topic);
+            singleRequestStruct.set(TOPIC_NAME, topic);
             singleRequestStruct.set(NUM_PARTITIONS_KEY_NAME, 
args.numPartitions);
             singleRequestStruct.set(REPLICATION_FACTOR_KEY_NAME, 
args.replicationFactor);
 
@@ -270,7 +320,7 @@ public class CreateTopicsRequest extends AbstractRequest {
             List<Struct> replicaAssignmentsStructs = new 
ArrayList<>(args.replicasAssignments.size());
             for (Map.Entry<Integer, List<Integer>> partitionReplicaAssignment 
: args.replicasAssignments.entrySet()) {
                 Struct replicaAssignmentStruct = 
singleRequestStruct.instance(REPLICA_ASSIGNMENT_KEY_NAME);
-                
replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME, 
partitionReplicaAssignment.getKey());
+                replicaAssignmentStruct.set(PARTITION_ID, 
partitionReplicaAssignment.getKey());
                 
replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, 
partitionReplicaAssignment.getValue().toArray());
                 replicaAssignmentsStructs.add(replicaAssignmentStruct);
             }
@@ -279,12 +329,12 @@ public class CreateTopicsRequest extends AbstractRequest {
             // configs
             List<Struct> configsStructs = new ArrayList<>(args.configs.size());
             for (Map.Entry<String, String> configEntry : 
args.configs.entrySet()) {
-                Struct configStruct = 
singleRequestStruct.instance(CONFIGS_KEY_NAME);
-                configStruct.set(CONFIG_KEY_KEY_NAME, configEntry.getKey());
+                Struct configStruct = 
singleRequestStruct.instance(CONFIG_ENTRIES_KEY_NAME);
+                configStruct.set(CONFIG_NAME_KEY_NAME, configEntry.getKey());
                 configStruct.set(CONFIG_VALUE_KEY_NAME, 
configEntry.getValue());
                 configsStructs.add(configStruct);
             }
-            singleRequestStruct.set(CONFIGS_KEY_NAME, 
configsStructs.toArray());
+            singleRequestStruct.set(CONFIG_ENTRIES_KEY_NAME, 
configsStructs.toArray());
             createTopicRequestStructs.add(singleRequestStruct);
         }
         struct.set(REQUESTS_KEY_NAME, createTopicRequestStructs.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index c34265d..b3c052b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
 
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -26,9 +29,37 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
 public class CreateTopicsResponse extends AbstractResponse {
     private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
-    private static final String TOPIC_KEY_NAME = "topic";
+
+    private static final Schema TOPIC_ERROR_CODE = new Schema(
+            TOPIC_NAME,
+            ERROR_CODE);
+
+    // Improves on TOPIC_ERROR_CODE by adding an error_message to complement 
the error_code
+    private static final Schema TOPIC_ERROR = new Schema(
+            TOPIC_NAME,
+            ERROR_CODE,
+            ERROR_MESSAGE);
+
+    private static final Schema CREATE_TOPICS_RESPONSE_V0 = new Schema(
+            new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR_CODE), 
"An array of per topic error codes."));
+
+    private static final Schema CREATE_TOPICS_RESPONSE_V1 = new Schema(
+            new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR), "An 
array of per topic errors."));
+
+    private static final Schema CREATE_TOPICS_RESPONSE_V2 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR), "An 
array of per topic errors."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{CREATE_TOPICS_RESPONSE_V0, 
CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2};
+    }
 
     /**
      * Possible error codes:
@@ -62,24 +93,23 @@ public class CreateTopicsResponse extends AbstractResponse {
         Map<String, ApiError> errors = new HashMap<>();
         for (Object topicErrorStructObj : topicErrorStructs) {
             Struct topicErrorStruct = (Struct) topicErrorStructObj;
-            String topic = topicErrorStruct.getString(TOPIC_KEY_NAME);
+            String topic = topicErrorStruct.get(TOPIC_NAME);
             errors.put(topic, new ApiError(topicErrorStruct));
         }
 
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? 
struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
         this.errors = errors;
     }
 
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.CREATE_TOPICS.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
 
         List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
         for (Map.Entry<String, ApiError> topicError : errors.entrySet()) {
             Struct topicErrorsStruct = struct.instance(TOPIC_ERRORS_KEY_NAME);
-            topicErrorsStruct.set(TOPIC_KEY_NAME, topicError.getKey());
+            topicErrorsStruct.set(TOPIC_NAME, topicError.getKey());
             topicError.getValue().write(topicErrorsStruct);
             topicErrorsStructs.add(topicErrorsStruct);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index c05bec6..2d50ea6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.utils.Utils;
@@ -29,10 +32,29 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.kafka.common.protocol.ApiKeys.DELETE_ACLS;
+import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
+import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
+import static 
org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class DeleteAclsRequest extends AbstractRequest {
     private final static String FILTERS = "filters";
 
+    private static final Schema DELETE_ACLS_REQUEST_V0 = new Schema(
+            new Field(FILTERS, new ArrayOf(new Schema(
+                    RESOURCE_TYPE,
+                    RESOURCE_NAME_FILTER,
+                    PRINCIPAL_FILTER,
+                    HOST_FILTER,
+                    OPERATION,
+                    PERMISSION_TYPE))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DELETE_ACLS_REQUEST_V0};
+    }
+
     public static class Builder extends 
AbstractRequest.Builder<DeleteAclsRequest> {
         private final List<AclBindingFilter> filters;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 973aa8e..0857287 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.utils.Utils;
@@ -30,10 +33,42 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.CommonFields.HOST;
+import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
+import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
 public class DeleteAclsResponse extends AbstractResponse {
     public static final Logger log = 
LoggerFactory.getLogger(DeleteAclsResponse.class);
-    private final static String FILTER_RESPONSES = "filter_responses";
-    private final static String MATCHING_ACLS = "matching_acls";
+    private final static String FILTER_RESPONSES_KEY_NAME = "filter_responses";
+    private final static String MATCHING_ACLS_KEY_NAME = "matching_acls";
+
+    private static final Schema MATCHING_ACL = new Schema(
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            RESOURCE_TYPE,
+            RESOURCE_NAME,
+            PRINCIPAL,
+            HOST,
+            OPERATION,
+            PERMISSION_TYPE);
+
+    private static final Schema DELETE_ACLS_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(FILTER_RESPONSES_KEY_NAME,
+                    new ArrayOf(new Schema(
+                            ERROR_CODE,
+                            ERROR_MESSAGE,
+                            new Field(MATCHING_ACLS_KEY_NAME, new 
ArrayOf(MATCHING_ACL), "The matching ACLs")))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DELETE_ACLS_RESPONSE_V0};
+    }
 
     public static class AclDeletionResult {
         private final ApiError error;
@@ -99,13 +134,13 @@ public class DeleteAclsResponse extends AbstractResponse {
     }
 
     public DeleteAclsResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
         this.responses = new ArrayList<>();
-        for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) {
+        for (Object responseStructObj : 
struct.getArray(FILTER_RESPONSES_KEY_NAME)) {
             Struct responseStruct = (Struct) responseStructObj;
             ApiError error = new ApiError(responseStruct);
             List<AclDeletionResult> deletions = new ArrayList<>();
-            for (Object matchingAclStructObj : 
responseStruct.getArray(MATCHING_ACLS)) {
+            for (Object matchingAclStructObj : 
responseStruct.getArray(MATCHING_ACLS_KEY_NAME)) {
                 Struct matchingAclStruct = (Struct) matchingAclStructObj;
                 ApiError matchError = new ApiError(matchingAclStruct);
                 AccessControlEntry entry = 
RequestUtils.aceFromStructFields(matchingAclStruct);
@@ -119,23 +154,23 @@ public class DeleteAclsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.DELETE_ACLS.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         List<Struct> responseStructs = new ArrayList<>();
         for (AclFilterResponse response : responses) {
-            Struct responseStruct = struct.instance(FILTER_RESPONSES);
+            Struct responseStruct = struct.instance(FILTER_RESPONSES_KEY_NAME);
             response.error.write(responseStruct);
             List<Struct> deletionStructs = new ArrayList<>();
             for (AclDeletionResult deletion : response.deletions()) {
-                Struct deletionStruct = responseStruct.instance(MATCHING_ACLS);
+                Struct deletionStruct = 
responseStruct.instance(MATCHING_ACLS_KEY_NAME);
                 deletion.error.write(deletionStruct);
                 
RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct);
                 RequestUtils.aceSetStructFields(deletion.acl().entry(), 
deletionStruct);
                 deletionStructs.add(deletionStruct);
             }
-            responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new 
Struct[0]));
+            responseStruct.set(MATCHING_ACLS_KEY_NAME, 
deletionStructs.toArray(new Struct[0]));
             responseStructs.add(responseStruct);
         }
-        struct.set(FILTER_RESPONSES, responseStructs.toArray());
+        struct.set(FILTER_RESPONSES_KEY_NAME, responseStructs.toArray());
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
index fcd9836..350238e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
@@ -20,14 +20,23 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
 public class DeleteRecordsRequest extends AbstractRequest {
 
     public static final long HIGH_WATERMARK = -1L;
@@ -37,13 +46,28 @@ public class DeleteRecordsRequest extends AbstractRequest {
     private static final String TIMEOUT_KEY_NAME = "timeout";
 
     // topic level key names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
     // partition level key names
-    private static final String PARTITION_KEY_NAME = "partition";
     private static final String OFFSET_KEY_NAME = "offset";
 
+
+    private static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new 
Schema(
+            PARTITION_ID,
+            new Field(OFFSET_KEY_NAME, INT64, "The offset before which the 
messages will be deleted."));
+
+    private static final Schema DELETE_RECORDS_REQUEST_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new 
ArrayOf(DELETE_RECORDS_REQUEST_PARTITION_V0)));
+
+    private static final Schema DELETE_RECORDS_REQUEST_V0 = new Schema(
+            new Field(TOPICS_KEY_NAME, new 
ArrayOf(DELETE_RECORDS_REQUEST_TOPIC_V0)),
+            new Field(TIMEOUT_KEY_NAME, INT32, "The maximum time to await a 
response in ms."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DELETE_RECORDS_REQUEST_V0};
+    }
+
     private final int timeout;
     private final Map<TopicPartition, Long> partitionOffsets;
 
@@ -79,10 +103,10 @@ public class DeleteRecordsRequest extends AbstractRequest {
         partitionOffsets = new HashMap<>();
         for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicStruct = (Struct) topicStructObj;
-            String topic = topicStruct.getString(TOPIC_KEY_NAME);
+            String topic = topicStruct.get(TOPIC_NAME);
             for (Object partitionStructObj : 
topicStruct.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionStruct = (Struct) partitionStructObj;
-                int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                int partition = partitionStruct.get(PARTITION_ID);
                 long offset = partitionStruct.getLong(OFFSET_KEY_NAME);
                 partitionOffsets.put(new TopicPartition(topic, partition), 
offset);
             }
@@ -103,11 +127,11 @@ public class DeleteRecordsRequest extends AbstractRequest 
{
         List<Struct> topicStructArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, Long>> offsetsByTopicEntry : 
offsetsByTopic.entrySet()) {
             Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
-            topicStruct.set(TOPIC_KEY_NAME, offsetsByTopicEntry.getKey());
+            topicStruct.set(TOPIC_NAME, offsetsByTopicEntry.getKey());
             List<Struct> partitionStructArray = new ArrayList<>();
             for (Map.Entry<Integer, Long> offsetsByPartitionEntry : 
offsetsByTopicEntry.getValue().entrySet()) {
                 Struct partitionStruct = 
topicStruct.instance(PARTITIONS_KEY_NAME);
-                partitionStruct.set(PARTITION_KEY_NAME, 
offsetsByPartitionEntry.getKey());
+                partitionStruct.set(PARTITION_ID, 
offsetsByPartitionEntry.getKey());
                 partitionStruct.set(OFFSET_KEY_NAME, 
offsetsByPartitionEntry.getValue());
                 partitionStructArray.add(partitionStruct);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index f19f933..aeea1cd 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -20,14 +20,24 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+
 public class DeleteRecordsResponse extends AbstractResponse {
 
     public static final long INVALID_LOW_WATERMARK = -1L;
@@ -36,13 +46,27 @@ public class DeleteRecordsResponse extends AbstractResponse 
{
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level key names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
     // partition level key names
-    private static final String PARTITION_KEY_NAME = "partition";
     private static final String LOW_WATERMARK_KEY_NAME = "low_watermark";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    private static final Schema DELETE_RECORDS_RESPONSE_PARTITION_V0 = new 
Schema(
+            PARTITION_ID,
+            new Field(LOW_WATERMARK_KEY_NAME, INT64, "Smallest available 
offset of all live replicas"),
+            ERROR_CODE);
+
+    private static final Schema DELETE_RECORDS_RESPONSE_TOPIC_V0 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new 
ArrayOf(DELETE_RECORDS_RESPONSE_PARTITION_V0)));
+
+    private static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field("topics", new 
ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DELETE_RECORDS_RESPONSE_V0};
+    }
 
     private final int throttleTimeMs;
     private final Map<TopicPartition, PartitionResponse> responses;
@@ -81,16 +105,16 @@ public class DeleteRecordsResponse extends 
AbstractResponse {
     }
 
     public DeleteRecordsResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? 
struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
         responses = new HashMap<>();
         for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
             Struct topicStruct = (Struct) topicStructObj;
-            String topic = topicStruct.getString(TOPIC_KEY_NAME);
+            String topic = topicStruct.get(TOPIC_NAME);
             for (Object partitionStructObj : 
topicStruct.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionStruct = (Struct) partitionStructObj;
-                int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                int partition = partitionStruct.get(PARTITION_ID);
                 long lowWatermark = 
partitionStruct.getLong(LOW_WATERMARK_KEY_NAME);
-                Errors error = 
Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+                Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
                 responses.put(new TopicPartition(topic, partition), new 
PartitionResponse(lowWatermark, error));
             }
         }
@@ -107,20 +131,19 @@ public class DeleteRecordsResponse extends 
AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.DELETE_RECORDS.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
         Map<String, Map<Integer, PartitionResponse>> responsesByTopic = 
CollectionUtils.groupDataByTopic(responses);
         List<Struct> topicStructArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, PartitionResponse>> 
responsesByTopicEntry : responsesByTopic.entrySet()) {
             Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
-            topicStruct.set(TOPIC_KEY_NAME, responsesByTopicEntry.getKey());
+            topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey());
             List<Struct> partitionStructArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionResponse> 
responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
                 Struct partitionStruct = 
topicStruct.instance(PARTITIONS_KEY_NAME);
                 PartitionResponse response = 
responsesByPartitionEntry.getValue();
-                partitionStruct.set(PARTITION_KEY_NAME, 
responsesByPartitionEntry.getKey());
+                partitionStruct.set(PARTITION_ID, 
responsesByPartitionEntry.getKey());
                 partitionStruct.set(LOW_WATERMARK_KEY_NAME, 
response.lowWatermark);
-                partitionStruct.set(ERROR_CODE_KEY_NAME, 
response.error.code());
+                partitionStruct.set(ERROR_CODE, response.error.code());
                 partitionStructArray.add(partitionStruct);
             }
             topicStruct.set(PARTITIONS_KEY_NAME, 
partitionStructArray.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index 2ea8c21..4696b50 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -27,10 +30,26 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.common.protocol.types.Type.INT32;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class DeleteTopicsRequest extends AbstractRequest {
     private static final String TOPICS_KEY_NAME = "topics";
     private static final String TIMEOUT_KEY_NAME = "timeout";
 
+    /* DeleteTopic api */
+    private static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema(
+            new Field(TOPICS_KEY_NAME, new ArrayOf(STRING), "An array of 
topics to be deleted."),
+            new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a 
topic to be completely deleted on the " +
+                    "controller node. Values <= 0 will trigger topic deletion 
and return immediately"));
+
+    /* v1 request is the same as v0. Throttle time has been added to the 
response */
+    private static final Schema DELETE_TOPICS_REQUEST_V1 = 
DELETE_TOPICS_REQUEST_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DELETE_TOPICS_REQUEST_V0, 
DELETE_TOPICS_REQUEST_V1};
+    }
+
     private final Set<String> topics;
     private final Integer timeout;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 3f11167..9c84c11 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -26,10 +29,29 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+
 public class DeleteTopicsResponse extends AbstractResponse {
     private static final String TOPIC_ERROR_CODES_KEY_NAME = 
"topic_error_codes";
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    private static final Schema TOPIC_ERROR_CODE = new Schema(
+            TOPIC_NAME,
+            ERROR_CODE);
+
+    private static final Schema DELETE_TOPICS_RESPONSE_V0 = new Schema(
+            new Field(TOPIC_ERROR_CODES_KEY_NAME,
+                    new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic 
error codes."));
+
+    private static final Schema DELETE_TOPICS_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(TOPIC_ERROR_CODES_KEY_NAME, new 
ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DELETE_TOPICS_RESPONSE_V0, 
DELETE_TOPICS_RESPONSE_V1};
+    }
+
 
     /**
      * Possible error codes:
@@ -52,13 +74,13 @@ public class DeleteTopicsResponse extends AbstractResponse {
     }
 
     public DeleteTopicsResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? 
struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
         Object[] topicErrorCodesStructs = 
struct.getArray(TOPIC_ERROR_CODES_KEY_NAME);
         Map<String, Errors> errors = new HashMap<>();
         for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
             Struct topicErrorCodeStruct = (Struct) topicErrorCodeStructObj;
-            String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME);
-            Errors error = 
Errors.forCode(topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME));
+            String topic = topicErrorCodeStruct.get(TOPIC_NAME);
+            Errors error = 
Errors.forCode(topicErrorCodeStruct.get(ERROR_CODE));
             errors.put(topic, error);
         }
 
@@ -68,13 +90,12 @@ public class DeleteTopicsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.DELETE_TOPICS.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
         List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
         for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
             Struct topicErrorCodeStruct = 
struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
-            topicErrorCodeStruct.set(TOPIC_KEY_NAME, topicError.getKey());
-            topicErrorCodeStruct.set(ERROR_CODE_KEY_NAME, 
topicError.getValue().code());
+            topicErrorCodeStruct.set(TOPIC_NAME, topicError.getKey());
+            topicErrorCodeStruct.set(ERROR_CODE, topicError.getValue().code());
             topicErrorCodeStructs.add(topicErrorCodeStruct);
         }
         struct.set(TOPIC_ERROR_CODES_KEY_NAME, 
topicErrorCodeStructs.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 58ce539..1bacac7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -20,13 +20,33 @@ import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.ResourceFilter;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
+import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
+import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
+import static 
org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
+
 public class DescribeAclsRequest extends AbstractRequest {
+    private static final Schema DESCRIBE_ACLS_REQUEST_V0 = new Schema(
+            RESOURCE_TYPE,
+            RESOURCE_NAME_FILTER,
+            PRINCIPAL_FILTER,
+            HOST_FILTER,
+            OPERATION,
+            PERMISSION_TYPE);
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DESCRIBE_ACLS_REQUEST_V0};
+    }
+
     public static class Builder extends 
AbstractRequest.Builder<DescribeAclsRequest> {
         private final AclBindingFilter filter;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index 993a45f..f8b9695 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -20,6 +20,9 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
 
@@ -30,9 +33,38 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.CommonFields.HOST;
+import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
+import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
 public class DescribeAclsResponse extends AbstractResponse {
-    private final static String RESOURCES = "resources";
-    private final static String ACLS = "acls";
+    private final static String RESOURCES_KEY_NAME = "resources";
+    private final static String ACLS_KEY_NAME = "acls";
+
+    private static final Schema DESCRIBE_ACLS_RESOURCE = new Schema(
+            RESOURCE_TYPE,
+            RESOURCE_NAME,
+            new Field(ACLS_KEY_NAME, new ArrayOf(new Schema(
+                    PRINCIPAL,
+                    HOST,
+                    OPERATION,
+                    PERMISSION_TYPE))));
+
+    private static final Schema DESCRIBE_ACLS_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_ACLS_RESOURCE), 
"The resources and their associated ACLs."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DESCRIBE_ACLS_RESPONSE_V0};
+    }
 
     private final int throttleTimeMs;
     private final ApiError error;
@@ -45,13 +77,13 @@ public class DescribeAclsResponse extends AbstractResponse {
     }
 
     public DescribeAclsResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
         this.error = new ApiError(struct);
         this.acls = new ArrayList<>();
-        for (Object resourceStructObj : struct.getArray(RESOURCES)) {
+        for (Object resourceStructObj : struct.getArray(RESOURCES_KEY_NAME)) {
             Struct resourceStruct = (Struct) resourceStructObj;
             Resource resource = 
RequestUtils.resourceFromStructFields(resourceStruct);
-            for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) {
+            for (Object aclDataStructObj : 
resourceStruct.getArray(ACLS_KEY_NAME)) {
                 Struct aclDataStruct = (Struct) aclDataStructObj;
                 AccessControlEntry entry = 
RequestUtils.aceFromStructFields(aclDataStruct);
                 this.acls.add(new AclBinding(resource, entry));
@@ -62,7 +94,7 @@ public class DescribeAclsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         error.write(struct);
 
         Map<Resource, List<AccessControlEntry>> resourceToData = new 
HashMap<>();
@@ -78,18 +110,18 @@ public class DescribeAclsResponse extends AbstractResponse 
{
         List<Struct> resourceStructs = new ArrayList<>();
         for (Map.Entry<Resource, List<AccessControlEntry>> tuple : 
resourceToData.entrySet()) {
             Resource resource = tuple.getKey();
-            Struct resourceStruct = struct.instance(RESOURCES);
+            Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
             RequestUtils.resourceSetStructFields(resource, resourceStruct);
             List<Struct> dataStructs = new ArrayList<>();
             for (AccessControlEntry entry : tuple.getValue()) {
-                Struct dataStruct = resourceStruct.instance(ACLS);
+                Struct dataStruct = resourceStruct.instance(ACLS_KEY_NAME);
                 RequestUtils.aceSetStructFields(entry, dataStruct);
                 dataStructs.add(dataStruct);
             }
-            resourceStruct.set(ACLS, dataStructs.toArray());
+            resourceStruct.set(ACLS_KEY_NAME, dataStructs.toArray());
             resourceStructs.add(resourceStruct);
         }
-        struct.set(RESOURCES, resourceStructs.toArray());
+        struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray());
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
index 64fae0e..74e25f4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -17,6 +17,9 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,6 +30,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class DescribeConfigsRequest extends AbstractRequest {
 
     private static final String RESOURCES_KEY_NAME = "resources";
@@ -34,6 +40,18 @@ public class DescribeConfigsRequest extends AbstractRequest {
     private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
     private static final String CONFIG_NAMES_KEY_NAME = "config_names";
 
+    private static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new 
Schema(
+            new Field(RESOURCE_TYPE_KEY_NAME, INT8),
+            new Field(RESOURCE_NAME_KEY_NAME, STRING),
+            new Field(CONFIG_NAMES_KEY_NAME, ArrayOf.nullable(STRING)));
+
+    private static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema(
+            new Field(RESOURCES_KEY_NAME, new 
ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to 
be returned."));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0};
+    }
+
     public static class Builder extends AbstractRequest.Builder {
         private final Map<Resource, Collection<String>> resourceToConfigNames;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 8694e1f..9b2289d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -18,6 +18,9 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,6 +30,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class DescribeConfigsResponse extends AbstractResponse {
 
     private static final String RESOURCES_KEY_NAME = "resources";
@@ -36,11 +47,31 @@ public class DescribeConfigsResponse extends 
AbstractResponse {
 
     private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries";
 
-    private static final String CONFIG_NAME = "config_name";
-    private static final String CONFIG_VALUE = "config_value";
-    private static final String IS_SENSITIVE = "is_sensitive";
-    private static final String IS_DEFAULT = "is_default";
-    private static final String READ_ONLY = "read_only";
+    private static final String CONFIG_NAME_KEY_NAME = "config_name";
+    private static final String CONFIG_VALUE_KEY_NAME = "config_value";
+    private static final String IS_SENSITIVE_KEY_NAME = "is_sensitive";
+    private static final String IS_DEFAULT_KEY_NAME = "is_default";
+    private static final String READ_ONLY_KEY_NAME = "read_only";
+
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new 
Schema(
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            new Field(RESOURCE_TYPE_KEY_NAME, INT8),
+            new Field(RESOURCE_NAME_KEY_NAME, STRING),
+            new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(new Schema(
+                    new Field(CONFIG_NAME_KEY_NAME, STRING),
+                    new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
+                    new Field(READ_ONLY_KEY_NAME, BOOLEAN),
+                    new Field(IS_DEFAULT_KEY_NAME, BOOLEAN),
+                    new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN)))));
+
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(RESOURCES_KEY_NAME, new 
ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0};
+    }
 
     public static class Config {
         private final ApiError error;
@@ -105,7 +136,7 @@ public class DescribeConfigsResponse extends 
AbstractResponse {
     }
 
     public DescribeConfigsResponse(Struct struct) {
-        throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        throttleTimeMs = struct.get(THROTTLE_TIME_MS);
         Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
         configs = new HashMap<>(resourcesArray.length);
         for (Object resourceObj : resourcesArray) {
@@ -120,11 +151,11 @@ public class DescribeConfigsResponse extends 
AbstractResponse {
             List<ConfigEntry> configEntries = new 
ArrayList<>(configEntriesArray.length);
             for (Object configEntriesObj: configEntriesArray) {
                 Struct configEntriesStruct = (Struct) configEntriesObj;
-                String configName = configEntriesStruct.getString(CONFIG_NAME);
-                String configValue = 
configEntriesStruct.getString(CONFIG_VALUE);
-                boolean isSensitive = 
configEntriesStruct.getBoolean(IS_SENSITIVE);
-                boolean isDefault = configEntriesStruct.getBoolean(IS_DEFAULT);
-                boolean readOnly = configEntriesStruct.getBoolean(READ_ONLY);
+                String configName = 
configEntriesStruct.getString(CONFIG_NAME_KEY_NAME);
+                String configValue = 
configEntriesStruct.getString(CONFIG_VALUE_KEY_NAME);
+                boolean isSensitive = 
configEntriesStruct.getBoolean(IS_SENSITIVE_KEY_NAME);
+                boolean isDefault = 
configEntriesStruct.getBoolean(IS_DEFAULT_KEY_NAME);
+                boolean readOnly = 
configEntriesStruct.getBoolean(READ_ONLY_KEY_NAME);
                 configEntries.add(new ConfigEntry(configName, configValue, 
isSensitive, isDefault, readOnly));
             }
             Config config = new Config(error, configEntries);
@@ -147,7 +178,7 @@ public class DescribeConfigsResponse extends 
AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.DESCRIBE_CONFIGS.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         List<Struct> resourceStructs = new ArrayList<>(configs.size());
         for (Map.Entry<Resource, Config> entry : configs.entrySet()) {
             Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
@@ -162,11 +193,11 @@ public class DescribeConfigsResponse extends 
AbstractResponse {
             List<Struct> configEntryStructs = new 
ArrayList<>(config.entries.size());
             for (ConfigEntry configEntry : config.entries) {
                 Struct configEntriesStruct = 
resourceStruct.instance(CONFIG_ENTRIES_KEY_NAME);
-                configEntriesStruct.set(CONFIG_NAME, configEntry.name);
-                configEntriesStruct.set(CONFIG_VALUE, configEntry.value);
-                configEntriesStruct.set(IS_SENSITIVE, configEntry.isSensitive);
-                configEntriesStruct.set(IS_DEFAULT, configEntry.isDefault);
-                configEntriesStruct.set(READ_ONLY, configEntry.readOnly);
+                configEntriesStruct.set(CONFIG_NAME_KEY_NAME, 
configEntry.name);
+                configEntriesStruct.set(CONFIG_VALUE_KEY_NAME, 
configEntry.value);
+                configEntriesStruct.set(IS_SENSITIVE_KEY_NAME, 
configEntry.isSensitive);
+                configEntriesStruct.set(IS_DEFAULT_KEY_NAME, 
configEntry.isDefault);
+                configEntriesStruct.set(READ_ONLY_KEY_NAME, 
configEntry.readOnly);
                 configEntryStructs.add(configEntriesStruct);
             }
             resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, 
configEntryStructs.toArray(new Struct[0]));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index b43e254..56117da 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -25,9 +28,23 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class DescribeGroupsRequest extends AbstractRequest {
     private static final String GROUP_IDS_KEY_NAME = "group_ids";
 
+    /* Describe group api */
+    private static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(
+            new Field(GROUP_IDS_KEY_NAME, new ArrayOf(STRING), "List of 
groupIds to request metadata for (an " +
+                    "empty groupId array will return empty group metadata)."));
+
+    /* v1 request is the same as v0. Throttle time has been added to response 
*/
+    private static final Schema DESCRIBE_GROUPS_REQUEST_V1 = 
DESCRIBE_GROUPS_REQUEST_V0;
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DESCRIBE_GROUPS_REQUEST_V0, 
DESCRIBE_GROUPS_REQUEST_V1};
+    }
+
     public static class Builder extends 
AbstractRequest.Builder<DescribeGroupsRequest> {
         private final List<String> groupIds;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 0e1d6bd..9241165 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -18,6 +18,9 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,11 +30,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class DescribeGroupsResponse extends AbstractResponse {
 
     private static final String GROUPS_KEY_NAME = "groups";
 
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String GROUP_STATE_KEY_NAME = "state";
     private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
@@ -44,6 +51,36 @@ public class DescribeGroupsResponse extends AbstractResponse 
{
     private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
     private static final String MEMBER_ASSIGNMENT_KEY_NAME = 
"member_assignment";
 
+    private static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new 
Schema(
+            new Field(MEMBER_ID_KEY_NAME, STRING, "The memberId assigned by 
the coordinator"),
+            new Field(CLIENT_ID_KEY_NAME, STRING, "The client id used in the 
member's latest join group request"),
+            new Field(CLIENT_HOST_KEY_NAME, STRING, "The client host used in 
the request session corresponding to the " +
+                    "member's join group."),
+            new Field(MEMBER_METADATA_KEY_NAME, BYTES, "The metadata 
corresponding to the current group protocol in " +
+                    "use (will only be present if the group is stable)."),
+            new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "The current 
assignment provided by the group leader " +
+                    "(will only be present if the group is stable)."));
+
+    private static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = 
new Schema(
+            ERROR_CODE,
+            new Field(GROUP_ID_KEY_NAME, STRING),
+            new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the 
group (one of: Dead, Stable, AwaitingSync, " +
+                    "PreparingRebalance, or empty if there is no active 
group)"),
+            new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group 
protocol type (will be empty if there is no active group)"),
+            new Field(PROTOCOL_KEY_NAME, STRING, "The current group protocol 
(only provided if the group is Stable)"),
+            new Field(MEMBERS_KEY_NAME, new 
ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), "Current group members " +
+                    "(only provided if the group is not Dead)"));
+
+    private static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(
+            new Field(GROUPS_KEY_NAME, new 
ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
+    private static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(GROUPS_KEY_NAME, new 
ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, 
DESCRIBE_GROUPS_RESPONSE_V1};
+    }
+
     public static final String UNKNOWN_STATE = "";
     public static final String UNKNOWN_PROTOCOL_TYPE = "";
     public static final String UNKNOWN_PROTOCOL = "";
@@ -70,13 +107,13 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
     }
 
     public DescribeGroupsResponse(Struct struct) {
-        this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? 
struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
         this.groups = new HashMap<>();
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
             Struct groupStruct = (Struct) groupObj;
 
             String groupId = groupStruct.getString(GROUP_ID_KEY_NAME);
-            Errors error = 
Errors.forCode(groupStruct.getShort(ERROR_CODE_KEY_NAME));
+            Errors error = Errors.forCode(groupStruct.get(ERROR_CODE));
             String state = groupStruct.getString(GROUP_STATE_KEY_NAME);
             String protocolType = 
groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
             String protocol = groupStruct.getString(PROTOCOL_KEY_NAME);
@@ -209,15 +246,14 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.DESCRIBE_GROUPS.responseSchema(version));
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
 
         List<Struct> groupStructs = new ArrayList<>();
         for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
             Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
             GroupMetadata group = groupEntry.getValue();
             groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey());
-            groupStruct.set(ERROR_CODE_KEY_NAME, group.error.code());
+            groupStruct.set(ERROR_CODE, group.error.code());
             groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
             groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
             groupStruct.set(PROTOCOL_KEY_NAME, group.protocol);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
index 338d684..0169da5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java
@@ -19,8 +19,12 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -29,6 +33,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.INT32;
 
 public class DescribeLogDirsRequest extends AbstractRequest {
 
@@ -36,9 +42,17 @@ public class DescribeLogDirsRequest extends AbstractRequest {
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level key names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
+    private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V0 = new Schema(
+            new Field("topics", ArrayOf.nullable(new Schema(
+                    TOPIC_NAME,
+                    new Field("partitions", new ArrayOf(INT32), "List of 
partition ids of the topic.")))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DESCRIBE_LOG_DIRS_REQUEST_V0};
+    }
+
     private final Set<TopicPartition> topicPartitions;
 
     public static class Builder extends 
AbstractRequest.Builder<DescribeLogDirsRequest> {
@@ -75,7 +89,7 @@ public class DescribeLogDirsRequest extends AbstractRequest {
             topicPartitions = new HashSet<>();
             for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
                 Struct topicStruct = (Struct) topicStructObj;
-                String topic = topicStruct.getString(TOPIC_KEY_NAME);
+                String topic = topicStruct.get(TOPIC_NAME);
                 for (Object partitionObj : 
topicStruct.getArray(PARTITIONS_KEY_NAME)) {
                     int partition = (Integer) partitionObj;
                     topicPartitions.add(new TopicPartition(topic, partition));
@@ -109,7 +123,7 @@ public class DescribeLogDirsRequest extends AbstractRequest 
{
         List<Struct> topicStructArray = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> partitionsByTopicEntry : 
partitionsByTopic.entrySet()) {
             Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
-            topicStruct.set(TOPIC_KEY_NAME, partitionsByTopicEntry.getKey());
+            topicStruct.set(TOPIC_NAME, partitionsByTopicEntry.getKey());
             topicStruct.set(PARTITIONS_KEY_NAME, 
partitionsByTopicEntry.getValue().toArray());
             topicStructArray.add(topicStruct);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
index f6b31ae..e35056e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
@@ -20,14 +20,26 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 
 public class DescribeLogDirsResponse extends AbstractResponse {
 
@@ -37,40 +49,58 @@ public class DescribeLogDirsResponse extends 
AbstractResponse {
     private static final String LOG_DIRS_KEY_NAME = "log_dirs";
 
     // dir level key names
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String LOG_DIR_KEY_NAME = "log_dir";
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level key names
-    private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
     // partition level key names
-    private static final String PARTITION_KEY_NAME = "partition";
     private static final String SIZE_KEY_NAME = "size";
     private static final String OFFSET_LAG_KEY_NAME = "offset_lag";
     private static final String IS_FUTURE_KEY_NAME = "is_future";
 
+    private static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field("log_dirs", new ArrayOf(new Schema(
+                    ERROR_CODE,
+                    new Field("log_dir", STRING, "The absolute log directory 
path."),
+                    new Field("topics", new ArrayOf(new Schema(
+                            TOPIC_NAME,
+                            new Field("partitions", new ArrayOf(new Schema(
+                                    PARTITION_ID,
+                                    new Field("size", INT64, "The size of the 
log segments of the partition in bytes."),
+                                    new Field("offset_lag", INT64, "The lag of 
the log's LEO w.r.t. partition's HW " +
+                                            "(if it is the current log for the 
partition) or current replica's LEO " +
+                                            "(if it is the future log for the 
partition)"),
+                                    new Field("is_future", BOOLEAN, "True if 
this log is created by " +
+                                            "AlterReplicaDirRequest and will 
replace the current log of the replica " +
+                                            "in the future.")))))))))));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{DESCRIBE_LOG_DIRS_RESPONSE_V0};
+    }
+
     private final int throttleTimeMs;
     private final Map<String, LogDirInfo> logDirInfos;
 
     public DescribeLogDirsResponse(Struct struct) {
-        throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        throttleTimeMs = struct.get(THROTTLE_TIME_MS);
         logDirInfos = new HashMap<>();
 
         for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
             Struct logDirStruct = (Struct) logDirStructObj;
-            Errors error = 
Errors.forCode(logDirStruct.getShort(ERROR_CODE_KEY_NAME));
+            Errors error = Errors.forCode(logDirStruct.get(ERROR_CODE));
             String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
             Map<TopicPartition, ReplicaInfo> replicaInfos = new HashMap<>();
 
             for (Object topicStructObj : 
logDirStruct.getArray(TOPICS_KEY_NAME)) {
                 Struct topicStruct = (Struct) topicStructObj;
-                String topic = topicStruct.getString(TOPIC_KEY_NAME);
+                String topic = topicStruct.get(TOPIC_NAME);
 
                 for (Object partitionStructObj : 
topicStruct.getArray(PARTITIONS_KEY_NAME)) {
                     Struct partitionStruct = (Struct) partitionStructObj;
-                    int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                    int partition = partitionStruct.get(PARTITION_ID);
                     long size = partitionStruct.getLong(SIZE_KEY_NAME);
                     long offsetLag = 
partitionStruct.getLong(OFFSET_LAG_KEY_NAME);
                     boolean isFuture = 
partitionStruct.getBoolean(IS_FUTURE_KEY_NAME);
@@ -94,25 +124,25 @@ public class DescribeLogDirsResponse extends 
AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
         List<Struct> logDirStructArray = new ArrayList<>();
         for (Map.Entry<String, LogDirInfo> logDirInfosEntry : 
logDirInfos.entrySet()) {
             LogDirInfo logDirInfo = logDirInfosEntry.getValue();
             Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
-            logDirStruct.set(ERROR_CODE_KEY_NAME, logDirInfo.error.code());
+            logDirStruct.set(ERROR_CODE, logDirInfo.error.code());
             logDirStruct.set(LOG_DIR_KEY_NAME, logDirInfosEntry.getKey());
 
             Map<String, Map<Integer, ReplicaInfo>> replicaInfosByTopic = 
CollectionUtils.groupDataByTopic(logDirInfo.replicaInfos);
             List<Struct> topicStructArray = new ArrayList<>();
             for (Map.Entry<String, Map<Integer, ReplicaInfo>> 
replicaInfosByTopicEntry : replicaInfosByTopic.entrySet()) {
                 Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
-                topicStruct.set(TOPIC_KEY_NAME, 
replicaInfosByTopicEntry.getKey());
+                topicStruct.set(TOPIC_NAME, replicaInfosByTopicEntry.getKey());
                 List<Struct> partitionStructArray = new ArrayList<>();
 
                 for (Map.Entry<Integer, ReplicaInfo> 
replicaInfosByPartitionEntry : replicaInfosByTopicEntry.getValue().entrySet()) {
                     Struct partitionStruct = 
topicStruct.instance(PARTITIONS_KEY_NAME);
                     ReplicaInfo replicaInfo = 
replicaInfosByPartitionEntry.getValue();
-                    partitionStruct.set(PARTITION_KEY_NAME, 
replicaInfosByPartitionEntry.getKey());
+                    partitionStruct.set(PARTITION_ID, 
replicaInfosByPartitionEntry.getKey());
                     partitionStruct.set(SIZE_KEY_NAME, replicaInfo.size);
                     partitionStruct.set(OFFSET_LAG_KEY_NAME, 
replicaInfo.offsetLag);
                     partitionStruct.set(IS_FUTURE_KEY_NAME, 
replicaInfo.isFuture);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index 01d73b2..243e9f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -18,16 +18,33 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
+import static org.apache.kafka.common.protocol.types.Type.INT16;
+import static org.apache.kafka.common.protocol.types.Type.INT64;
+import static org.apache.kafka.common.protocol.types.Type.STRING;
+
 public class EndTxnRequest extends AbstractRequest {
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
     private static final String TRANSACTION_RESULT_KEY_NAME = 
"transaction_result";
 
+    private static final Schema END_TXN_REQUEST_V0 = new Schema(
+            new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id 
corresponding to the transaction."),
+            new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use 
by the transactional id."),
+            new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch 
associated with the producer id."),
+            new Field(TRANSACTION_RESULT_KEY_NAME, BOOLEAN, "The result of the 
transaction (0 = ABORT, 1 = COMMIT)"));
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{END_TXN_REQUEST_V0};
+    }
+
     public static class Builder extends AbstractRequest.Builder<EndTxnRequest> 
{
         private final String transactionalId;
         private final long producerId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index 47a6623..a3bae58 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -18,12 +18,22 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+
 public class EndTxnResponse extends AbstractResponse {
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final Schema END_TXN_RESPONSE_V0 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE);
+
+    public static Schema[] schemaVersions() {
+        return new Schema[]{END_TXN_RESPONSE_V0};
+    }
 
     // Possible error codes:
     //   NotCoordinator
@@ -43,8 +53,8 @@ public class EndTxnResponse extends AbstractResponse {
     }
 
     public EndTxnResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
-        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
+        this.error = Errors.forCode(struct.get(ERROR_CODE));
     }
 
     public int throttleTimeMs() {
@@ -58,8 +68,8 @@ public class EndTxnResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.END_TXN.responseSchema(version));
-        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(ERROR_CODE, error.code());
         return struct;
     }
 

Reply via email to