MINOR: Use an explicit `Errors` object when possible instead of a numeric error 
code

Author: Vahid Hashemian <vahidhashem...@us.ibm.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #2475 from vahidhashemian/minor/use_explicit_Errors_type_when_possible


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9898d665
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9898d665
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9898d665

Branch: refs/heads/trunk
Commit: 9898d665d1ab201405d66c70e3ea9710d9dcecd7
Parents: a15fcea
Author: Vahid Hashemian <vahidhashem...@us.ibm.com>
Authored: Thu Feb 9 21:03:46 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Feb 9 21:03:46 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |   4 +-
 .../consumer/internals/AbstractCoordinator.java |  10 +-
 .../consumer/internals/ConsumerCoordinator.java |   4 +-
 .../clients/consumer/internals/Fetcher.java     |   4 +-
 .../common/requests/ApiVersionsRequest.java     |   3 +-
 .../common/requests/ApiVersionsResponse.java    |  18 +-
 .../requests/ControlledShutdownRequest.java     |   2 +-
 .../requests/ControlledShutdownResponse.java    |  15 +-
 .../common/requests/CreateTopicsResponse.java   |   4 +-
 .../common/requests/DeleteTopicsResponse.java   |   4 +-
 .../common/requests/DescribeGroupsResponse.java |  18 +-
 .../kafka/common/requests/FetchRequest.java     |   2 +-
 .../kafka/common/requests/FetchResponse.java    |  17 +-
 .../requests/GroupCoordinatorRequest.java       |   2 +-
 .../requests/GroupCoordinatorResponse.java      |  17 +-
 .../kafka/common/requests/HeartbeatRequest.java |   2 +-
 .../common/requests/HeartbeatResponse.java      |  18 +-
 .../kafka/common/requests/JoinGroupRequest.java |   2 +-
 .../common/requests/JoinGroupResponse.java      |  19 +-
 .../common/requests/LeaderAndIsrRequest.java    |   6 +-
 .../common/requests/LeaderAndIsrResponse.java   |  32 +-
 .../common/requests/LeaveGroupRequest.java      |   2 +-
 .../common/requests/LeaveGroupResponse.java     |  15 +-
 .../common/requests/ListGroupsRequest.java      |   3 +-
 .../common/requests/ListGroupsResponse.java     |  16 +-
 .../common/requests/ListOffsetRequest.java      |   4 +-
 .../common/requests/ListOffsetResponse.java     |  23 +-
 .../common/requests/OffsetCommitRequest.java    |   8 +-
 .../common/requests/OffsetCommitResponse.java   |  23 +-
 .../common/requests/SaslHandshakeRequest.java   |   2 +-
 .../common/requests/SaslHandshakeResponse.java  |  15 +-
 .../common/requests/StopReplicaRequest.java     |   6 +-
 .../common/requests/StopReplicaResponse.java    |  32 +-
 .../kafka/common/requests/SyncGroupRequest.java |   2 +-
 .../common/requests/SyncGroupResponse.java      |  15 +-
 .../common/requests/UpdateMetadataRequest.java  |   2 +-
 .../common/requests/UpdateMetadataResponse.java |  15 +-
 .../authenticator/SaslClientAuthenticator.java  |   4 +-
 .../authenticator/SaslServerAuthenticator.java  |   4 +-
 .../clients/consumer/KafkaConsumerTest.java     |  56 ++--
 .../internals/AbstractCoordinatorTest.java      |   8 +-
 .../internals/ConsumerCoordinatorTest.java      | 320 +++++++++----------
 .../internals/ConsumerNetworkClientTest.java    |  16 +-
 .../clients/consumer/internals/FetcherTest.java |  48 +--
 .../clients/producer/internals/SenderTest.java  |  21 +-
 .../common/requests/RequestResponseTest.java    |  46 +--
 .../authenticator/SaslAuthenticatorTest.java    |   6 +-
 .../distributed/WorkerCoordinatorTest.java      |  40 +--
 .../main/scala/kafka/admin/AdminClient.scala    |   8 +-
 .../kafka/admin/ConsumerGroupCommand.scala      |   4 +-
 .../kafka/api/ControlledShutdownRequest.scala   |   4 +-
 .../kafka/api/ControlledShutdownResponse.scala  |   8 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   2 +-
 .../main/scala/kafka/api/FetchResponse.scala    |   8 +-
 .../kafka/api/GroupCoordinatorRequest.scala     |   2 +-
 .../kafka/api/GroupCoordinatorResponse.scala    |  10 +-
 .../scala/kafka/api/OffsetCommitRequest.scala   |   4 +-
 .../scala/kafka/api/OffsetCommitResponse.scala  |  10 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |   4 +-
 .../scala/kafka/api/OffsetFetchResponse.scala   |  16 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |   2 +-
 .../main/scala/kafka/api/OffsetResponse.scala   |  10 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   2 +-
 .../main/scala/kafka/api/ProducerResponse.scala |   8 +-
 .../main/scala/kafka/api/TopicMetadata.scala    |  20 +-
 .../scala/kafka/api/TopicMetadataRequest.scala  |   2 +-
 .../main/scala/kafka/client/ClientUtils.scala   |   2 +-
 .../main/scala/kafka/common/ErrorMapping.scala  |  20 +-
 .../kafka/common/OffsetMetadataAndError.scala   |  22 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |   5 +-
 .../scala/kafka/consumer/SimpleConsumer.scala   |   5 +-
 .../consumer/ZookeeperConsumerConnector.scala   |  20 +-
 .../kafka/controller/TopicDeletionManager.scala |   4 +-
 .../kafka/coordinator/GroupCoordinator.scala    | 138 ++++----
 .../coordinator/GroupMetadataManager.scala      |  14 +-
 .../kafka/coordinator/MemberMetadata.scala      |   4 +-
 .../scala/kafka/javaapi/FetchResponse.scala     |   4 +-
 .../javaapi/GroupCoordinatorResponse.scala      |   4 +-
 .../kafka/javaapi/OffsetCommitResponse.scala    |   6 +-
 .../scala/kafka/javaapi/OffsetResponse.scala    |   4 +-
 .../scala/kafka/javaapi/TopicMetadata.scala     |   9 +-
 .../kafka/producer/BrokerPartitionInfo.scala    |  12 +-
 .../producer/async/DefaultEventHandler.scala    |   6 +-
 .../kafka/security/auth/ResourceType.scala      |   8 +-
 .../kafka/server/AbstractFetcherThread.scala    |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 112 +++----
 .../main/scala/kafka/server/KafkaServer.scala   |   8 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   6 +-
 .../scala/kafka/server/ReplicaManager.scala     |  40 +--
 .../kafka/tools/ConsumerOffsetChecker.scala     |   4 +-
 .../kafka/tools/ReplicaVerificationTool.scala   |   2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  38 +--
 .../ReplicaFetcherThreadFatalErrorTest.scala    |   2 +-
 .../tools/ReplicaVerificationToolTest.scala     |   2 +-
 .../scala/other/kafka/TestOffsetManager.scala   |   4 +-
 .../api/RequestResponseSerializationTest.scala  |  28 +-
 .../GroupCoordinatorResponseTest.scala          | 300 ++++++++---------
 .../coordinator/GroupMetadataManagerTest.scala  |  36 +--
 .../integration/BaseTopicMetadataTest.scala     |  26 +-
 .../kafka/integration/PrimitiveApiTest.scala    |   4 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |   6 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |  18 +-
 .../AbstractCreateTopicsRequestTest.scala       |   2 +-
 .../server/AbstractFetcherThreadTest.scala      |   2 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |   2 +-
 .../kafka/server/DeleteTopicsRequestTest.scala  |   2 +-
 .../unit/kafka/server/FetchRequestTest.scala    |   8 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   2 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   2 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |  50 +--
 .../unit/kafka/server/ReplicaManagerTest.scala  |   8 +-
 .../server/SaslApiVersionsRequestTest.scala     |   4 +-
 112 files changed, 1054 insertions(+), 1034 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3a75288..890bf56 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -544,9 +544,9 @@ public class NetworkClient implements KafkaClient {
     private void handleApiVersionsResponse(List<ClientResponse> responses,
                                            InFlightRequest req, long now, 
ApiVersionsResponse apiVersionsResponse) {
         final String node = req.destination;
-        if (apiVersionsResponse.errorCode() != Errors.NONE.code()) {
+        if (apiVersionsResponse.error() != Errors.NONE) {
             log.warn("Node {} got error {} when making an ApiVersionsRequest.  
Disconnecting.",
-                    node, Errors.forCode(apiVersionsResponse.errorCode()));
+                    node, apiVersionsResponse.error());
             this.selector.close(node);
             processDisconnection(responses, node, now);
             return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2fdf802..350a84b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -420,7 +420,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
     private class JoinGroupResponseHandler extends 
CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
         @Override
         public void handle(JoinGroupResponse joinResponse, 
RequestFuture<ByteBuffer> future) {
-            Errors error = Errors.forCode(joinResponse.errorCode());
+            Errors error = joinResponse.error();
             if (error == Errors.NONE) {
                 log.debug("Received successful JoinGroup response for group 
{}: {}", groupId, joinResponse);
                 sensors.joinLatency.record(response.requestLatencyMs());
@@ -509,7 +509,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         @Override
         public void handle(SyncGroupResponse syncResponse,
                            RequestFuture<ByteBuffer> future) {
-            Errors error = Errors.forCode(syncResponse.errorCode());
+            Errors error = syncResponse.error();
             if (error == Errors.NONE) {
                 sensors.syncLatency.record(response.requestLatencyMs());
                 future.complete(syncResponse.memberAssignment());
@@ -562,7 +562,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
             // use MAX_VALUE - node.id as the coordinator id to mimic separate 
connections
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
-            Errors error = 
Errors.forCode(groupCoordinatorResponse.errorCode());
+            Errors error = groupCoordinatorResponse.error();
             clearFindCoordinatorFuture();
             if (error == Errors.NONE) {
                 synchronized (AbstractCoordinator.this) {
@@ -688,7 +688,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
     private class LeaveGroupResponseHandler extends 
CoordinatorResponseHandler<LeaveGroupResponse, Void> {
         @Override
         public void handle(LeaveGroupResponse leaveResponse, 
RequestFuture<Void> future) {
-            Errors error = Errors.forCode(leaveResponse.errorCode());
+            Errors error = leaveResponse.error();
             if (error == Errors.NONE) {
                 log.debug("LeaveGroup request for group {} returned 
successfully", groupId);
                 future.complete(null);
@@ -712,7 +712,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         @Override
         public void handle(HeartbeatResponse heartbeatResponse, 
RequestFuture<Void> future) {
             sensors.heartbeatLatency.record(response.requestLatencyMs());
-            Errors error = Errors.forCode(heartbeatResponse.errorCode());
+            Errors error = heartbeatResponse.error();
             if (error == Errors.NONE) {
                 log.debug("Received successful Heartbeat response for group 
{}", groupId);
                 future.complete(null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 8669527..0dc073e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -722,12 +722,12 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             sensors.commitLatency.record(response.requestLatencyMs());
             Set<String> unauthorizedTopics = new HashSet<>();
 
-            for (Map.Entry<TopicPartition, Short> entry : 
commitResponse.responseData().entrySet()) {
+            for (Map.Entry<TopicPartition, Errors> entry : 
commitResponse.responseData().entrySet()) {
                 TopicPartition tp = entry.getKey();
                 OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                 long offset = offsetAndMetadata.offset();
 
-                Errors error = Errors.forCode(entry.getValue());
+                Errors error = entry.getValue();
                 if (error == Errors.NONE) {
                     log.debug("Group {} committed offset {} for partition {}", 
groupId, offset, tp);
                     if (subscriptions.isAssigned(tp))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6a13d46..02f34e5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -637,7 +637,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
         for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet()) {
             TopicPartition topicPartition = entry.getKey();
             ListOffsetResponse.PartitionData partitionData = 
listOffsetResponse.responseData().get(topicPartition);
-            Errors error = Errors.forCode(partitionData.errorCode);
+            Errors error = partitionData.error;
             if (error == Errors.NONE) {
                 if (partitionData.offsets != null) {
                     // Handle v0 response
@@ -750,7 +750,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
         int bytes = 0;
         int recordsCount = 0;
         PartitionRecords<K, V> parsedRecords = null;
-        Errors error = Errors.forCode(partition.errorCode);
+        Errors error = partition.error;
 
         try {
             if (!subscriptions.isFetchable(tp)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 17e6d5e..fe7c348 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -51,8 +51,7 @@ public class ApiVersionsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                short errorCode = Errors.forException(e).code();
-                return new ApiVersionsResponse(errorCode, 
Collections.<ApiVersionsResponse.ApiVersion>emptyList());
+                return new ApiVersionsResponse(Errors.forException(e), 
Collections.<ApiVersionsResponse.ApiVersion>emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.API_VERSIONS.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 81be9c3..7d8bcc5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -42,7 +42,7 @@ public class ApiVersionsResponse extends AbstractResponse {
      *
      * UNSUPPORTED_VERSION (33)
      */
-    private final short errorCode;
+    private final Errors error;
     private final Map<Short, ApiVersion> apiKeyToApiVersion;
 
     public static final class ApiVersion {
@@ -66,9 +66,9 @@ public class ApiVersionsResponse extends AbstractResponse {
         }
     }
 
-    public ApiVersionsResponse(short errorCode, List<ApiVersion> apiVersions) {
+    public ApiVersionsResponse(Errors error, List<ApiVersion> apiVersions) {
         super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
         List<Struct> apiVersionList = new ArrayList<>();
         for (ApiVersion apiVersion : apiVersions) {
             Struct apiVersionStruct = struct.instance(API_VERSIONS_KEY_NAME);
@@ -78,13 +78,13 @@ public class ApiVersionsResponse extends AbstractResponse {
             apiVersionList.add(apiVersionStruct);
         }
         struct.set(API_VERSIONS_KEY_NAME, apiVersionList.toArray());
-        this.errorCode = errorCode;
+        this.error = error;
         this.apiKeyToApiVersion = buildApiKeyToApiVersion(apiVersions);
     }
 
     public ApiVersionsResponse(Struct struct) {
         super(struct);
-        this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         List<ApiVersion> tempApiVersions = new ArrayList<>();
         for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) {
             Struct apiVersionStruct = (Struct) apiVersionsObj;
@@ -104,8 +104,8 @@ public class ApiVersionsResponse extends AbstractResponse {
         return apiKeyToApiVersion.get(apiKey);
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public static ApiVersionsResponse parse(ByteBuffer buffer) {
@@ -113,7 +113,7 @@ public class ApiVersionsResponse extends AbstractResponse {
     }
 
     public static ApiVersionsResponse fromError(Errors error) {
-        return new ApiVersionsResponse(error.code(), 
Collections.<ApiVersion>emptyList());
+        return new ApiVersionsResponse(error, 
Collections.<ApiVersion>emptyList());
     }
 
     private static ApiVersionsResponse createApiVersionsResponse() {
@@ -121,7 +121,7 @@ public class ApiVersionsResponse extends AbstractResponse {
         for (ApiKeys apiKey : ApiKeys.values()) {
             versionList.add(new ApiVersion(apiKey.id, 
ProtoUtils.oldestVersion(apiKey.id), ProtoUtils.latestVersion(apiKey.id)));
         }
-        return new ApiVersionsResponse(Errors.NONE.code(), versionList);
+        return new ApiVersionsResponse(Errors.NONE, versionList);
     }
 
     private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> 
apiVersions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 337ccfc..8f44e5c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -68,7 +68,7 @@ public class ControlledShutdownRequest extends 
AbstractRequest {
                 throw new IllegalArgumentException("Version 0 is not 
supported. It is only supported by " +
                         "the Scala request class for controlled shutdown");
             case 1:
-                return new 
ControlledShutdownResponse(Errors.forException(e).code(), 
Collections.<TopicPartition>emptySet());
+                return new ControlledShutdownResponse(Errors.forException(e), 
Collections.<TopicPartition>emptySet());
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index 1996f82..b3922f9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -41,14 +42,14 @@ public class ControlledShutdownResponse extends 
AbstractResponse {
      * BROKER_NOT_AVAILABLE(8)
      * STALE_CONTROLLER_EPOCH(11)
      */
-    private final short errorCode;
+    private final Errors error;
 
     private final Set<TopicPartition> partitionsRemaining;
 
-    public ControlledShutdownResponse(short errorCode, Set<TopicPartition> 
partitionsRemaining) {
+    public ControlledShutdownResponse(Errors error, Set<TopicPartition> 
partitionsRemaining) {
         super(new Struct(CURRENT_SCHEMA));
 
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
 
         List<Struct> partitionsRemainingList = new 
ArrayList<>(partitionsRemaining.size());
         for (TopicPartition topicPartition : partitionsRemaining) {
@@ -59,13 +60,13 @@ public class ControlledShutdownResponse extends 
AbstractResponse {
         }
         struct.set(PARTITIONS_REMAINING_KEY_NAME, 
partitionsRemainingList.toArray());
 
-        this.errorCode = errorCode;
+        this.error = error;
         this.partitionsRemaining = partitionsRemaining;
     }
 
     public ControlledShutdownResponse(Struct struct) {
         super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         Set<TopicPartition> partitions = new HashSet<>();
         for (Object topicPartitionObj : 
struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) {
             Struct topicPartition = (Struct) topicPartitionObj;
@@ -76,8 +77,8 @@ public class ControlledShutdownResponse extends 
AbstractResponse {
         partitionsRemaining = partitions;
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public Set<TopicPartition> partitionsRemaining() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 9807283..b09795f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -113,11 +113,11 @@ public class CreateTopicsResponse extends 
AbstractResponse {
         for (Object topicErrorStructObj : topicErrorStructs) {
             Struct topicErrorCodeStruct = (Struct) topicErrorStructObj;
             String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME);
-            short errorCode = 
topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME);
+            Errors error = 
Errors.forCode(topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME));
             String errorMessage = null;
             if (topicErrorCodeStruct.hasField(ERROR_MESSAGE_KEY_NAME))
                 errorMessage = 
topicErrorCodeStruct.getString(ERROR_MESSAGE_KEY_NAME);
-            errors.put(topic, new Error(Errors.forCode(errorCode), 
errorMessage));
+            errors.put(topic, new Error(error, errorMessage));
         }
 
         this.errors = errors;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index ed6a63d..5c8b3d5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -67,8 +67,8 @@ public class DeleteTopicsResponse extends AbstractResponse {
         for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
             Struct topicErrorCodeStruct = (Struct) topicErrorCodeStructObj;
             String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME);
-            short errorCode = 
topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME);
-            errors.put(topic, Errors.forCode(errorCode));
+            Errors error = 
Errors.forCode(topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME));
+            errors.put(topic, error);
         }
 
         this.errors = errors;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 2eff628..56b387e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -67,7 +67,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
             Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
             GroupMetadata group = groupEntry.getValue();
             groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey());
-            groupStruct.set(ERROR_CODE_KEY_NAME, group.errorCode);
+            groupStruct.set(ERROR_CODE_KEY_NAME, group.error.code());
             groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
             groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
             groupStruct.set(PROTOCOL_KEY_NAME, group.protocol);
@@ -95,7 +95,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
             Struct groupStruct = (Struct) groupObj;
 
             String groupId = groupStruct.getString(GROUP_ID_KEY_NAME);
-            short errorCode = groupStruct.getShort(ERROR_CODE_KEY_NAME);
+            Errors error = 
Errors.forCode(groupStruct.getShort(ERROR_CODE_KEY_NAME));
             String state = groupStruct.getString(GROUP_STATE_KEY_NAME);
             String protocolType = 
groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
             String protocol = groupStruct.getString(PROTOCOL_KEY_NAME);
@@ -111,7 +111,7 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
                 members.add(new GroupMember(memberId, clientId, clientHost,
                         memberMetadata, memberAssignment));
             }
-            this.groups.put(groupId, new GroupMetadata(errorCode, state, 
protocolType, protocol, members));
+            this.groups.put(groupId, new GroupMetadata(error, state, 
protocolType, protocol, members));
         }
     }
 
@@ -121,26 +121,26 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
 
 
     public static class GroupMetadata {
-        private final short errorCode;
+        private final Errors error;
         private final String state;
         private final String protocolType;
         private final String protocol;
         private final List<GroupMember> members;
 
-        public GroupMetadata(short errorCode,
+        public GroupMetadata(Errors error,
                              String state,
                              String protocolType,
                              String protocol,
                              List<GroupMember> members) {
-            this.errorCode = errorCode;
+            this.error = error;
             this.state = state;
             this.protocolType = protocolType;
             this.protocol = protocol;
             this.members = members;
         }
 
-        public short errorCode() {
-            return errorCode;
+        public Errors error() {
+            return error;
         }
 
         public String state() {
@@ -161,7 +161,7 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
 
         public static GroupMetadata forError(Errors error) {
             return new DescribeGroupsResponse.GroupMetadata(
-                    error.code(),
+                    error,
                     DescribeGroupsResponse.UNKNOWN_STATE,
                     DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
                     DescribeGroupsResponse.UNKNOWN_PROTOCOL,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 5700e9e..4f270e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -212,7 +212,7 @@ public class FetchRequest extends AbstractRequest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> 
responseData = new LinkedHashMap<>();
 
         for (Map.Entry<TopicPartition, PartitionData> entry: 
fetchData.entrySet()) {
-            FetchResponse.PartitionData partitionResponse = new 
FetchResponse.PartitionData(Errors.forException(e).code(),
+            FetchResponse.PartitionData partitionResponse = new 
FetchResponse.PartitionData(Errors.forException(e),
                     FetchResponse.INVALID_HIGHWATERMARK, MemoryRecords.EMPTY);
 
             responseData.put(entry.getKey(), partitionResponse);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 965b207..64bd3d3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.network.ByteBufferSend;
 import org.apache.kafka.common.network.MultiSend;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -37,7 +38,7 @@ import java.util.Map;
  * This wrapper supports all versions of the Fetch API
  */
 public class FetchResponse extends AbstractResponse {
-    
+
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
     private static final String RESPONSES_KEY_NAME = "responses";
 
@@ -73,19 +74,19 @@ public class FetchResponse extends AbstractResponse {
     private final int throttleTime;
 
     public static final class PartitionData {
-        public final short errorCode;
+        public final Errors error;
         public final long highWatermark;
         public final Records records;
 
-        public PartitionData(short errorCode, long highWatermark, Records 
records) {
-            this.errorCode = errorCode;
+        public PartitionData(Errors error, long highWatermark, Records 
records) {
+            this.error = error;
             this.highWatermark = highWatermark;
             this.records = records;
         }
 
         @Override
         public String toString() {
-            return "(errorCode=" + errorCode + ", highWaterMark=" + 
highWatermark +
+            return "(error=" + error.toString() + ", highWaterMark=" + 
highWatermark +
                     ", records=" + records + ")";
         }
     }
@@ -128,10 +129,10 @@ public class FetchResponse extends AbstractResponse {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 Struct partitionResponseHeader = 
partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
                 int partition = 
partitionResponseHeader.getInt(PARTITION_KEY_NAME);
-                short errorCode = 
partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME);
+                Errors error = 
Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME));
                 long highWatermark = 
partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
                 Records records = 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                PartitionData partitionData = new PartitionData(errorCode, 
highWatermark, records);
+                PartitionData partitionData = new PartitionData(error, 
highWatermark, records);
                 responseData.put(new TopicPartition(topic, partition), 
partitionData);
             }
         }
@@ -237,7 +238,7 @@ public class FetchResponse extends AbstractResponse {
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 Struct partitionDataHeader = 
partitionData.instance(PARTITION_HEADER_KEY_NAME);
                 partitionDataHeader.set(PARTITION_KEY_NAME, 
partitionEntry.getKey());
-                partitionDataHeader.set(ERROR_CODE_KEY_NAME, 
fetchPartitionData.errorCode);
+                partitionDataHeader.set(ERROR_CODE_KEY_NAME, 
fetchPartitionData.error.code());
                 partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, 
fetchPartitionData.highWatermark);
                 partitionData.set(PARTITION_HEADER_KEY_NAME, 
partitionDataHeader);
                 partitionData.set(RECORD_SET_KEY_NAME, 
fetchPartitionData.records);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
index d8ccdf6..ed56f39 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
@@ -65,7 +65,7 @@ public class GroupCoordinatorRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new 
GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), 
Node.noNode());
+                return new 
GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode());
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.GROUP_COORDINATOR.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
index 1f447f7..fc3d358 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -21,7 +22,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class GroupCoordinatorResponse extends AbstractResponse {
-    
+
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String COORDINATOR_KEY_NAME = "coordinator";
@@ -40,24 +41,24 @@ public class GroupCoordinatorResponse extends 
AbstractResponse {
     private static final String HOST_KEY_NAME = "host";
     private static final String PORT_KEY_NAME = "port";
 
-    private final short errorCode;
+    private final Errors error;
     private final Node node;
 
-    public GroupCoordinatorResponse(short errorCode, Node node) {
+    public GroupCoordinatorResponse(Errors error, Node node) {
         super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
         Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
         coordinator.set(NODE_ID_KEY_NAME, node.id());
         coordinator.set(HOST_KEY_NAME, node.host());
         coordinator.set(PORT_KEY_NAME, node.port());
         struct.set(COORDINATOR_KEY_NAME, coordinator);
-        this.errorCode = errorCode;
+        this.error = error;
         this.node = node;
     }
 
     public GroupCoordinatorResponse(Struct struct) {
         super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
         int nodeId = broker.getInt(NODE_ID_KEY_NAME);
         String host = broker.getString(HOST_KEY_NAME);
@@ -65,8 +66,8 @@ public class GroupCoordinatorResponse extends 
AbstractResponse {
         node = new Node(nodeId, host, port);
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public Node node() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 0e5c17a..7e79c8a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -80,7 +80,7 @@ public class HeartbeatRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new HeartbeatResponse(Errors.forException(e).code());
+                return new HeartbeatResponse(Errors.forException(e));
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 72f0175..f36dec4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -13,6 +13,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -20,7 +21,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class HeartbeatResponse extends AbstractResponse {
-    
+
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
@@ -35,20 +36,21 @@ public class HeartbeatResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
 
-    private final short errorCode;
-    public HeartbeatResponse(short errorCode) {
+    private final Errors error;
+
+    public HeartbeatResponse(Errors error) {
         super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        this.errorCode = errorCode;
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        this.error = error;
     }
 
     public HeartbeatResponse(Struct struct) {
         super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public static HeartbeatResponse parse(ByteBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 3f00ed1..ad0cdd0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -170,7 +170,7 @@ public class JoinGroupRequest extends AbstractRequest {
             case 1:
                 return new JoinGroupResponse(
                         versionId,
-                        Errors.forException(e).code(),
+                        Errors.forException(e),
                         JoinGroupResponse.UNKNOWN_GENERATION_ID,
                         JoinGroupResponse.UNKNOWN_PROTOCOL,
                         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 8fd77ce..bc9366a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -13,6 +13,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -53,24 +54,24 @@ public class JoinGroupResponse extends AbstractResponse {
     public static final int UNKNOWN_GENERATION_ID = -1;
     public static final String UNKNOWN_MEMBER_ID = "";
 
-    private final short errorCode;
+    private final Errors error;
     private final int generationId;
     private final String groupProtocol;
     private final String memberId;
     private final String leaderId;
     private final Map<String, ByteBuffer> members;
 
-    public JoinGroupResponse(short errorCode,
+    public JoinGroupResponse(Errors error,
                              int generationId,
                              String groupProtocol,
                              String memberId,
                              String leaderId,
                              Map<String, ByteBuffer> groupMembers) {
-        this(CURRENT_VERSION, errorCode, generationId, groupProtocol, 
memberId, leaderId, groupMembers);
+        this(CURRENT_VERSION, error, generationId, groupProtocol, memberId, 
leaderId, groupMembers);
     }
 
     public JoinGroupResponse(int version,
-                             short errorCode,
+                             Errors error,
                              int generationId,
                              String groupProtocol,
                              String memberId,
@@ -78,7 +79,7 @@ public class JoinGroupResponse extends AbstractResponse {
                              Map<String, ByteBuffer> groupMembers) {
         super(new Struct(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, 
version)));
 
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
         struct.set(GENERATION_ID_KEY_NAME, generationId);
         struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol);
         struct.set(MEMBER_ID_KEY_NAME, memberId);
@@ -93,7 +94,7 @@ public class JoinGroupResponse extends AbstractResponse {
         }
         struct.set(MEMBERS_KEY_NAME, memberArray.toArray());
 
-        this.errorCode = errorCode;
+        this.error = error;
         this.generationId = generationId;
         this.groupProtocol = groupProtocol;
         this.memberId = memberId;
@@ -111,15 +112,15 @@ public class JoinGroupResponse extends AbstractResponse {
             ByteBuffer memberMetadata = 
memberData.getBytes(MEMBER_METADATA_KEY_NAME);
             members.put(memberId, memberMetadata);
         }
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         generationId = struct.getInt(GENERATION_ID_KEY_NAME);
         groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME);
         memberId = struct.getString(MEMBER_ID_KEY_NAME);
         leaderId = struct.getString(LEADER_ID_KEY_NAME);
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public int generationId() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 1f09a12..fde184a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -178,15 +178,15 @@ public class LeaderAndIsrRequest extends AbstractRequest {
 
     @Override
     public AbstractResponse getErrorResponse(Throwable e) {
-        Map<TopicPartition, Short> responses = new 
HashMap<>(partitionStates.size());
+        Map<TopicPartition, Errors> responses = new 
HashMap<>(partitionStates.size());
         for (TopicPartition partition : partitionStates.keySet()) {
-            responses.put(partition, Errors.forException(e).code());
+            responses.put(partition, Errors.forException(e));
         }
 
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
+                return new LeaderAndIsrResponse(Errors.NONE, responses);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.LEADER_AND_ISR.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index a754def..4d0a05d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -41,34 +41,32 @@ public class LeaderAndIsrResponse extends AbstractResponse {
      *
      * STALE_CONTROLLER_EPOCH (11)
      */
-    private final short errorCode;
+    private final Errors error;
 
-    private final Map<TopicPartition, Short> responses;
+    private final Map<TopicPartition, Errors> responses;
 
-    public LeaderAndIsrResponse(Map<TopicPartition, Short> responses) {
-        this(Errors.NONE.code(), responses);
+    public LeaderAndIsrResponse(Map<TopicPartition, Errors> responses) {
+        this(Errors.NONE, responses);
     }
 
-    public LeaderAndIsrResponse(short errorCode, Map<TopicPartition, Short> 
responses) {
+    public LeaderAndIsrResponse(Errors error, Map<TopicPartition, Errors> 
responses) {
         super(new Struct(CURRENT_SCHEMA));
 
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-
         List<Struct> responseDatas = new ArrayList<>(responses.size());
-        for (Map.Entry<TopicPartition, Short> response : responses.entrySet()) 
{
+        for (Map.Entry<TopicPartition, Errors> response : 
responses.entrySet()) {
             Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
             TopicPartition partition = response.getKey();
             partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
             partitionData.set(PARTITIONS_PARTITION_KEY_NAME, 
partition.partition());
-            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, 
response.getValue());
+            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, 
response.getValue().code());
             responseDatas.add(partitionData);
         }
 
         struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
 
         this.responses = responses;
-        this.errorCode = errorCode;
+        this.error = error;
     }
 
     public LeaderAndIsrResponse(Struct struct) {
@@ -79,19 +77,19 @@ public class LeaderAndIsrResponse extends AbstractResponse {
             Struct responseData = (Struct) responseDataObj;
             String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME);
             int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME);
-            short errorCode = 
responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME);
-            responses.put(new TopicPartition(topic, partition), errorCode);
+            Errors error = 
Errors.forCode(responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME));
+            responses.put(new TopicPartition(topic, partition), error);
         }
 
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
-    public Map<TopicPartition, Short> responses() {
+    public Map<TopicPartition, Errors> responses() {
         return responses;
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public static LeaderAndIsrResponse parse(ByteBuffer buffer, int version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 573ebc8..2a7b70e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -71,7 +71,7 @@ public class LeaveGroupRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new LeaveGroupResponse(Errors.forException(e).code());
+                return new LeaveGroupResponse(Errors.forException(e));
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.LEAVE_GROUP.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 9c7998b..bd1c84d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.requests;
 
 import java.nio.ByteBuffer;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -33,20 +34,20 @@ public class LeaveGroupResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
 
-    private final short errorCode;
-    public LeaveGroupResponse(short errorCode) {
+    private final Errors error;
+    public LeaveGroupResponse(Errors error) {
         super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        this.errorCode = errorCode;
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        this.error = error;
     }
 
     public LeaveGroupResponse(Struct struct) {
         super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public static LeaveGroupResponse parse(ByteBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 8d0a1af..235f4e4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -51,8 +51,7 @@ public class ListGroupsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                short errorCode = Errors.forException(e).code();
-                return new ListGroupsResponse(errorCode, 
Collections.<ListGroupsResponse.Group>emptyList());
+                return new ListGroupsResponse(Errors.forException(e), 
Collections.<ListGroupsResponse.Group>emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.LIST_GROUPS.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 98573f8..f421064 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -39,12 +39,12 @@ public class ListGroupsResponse extends AbstractResponse {
      * AUTHORIZATION_FAILED (29)
      */
 
-    private final short errorCode;
+    private final Errors error;
     private final List<Group> groups;
 
-    public ListGroupsResponse(short errorCode, List<Group> groups) {
+    public ListGroupsResponse(Errors error, List<Group> groups) {
         super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
         List<Struct> groupList = new ArrayList<>();
         for (Group group : groups) {
             Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
@@ -53,13 +53,13 @@ public class ListGroupsResponse extends AbstractResponse {
             groupList.add(groupStruct);
         }
         struct.set(GROUPS_KEY_NAME, groupList.toArray());
-        this.errorCode = errorCode;
+        this.error = error;
         this.groups = groups;
     }
 
     public ListGroupsResponse(Struct struct) {
         super(struct);
-        this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         this.groups = new ArrayList<>();
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
             Struct groupStruct = (Struct) groupObj;
@@ -73,8 +73,8 @@ public class ListGroupsResponse extends AbstractResponse {
         return groups;
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public static class Group {
@@ -101,7 +101,7 @@ public class ListGroupsResponse extends AbstractResponse {
     }
 
     public static ListGroupsResponse fromError(Errors error) {
-        return new ListGroupsResponse(error.code(), 
Collections.<Group>emptyList());
+        return new ListGroupsResponse(error, Collections.<Group>emptyList());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 79251ed..6214a56 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -241,12 +241,12 @@ public class ListOffsetRequest extends AbstractRequest {
         short versionId = version();
         if (versionId == 0) {
             for (Map.Entry<TopicPartition, PartitionData> entry : 
offsetData.entrySet()) {
-                ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(Errors.forException(e).code(), new 
ArrayList<Long>());
+                ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(Errors.forException(e), new ArrayList<Long>());
                 responseData.put(entry.getKey(), partitionResponse);
             }
         } else {
             for (Map.Entry<TopicPartition, Long> entry : 
partitionTimestamps.entrySet()) {
-                ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(Errors.forException(e).code(), -1L, -1L);
+                ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(Errors.forException(e), -1L, -1L);
                 responseData.put(entry.getKey(), partitionResponse);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 2eddf1e..b815a53 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -33,7 +34,7 @@ import java.util.Map;
 public class ListOffsetResponse extends AbstractResponse {
     public static final long UNKNOWN_TIMESTAMP = -1L;
     public static final long UNKNOWN_OFFSET = -1L;
-    
+
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
     private static final String RESPONSES_KEY_NAME = "responses";
 
@@ -63,7 +64,7 @@ public class ListOffsetResponse extends AbstractResponse {
     private final Map<TopicPartition, PartitionData> responseData;
 
     public static final class PartitionData {
-        public final short errorCode;
+        public final Errors error;
         // The offsets list is only used in ListOffsetResponse v0.
         @Deprecated
         public final List<Long> offsets;
@@ -74,8 +75,8 @@ public class ListOffsetResponse extends AbstractResponse {
          * Constructor for ListOffsetResponse v0
          */
         @Deprecated
-        public PartitionData(short errorCode, List<Long> offsets) {
-            this.errorCode = errorCode;
+        public PartitionData(Errors error, List<Long> offsets) {
+            this.error = error;
             this.offsets = offsets;
             this.timestamp = null;
             this.offset = null;
@@ -84,8 +85,8 @@ public class ListOffsetResponse extends AbstractResponse {
         /**
          * Constructor for ListOffsetResponse v1
          */
-        public PartitionData(short errorCode, long timestamp, long offset) {
-            this.errorCode = errorCode;
+        public PartitionData(Errors error, long timestamp, long offset) {
+            this.error = error;
             this.timestamp = timestamp;
             this.offset = offset;
             this.offsets = null;
@@ -95,7 +96,7 @@ public class ListOffsetResponse extends AbstractResponse {
         public String toString() {
             StringBuilder bld = new StringBuilder();
             bld.append("PartitionData{").
-                append("errorCode: ").append((int) errorCode).
+                append("errorCode: ").append((int) error.code()).
                 append(", timestamp: ").append(timestamp).
                 append(", offset: ").append(offset).
                 append(", offsets: ");
@@ -130,7 +131,7 @@ public class ListOffsetResponse extends AbstractResponse {
                 PartitionData offsetPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, 
offsetPartitionData.errorCode);
+                partitionData.set(ERROR_CODE_KEY_NAME, 
offsetPartitionData.error.code());
                 if (version == 0)
                     partitionData.set(OFFSETS_KEY_NAME, 
offsetPartitionData.offsets.toArray());
                 else {
@@ -155,18 +156,18 @@ public class ListOffsetResponse extends AbstractResponse {
             for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                short errorCode = 
partitionResponse.getShort(ERROR_CODE_KEY_NAME);
+                Errors error = 
Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME));
                 PartitionData partitionData;
                 if (partitionResponse.hasField(OFFSETS_KEY_NAME)) {
                     Object[] offsets = 
partitionResponse.getArray(OFFSETS_KEY_NAME);
                     List<Long> offsetsList = new ArrayList<Long>();
                     for (Object offset : offsets)
                         offsetsList.add((Long) offset);
-                    partitionData = new PartitionData(errorCode, offsetsList);
+                    partitionData = new PartitionData(error, offsetsList);
                 } else {
                     long timestamp = 
partitionResponse.getLong(TIMESTAMP_KEY_NAME);
                     long offset = partitionResponse.getLong(OFFSET_KEY_NAME);
-                    partitionData = new PartitionData(errorCode, timestamp, 
offset);
+                    partitionData = new PartitionData(error, timestamp, 
offset);
                 }
                 responseData.put(new TopicPartition(topic, partition), 
partitionData);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 361fd15..6dd1197 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -285,9 +285,9 @@ public class OffsetCommitRequest extends AbstractRequest {
 
     @Override
     public AbstractResponse getErrorResponse(Throwable e) {
-        Map<TopicPartition, Short> responseData = new HashMap<>();
+        Map<TopicPartition, Errors> responseData = new HashMap<>();
         for (Map.Entry<TopicPartition, PartitionData> entry: 
offsetData.entrySet()) {
-            responseData.put(entry.getKey(), Errors.forException(e).code());
+            responseData.put(entry.getKey(), Errors.forException(e));
         }
 
         short versionId = version();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index abb260e..8a00c6b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -26,7 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 public class OffsetCommitResponse extends AbstractResponse {
-    
+
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
     private static final String RESPONSES_KEY_NAME = "responses";
 
@@ -54,22 +55,22 @@ public class OffsetCommitResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
 
-    private final Map<TopicPartition, Short> responseData;
+    private final Map<TopicPartition, Errors> responseData;
 
-    public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {
+    public OffsetCommitResponse(Map<TopicPartition, Errors> responseData) {
         super(new Struct(CURRENT_SCHEMA));
 
-        Map<String, Map<Integer, Short>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
+        Map<String, Map<Integer, Errors>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
 
         List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, Short>> entries: 
topicsData.entrySet()) {
+        for (Map.Entry<String, Map<Integer, Errors>> entries: 
topicsData.entrySet()) {
             Struct topicData = struct.instance(RESPONSES_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, entries.getKey());
             List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, Short> partitionEntry : 
entries.getValue().entrySet()) {
+            for (Map.Entry<Integer, Errors> partitionEntry : 
entries.getValue().entrySet()) {
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, 
partitionEntry.getValue());
+                partitionData.set(ERROR_CODE_KEY_NAME, 
partitionEntry.getValue().code());
                 partitionArray.add(partitionData);
             }
             topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
@@ -81,20 +82,20 @@ public class OffsetCommitResponse extends AbstractResponse {
 
     public OffsetCommitResponse(Struct struct) {
         super(struct);
-        responseData = new HashMap<TopicPartition, Short>();
+        responseData = new HashMap<TopicPartition, Errors>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.getString(TOPIC_KEY_NAME);
             for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                short errorCode = 
partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                responseData.put(new TopicPartition(topic, partition), 
errorCode);
+                Errors error = 
Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME));
+                responseData.put(new TopicPartition(topic, partition), error);
             }
         }
     }
 
-    public Map<TopicPartition, Short> responseData() {
+    public Map<TopicPartition, Errors> responseData() {
         return responseData;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index 81bc249..d244f0a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -66,7 +66,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
         switch (versionId) {
             case 0:
                 List<String> enabledMechanisms = Collections.emptyList();
-                return new 
SaslHandshakeResponse(Errors.forException(e).code(), enabledMechanisms);
+                return new SaslHandshakeResponse(Errors.forException(e), 
enabledMechanisms);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index 6d7f734..f50c5be 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -45,20 +46,20 @@ public class SaslHandshakeResponse extends AbstractResponse 
{
      *   UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server
      *   ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake
      */
-    private final short errorCode;
+    private final Errors error;
     private final List<String> enabledMechanisms;
 
-    public SaslHandshakeResponse(short errorCode, Collection<String> 
enabledMechanisms) {
+    public SaslHandshakeResponse(Errors error, Collection<String> 
enabledMechanisms) {
         super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
         struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray());
-        this.errorCode = errorCode;
+        this.error = error;
         this.enabledMechanisms = new ArrayList<>(enabledMechanisms);
     }
 
     public SaslHandshakeResponse(Struct struct) {
         super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME);
         ArrayList<String> enabledMechanisms = new ArrayList<>();
         for (Object mechanism : mechanisms)
@@ -66,8 +67,8 @@ public class SaslHandshakeResponse extends AbstractResponse {
         this.enabledMechanisms = enabledMechanisms;
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public List<String> enabledMechanisms() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index d687d99..ff2638b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -117,15 +117,15 @@ public class StopReplicaRequest extends AbstractRequest {
 
     @Override
     public AbstractResponse getErrorResponse(Throwable e) {
-        Map<TopicPartition, Short> responses = new 
HashMap<>(partitions.size());
+        Map<TopicPartition, Errors> responses = new 
HashMap<>(partitions.size());
         for (TopicPartition partition : partitions) {
-            responses.put(partition, Errors.forException(e).code());
+            responses.put(partition, Errors.forException(e));
         }
 
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new StopReplicaResponse(Errors.NONE.code(), responses);
+                return new StopReplicaResponse(Errors.NONE, responses);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.STOP_REPLICA.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 92d9e58..b39fb19 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -36,8 +36,8 @@ public class StopReplicaResponse extends AbstractResponse {
     private static final String PARTITIONS_PARTITION_KEY_NAME = "partition";
     private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code";
 
-    private final Map<TopicPartition, Short> responses;
-    private final short errorCode;
+    private final Map<TopicPartition, Errors> responses;
+    private final Errors error;
 
     /**
      * Possible error code:
@@ -45,30 +45,28 @@ public class StopReplicaResponse extends AbstractResponse {
      * STALE_CONTROLLER_EPOCH (11)
      */
 
-    public StopReplicaResponse(Map<TopicPartition, Short> responses) {
-        this(Errors.NONE.code(), responses);
+    public StopReplicaResponse(Map<TopicPartition, Errors> responses) {
+        this(Errors.NONE, responses);
     }
 
-    public StopReplicaResponse(short errorCode, Map<TopicPartition, Short> 
responses) {
+    public StopReplicaResponse(Errors error, Map<TopicPartition, Errors> 
responses) {
         super(new Struct(CURRENT_SCHEMA));
 
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-
         List<Struct> responseDatas = new ArrayList<>(responses.size());
-        for (Map.Entry<TopicPartition, Short> response : responses.entrySet()) 
{
+        for (Map.Entry<TopicPartition, Errors> response : 
responses.entrySet()) {
             Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
             TopicPartition partition = response.getKey();
             partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
             partitionData.set(PARTITIONS_PARTITION_KEY_NAME, 
partition.partition());
-            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, 
response.getValue());
+            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, 
response.getValue().code());
             responseDatas.add(partitionData);
         }
 
         struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
 
         this.responses = responses;
-        this.errorCode = errorCode;
+        this.error = error;
     }
 
     public StopReplicaResponse(Struct struct) {
@@ -79,19 +77,19 @@ public class StopReplicaResponse extends AbstractResponse {
             Struct responseData = (Struct) responseDataObj;
             String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME);
             int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME);
-            short errorCode = 
responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME);
-            responses.put(new TopicPartition(topic, partition), errorCode);
+            Errors error = 
Errors.forCode(responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME));
+            responses.put(new TopicPartition(topic, partition), error);
         }
 
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
-    public Map<TopicPartition, Short> responses() {
+    public Map<TopicPartition, Errors> responses() {
         return responses;
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public static StopReplicaResponse parse(ByteBuffer buffer, int versionId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 55b7308..937bf98 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -117,7 +117,7 @@ public class SyncGroupRequest extends AbstractRequest {
         switch (versionId) {
             case 0:
                 return new SyncGroupResponse(
-                        Errors.forException(e).code(),
+                        Errors.forException(e),
                         ByteBuffer.wrap(new byte[]{}));
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index f459656..e598975 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -40,28 +41,28 @@ public class SyncGroupResponse extends AbstractResponse {
      * GROUP_AUTHORIZATION_FAILED (30)
      */
 
-    private final short errorCode;
+    private final Errors error;
     private final ByteBuffer memberState;
 
-    public SyncGroupResponse(short errorCode, ByteBuffer memberState) {
+    public SyncGroupResponse(Errors error, ByteBuffer memberState) {
         super(new Struct(CURRENT_SCHEMA));
 
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
         struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
 
-        this.errorCode = errorCode;
+        this.error = error;
         this.memberState = memberState;
     }
 
     public SyncGroupResponse(Struct struct) {
         super(struct);
 
-        this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public ByteBuffer memberAssignment() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 95e5683..ef680ff 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -280,7 +280,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     public AbstractResponse getErrorResponse(Throwable e) {
         short versionId = version();
         if (versionId <= 3)
-            return new UpdateMetadataResponse(Errors.forException(e).code());
+            return new UpdateMetadataResponse(Errors.forException(e));
         else
             throw new IllegalArgumentException(String.format("Version %d is 
not valid. Valid versions for %s are 0 to %d",
                     versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));

Reply via email to