http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 6dd1197..bf14f10 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -124,17 +124,15 @@ public class OffsetCommitRequest extends AbstractRequest {
         }
 
         @Override
-        public OffsetCommitRequest build() {
-            short version = version();
+        public OffsetCommitRequest build(short version) {
             switch (version) {
                 case 0:
-                    return new OffsetCommitRequest(groupId, offsetData);
+                    return new OffsetCommitRequest(groupId, 
DEFAULT_GENERATION_ID, DEFAULT_MEMBER_ID,
+                            DEFAULT_RETENTION_TIME, offsetData, version);
                 case 1:
-                    return new OffsetCommitRequest(groupId, generationId, 
memberId,
-                            offsetData);
                 case 2:
-                    return new OffsetCommitRequest(groupId, generationId, 
memberId,
-                            retentionTime, offsetData, version);
+                    long retentionTime = version == 1 ? DEFAULT_RETENTION_TIME 
: this.retentionTime;
+                    return new OffsetCommitRequest(groupId, generationId, 
memberId, retentionTime, offsetData, version);
                 default:
                     throw new UnsupportedVersionException("Unsupported version 
" + version);
             }
@@ -154,57 +152,9 @@ public class OffsetCommitRequest extends AbstractRequest {
         }
     }
 
-    /**
-     * Constructor for version 0.
-     * @param groupId
-     * @param offsetData
-     */
-    private OffsetCommitRequest(String groupId, Map<TopicPartition, 
PartitionData> offsetData) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 
0)), (short) 0);
-
-        initCommonFields(groupId, offsetData);
-        this.groupId = groupId;
-        this.generationId = DEFAULT_GENERATION_ID;
-        this.memberId = DEFAULT_MEMBER_ID;
-        this.retentionTime = DEFAULT_RETENTION_TIME;
-        this.offsetData = offsetData;
-    }
-
-    /**
-     * Constructor for version 1.
-     * @param groupId
-     * @param generationId
-     * @param memberId
-     * @param offsetData
-     */
-    private OffsetCommitRequest(String groupId, int generationId, String 
memberId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 
1)), (short) 1);
-
-        initCommonFields(groupId, offsetData);
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
-        this.groupId = groupId;
-        this.generationId = generationId;
-        this.memberId = memberId;
-        this.retentionTime = DEFAULT_RETENTION_TIME;
-        this.offsetData = offsetData;
-    }
-
-    /**
-     * Constructor for version 2 and above.
-     * @param groupId
-     * @param generationId
-     * @param memberId
-     * @param retentionTime
-     * @param offsetData
-     */
     private OffsetCommitRequest(String groupId, int generationId, String 
memberId, long retentionTime,
                                 Map<TopicPartition, PartitionData> offsetData, 
short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 
version)), version);
-        initCommonFields(groupId, offsetData);
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
-        struct.set(RETENTION_TIME_KEY_NAME, retentionTime);
+        super(version);
         this.groupId = groupId;
         this.generationId = generationId;
         this.memberId = memberId;
@@ -212,35 +162,8 @@ public class OffsetCommitRequest extends AbstractRequest {
         this.offsetData = offsetData;
     }
 
-    private void initCommonFields(String groupId, Map<TopicPartition, 
PartitionData> offsetData) {
-        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(offsetData);
-
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        List<Struct> topicArray = new ArrayList<Struct>();
-
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: 
topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(COMMIT_OFFSET_KEY_NAME, 
fetchPartitionData.offset);
-                // Only for v1
-                if (partitionData.hasField(TIMESTAMP_KEY_NAME))
-                    partitionData.set(TIMESTAMP_KEY_NAME, 
fetchPartitionData.timestamp);
-                partitionData.set(METADATA_KEY_NAME, 
fetchPartitionData.metadata);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
-    }
-
     public OffsetCommitRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+        super(versionId);
 
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         // This field only exists in v1.
@@ -284,6 +207,42 @@ public class OffsetCommitRequest extends AbstractRequest {
     }
 
     @Override
+    public Struct toStruct() {
+        short version = version();
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, version));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+
+        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(offsetData);
+        List<Struct> topicArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: 
topicsData.entrySet()) {
+            Struct topicData = struct.instance(TOPICS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.getValue().entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(COMMIT_OFFSET_KEY_NAME, 
fetchPartitionData.offset);
+                // Only for v1
+                if (partitionData.hasField(TIMESTAMP_KEY_NAME))
+                    partitionData.set(TIMESTAMP_KEY_NAME, 
fetchPartitionData.timestamp);
+                partitionData.set(METADATA_KEY_NAME, 
fetchPartitionData.metadata);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        if (struct.hasField(GENERATION_ID_KEY_NAME))
+            struct.set(GENERATION_ID_KEY_NAME, generationId);
+        if (struct.hasField(MEMBER_ID_KEY_NAME))
+            struct.set(MEMBER_ID_KEY_NAME, memberId);
+        if (struct.hasField(RETENTION_TIME_KEY_NAME))
+            struct.set(RETENTION_TIME_KEY_NAME, retentionTime);
+        return struct;
+    }
+
+    @Override
     public AbstractResponse getErrorResponse(Throwable e) {
         Map<TopicPartition, Errors> responseData = new HashMap<>();
         for (Map.Entry<TopicPartition, PartitionData> entry: 
offsetData.entrySet()) {
@@ -322,12 +281,8 @@ public class OffsetCommitRequest extends AbstractRequest {
         return offsetData;
     }
 
-    public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
+    public static OffsetCommitRequest parse(ByteBuffer buffer, short 
versionId) {
         Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 
versionId);
-        return new OffsetCommitRequest(schema.read(buffer), (short) versionId);
-    }
-
-    public static OffsetCommitRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.OFFSET_COMMIT.id));
+        return new OffsetCommitRequest(schema.read(buffer), versionId);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 8a00c6b..b5709e2 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -16,7 +16,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -28,7 +27,6 @@ import java.util.Map;
 
 public class OffsetCommitResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level fields
@@ -58,15 +56,33 @@ public class OffsetCommitResponse extends AbstractResponse {
     private final Map<TopicPartition, Errors> responseData;
 
     public OffsetCommitResponse(Map<TopicPartition, Errors> responseData) {
-        super(new Struct(CURRENT_SCHEMA));
+        this.responseData = responseData;
+    }
 
-        Map<String, Map<Integer, Errors>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
+    public OffsetCommitResponse(Struct struct) {
+        responseData = new HashMap<>();
+        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
+            Struct topicResponse = (Struct) topicResponseObj;
+            String topic = topicResponse.getString(TOPIC_KEY_NAME);
+            for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionResponse = (Struct) partitionResponseObj;
+                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
+                Errors error = 
Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME));
+                responseData.put(new TopicPartition(topic, partition), error);
+            }
+        }
+    }
+
+    @Override
+    public Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_COMMIT.id, version));
 
-        List<Struct> topicArray = new ArrayList<Struct>();
+        Map<String, Map<Integer, Errors>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
+        List<Struct> topicArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, Errors>> entries: 
topicsData.entrySet()) {
             Struct topicData = struct.instance(RESPONSES_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
+            List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, Errors> partitionEntry : 
entries.getValue().entrySet()) {
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
@@ -77,34 +93,16 @@ public class OffsetCommitResponse extends AbstractResponse {
             topicArray.add(topicData);
         }
         struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-        this.responseData = responseData;
-    }
 
-    public OffsetCommitResponse(Struct struct) {
-        super(struct);
-        responseData = new HashMap<TopicPartition, Errors>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                Errors error = 
Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME));
-                responseData.put(new TopicPartition(topic, partition), error);
-            }
-        }
+        return struct;
     }
 
     public Map<TopicPartition, Errors> responseData() {
         return responseData;
     }
 
-    public static OffsetCommitResponse parse(ByteBuffer buffer, int version) {
-        Schema schema = ProtoUtils.responseSchema(ApiKeys.OFFSET_COMMIT.id, 
version);
-        return new OffsetCommitResponse(schema.read(buffer));
+    public static OffsetCommitResponse parse(ByteBuffer buffer, short version) 
{
+        return new 
OffsetCommitResponse(ProtoUtils.parseResponse(ApiKeys.OFFSET_COMMIT.id, 
version, buffer));
     }
 
-    public static OffsetCommitResponse parse(ByteBuffer buffer) {
-        return new OffsetCommitResponse(CURRENT_SCHEMA.read(buffer));
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 43ddf88..2a550e5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -58,11 +58,11 @@ public class OffsetFetchRequest extends AbstractRequest {
         }
 
         @Override
-        public OffsetFetchRequest build() {
-            if (isAllTopicPartitions() && version() < 2)
+        public OffsetFetchRequest build(short version) {
+            if (isAllTopicPartitions() && version < 2)
                 throw new UnsupportedVersionException("The broker only 
supports OffsetFetchRequest " +
-                        "v" + version() + ", but we need v2 or newer to 
request all topic partitions.");
-            return new OffsetFetchRequest(groupId, partitions, version());
+                        "v" + version + ", but we need v2 or newer to request 
all topic partitions.");
+            return new OffsetFetchRequest(groupId, partitions, version);
         }
 
         @Override
@@ -80,39 +80,18 @@ public class OffsetFetchRequest extends AbstractRequest {
     private final List<TopicPartition> partitions;
 
     public static OffsetFetchRequest forAllPartitions(String groupId) {
-        return new OffsetFetchRequest.Builder(groupId, 
null).setVersion((short) 2).build();
+        return new OffsetFetchRequest.Builder(groupId, null).build((short) 2);
     }
 
     // v0, v1, and v2 have the same fields.
     private OffsetFetchRequest(String groupId, List<TopicPartition> 
partitions, short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_FETCH.id, 
version)), version);
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        if (partitions != null) {
-            Map<String, List<Integer>> topicsData = 
CollectionUtils.groupDataByTopic(partitions);
-
-            List<Struct> topicArray = new ArrayList<>();
-            for (Map.Entry<String, List<Integer>> entries : 
topicsData.entrySet()) {
-                Struct topicData = struct.instance(TOPICS_KEY_NAME);
-                topicData.set(TOPIC_KEY_NAME, entries.getKey());
-                List<Struct> partitionArray = new ArrayList<>();
-                for (Integer partitionId : entries.getValue()) {
-                    Struct partitionData = 
topicData.instance(PARTITIONS_KEY_NAME);
-                    partitionData.set(PARTITION_KEY_NAME, partitionId);
-                    partitionArray.add(partitionData);
-                }
-                topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-                topicArray.add(topicData);
-            }
-            struct.set(TOPICS_KEY_NAME, topicArray.toArray());
-        } else
-            struct.set(TOPICS_KEY_NAME, null);
-
+        super(version);
         this.groupId = groupId;
         this.partitions = partitions;
     }
 
-    public OffsetFetchRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+    public OffsetFetchRequest(Struct struct, short version) {
+        super(version);
 
         Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
         if (topicArray != null) {
@@ -150,7 +129,7 @@ public class OffsetFetchRequest extends AbstractRequest {
             case 0:
             case 1:
             case 2:
-                return new OffsetFetchResponse(error, responsePartitions, 
versionId);
+                return new OffsetFetchResponse(error, responsePartitions);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id)));
@@ -170,16 +149,38 @@ public class OffsetFetchRequest extends AbstractRequest {
         return partitions;
     }
 
-    public static OffsetFetchRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, 
buffer),
-                (short) versionId);
-    }
-
-    public static OffsetFetchRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id));
+    public static OffsetFetchRequest parse(ByteBuffer buffer, short versionId) 
{
+        return new 
OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, 
buffer), versionId);
     }
 
     public boolean isAllPartitions() {
         return partitions == null;
     }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_FETCH.id, version()));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        if (partitions != null) {
+            Map<String, List<Integer>> topicsData = 
CollectionUtils.groupDataByTopic(partitions);
+
+            List<Struct> topicArray = new ArrayList<>();
+            for (Map.Entry<String, List<Integer>> entries : 
topicsData.entrySet()) {
+                Struct topicData = struct.instance(TOPICS_KEY_NAME);
+                topicData.set(TOPIC_KEY_NAME, entries.getKey());
+                List<Struct> partitionArray = new ArrayList<>();
+                for (Integer partitionId : entries.getValue()) {
+                    Struct partitionData = 
topicData.instance(PARTITIONS_KEY_NAME);
+                    partitionData.set(PARTITION_KEY_NAME, partitionId);
+                    partitionArray.add(partitionData);
+                }
+                topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+                topicArray.add(topicData);
+            }
+            struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        } else
+            struct.set(TOPICS_KEY_NAME, null);
+
+        return struct;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 9c14155..94de4b1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -24,14 +24,11 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
 public class OffsetFetchResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
-    private static final short CURRENT_VERSION = 
ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id);
     private static final String RESPONSES_KEY_NAME = "responses";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
@@ -85,51 +82,16 @@ public class OffsetFetchResponse extends AbstractResponse {
     }
 
     /**
-     * Constructor for the latest version.
-     * @param error Potential coordinator or group level error code
-     * @param responseData Fetched offset information grouped by 
topic-partition
-     */
-    public OffsetFetchResponse(Errors error, Map<TopicPartition, 
PartitionData> responseData) {
-        this(error, responseData, CURRENT_VERSION);
-    }
-
-    /**
-     * Unified constructor for all versions.
+     * Constructor for all versions.
      * @param error Potential coordinator or group level error code (for api 
version 2 and later)
      * @param responseData Fetched offset information grouped by 
topic-partition
-     * @param version The request API version
      */
-    public OffsetFetchResponse(Errors error, Map<TopicPartition, 
PartitionData> responseData, int version) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, 
version)));
-
-        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
-        List<Struct> topicArray = new ArrayList<>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> entries : 
topicsData.entrySet()) {
-            Struct topicData = this.struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : 
entries.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(COMMIT_OFFSET_KEY_NAME, 
fetchPartitionData.offset);
-                partitionData.set(METADATA_KEY_NAME, 
fetchPartitionData.metadata);
-                partitionData.set(ERROR_CODE_KEY_NAME, 
fetchPartitionData.error.code());
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-
-        this.struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+    public OffsetFetchResponse(Errors error, Map<TopicPartition, 
PartitionData> responseData) {
         this.responseData = responseData;
         this.error = error;
-        if (version > 1)
-            this.struct.set(ERROR_CODE_KEY_NAME, this.error.code());
     }
 
     public OffsetFetchResponse(Struct struct) {
-        super(struct);
         Errors topLevelError = Errors.NONE;
         this.responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
@@ -175,12 +137,37 @@ public class OffsetFetchResponse extends AbstractResponse 
{
         return responseData;
     }
 
-    public static OffsetFetchResponse parse(ByteBuffer buffer, int version) {
-        Schema schema = ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, 
version);
-        return new OffsetFetchResponse(schema.read(buffer));
+    public static OffsetFetchResponse parse(ByteBuffer buffer, short version) {
+        return new 
OffsetFetchResponse(ProtoUtils.parseResponse(ApiKeys.OFFSET_FETCH.id, version, 
buffer));
     }
 
-    public static OffsetFetchResponse parse(ByteBuffer buffer) {
-        return new OffsetFetchResponse(CURRENT_SCHEMA.read(buffer));
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version));
+
+        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
+        List<Struct> topicArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, PartitionData>> entries : 
topicsData.entrySet()) {
+            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entries.getKey());
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : 
entries.getValue().entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(COMMIT_OFFSET_KEY_NAME, 
fetchPartitionData.offset);
+                partitionData.set(METADATA_KEY_NAME, 
fetchPartitionData.metadata);
+                partitionData.set(ERROR_CODE_KEY_NAME, 
fetchPartitionData.error.code());
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+
+        if (version > 1)
+            struct.set(ERROR_CODE_KEY_NAME, this.error.code());
+
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index bd3ae8f..df70e20 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -55,11 +55,10 @@ public class ProduceRequest extends AbstractRequest {
         }
 
         @Override
-        public ProduceRequest build() {
-            short version = version();
-            if (version < 2) {
+        public ProduceRequest build(short version) {
+            if (version < 2)
                 throw new UnsupportedVersionException("ProduceRequest versions 
older than 2 are not supported.");
-            }
+
             return new ProduceRequest(version, acks, timeout, 
partitionRecords);
         }
 
@@ -80,33 +79,14 @@ public class ProduceRequest extends AbstractRequest {
     private final Map<TopicPartition, MemoryRecords> partitionRecords;
 
     private ProduceRequest(short version, short acks, int timeout, 
Map<TopicPartition, MemoryRecords> partitionRecords) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, 
version)), version);
-        Map<String, Map<Integer, MemoryRecords>> recordsByTopic = 
CollectionUtils.groupDataByTopic(partitionRecords);
-        struct.set(ACKS_KEY_NAME, acks);
-        struct.set(TIMEOUT_KEY_NAME, timeout);
-        List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
-        for (Map.Entry<String, Map<Integer, MemoryRecords>> entry : 
recordsByTopic.entrySet()) {
-            Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entry.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, MemoryRecords> partitionEntry : 
entry.getValue().entrySet()) {
-                MemoryRecords records = partitionEntry.getValue();
-                Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
-                                       .set(PARTITION_KEY_NAME, 
partitionEntry.getKey())
-                                       .set(RECORD_SET_KEY_NAME, records);
-                partitionArray.add(part);
-            }
-            topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
-            topicDatas.add(topicData);
-        }
-        struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray());
+        super(version);
         this.acks = acks;
         this.timeout = timeout;
         this.partitionRecords = partitionRecords;
     }
 
     public ProduceRequest(Struct struct, short version) {
-        super(struct, version);
+        super(version);
         partitionRecords = new HashMap<>();
         for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
             Struct topicData = (Struct) topicDataObj;
@@ -122,6 +102,34 @@ public class ProduceRequest extends AbstractRequest {
         timeout = struct.getInt(TIMEOUT_KEY_NAME);
     }
 
+    /**
+     * Visible for testing.
+     */
+    @Override
+    public Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, version()));
+        Map<String, Map<Integer, MemoryRecords>> recordsByTopic = 
CollectionUtils.groupDataByTopic(partitionRecords);
+        struct.set(ACKS_KEY_NAME, acks);
+        struct.set(TIMEOUT_KEY_NAME, timeout);
+        List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
+        for (Map.Entry<String, Map<Integer, MemoryRecords>> entry : 
recordsByTopic.entrySet()) {
+            Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, entry.getKey());
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, MemoryRecords> partitionEntry : 
entry.getValue().entrySet()) {
+                MemoryRecords records = partitionEntry.getValue();
+                Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
+                        .set(PARTITION_KEY_NAME, partitionEntry.getKey())
+                        .set(RECORD_SET_KEY_NAME, records);
+                partitionArray.add(part);
+            }
+            topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
+            topicDatas.add(topicData);
+        }
+        struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray());
+        return struct;
+    }
+
     @Override
     public AbstractResponse getErrorResponse(Throwable e) {
         /* In case the producer doesn't actually want any response */
@@ -137,10 +145,9 @@ public class ProduceRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new ProduceResponse(responseMap);
             case 1:
             case 2:
-                return new ProduceResponse(responseMap, 
ProduceResponse.DEFAULT_THROTTLE_TIME, versionId);
+                return new ProduceResponse(responseMap);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
@@ -160,15 +167,10 @@ public class ProduceRequest extends AbstractRequest {
     }
 
     public void clearPartitionRecords() {
-        struct.clear();
         partitionRecords.clear();
     }
 
-    public static ProduceRequest parse(ByteBuffer buffer, int versionId) {
-        return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, 
versionId, buffer), (short) versionId);
-    }
-
-    public static ProduceRequest parse(ByteBuffer buffer) {
-        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.PRODUCE.id));
+    public static ProduceRequest parse(ByteBuffer buffer, short versionId) {
+        return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, 
versionId, buffer), versionId);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 9eaaadf..7a022af 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -16,7 +16,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.utils.CollectionUtils;
@@ -31,8 +30,7 @@ import java.util.Map;
  * This wrapper supports both v0 and v1 of ProduceResponse.
  */
 public class ProduceResponse extends AbstractResponse {
-    
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
+
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names
@@ -73,10 +71,7 @@ public class ProduceResponse extends AbstractResponse {
      * @param responses Produced data grouped by topic-partition
      */
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0)));
-        initCommonFields(responses);
-        this.responses = responses;
-        this.throttleTime = DEFAULT_THROTTLE_TIME;
+        this(responses, DEFAULT_THROTTLE_TIME);
     }
 
     /**
@@ -85,30 +80,14 @@ public class ProduceResponse extends AbstractResponse {
      * @param throttleTime Time in milliseconds the response was throttled
      */
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, 
int throttleTime) {
-        this(responses, throttleTime, 
ProtoUtils.latestVersion(ApiKeys.PRODUCE.id));
-    }
-
-    /**
-     * Constructor for a specific version
-     * @param responses Produced data grouped by topic-partition
-     * @param throttleTime Time in milliseconds the response was throttled
-     * @param version the version of schema to use.
-     */
-    public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, 
int throttleTime, int version) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 
version)));
-        initCommonFields(responses);
-        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
-            struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
         this.responses = responses;
         this.throttleTime = throttleTime;
     }
 
     /**
-     * Constructor from a {@link Struct}. It is the caller's responsibility to 
pass in a struct with the latest schema.
-     * @param struct
+     * Constructor from a {@link Struct}.
      */
     public ProduceResponse(Struct struct) {
-        super(struct);
         responses = new HashMap<>();
         for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicRespStruct = (Struct) topicResponse;
@@ -126,7 +105,10 @@ public class ProduceResponse extends AbstractResponse {
         this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME);
     }
 
-    private void initCommonFields(Map<TopicPartition, PartitionResponse> 
responses) {
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, version));
+
         Map<String, Map<Integer, PartitionResponse>> responseByTopic = 
CollectionUtils.groupDataByTopic(responses);
         List<Struct> topicDatas = new ArrayList<>(responseByTopic.size());
         for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : 
responseByTopic.entrySet()) {
@@ -140,13 +122,17 @@ public class ProduceResponse extends AbstractResponse {
                         .set(ERROR_CODE_KEY_NAME, part.error.code())
                         .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
                 if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME))
-                        partStruct.set(LOG_APPEND_TIME_KEY_NAME, 
part.logAppendTime);
+                    partStruct.set(LOG_APPEND_TIME_KEY_NAME, 
part.logAppendTime);
                 partitionArray.add(partStruct);
             }
             topicData.set(PARTITION_RESPONSES_KEY_NAME, 
partitionArray.toArray());
             topicDatas.add(topicData);
         }
         struct.set(RESPONSES_KEY_NAME, topicDatas.toArray());
+
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
+            struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+        return struct;
     }
 
     public Map<TopicPartition, PartitionResponse> responses() {
@@ -187,7 +173,7 @@ public class ProduceResponse extends AbstractResponse {
         }
     }
 
-    public static ProduceResponse parse(ByteBuffer buffer) {
-        return new ProduceResponse(CURRENT_SCHEMA.read(buffer));
+    public static ProduceResponse parse(ByteBuffer buffer, short version) {
+        return new 
ProduceResponse(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 
version).read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/RequestAndSize.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RequestAndSize.java 
b/clients/src/main/java/org/apache/kafka/common/requests/RequestAndSize.java
new file mode 100644
index 0000000..d2147b3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestAndSize.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+public class RequestAndSize {
+    public final AbstractRequest request;
+    public final int size;
+
+    public RequestAndSize(AbstractRequest request, int size) {
+        this.request = request;
+        this.size = size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java 
b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 05b78cb..5e65132 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -35,8 +35,7 @@ public class RequestHeader extends AbstractRequestResponse {
     private final String clientId;
     private final int correlationId;
 
-    public RequestHeader(Struct header) {
-        super(header);
+    public RequestHeader(Struct struct) {
         apiKey = struct.getShort(API_KEY_FIELD);
         apiVersion = struct.getShort(API_VERSION_FIELD);
         clientId = struct.getString(CLIENT_ID_FIELD);
@@ -44,17 +43,21 @@ public class RequestHeader extends AbstractRequestResponse {
     }
 
     public RequestHeader(short apiKey, short version, String client, int 
correlation) {
-        super(new Struct(Protocol.REQUEST_HEADER));
-        struct.set(API_KEY_FIELD, apiKey);
-        struct.set(API_VERSION_FIELD, version);
-        struct.set(CLIENT_ID_FIELD, client);
-        struct.set(CORRELATION_ID_FIELD, correlation);
         this.apiKey = apiKey;
         this.apiVersion = version;
         this.clientId = client;
         this.correlationId = correlation;
     }
 
+    public Struct toStruct() {
+        Struct struct = new Struct(Protocol.REQUEST_HEADER);
+        struct.set(API_KEY_FIELD, apiKey);
+        struct.set(API_VERSION_FIELD, apiVersion);
+        struct.set(CLIENT_ID_FIELD, clientId);
+        struct.set(CORRELATION_ID_FIELD, correlationId);
+        return struct;
+    }
+
     public short apiKey() {
         return apiKey;
     }
@@ -71,6 +74,10 @@ public class RequestHeader extends AbstractRequestResponse {
         return correlationId;
     }
 
+    public ResponseHeader toResponseHeader() {
+        return new ResponseHeader(correlationId);
+    }
+
     public static RequestHeader parse(ByteBuffer buffer) {
         return new RequestHeader(Protocol.REQUEST_HEADER.read(buffer));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
index e68bd39..04390ea 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
@@ -34,17 +34,24 @@ public class ResponseHeader extends AbstractRequestResponse 
{
 
     private final int correlationId;
 
-    public ResponseHeader(Struct header) {
-        super(header);
+    public ResponseHeader(Struct struct) {
         correlationId = struct.getInt(CORRELATION_KEY_FIELD);
     }
 
     public ResponseHeader(int correlationId) {
-        super(new Struct(Protocol.RESPONSE_HEADER));
-        struct.set(CORRELATION_KEY_FIELD, correlationId);
         this.correlationId = correlationId;
     }
 
+    public int sizeOf() {
+        return toStruct().sizeOf();
+    }
+
+    public Struct toStruct() {
+        Struct struct = new Struct(Protocol.RESPONSE_HEADER);
+        struct.set(CORRELATION_KEY_FIELD, correlationId);
+        return struct;
+    }
+
     public int correlationId() {
         return correlationId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index d244f0a..a1f3f0e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 
@@ -40,19 +39,17 @@ import org.apache.kafka.common.protocol.types.Struct;
  */
 public class SaslHandshakeRequest extends AbstractRequest {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.SASL_HANDSHAKE.id);
     public static final String MECHANISM_KEY_NAME = "mechanism";
 
     private final String mechanism;
 
     public SaslHandshakeRequest(String mechanism) {
-        super(new Struct(CURRENT_SCHEMA), 
ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id));
-        struct.set(MECHANISM_KEY_NAME, mechanism);
+        super(ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id));
         this.mechanism = mechanism;
     }
 
-    public SaslHandshakeRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+    public SaslHandshakeRequest(Struct struct, short version) {
+        super(version);
         mechanism = struct.getString(MECHANISM_KEY_NAME);
     }
 
@@ -73,13 +70,15 @@ public class SaslHandshakeRequest extends AbstractRequest {
         }
     }
 
-    public static SaslHandshakeRequest parse(ByteBuffer buffer, int versionId) 
{
-        return new 
SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, 
versionId, buffer),
-                (short) versionId);
+    public static SaslHandshakeRequest parse(ByteBuffer buffer, short 
versionId) {
+        return new 
SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, 
versionId, buffer), versionId);
     }
 
-    public static SaslHandshakeRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id));
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.SASL_HANDSHAKE.id, version()));
+        struct.set(MECHANISM_KEY_NAME, mechanism);
+        return struct;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index f50c5be..9d38c6a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -26,7 +26,6 @@ import java.util.List;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 
@@ -36,8 +35,6 @@ import org.apache.kafka.common.protocol.types.Struct;
  */
 public class SaslHandshakeResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.SASL_HANDSHAKE.id);
-
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String ENABLED_MECHANISMS_KEY_NAME = 
"enabled_mechanisms";
 
@@ -50,15 +47,11 @@ public class SaslHandshakeResponse extends AbstractResponse 
{
     private final List<String> enabledMechanisms;
 
     public SaslHandshakeResponse(Errors error, Collection<String> 
enabledMechanisms) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray());
         this.error = error;
         this.enabledMechanisms = new ArrayList<>(enabledMechanisms);
     }
 
     public SaslHandshakeResponse(Struct struct) {
-        super(struct);
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME);
         ArrayList<String> enabledMechanisms = new ArrayList<>();
@@ -71,15 +64,19 @@ public class SaslHandshakeResponse extends AbstractResponse 
{
         return error;
     }
 
-    public List<String> enabledMechanisms() {
-        return enabledMechanisms;
+    @Override
+    public Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.SASL_HANDSHAKE.id, version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray());
+        return struct;
     }
 
-    public static SaslHandshakeResponse parse(ByteBuffer buffer) {
-        return new SaslHandshakeResponse(CURRENT_SCHEMA.read(buffer));
+    public List<String> enabledMechanisms() {
+        return enabledMechanisms;
     }
 
-    public static SaslHandshakeResponse parse(ByteBuffer buffer, int version) {
+    public static SaslHandshakeResponse parse(ByteBuffer buffer, short 
version) {
         return new 
SaslHandshakeResponse(ProtoUtils.parseResponse(ApiKeys.SASL_HANDSHAKE.id, 
version, buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index ff2638b..91806f1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -52,9 +52,9 @@ public class StopReplicaRequest extends AbstractRequest {
         }
 
         @Override
-        public StopReplicaRequest build() {
+        public StopReplicaRequest build(short version) {
             return new StopReplicaRequest(controllerId, controllerEpoch,
-                    deletePartitions, partitions, version());
+                    deletePartitions, partitions, version);
         }
 
         @Override
@@ -77,30 +77,15 @@ public class StopReplicaRequest extends AbstractRequest {
 
     private StopReplicaRequest(int controllerId, int controllerEpoch, boolean 
deletePartitions,
                                Set<TopicPartition> partitions, short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.STOP_REPLICA.id, 
version)), version);
-
-        struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
-        struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
-        struct.set(DELETE_PARTITIONS_KEY_NAME, deletePartitions);
-
-        List<Struct> partitionDatas = new ArrayList<>(partitions.size());
-        for (TopicPartition partition : partitions) {
-            Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
-            partitionData.set(TOPIC_KEY_NAME, partition.topic());
-            partitionData.set(PARTITION_KEY_NAME, partition.partition());
-            partitionDatas.add(partitionData);
-        }
-
-        struct.set(PARTITIONS_KEY_NAME, partitionDatas.toArray());
-
+        super(version);
         this.controllerId = controllerId;
         this.controllerEpoch = controllerEpoch;
         this.deletePartitions = deletePartitions;
         this.partitions = partitions;
     }
 
-    public StopReplicaRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+    public StopReplicaRequest(Struct struct, short version) {
+        super(version);
 
         partitions = new HashSet<>();
         for (Object partitionDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
@@ -148,12 +133,27 @@ public class StopReplicaRequest extends AbstractRequest {
         return partitions;
     }
 
-    public static StopReplicaRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
StopReplicaRequest(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, 
buffer),
-                (short) versionId);
+    public static StopReplicaRequest parse(ByteBuffer buffer, short versionId) 
{
+        return new 
StopReplicaRequest(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, 
buffer), versionId);
     }
 
-    public static StopReplicaRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.STOP_REPLICA.id));
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.STOP_REPLICA.id, version()));
+
+        struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
+        struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
+        struct.set(DELETE_PARTITIONS_KEY_NAME, deletePartitions);
+
+        List<Struct> partitionDatas = new ArrayList<>(partitions.size());
+        for (TopicPartition partition : partitions) {
+            Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
+            partitionData.set(TOPIC_KEY_NAME, partition.topic());
+            partitionData.set(PARTITION_KEY_NAME, partition.partition());
+            partitionDatas.add(partitionData);
+        }
+
+        struct.set(PARTITIONS_KEY_NAME, partitionDatas.toArray());
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index b39fb19..5ae5cc1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -17,7 +17,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 
 public class StopReplicaResponse extends AbstractResponse {
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.STOP_REPLICA.id);
 
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String PARTITIONS_KEY_NAME = "partitions";
@@ -37,41 +35,20 @@ public class StopReplicaResponse extends AbstractResponse {
     private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code";
 
     private final Map<TopicPartition, Errors> responses;
-    private final Errors error;
 
     /**
      * Possible error code:
      *
      * STALE_CONTROLLER_EPOCH (11)
      */
-
-    public StopReplicaResponse(Map<TopicPartition, Errors> responses) {
-        this(Errors.NONE, responses);
-    }
+    private final Errors error;
 
     public StopReplicaResponse(Errors error, Map<TopicPartition, Errors> 
responses) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        List<Struct> responseDatas = new ArrayList<>(responses.size());
-        for (Map.Entry<TopicPartition, Errors> response : 
responses.entrySet()) {
-            Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
-            TopicPartition partition = response.getKey();
-            partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
-            partitionData.set(PARTITIONS_PARTITION_KEY_NAME, 
partition.partition());
-            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, 
response.getValue().code());
-            responseDatas.add(partitionData);
-        }
-
-        struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-
         this.responses = responses;
         this.error = error;
     }
 
     public StopReplicaResponse(Struct struct) {
-        super(struct);
-
         responses = new HashMap<>();
         for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
             Struct responseData = (Struct) responseDataObj;
@@ -92,11 +69,27 @@ public class StopReplicaResponse extends AbstractResponse {
         return error;
     }
 
-    public static StopReplicaResponse parse(ByteBuffer buffer, int versionId) {
-        return new 
StopReplicaResponse(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, 
buffer));
+    public static StopReplicaResponse parse(ByteBuffer buffer, short 
versionId) {
+        return new 
StopReplicaResponse(ProtoUtils.parseResponse(ApiKeys.STOP_REPLICA.id, 
versionId, buffer));
     }
 
-    public static StopReplicaResponse parse(ByteBuffer buffer) {
-        return new StopReplicaResponse(CURRENT_SCHEMA.read(buffer));
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.STOP_REPLICA.id, version));
+
+        List<Struct> responseDatas = new ArrayList<>(responses.size());
+        for (Map.Entry<TopicPartition, Errors> response : 
responses.entrySet()) {
+            Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
+            TopicPartition partition = response.getKey();
+            partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
+            partitionData.set(PARTITIONS_PARTITION_KEY_NAME, 
partition.partition());
+            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, 
response.getValue().code());
+            responseDatas.add(partitionData);
+        }
+
+        struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 937bf98..7ad5c9a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -51,8 +51,8 @@ public class SyncGroupRequest extends AbstractRequest {
         }
 
         @Override
-        public SyncGroupRequest build() {
-            return new SyncGroupRequest(groupId, generationId, memberId, 
groupAssignment, version());
+        public SyncGroupRequest build(short version) {
+            return new SyncGroupRequest(groupId, generationId, memberId, 
groupAssignment, version);
         }
 
         @Override
@@ -75,20 +75,7 @@ public class SyncGroupRequest extends AbstractRequest {
 
     private SyncGroupRequest(String groupId, int generationId, String memberId,
                              Map<String, ByteBuffer> groupAssignment, short 
version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.SYNC_GROUP.id, 
version)), version);
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(MEMBER_ID_KEY_NAME, memberId);
-
-        List<Struct> memberArray = new ArrayList<>();
-        for (Map.Entry<String, ByteBuffer> entries: 
groupAssignment.entrySet()) {
-            Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME);
-            memberData.set(MEMBER_ID_KEY_NAME, entries.getKey());
-            memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue());
-            memberArray.add(memberData);
-        }
-        struct.set(GROUP_ASSIGNMENT_KEY_NAME, memberArray.toArray());
-
+        super(version);
         this.groupId = groupId;
         this.generationId = generationId;
         this.memberId = memberId;
@@ -96,7 +83,7 @@ public class SyncGroupRequest extends AbstractRequest {
     }
 
     public SyncGroupRequest(Struct struct, short version) {
-        super(struct, version);
+        super(version);
         this.groupId = struct.getString(GROUP_ID_KEY_NAME);
         this.generationId = struct.getInt(GENERATION_ID_KEY_NAME);
         this.memberId = struct.getString(MEMBER_ID_KEY_NAME);
@@ -141,12 +128,25 @@ public class SyncGroupRequest extends AbstractRequest {
         return memberId;
     }
 
-    public static SyncGroupRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, 
buffer),
-                (short) versionId);
+    public static SyncGroupRequest parse(ByteBuffer buffer, short versionId) {
+        return new 
SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, 
buffer), versionId);
     }
 
-    public static SyncGroupRequest parse(ByteBuffer buffer) {
-        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.SYNC_GROUP.id));
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.SYNC_GROUP.id, version()));
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        struct.set(GENERATION_ID_KEY_NAME, generationId);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
+
+        List<Struct> memberArray = new ArrayList<>();
+        for (Map.Entry<String, ByteBuffer> entries: 
groupAssignment.entrySet()) {
+            Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME);
+            memberData.set(MEMBER_ID_KEY_NAME, entries.getKey());
+            memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue());
+            memberArray.add(memberData);
+        }
+        struct.set(GROUP_ASSIGNMENT_KEY_NAME, memberArray.toArray());
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index e598975..ff198aa 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -19,14 +19,12 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
 public class SyncGroupResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.SYNC_GROUP.id);
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String MEMBER_ASSIGNMENT_KEY_NAME = 
"member_assignment";
 
@@ -45,18 +43,11 @@ public class SyncGroupResponse extends AbstractResponse {
     private final ByteBuffer memberState;
 
     public SyncGroupResponse(Errors error, ByteBuffer memberState) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
-
         this.error = error;
         this.memberState = memberState;
     }
 
     public SyncGroupResponse(Struct struct) {
-        super(struct);
-
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
     }
@@ -69,8 +60,16 @@ public class SyncGroupResponse extends AbstractResponse {
         return memberState;
     }
 
-    public static SyncGroupResponse parse(ByteBuffer buffer) {
-        return new SyncGroupResponse(CURRENT_SCHEMA.read(buffer));
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.SYNC_GROUP.id, version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
+        return struct;
+    }
+
+    public static SyncGroupResponse parse(ByteBuffer buffer, short version) {
+        return new 
SyncGroupResponse(ProtoUtils.parseResponse(ApiKeys.SYNC_GROUP.id, version, 
buffer));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index ef680ff..8dd852d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -38,10 +38,9 @@ public class UpdateMetadataRequest extends AbstractRequest {
         private final Map<TopicPartition, PartitionState> partitionStates;
         private final Set<Broker> liveBrokers;
 
-        public Builder(int controllerId, int controllerEpoch,
-                       Map<TopicPartition, PartitionState> partitionStates,
-                       Set<Broker> liveBrokers) {
-            super(ApiKeys.UPDATE_METADATA_KEY);
+        public Builder(short version, int controllerId, int controllerEpoch,
+                       Map<TopicPartition, PartitionState> partitionStates, 
Set<Broker> liveBrokers) {
+            super(ApiKeys.UPDATE_METADATA_KEY, version);
             this.controllerId = controllerId;
             this.controllerEpoch = controllerEpoch;
             this.partitionStates = partitionStates;
@@ -49,8 +48,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
         }
 
         @Override
-        public UpdateMetadataRequest build() {
-            short version = version();
+        public UpdateMetadataRequest build(short version) {
             if (version == 0) {
                 for (Broker broker : liveBrokers) {
                     if (broker.endPoints.size() != 1 || 
broker.endPoints.get(0).securityProtocol != SecurityProtocol.PLAINTEXT) {
@@ -148,58 +146,7 @@ public class UpdateMetadataRequest extends AbstractRequest 
{
 
     private UpdateMetadataRequest(short version, int controllerId, int 
controllerEpoch, Map<TopicPartition,
             PartitionState> partitionStates, Set<Broker> liveBrokers) {
-        super(new 
Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)), 
version);
-        struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
-        struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
-
-        List<Struct> partitionStatesData = new 
ArrayList<>(partitionStates.size());
-        for (Map.Entry<TopicPartition, PartitionState> entry : 
partitionStates.entrySet()) {
-            Struct partitionStateData = 
struct.instance(PARTITION_STATES_KEY_NAME);
-            TopicPartition topicPartition = entry.getKey();
-            partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
-            partitionStateData.set(PARTITION_KEY_NAME, 
topicPartition.partition());
-            PartitionState partitionState = entry.getValue();
-            partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, 
partitionState.controllerEpoch);
-            partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
-            partitionStateData.set(LEADER_EPOCH_KEY_NAME, 
partitionState.leaderEpoch);
-            partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
-            partitionStateData.set(ZK_VERSION_KEY_NAME, 
partitionState.zkVersion);
-            partitionStateData.set(REPLICAS_KEY_NAME, 
partitionState.replicas.toArray());
-            partitionStatesData.add(partitionStateData);
-        }
-        struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
-
-        List<Struct> brokersData = new ArrayList<>(liveBrokers.size());
-        for (Broker broker : liveBrokers) {
-            Struct brokerData = struct.instance(LIVE_BROKERS_KEY_NAME);
-            brokerData.set(BROKER_ID_KEY_NAME, broker.id);
-
-            if (version == 0) {
-                EndPoint endPoint = broker.endPoints.get(0);
-                brokerData.set(HOST_KEY_NAME, endPoint.host);
-                brokerData.set(PORT_KEY_NAME, endPoint.port);
-            } else {
-                List<Struct> endPointsData = new 
ArrayList<>(broker.endPoints.size());
-                for (EndPoint endPoint : broker.endPoints) {
-                    Struct endPointData = 
brokerData.instance(ENDPOINTS_KEY_NAME);
-                    endPointData.set(PORT_KEY_NAME, endPoint.port);
-                    endPointData.set(HOST_KEY_NAME, endPoint.host);
-                    endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, 
endPoint.securityProtocol.id);
-                    if (version >= 3)
-                        endPointData.set(LISTENER_NAME_KEY_NAME, 
endPoint.listenerName.value());
-                    endPointsData.add(endPointData);
-
-                }
-                brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray());
-                if (version >= 2) {
-                    brokerData.set(RACK_KEY_NAME, broker.rack);
-                }
-            }
-
-            brokersData.add(brokerData);
-        }
-        struct.set(LIVE_BROKERS_KEY_NAME, brokersData.toArray());
-
+        super(version);
         this.controllerId = controllerId;
         this.controllerEpoch = controllerEpoch;
         this.partitionStates = partitionStates;
@@ -207,7 +154,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     }
 
     public UpdateMetadataRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+        super(versionId);
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         for (Object partitionStateDataObj : 
struct.getArray(PARTITION_STATES_KEY_NAME)) {
             Struct partitionStateData = (Struct) partitionStateDataObj;
@@ -277,6 +224,64 @@ public class UpdateMetadataRequest extends AbstractRequest 
{
     }
 
     @Override
+    protected Struct toStruct() {
+        short version = version();
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version));
+        struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
+        struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
+
+        List<Struct> partitionStatesData = new 
ArrayList<>(partitionStates.size());
+        for (Map.Entry<TopicPartition, PartitionState> entry : 
partitionStates.entrySet()) {
+            Struct partitionStateData = 
struct.instance(PARTITION_STATES_KEY_NAME);
+            TopicPartition topicPartition = entry.getKey();
+            partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
+            partitionStateData.set(PARTITION_KEY_NAME, 
topicPartition.partition());
+            PartitionState partitionState = entry.getValue();
+            partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, 
partitionState.controllerEpoch);
+            partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
+            partitionStateData.set(LEADER_EPOCH_KEY_NAME, 
partitionState.leaderEpoch);
+            partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
+            partitionStateData.set(ZK_VERSION_KEY_NAME, 
partitionState.zkVersion);
+            partitionStateData.set(REPLICAS_KEY_NAME, 
partitionState.replicas.toArray());
+            partitionStatesData.add(partitionStateData);
+        }
+        struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
+
+        List<Struct> brokersData = new ArrayList<>(liveBrokers.size());
+        for (Broker broker : liveBrokers) {
+            Struct brokerData = struct.instance(LIVE_BROKERS_KEY_NAME);
+            brokerData.set(BROKER_ID_KEY_NAME, broker.id);
+
+            if (version == 0) {
+                EndPoint endPoint = broker.endPoints.get(0);
+                brokerData.set(HOST_KEY_NAME, endPoint.host);
+                brokerData.set(PORT_KEY_NAME, endPoint.port);
+            } else {
+                List<Struct> endPointsData = new 
ArrayList<>(broker.endPoints.size());
+                for (EndPoint endPoint : broker.endPoints) {
+                    Struct endPointData = 
brokerData.instance(ENDPOINTS_KEY_NAME);
+                    endPointData.set(PORT_KEY_NAME, endPoint.port);
+                    endPointData.set(HOST_KEY_NAME, endPoint.host);
+                    endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, 
endPoint.securityProtocol.id);
+                    if (version >= 3)
+                        endPointData.set(LISTENER_NAME_KEY_NAME, 
endPoint.listenerName.value());
+                    endPointsData.add(endPointData);
+
+                }
+                brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray());
+                if (version >= 2) {
+                    brokerData.set(RACK_KEY_NAME, broker.rack);
+                }
+            }
+
+            brokersData.add(brokerData);
+        }
+        struct.set(LIVE_BROKERS_KEY_NAME, brokersData.toArray());
+
+        return struct;
+    }
+
+    @Override
     public AbstractResponse getErrorResponse(Throwable e) {
         short versionId = version();
         if (versionId <= 3)
@@ -302,12 +307,9 @@ public class UpdateMetadataRequest extends AbstractRequest 
{
         return liveBrokers;
     }
 
-    public static UpdateMetadataRequest parse(ByteBuffer buffer, int 
versionId) {
+    public static UpdateMetadataRequest parse(ByteBuffer buffer, short 
versionId) {
         return new 
UpdateMetadataRequest(ProtoUtils.parseRequest(ApiKeys.UPDATE_METADATA_KEY.id, 
versionId, buffer),
-                (short) versionId);
+                versionId);
     }
 
-    public static UpdateMetadataRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id));
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 865d6c6..0032fca 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -16,15 +16,12 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
 public class UpdateMetadataResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.UPDATE_METADATA_KEY.id);
-
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
@@ -35,13 +32,10 @@ public class UpdateMetadataResponse extends 
AbstractResponse {
     private final Errors error;
 
     public UpdateMetadataResponse(Errors error) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
         this.error = error;
     }
 
     public UpdateMetadataResponse(Struct struct) {
-        super(struct);
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
@@ -49,12 +43,14 @@ public class UpdateMetadataResponse extends 
AbstractResponse {
         return error;
     }
 
-    public static UpdateMetadataResponse parse(ByteBuffer buffer) {
-        return new UpdateMetadataResponse(CURRENT_SCHEMA.read(buffer));
-    }
-
-    public static UpdateMetadataResponse parse(ByteBuffer buffer, int version) 
{
+    public static UpdateMetadataResponse parse(ByteBuffer buffer, short 
version) {
         return new 
UpdateMetadataResponse(ProtoUtils.parseResponse(ApiKeys.UPDATE_METADATA_KEY.id, 
version, buffer));
     }
 
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.UPDATE_METADATA_KEY.id, version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        return struct;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 7f6b7aa..88f8959 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -308,11 +308,12 @@ public class SaslServerAuthenticator implements 
Authenticator {
 
             if (!Protocol.apiVersionSupported(requestHeader.apiKey(), 
requestHeader.apiVersion())) {
                 if (apiKey == ApiKeys.API_VERSIONS)
-                    sendKafkaResponse(requestHeader, 
ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION));
+                    
sendKafkaResponse(ApiVersionsResponse.unsupportedVersionSend(node, 
requestHeader));
                 else
                     throw new UnsupportedVersionException("Version " + 
requestHeader.apiVersion() + " is not supported for apiKey " + apiKey);
             } else {
-                AbstractRequest request = 
AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), 
requestBuffer);
+                AbstractRequest request = 
AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(),
+                        requestBuffer).request;
 
                 LOG.debug("Handle Kafka request {}", apiKey);
                 switch (apiKey) {
@@ -373,7 +374,11 @@ public class SaslServerAuthenticator implements 
Authenticator {
     }
 
     private void sendKafkaResponse(RequestHeader requestHeader, 
AbstractResponse response) throws IOException {
-        netOutBuffer = response.toSend(node, requestHeader);
+        sendKafkaResponse(response.toSend(node, requestHeader));
+    }
+
+    private void sendKafkaResponse(Send send) throws IOException {
+        netOutBuffer = send;
         flushNetOutBufferAndUpdateInterestOps();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
index ba38637..106a7d4 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
@@ -26,14 +26,14 @@ public class CollectionUtils {
      * @param <T> Partition data type
      * @return partitioned data
      */
-    public static <T> Map<String, Map<Integer, T>> 
groupDataByTopic(Map<TopicPartition, T> data) {
-        Map<String, Map<Integer, T>> dataByTopic = new HashMap<String, 
Map<Integer, T>>();
-        for (Map.Entry<TopicPartition, T> entry: data.entrySet()) {
+    public static <T> Map<String, Map<Integer, T>> 
groupDataByTopic(Map<TopicPartition, ? extends T> data) {
+        Map<String, Map<Integer, T>> dataByTopic = new HashMap<>();
+        for (Map.Entry<TopicPartition, ? extends T> entry: data.entrySet()) {
             String topic = entry.getKey().topic();
             int partition = entry.getKey().partition();
             Map<Integer, T> topicData = dataByTopic.get(topic);
             if (topicData == null) {
-                topicData = new HashMap<Integer, T>();
+                topicData = new HashMap<>();
                 dataByTopic.put(topic, topicData);
             }
             topicData.put(partition, entry.getValue());
@@ -47,12 +47,12 @@ public class CollectionUtils {
      * @return partitions per topic
      */
     public static Map<String, List<Integer>> 
groupDataByTopic(List<TopicPartition> partitions) {
-        Map<String, List<Integer>> partitionsByTopic = new HashMap<String, 
List<Integer>>();
+        Map<String, List<Integer>> partitionsByTopic = new HashMap<>();
         for (TopicPartition tp: partitions) {
             String topic = tp.topic();
             List<Integer> topicData = partitionsByTopic.get(topic);
             if (topicData == null) {
-                topicData = new ArrayList<Integer>();
+                topicData = new ArrayList<>();
                 partitionsByTopic.put(topic, topicData);
             }
             topicData.add(tp.partition());

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 50ed131..7712d3c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -131,7 +131,8 @@ public class MockClient implements KafkaClient {
         while (iter.hasNext()) {
             ClientRequest request = iter.next();
             if (request.destination().equals(node)) {
-                responses.add(new ClientResponse(request.makeHeader(), 
request.callback(), request.destination(),
+                short version = 
request.requestBuilder().desiredOrLatestVersion();
+                responses.add(new ClientResponse(request.makeHeader(version), 
request.callback(), request.destination(),
                         request.createdTimeMs(), now, true, null, null));
                 iter.remove();
             }
@@ -146,13 +147,11 @@ public class MockClient implements KafkaClient {
             FutureResponse futureResp = iterator.next();
             if (futureResp.node != null && 
!request.destination().equals(futureResp.node.idString()))
                 continue;
-            request.requestBuilder().setVersion(nodeApiVersions.usableVersion(
-                    request.requestBuilder().apiKey()));
-            AbstractRequest abstractRequest = request.requestBuilder().build();
+            short usableVersion = 
nodeApiVersions.usableVersion(request.requestBuilder().apiKey());
+            AbstractRequest abstractRequest = 
request.requestBuilder().build(usableVersion);
             if (!futureResp.requestMatcher.matches(abstractRequest))
                 throw new IllegalStateException("Next in line response did not 
match expected request");
-
-            ClientResponse resp = new ClientResponse(request.makeHeader(), 
request.callback(), request.destination(),
+            ClientResponse resp = new 
ClientResponse(request.makeHeader(usableVersion), request.callback(), 
request.destination(),
                     request.createdTimeMs(), time.milliseconds(), 
futureResp.disconnected, null, futureResp.responseBody);
             responses.add(resp);
             iterator.remove();
@@ -192,7 +191,8 @@ public class MockClient implements KafkaClient {
 
     public void respond(AbstractResponse response, boolean disconnected) {
         ClientRequest request = requests.remove();
-        responses.add(new ClientResponse(request.makeHeader(), 
request.callback(), request.destination(),
+        short version = request.requestBuilder().desiredOrLatestVersion();
+        responses.add(new ClientResponse(request.makeHeader(version), 
request.callback(), request.destination(),
                 request.createdTimeMs(), time.milliseconds(), disconnected, 
null, response));
     }
 
@@ -206,7 +206,8 @@ public class MockClient implements KafkaClient {
             ClientRequest request = iterator.next();
             if (request.destination().equals(node.idString())) {
                 iterator.remove();
-                responses.add(new ClientResponse(request.makeHeader(), 
request.callback(), request.destination(),
+                short version = 
request.requestBuilder().desiredOrLatestVersion();
+                responses.add(new ClientResponse(request.makeHeader(version), 
request.callback(), request.destination(),
                         request.createdTimeMs(), time.milliseconds(), 
disconnected, null, response));
                 return;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index deaf2cc..c89cc24 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.requests.AbstractRequestResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
@@ -137,9 +136,10 @@ public class NetworkClientTest {
         ResponseHeader respHeader = new 
ResponseHeader(request.correlationId());
         Struct resp = new 
Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
         resp.set("responses", new Object[0]);
-        int size = respHeader.sizeOf() + resp.sizeOf();
+        Struct responseHeaderStruct = respHeader.toStruct();
+        int size = responseHeaderStruct.sizeOf() + resp.sizeOf();
         ByteBuffer buffer = ByteBuffer.allocate(size);
-        respHeader.writeTo(buffer);
+        responseHeaderStruct.writeTo(buffer);
         resp.writeTo(buffer);
         buffer.flip();
         selector.completeReceive(new NetworkReceive(node.idString(), buffer));
@@ -152,9 +152,7 @@ public class NetworkClientTest {
     }
 
     private void maybeSetExpectedApiVersionsResponse() {
-        ResponseHeader responseHeader = new ResponseHeader(0);
-        ByteBuffer buffer = AbstractRequestResponse.serialize(responseHeader,
-                ApiVersionsResponse.API_VERSIONS_RESPONSE);
+        ByteBuffer buffer = 
ApiVersionsResponse.API_VERSIONS_RESPONSE.serialize((short) 0, new 
ResponseHeader(0));
         selector.delayedReceive(new DelayedReceive(node.idString(), new 
NetworkReceive(node.idString(), buffer)));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index a355aa1..5c4590b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1436,7 +1436,7 @@ public class KafkaConsumerTest {
             partitionData.put(partitionOffset.getKey(), new 
ListOffsetResponse.PartitionData(error,
                     1L, partitionOffset.getValue()));
         }
-        return new ListOffsetResponse(partitionData, 1);
+        return new ListOffsetResponse(partitionData);
     }
 
     private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> 
fetches) {

Reply via email to