MINOR: Use an explicit `Errors` object when possible instead of a numeric error code
Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #2475 from vahidhashemian/minor/use_explicit_Errors_type_when_possible Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9898d665 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9898d665 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9898d665 Branch: refs/heads/trunk Commit: 9898d665d1ab201405d66c70e3ea9710d9dcecd7 Parents: a15fcea Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Thu Feb 9 21:03:46 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Feb 9 21:03:46 2017 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/clients/NetworkClient.java | 4 +- .../consumer/internals/AbstractCoordinator.java | 10 +- .../consumer/internals/ConsumerCoordinator.java | 4 +- .../clients/consumer/internals/Fetcher.java | 4 +- .../common/requests/ApiVersionsRequest.java | 3 +- .../common/requests/ApiVersionsResponse.java | 18 +- .../requests/ControlledShutdownRequest.java | 2 +- .../requests/ControlledShutdownResponse.java | 15 +- .../common/requests/CreateTopicsResponse.java | 4 +- .../common/requests/DeleteTopicsResponse.java | 4 +- .../common/requests/DescribeGroupsResponse.java | 18 +- .../kafka/common/requests/FetchRequest.java | 2 +- .../kafka/common/requests/FetchResponse.java | 17 +- .../requests/GroupCoordinatorRequest.java | 2 +- .../requests/GroupCoordinatorResponse.java | 17 +- .../kafka/common/requests/HeartbeatRequest.java | 2 +- .../common/requests/HeartbeatResponse.java | 18 +- .../kafka/common/requests/JoinGroupRequest.java | 2 +- .../common/requests/JoinGroupResponse.java | 19 +- .../common/requests/LeaderAndIsrRequest.java | 6 +- .../common/requests/LeaderAndIsrResponse.java | 32 +- .../common/requests/LeaveGroupRequest.java | 2 +- .../common/requests/LeaveGroupResponse.java | 15 +- .../common/requests/ListGroupsRequest.java | 3 +- .../common/requests/ListGroupsResponse.java | 16 +- .../common/requests/ListOffsetRequest.java | 4 +- .../common/requests/ListOffsetResponse.java | 23 +- .../common/requests/OffsetCommitRequest.java | 8 +- .../common/requests/OffsetCommitResponse.java | 23 +- .../common/requests/SaslHandshakeRequest.java | 2 +- .../common/requests/SaslHandshakeResponse.java | 15 +- .../common/requests/StopReplicaRequest.java | 6 +- .../common/requests/StopReplicaResponse.java | 32 +- .../kafka/common/requests/SyncGroupRequest.java | 2 +- .../common/requests/SyncGroupResponse.java | 15 +- .../common/requests/UpdateMetadataRequest.java | 2 +- .../common/requests/UpdateMetadataResponse.java | 15 +- .../authenticator/SaslClientAuthenticator.java | 4 +- .../authenticator/SaslServerAuthenticator.java | 4 +- .../clients/consumer/KafkaConsumerTest.java | 56 ++-- .../internals/AbstractCoordinatorTest.java | 8 +- .../internals/ConsumerCoordinatorTest.java | 320 +++++++++---------- .../internals/ConsumerNetworkClientTest.java | 16 +- .../clients/consumer/internals/FetcherTest.java | 48 +-- .../clients/producer/internals/SenderTest.java | 21 +- .../common/requests/RequestResponseTest.java | 46 +-- .../authenticator/SaslAuthenticatorTest.java | 6 +- .../distributed/WorkerCoordinatorTest.java | 40 +-- .../main/scala/kafka/admin/AdminClient.scala | 8 +- .../kafka/admin/ConsumerGroupCommand.scala | 4 +- .../kafka/api/ControlledShutdownRequest.scala | 4 +- .../kafka/api/ControlledShutdownResponse.scala | 8 +- .../src/main/scala/kafka/api/FetchRequest.scala | 2 +- .../main/scala/kafka/api/FetchResponse.scala | 8 +- .../kafka/api/GroupCoordinatorRequest.scala | 2 +- .../kafka/api/GroupCoordinatorResponse.scala | 10 +- .../scala/kafka/api/OffsetCommitRequest.scala | 4 +- .../scala/kafka/api/OffsetCommitResponse.scala | 10 +- .../scala/kafka/api/OffsetFetchRequest.scala | 4 +- .../scala/kafka/api/OffsetFetchResponse.scala | 16 +- .../main/scala/kafka/api/OffsetRequest.scala | 2 +- .../main/scala/kafka/api/OffsetResponse.scala | 10 +- .../main/scala/kafka/api/ProducerRequest.scala | 2 +- .../main/scala/kafka/api/ProducerResponse.scala | 8 +- .../main/scala/kafka/api/TopicMetadata.scala | 20 +- .../scala/kafka/api/TopicMetadataRequest.scala | 2 +- .../main/scala/kafka/client/ClientUtils.scala | 2 +- .../main/scala/kafka/common/ErrorMapping.scala | 20 +- .../kafka/common/OffsetMetadataAndError.scala | 22 +- .../kafka/consumer/ConsumerFetcherThread.scala | 5 +- .../scala/kafka/consumer/SimpleConsumer.scala | 5 +- .../consumer/ZookeeperConsumerConnector.scala | 20 +- .../kafka/controller/TopicDeletionManager.scala | 4 +- .../kafka/coordinator/GroupCoordinator.scala | 138 ++++---- .../coordinator/GroupMetadataManager.scala | 14 +- .../kafka/coordinator/MemberMetadata.scala | 4 +- .../scala/kafka/javaapi/FetchResponse.scala | 4 +- .../javaapi/GroupCoordinatorResponse.scala | 4 +- .../kafka/javaapi/OffsetCommitResponse.scala | 6 +- .../scala/kafka/javaapi/OffsetResponse.scala | 4 +- .../scala/kafka/javaapi/TopicMetadata.scala | 9 +- .../kafka/producer/BrokerPartitionInfo.scala | 12 +- .../producer/async/DefaultEventHandler.scala | 6 +- .../kafka/security/auth/ResourceType.scala | 8 +- .../kafka/server/AbstractFetcherThread.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 112 +++---- .../main/scala/kafka/server/KafkaServer.scala | 8 +- .../kafka/server/ReplicaFetcherThread.scala | 6 +- .../scala/kafka/server/ReplicaManager.scala | 40 +-- .../kafka/tools/ConsumerOffsetChecker.scala | 4 +- .../kafka/tools/ReplicaVerificationTool.scala | 2 +- .../kafka/api/AuthorizerIntegrationTest.scala | 38 +-- .../ReplicaFetcherThreadFatalErrorTest.scala | 2 +- .../tools/ReplicaVerificationToolTest.scala | 2 +- .../scala/other/kafka/TestOffsetManager.scala | 4 +- .../api/RequestResponseSerializationTest.scala | 28 +- .../GroupCoordinatorResponseTest.scala | 300 ++++++++--------- .../coordinator/GroupMetadataManagerTest.scala | 36 +-- .../integration/BaseTopicMetadataTest.scala | 26 +- .../kafka/integration/PrimitiveApiTest.scala | 4 +- .../unit/kafka/producer/AsyncProducerTest.scala | 6 +- .../unit/kafka/producer/SyncProducerTest.scala | 18 +- .../AbstractCreateTopicsRequestTest.scala | 2 +- .../server/AbstractFetcherThreadTest.scala | 2 +- .../kafka/server/ApiVersionsRequestTest.scala | 2 +- .../kafka/server/DeleteTopicsRequestTest.scala | 2 +- .../unit/kafka/server/FetchRequestTest.scala | 8 +- .../unit/kafka/server/LeaderElectionTest.scala | 2 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 2 +- .../unit/kafka/server/OffsetCommitTest.scala | 50 +-- .../unit/kafka/server/ReplicaManagerTest.scala | 8 +- .../server/SaslApiVersionsRequestTest.scala | 4 +- 112 files changed, 1054 insertions(+), 1034 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 3a75288..890bf56 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -544,9 +544,9 @@ public class NetworkClient implements KafkaClient { private void handleApiVersionsResponse(List<ClientResponse> responses, InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) { final String node = req.destination; - if (apiVersionsResponse.errorCode() != Errors.NONE.code()) { + if (apiVersionsResponse.error() != Errors.NONE) { log.warn("Node {} got error {} when making an ApiVersionsRequest. Disconnecting.", - node, Errors.forCode(apiVersionsResponse.errorCode())); + node, apiVersionsResponse.error()); this.selector.close(node); processDisconnection(responses, node, now); return; http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 2fdf802..350a84b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -420,7 +420,7 @@ public abstract class AbstractCoordinator implements Closeable { private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> { @Override public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { - Errors error = Errors.forCode(joinResponse.errorCode()); + Errors error = joinResponse.error(); if (error == Errors.NONE) { log.debug("Received successful JoinGroup response for group {}: {}", groupId, joinResponse); sensors.joinLatency.record(response.requestLatencyMs()); @@ -509,7 +509,7 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) { - Errors error = Errors.forCode(syncResponse.errorCode()); + Errors error = syncResponse.error(); if (error == Errors.NONE) { sensors.syncLatency.record(response.requestLatencyMs()); future.complete(syncResponse.memberAssignment()); @@ -562,7 +562,7 @@ public abstract class AbstractCoordinator implements Closeable { // use MAX_VALUE - node.id as the coordinator id to mimic separate connections // for the coordinator in the underlying network client layer // TODO: this needs to be better handled in KAFKA-1935 - Errors error = Errors.forCode(groupCoordinatorResponse.errorCode()); + Errors error = groupCoordinatorResponse.error(); clearFindCoordinatorFuture(); if (error == Errors.NONE) { synchronized (AbstractCoordinator.this) { @@ -688,7 +688,7 @@ public abstract class AbstractCoordinator implements Closeable { private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> { @Override public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) { - Errors error = Errors.forCode(leaveResponse.errorCode()); + Errors error = leaveResponse.error(); if (error == Errors.NONE) { log.debug("LeaveGroup request for group {} returned successfully", groupId); future.complete(null); @@ -712,7 +712,7 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); - Errors error = Errors.forCode(heartbeatResponse.errorCode()); + Errors error = heartbeatResponse.error(); if (error == Errors.NONE) { log.debug("Received successful Heartbeat response for group {}", groupId); future.complete(null); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 8669527..0dc073e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -722,12 +722,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator { sensors.commitLatency.record(response.requestLatencyMs()); Set<String> unauthorizedTopics = new HashSet<>(); - for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) { + for (Map.Entry<TopicPartition, Errors> entry : commitResponse.responseData().entrySet()) { TopicPartition tp = entry.getKey(); OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); long offset = offsetAndMetadata.offset(); - Errors error = Errors.forCode(entry.getValue()); + Errors error = entry.getValue(); if (error == Errors.NONE) { log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp); if (subscriptions.isAssigned(tp)) http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 6a13d46..02f34e5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -637,7 +637,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) { TopicPartition topicPartition = entry.getKey(); ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition); - Errors error = Errors.forCode(partitionData.errorCode); + Errors error = partitionData.error; if (error == Errors.NONE) { if (partitionData.offsets != null) { // Handle v0 response @@ -750,7 +750,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { int bytes = 0; int recordsCount = 0; PartitionRecords<K, V> parsedRecords = null; - Errors error = Errors.forCode(partition.errorCode); + Errors error = partition.error; try { if (!subscriptions.isFetchable(tp)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java index 17e6d5e..fe7c348 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java @@ -51,8 +51,7 @@ public class ApiVersionsRequest extends AbstractRequest { short versionId = version(); switch (versionId) { case 0: - short errorCode = Errors.forException(e).code(); - return new ApiVersionsResponse(errorCode, Collections.<ApiVersionsResponse.ApiVersion>emptyList()); + return new ApiVersionsResponse(Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.API_VERSIONS.id))); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 81be9c3..7d8bcc5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -42,7 +42,7 @@ public class ApiVersionsResponse extends AbstractResponse { * * UNSUPPORTED_VERSION (33) */ - private final short errorCode; + private final Errors error; private final Map<Short, ApiVersion> apiKeyToApiVersion; public static final class ApiVersion { @@ -66,9 +66,9 @@ public class ApiVersionsResponse extends AbstractResponse { } } - public ApiVersionsResponse(short errorCode, List<ApiVersion> apiVersions) { + public ApiVersionsResponse(Errors error, List<ApiVersion> apiVersions) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(ERROR_CODE_KEY_NAME, error.code()); List<Struct> apiVersionList = new ArrayList<>(); for (ApiVersion apiVersion : apiVersions) { Struct apiVersionStruct = struct.instance(API_VERSIONS_KEY_NAME); @@ -78,13 +78,13 @@ public class ApiVersionsResponse extends AbstractResponse { apiVersionList.add(apiVersionStruct); } struct.set(API_VERSIONS_KEY_NAME, apiVersionList.toArray()); - this.errorCode = errorCode; + this.error = error; this.apiKeyToApiVersion = buildApiKeyToApiVersion(apiVersions); } public ApiVersionsResponse(Struct struct) { super(struct); - this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); List<ApiVersion> tempApiVersions = new ArrayList<>(); for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) { Struct apiVersionStruct = (Struct) apiVersionsObj; @@ -104,8 +104,8 @@ public class ApiVersionsResponse extends AbstractResponse { return apiKeyToApiVersion.get(apiKey); } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public static ApiVersionsResponse parse(ByteBuffer buffer) { @@ -113,7 +113,7 @@ public class ApiVersionsResponse extends AbstractResponse { } public static ApiVersionsResponse fromError(Errors error) { - return new ApiVersionsResponse(error.code(), Collections.<ApiVersion>emptyList()); + return new ApiVersionsResponse(error, Collections.<ApiVersion>emptyList()); } private static ApiVersionsResponse createApiVersionsResponse() { @@ -121,7 +121,7 @@ public class ApiVersionsResponse extends AbstractResponse { for (ApiKeys apiKey : ApiKeys.values()) { versionList.add(new ApiVersion(apiKey.id, ProtoUtils.oldestVersion(apiKey.id), ProtoUtils.latestVersion(apiKey.id))); } - return new ApiVersionsResponse(Errors.NONE.code(), versionList); + return new ApiVersionsResponse(Errors.NONE, versionList); } private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java index 337ccfc..8f44e5c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java @@ -68,7 +68,7 @@ public class ControlledShutdownRequest extends AbstractRequest { throw new IllegalArgumentException("Version 0 is not supported. It is only supported by " + "the Scala request class for controlled shutdown"); case 1: - return new ControlledShutdownResponse(Errors.forException(e).code(), Collections.<TopicPartition>emptySet()); + return new ControlledShutdownResponse(Errors.forException(e), Collections.<TopicPartition>emptySet()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id))); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java index 1996f82..b3922f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java @@ -14,6 +14,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -41,14 +42,14 @@ public class ControlledShutdownResponse extends AbstractResponse { * BROKER_NOT_AVAILABLE(8) * STALE_CONTROLLER_EPOCH(11) */ - private final short errorCode; + private final Errors error; private final Set<TopicPartition> partitionsRemaining; - public ControlledShutdownResponse(short errorCode, Set<TopicPartition> partitionsRemaining) { + public ControlledShutdownResponse(Errors error, Set<TopicPartition> partitionsRemaining) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(ERROR_CODE_KEY_NAME, error.code()); List<Struct> partitionsRemainingList = new ArrayList<>(partitionsRemaining.size()); for (TopicPartition topicPartition : partitionsRemaining) { @@ -59,13 +60,13 @@ public class ControlledShutdownResponse extends AbstractResponse { } struct.set(PARTITIONS_REMAINING_KEY_NAME, partitionsRemainingList.toArray()); - this.errorCode = errorCode; + this.error = error; this.partitionsRemaining = partitionsRemaining; } public ControlledShutdownResponse(Struct struct) { super(struct); - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); Set<TopicPartition> partitions = new HashSet<>(); for (Object topicPartitionObj : struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) { Struct topicPartition = (Struct) topicPartitionObj; @@ -76,8 +77,8 @@ public class ControlledShutdownResponse extends AbstractResponse { partitionsRemaining = partitions; } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public Set<TopicPartition> partitionsRemaining() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java index 9807283..b09795f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java @@ -113,11 +113,11 @@ public class CreateTopicsResponse extends AbstractResponse { for (Object topicErrorStructObj : topicErrorStructs) { Struct topicErrorCodeStruct = (Struct) topicErrorStructObj; String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME); - short errorCode = topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME); + Errors error = Errors.forCode(topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME)); String errorMessage = null; if (topicErrorCodeStruct.hasField(ERROR_MESSAGE_KEY_NAME)) errorMessage = topicErrorCodeStruct.getString(ERROR_MESSAGE_KEY_NAME); - errors.put(topic, new Error(Errors.forCode(errorCode), errorMessage)); + errors.put(topic, new Error(error, errorMessage)); } this.errors = errors; http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java index ed6a63d..5c8b3d5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java @@ -67,8 +67,8 @@ public class DeleteTopicsResponse extends AbstractResponse { for (Object topicErrorCodeStructObj : topicErrorCodesStructs) { Struct topicErrorCodeStruct = (Struct) topicErrorCodeStructObj; String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME); - short errorCode = topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME); - errors.put(topic, Errors.forCode(errorCode)); + Errors error = Errors.forCode(topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME)); + errors.put(topic, error); } this.errors = errors; http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 2eff628..56b387e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -67,7 +67,7 @@ public class DescribeGroupsResponse extends AbstractResponse { Struct groupStruct = struct.instance(GROUPS_KEY_NAME); GroupMetadata group = groupEntry.getValue(); groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey()); - groupStruct.set(ERROR_CODE_KEY_NAME, group.errorCode); + groupStruct.set(ERROR_CODE_KEY_NAME, group.error.code()); groupStruct.set(GROUP_STATE_KEY_NAME, group.state); groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType); groupStruct.set(PROTOCOL_KEY_NAME, group.protocol); @@ -95,7 +95,7 @@ public class DescribeGroupsResponse extends AbstractResponse { Struct groupStruct = (Struct) groupObj; String groupId = groupStruct.getString(GROUP_ID_KEY_NAME); - short errorCode = groupStruct.getShort(ERROR_CODE_KEY_NAME); + Errors error = Errors.forCode(groupStruct.getShort(ERROR_CODE_KEY_NAME)); String state = groupStruct.getString(GROUP_STATE_KEY_NAME); String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME); String protocol = groupStruct.getString(PROTOCOL_KEY_NAME); @@ -111,7 +111,7 @@ public class DescribeGroupsResponse extends AbstractResponse { members.add(new GroupMember(memberId, clientId, clientHost, memberMetadata, memberAssignment)); } - this.groups.put(groupId, new GroupMetadata(errorCode, state, protocolType, protocol, members)); + this.groups.put(groupId, new GroupMetadata(error, state, protocolType, protocol, members)); } } @@ -121,26 +121,26 @@ public class DescribeGroupsResponse extends AbstractResponse { public static class GroupMetadata { - private final short errorCode; + private final Errors error; private final String state; private final String protocolType; private final String protocol; private final List<GroupMember> members; - public GroupMetadata(short errorCode, + public GroupMetadata(Errors error, String state, String protocolType, String protocol, List<GroupMember> members) { - this.errorCode = errorCode; + this.error = error; this.state = state; this.protocolType = protocolType; this.protocol = protocol; this.members = members; } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public String state() { @@ -161,7 +161,7 @@ public class DescribeGroupsResponse extends AbstractResponse { public static GroupMetadata forError(Errors error) { return new DescribeGroupsResponse.GroupMetadata( - error.code(), + error, DescribeGroupsResponse.UNKNOWN_STATE, DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE, DescribeGroupsResponse.UNKNOWN_PROTOCOL, http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 5700e9e..4f270e1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -212,7 +212,7 @@ public class FetchRequest extends AbstractRequest { LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>(); for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) { - FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(), + FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e), FetchResponse.INVALID_HIGHWATERMARK, MemoryRecords.EMPTY); responseData.put(entry.getKey(), partitionResponse); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 965b207..64bd3d3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.network.ByteBufferSend; import org.apache.kafka.common.network.MultiSend; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -37,7 +38,7 @@ import java.util.Map; * This wrapper supports all versions of the Fetch API */ public class FetchResponse extends AbstractResponse { - + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id); private static final String RESPONSES_KEY_NAME = "responses"; @@ -73,19 +74,19 @@ public class FetchResponse extends AbstractResponse { private final int throttleTime; public static final class PartitionData { - public final short errorCode; + public final Errors error; public final long highWatermark; public final Records records; - public PartitionData(short errorCode, long highWatermark, Records records) { - this.errorCode = errorCode; + public PartitionData(Errors error, long highWatermark, Records records) { + this.error = error; this.highWatermark = highWatermark; this.records = records; } @Override public String toString() { - return "(errorCode=" + errorCode + ", highWaterMark=" + highWatermark + + return "(error=" + error.toString() + ", highWaterMark=" + highWatermark + ", records=" + records + ")"; } } @@ -128,10 +129,10 @@ public class FetchResponse extends AbstractResponse { Struct partitionResponse = (Struct) partitionResponseObj; Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME); int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME); - short errorCode = partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME); + Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME)); long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME); Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME); - PartitionData partitionData = new PartitionData(errorCode, highWatermark, records); + PartitionData partitionData = new PartitionData(error, highWatermark, records); responseData.put(new TopicPartition(topic, partition), partitionData); } } @@ -237,7 +238,7 @@ public class FetchResponse extends AbstractResponse { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); Struct partitionDataHeader = partitionData.instance(PARTITION_HEADER_KEY_NAME); partitionDataHeader.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode); + partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code()); partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark); partitionData.set(PARTITION_HEADER_KEY_NAME, partitionDataHeader); partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.records); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java index d8ccdf6..ed56f39 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java @@ -65,7 +65,7 @@ public class GroupCoordinatorRequest extends AbstractRequest { short versionId = version(); switch (versionId) { case 0: - return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode()); + return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_COORDINATOR.id))); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java index 1f447f7..fc3d358 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java @@ -14,6 +14,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Node; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -21,7 +22,7 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public class GroupCoordinatorResponse extends AbstractResponse { - + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id); private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String COORDINATOR_KEY_NAME = "coordinator"; @@ -40,24 +41,24 @@ public class GroupCoordinatorResponse extends AbstractResponse { private static final String HOST_KEY_NAME = "host"; private static final String PORT_KEY_NAME = "port"; - private final short errorCode; + private final Errors error; private final Node node; - public GroupCoordinatorResponse(short errorCode, Node node) { + public GroupCoordinatorResponse(Errors error, Node node) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(ERROR_CODE_KEY_NAME, error.code()); Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); coordinator.set(NODE_ID_KEY_NAME, node.id()); coordinator.set(HOST_KEY_NAME, node.host()); coordinator.set(PORT_KEY_NAME, node.port()); struct.set(COORDINATOR_KEY_NAME, coordinator); - this.errorCode = errorCode; + this.error = error; this.node = node; } public GroupCoordinatorResponse(Struct struct) { super(struct); - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME); int nodeId = broker.getInt(NODE_ID_KEY_NAME); String host = broker.getString(HOST_KEY_NAME); @@ -65,8 +66,8 @@ public class GroupCoordinatorResponse extends AbstractResponse { node = new Node(nodeId, host, port); } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public Node node() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index 0e5c17a..7e79c8a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -80,7 +80,7 @@ public class HeartbeatRequest extends AbstractRequest { short versionId = version(); switch (versionId) { case 0: - return new HeartbeatResponse(Errors.forException(e).code()); + return new HeartbeatResponse(Errors.forException(e)); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id))); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java index 72f0175..f36dec4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -20,7 +21,7 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public class HeartbeatResponse extends AbstractResponse { - + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id); private static final String ERROR_CODE_KEY_NAME = "error_code"; @@ -35,20 +36,21 @@ public class HeartbeatResponse extends AbstractResponse { * GROUP_AUTHORIZATION_FAILED (30) */ - private final short errorCode; - public HeartbeatResponse(short errorCode) { + private final Errors error; + + public HeartbeatResponse(Errors error) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); - this.errorCode = errorCode; + struct.set(ERROR_CODE_KEY_NAME, error.code()); + this.error = error; } public HeartbeatResponse(Struct struct) { super(struct); - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public static HeartbeatResponse parse(ByteBuffer buffer) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 3f00ed1..ad0cdd0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -170,7 +170,7 @@ public class JoinGroupRequest extends AbstractRequest { case 1: return new JoinGroupResponse( versionId, - Errors.forException(e).code(), + Errors.forException(e), JoinGroupResponse.UNKNOWN_GENERATION_ID, JoinGroupResponse.UNKNOWN_PROTOCOL, JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 8fd77ce..bc9366a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -53,24 +54,24 @@ public class JoinGroupResponse extends AbstractResponse { public static final int UNKNOWN_GENERATION_ID = -1; public static final String UNKNOWN_MEMBER_ID = ""; - private final short errorCode; + private final Errors error; private final int generationId; private final String groupProtocol; private final String memberId; private final String leaderId; private final Map<String, ByteBuffer> members; - public JoinGroupResponse(short errorCode, + public JoinGroupResponse(Errors error, int generationId, String groupProtocol, String memberId, String leaderId, Map<String, ByteBuffer> groupMembers) { - this(CURRENT_VERSION, errorCode, generationId, groupProtocol, memberId, leaderId, groupMembers); + this(CURRENT_VERSION, error, generationId, groupProtocol, memberId, leaderId, groupMembers); } public JoinGroupResponse(int version, - short errorCode, + Errors error, int generationId, String groupProtocol, String memberId, @@ -78,7 +79,7 @@ public class JoinGroupResponse extends AbstractResponse { Map<String, ByteBuffer> groupMembers) { super(new Struct(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, version))); - struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(ERROR_CODE_KEY_NAME, error.code()); struct.set(GENERATION_ID_KEY_NAME, generationId); struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol); struct.set(MEMBER_ID_KEY_NAME, memberId); @@ -93,7 +94,7 @@ public class JoinGroupResponse extends AbstractResponse { } struct.set(MEMBERS_KEY_NAME, memberArray.toArray()); - this.errorCode = errorCode; + this.error = error; this.generationId = generationId; this.groupProtocol = groupProtocol; this.memberId = memberId; @@ -111,15 +112,15 @@ public class JoinGroupResponse extends AbstractResponse { ByteBuffer memberMetadata = memberData.getBytes(MEMBER_METADATA_KEY_NAME); members.put(memberId, memberMetadata); } - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); generationId = struct.getInt(GENERATION_ID_KEY_NAME); groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME); memberId = struct.getString(MEMBER_ID_KEY_NAME); leaderId = struct.getString(LEADER_ID_KEY_NAME); } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public int generationId() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 1f09a12..fde184a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -178,15 +178,15 @@ public class LeaderAndIsrRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(Throwable e) { - Map<TopicPartition, Short> responses = new HashMap<>(partitionStates.size()); + Map<TopicPartition, Errors> responses = new HashMap<>(partitionStates.size()); for (TopicPartition partition : partitionStates.keySet()) { - responses.put(partition, Errors.forException(e).code()); + responses.put(partition, Errors.forException(e)); } short versionId = version(); switch (versionId) { case 0: - return new LeaderAndIsrResponse(Errors.NONE.code(), responses); + return new LeaderAndIsrResponse(Errors.NONE, responses); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LEADER_AND_ISR.id))); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java index a754def..4d0a05d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java @@ -41,34 +41,32 @@ public class LeaderAndIsrResponse extends AbstractResponse { * * STALE_CONTROLLER_EPOCH (11) */ - private final short errorCode; + private final Errors error; - private final Map<TopicPartition, Short> responses; + private final Map<TopicPartition, Errors> responses; - public LeaderAndIsrResponse(Map<TopicPartition, Short> responses) { - this(Errors.NONE.code(), responses); + public LeaderAndIsrResponse(Map<TopicPartition, Errors> responses) { + this(Errors.NONE, responses); } - public LeaderAndIsrResponse(short errorCode, Map<TopicPartition, Short> responses) { + public LeaderAndIsrResponse(Errors error, Map<TopicPartition, Errors> responses) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); - List<Struct> responseDatas = new ArrayList<>(responses.size()); - for (Map.Entry<TopicPartition, Short> response : responses.entrySet()) { + for (Map.Entry<TopicPartition, Errors> response : responses.entrySet()) { Struct partitionData = struct.instance(PARTITIONS_KEY_NAME); TopicPartition partition = response.getKey(); partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic()); partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition()); - partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue()); + partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue().code()); responseDatas.add(partitionData); } struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray()); - struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(ERROR_CODE_KEY_NAME, error.code()); this.responses = responses; - this.errorCode = errorCode; + this.error = error; } public LeaderAndIsrResponse(Struct struct) { @@ -79,19 +77,19 @@ public class LeaderAndIsrResponse extends AbstractResponse { Struct responseData = (Struct) responseDataObj; String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME); int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME); - short errorCode = responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME); - responses.put(new TopicPartition(topic, partition), errorCode); + Errors error = Errors.forCode(responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME)); + responses.put(new TopicPartition(topic, partition), error); } - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); } - public Map<TopicPartition, Short> responses() { + public Map<TopicPartition, Errors> responses() { return responses; } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public static LeaderAndIsrResponse parse(ByteBuffer buffer, int version) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java index 573ebc8..2a7b70e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java @@ -71,7 +71,7 @@ public class LeaveGroupRequest extends AbstractRequest { short versionId = version(); switch (versionId) { case 0: - return new LeaveGroupResponse(Errors.forException(e).code()); + return new LeaveGroupResponse(Errors.forException(e)); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LEAVE_GROUP.id))); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java index 9c7998b..bd1c84d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java @@ -14,6 +14,7 @@ package org.apache.kafka.common.requests; import java.nio.ByteBuffer; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -33,20 +34,20 @@ public class LeaveGroupResponse extends AbstractResponse { * GROUP_AUTHORIZATION_FAILED (30) */ - private final short errorCode; - public LeaveGroupResponse(short errorCode) { + private final Errors error; + public LeaveGroupResponse(Errors error) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); - this.errorCode = errorCode; + struct.set(ERROR_CODE_KEY_NAME, error.code()); + this.error = error; } public LeaveGroupResponse(Struct struct) { super(struct); - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public static LeaveGroupResponse parse(ByteBuffer buffer) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java index 8d0a1af..235f4e4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java @@ -51,8 +51,7 @@ public class ListGroupsRequest extends AbstractRequest { short versionId = version(); switch (versionId) { case 0: - short errorCode = Errors.forException(e).code(); - return new ListGroupsResponse(errorCode, Collections.<ListGroupsResponse.Group>emptyList()); + return new ListGroupsResponse(Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_GROUPS.id))); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index 98573f8..f421064 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -39,12 +39,12 @@ public class ListGroupsResponse extends AbstractResponse { * AUTHORIZATION_FAILED (29) */ - private final short errorCode; + private final Errors error; private final List<Group> groups; - public ListGroupsResponse(short errorCode, List<Group> groups) { + public ListGroupsResponse(Errors error, List<Group> groups) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(ERROR_CODE_KEY_NAME, error.code()); List<Struct> groupList = new ArrayList<>(); for (Group group : groups) { Struct groupStruct = struct.instance(GROUPS_KEY_NAME); @@ -53,13 +53,13 @@ public class ListGroupsResponse extends AbstractResponse { groupList.add(groupStruct); } struct.set(GROUPS_KEY_NAME, groupList.toArray()); - this.errorCode = errorCode; + this.error = error; this.groups = groups; } public ListGroupsResponse(Struct struct) { super(struct); - this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); this.groups = new ArrayList<>(); for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { Struct groupStruct = (Struct) groupObj; @@ -73,8 +73,8 @@ public class ListGroupsResponse extends AbstractResponse { return groups; } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public static class Group { @@ -101,7 +101,7 @@ public class ListGroupsResponse extends AbstractResponse { } public static ListGroupsResponse fromError(Errors error) { - return new ListGroupsResponse(error.code(), Collections.<Group>emptyList()); + return new ListGroupsResponse(error, Collections.<Group>emptyList()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 79251ed..6214a56 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -241,12 +241,12 @@ public class ListOffsetRequest extends AbstractRequest { short versionId = version(); if (versionId == 0) { for (Map.Entry<TopicPartition, PartitionData> entry : offsetData.entrySet()) { - ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>()); + ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e), new ArrayList<Long>()); responseData.put(entry.getKey(), partitionResponse); } } else { for (Map.Entry<TopicPartition, Long> entry : partitionTimestamps.entrySet()) { - ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), -1L, -1L); + ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e), -1L, -1L); responseData.put(entry.getKey(), partitionResponse); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java index 2eddf1e..b815a53 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -33,7 +34,7 @@ import java.util.Map; public class ListOffsetResponse extends AbstractResponse { public static final long UNKNOWN_TIMESTAMP = -1L; public static final long UNKNOWN_OFFSET = -1L; - + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id); private static final String RESPONSES_KEY_NAME = "responses"; @@ -63,7 +64,7 @@ public class ListOffsetResponse extends AbstractResponse { private final Map<TopicPartition, PartitionData> responseData; public static final class PartitionData { - public final short errorCode; + public final Errors error; // The offsets list is only used in ListOffsetResponse v0. @Deprecated public final List<Long> offsets; @@ -74,8 +75,8 @@ public class ListOffsetResponse extends AbstractResponse { * Constructor for ListOffsetResponse v0 */ @Deprecated - public PartitionData(short errorCode, List<Long> offsets) { - this.errorCode = errorCode; + public PartitionData(Errors error, List<Long> offsets) { + this.error = error; this.offsets = offsets; this.timestamp = null; this.offset = null; @@ -84,8 +85,8 @@ public class ListOffsetResponse extends AbstractResponse { /** * Constructor for ListOffsetResponse v1 */ - public PartitionData(short errorCode, long timestamp, long offset) { - this.errorCode = errorCode; + public PartitionData(Errors error, long timestamp, long offset) { + this.error = error; this.timestamp = timestamp; this.offset = offset; this.offsets = null; @@ -95,7 +96,7 @@ public class ListOffsetResponse extends AbstractResponse { public String toString() { StringBuilder bld = new StringBuilder(); bld.append("PartitionData{"). - append("errorCode: ").append((int) errorCode). + append("errorCode: ").append((int) error.code()). append(", timestamp: ").append(timestamp). append(", offset: ").append(offset). append(", offsets: "); @@ -130,7 +131,7 @@ public class ListOffsetResponse extends AbstractResponse { PartitionData offsetPartitionData = partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.errorCode); + partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.error.code()); if (version == 0) partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray()); else { @@ -155,18 +156,18 @@ public class ListOffsetResponse extends AbstractResponse { for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { Struct partitionResponse = (Struct) partitionResponseObj; int partition = partitionResponse.getInt(PARTITION_KEY_NAME); - short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME); + Errors error = Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME)); PartitionData partitionData; if (partitionResponse.hasField(OFFSETS_KEY_NAME)) { Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME); List<Long> offsetsList = new ArrayList<Long>(); for (Object offset : offsets) offsetsList.add((Long) offset); - partitionData = new PartitionData(errorCode, offsetsList); + partitionData = new PartitionData(error, offsetsList); } else { long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); long offset = partitionResponse.getLong(OFFSET_KEY_NAME); - partitionData = new PartitionData(errorCode, timestamp, offset); + partitionData = new PartitionData(error, timestamp, offset); } responseData.put(new TopicPartition(topic, partition), partitionData); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 361fd15..6dd1197 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -285,9 +285,9 @@ public class OffsetCommitRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(Throwable e) { - Map<TopicPartition, Short> responseData = new HashMap<>(); + Map<TopicPartition, Errors> responseData = new HashMap<>(); for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) { - responseData.put(entry.getKey(), Errors.forException(e).code()); + responseData.put(entry.getKey(), Errors.forException(e)); } short versionId = version(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index abb260e..8a00c6b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -14,6 +14,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -26,7 +27,7 @@ import java.util.List; import java.util.Map; public class OffsetCommitResponse extends AbstractResponse { - + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id); private static final String RESPONSES_KEY_NAME = "responses"; @@ -54,22 +55,22 @@ public class OffsetCommitResponse extends AbstractResponse { * GROUP_AUTHORIZATION_FAILED (30) */ - private final Map<TopicPartition, Short> responseData; + private final Map<TopicPartition, Errors> responseData; - public OffsetCommitResponse(Map<TopicPartition, Short> responseData) { + public OffsetCommitResponse(Map<TopicPartition, Errors> responseData) { super(new Struct(CURRENT_SCHEMA)); - Map<String, Map<Integer, Short>> topicsData = CollectionUtils.groupDataByTopic(responseData); + Map<String, Map<Integer, Errors>> topicsData = CollectionUtils.groupDataByTopic(responseData); List<Struct> topicArray = new ArrayList<Struct>(); - for (Map.Entry<String, Map<Integer, Short>> entries: topicsData.entrySet()) { + for (Map.Entry<String, Map<Integer, Errors>> entries: topicsData.entrySet()) { Struct topicData = struct.instance(RESPONSES_KEY_NAME); topicData.set(TOPIC_KEY_NAME, entries.getKey()); List<Struct> partitionArray = new ArrayList<Struct>(); - for (Map.Entry<Integer, Short> partitionEntry : entries.getValue().entrySet()) { + for (Map.Entry<Integer, Errors> partitionEntry : entries.getValue().entrySet()) { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionData.set(ERROR_CODE_KEY_NAME, partitionEntry.getValue()); + partitionData.set(ERROR_CODE_KEY_NAME, partitionEntry.getValue().code()); partitionArray.add(partitionData); } topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); @@ -81,20 +82,20 @@ public class OffsetCommitResponse extends AbstractResponse { public OffsetCommitResponse(Struct struct) { super(struct); - responseData = new HashMap<TopicPartition, Short>(); + responseData = new HashMap<TopicPartition, Errors>(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { Struct partitionResponse = (Struct) partitionResponseObj; int partition = partitionResponse.getInt(PARTITION_KEY_NAME); - short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME); - responseData.put(new TopicPartition(topic, partition), errorCode); + Errors error = Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME)); + responseData.put(new TopicPartition(topic, partition), error); } } } - public Map<TopicPartition, Short> responseData() { + public Map<TopicPartition, Errors> responseData() { return responseData; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java index 81bc249..d244f0a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java @@ -66,7 +66,7 @@ public class SaslHandshakeRequest extends AbstractRequest { switch (versionId) { case 0: List<String> enabledMechanisms = Collections.emptyList(); - return new SaslHandshakeResponse(Errors.forException(e).code(), enabledMechanisms); + return new SaslHandshakeResponse(Errors.forException(e), enabledMechanisms); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id))); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java index 6d7f734..f50c5be 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.List; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -45,20 +46,20 @@ public class SaslHandshakeResponse extends AbstractResponse { * UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server * ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake */ - private final short errorCode; + private final Errors error; private final List<String> enabledMechanisms; - public SaslHandshakeResponse(short errorCode, Collection<String> enabledMechanisms) { + public SaslHandshakeResponse(Errors error, Collection<String> enabledMechanisms) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(ERROR_CODE_KEY_NAME, error.code()); struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray()); - this.errorCode = errorCode; + this.error = error; this.enabledMechanisms = new ArrayList<>(enabledMechanisms); } public SaslHandshakeResponse(Struct struct) { super(struct); - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME); ArrayList<String> enabledMechanisms = new ArrayList<>(); for (Object mechanism : mechanisms) @@ -66,8 +67,8 @@ public class SaslHandshakeResponse extends AbstractResponse { this.enabledMechanisms = enabledMechanisms; } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public List<String> enabledMechanisms() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java index d687d99..ff2638b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java @@ -117,15 +117,15 @@ public class StopReplicaRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(Throwable e) { - Map<TopicPartition, Short> responses = new HashMap<>(partitions.size()); + Map<TopicPartition, Errors> responses = new HashMap<>(partitions.size()); for (TopicPartition partition : partitions) { - responses.put(partition, Errors.forException(e).code()); + responses.put(partition, Errors.forException(e)); } short versionId = version(); switch (versionId) { case 0: - return new StopReplicaResponse(Errors.NONE.code(), responses); + return new StopReplicaResponse(Errors.NONE, responses); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.STOP_REPLICA.id))); http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java index 92d9e58..b39fb19 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java @@ -36,8 +36,8 @@ public class StopReplicaResponse extends AbstractResponse { private static final String PARTITIONS_PARTITION_KEY_NAME = "partition"; private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code"; - private final Map<TopicPartition, Short> responses; - private final short errorCode; + private final Map<TopicPartition, Errors> responses; + private final Errors error; /** * Possible error code: @@ -45,30 +45,28 @@ public class StopReplicaResponse extends AbstractResponse { * STALE_CONTROLLER_EPOCH (11) */ - public StopReplicaResponse(Map<TopicPartition, Short> responses) { - this(Errors.NONE.code(), responses); + public StopReplicaResponse(Map<TopicPartition, Errors> responses) { + this(Errors.NONE, responses); } - public StopReplicaResponse(short errorCode, Map<TopicPartition, Short> responses) { + public StopReplicaResponse(Errors error, Map<TopicPartition, Errors> responses) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); - List<Struct> responseDatas = new ArrayList<>(responses.size()); - for (Map.Entry<TopicPartition, Short> response : responses.entrySet()) { + for (Map.Entry<TopicPartition, Errors> response : responses.entrySet()) { Struct partitionData = struct.instance(PARTITIONS_KEY_NAME); TopicPartition partition = response.getKey(); partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic()); partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition()); - partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue()); + partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue().code()); responseDatas.add(partitionData); } struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray()); - struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(ERROR_CODE_KEY_NAME, error.code()); this.responses = responses; - this.errorCode = errorCode; + this.error = error; } public StopReplicaResponse(Struct struct) { @@ -79,19 +77,19 @@ public class StopReplicaResponse extends AbstractResponse { Struct responseData = (Struct) responseDataObj; String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME); int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME); - short errorCode = responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME); - responses.put(new TopicPartition(topic, partition), errorCode); + Errors error = Errors.forCode(responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME)); + responses.put(new TopicPartition(topic, partition), error); } - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); } - public Map<TopicPartition, Short> responses() { + public Map<TopicPartition, Errors> responses() { return responses; } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public static StopReplicaResponse parse(ByteBuffer buffer, int versionId) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java index 55b7308..937bf98 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java @@ -117,7 +117,7 @@ public class SyncGroupRequest extends AbstractRequest { switch (versionId) { case 0: return new SyncGroupResponse( - Errors.forException(e).code(), + Errors.forException(e), ByteBuffer.wrap(new byte[]{})); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java index f459656..e598975 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -40,28 +41,28 @@ public class SyncGroupResponse extends AbstractResponse { * GROUP_AUTHORIZATION_FAILED (30) */ - private final short errorCode; + private final Errors error; private final ByteBuffer memberState; - public SyncGroupResponse(short errorCode, ByteBuffer memberState) { + public SyncGroupResponse(Errors error, ByteBuffer memberState) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(ERROR_CODE_KEY_NAME, error.code()); struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState); - this.errorCode = errorCode; + this.error = error; this.memberState = memberState; } public SyncGroupResponse(Struct struct) { super(struct); - this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME); } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public ByteBuffer memberAssignment() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 95e5683..ef680ff 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -280,7 +280,7 @@ public class UpdateMetadataRequest extends AbstractRequest { public AbstractResponse getErrorResponse(Throwable e) { short versionId = version(); if (versionId <= 3) - return new UpdateMetadataResponse(Errors.forException(e).code()); + return new UpdateMetadataResponse(Errors.forException(e)); else throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));