http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java index def4c85..3a1de51 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -29,21 +32,68 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; +import static org.apache.kafka.common.protocol.types.Type.INT16; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class CreateTopicsRequest extends AbstractRequest { private static final String REQUESTS_KEY_NAME = "create_topic_requests"; private static final String TIMEOUT_KEY_NAME = "timeout"; private static final String VALIDATE_ONLY_KEY_NAME = "validate_only"; - private static final String TOPIC_KEY_NAME = "topic"; private static final String NUM_PARTITIONS_KEY_NAME = "num_partitions"; private static final String REPLICATION_FACTOR_KEY_NAME = "replication_factor"; private static final String REPLICA_ASSIGNMENT_KEY_NAME = "replica_assignment"; - private static final String REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME = "partition_id"; private static final String REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME = "replicas"; - private static final String CONFIG_KEY_KEY_NAME = "config_name"; + private static final String CONFIG_NAME_KEY_NAME = "config_name"; private static final String CONFIG_VALUE_KEY_NAME = "config_value"; - private static final String CONFIGS_KEY_NAME = "config_entries"; + private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries"; + + private static final Schema CONFIG_ENTRY = new Schema( + new Field(CONFIG_NAME_KEY_NAME, STRING, "Configuration name"), + new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING, "Configuration value")); + + private static final Schema PARTITION_REPLICA_ASSIGNMENT_ENTRY = new Schema( + PARTITION_ID, + new Field(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, new ArrayOf(INT32), "The set of all nodes that should " + + "host this partition. The first replica in the list is the preferred leader.")); + + private static final Schema SINGLE_CREATE_TOPIC_REQUEST_V0 = new Schema( + TOPIC_NAME, + new Field(NUM_PARTITIONS_KEY_NAME, INT32, "Number of partitions to be created. -1 indicates unset."), + new Field(REPLICATION_FACTOR_KEY_NAME, INT16, "Replication factor for the topic. -1 indicates unset."), + new Field(REPLICA_ASSIGNMENT_KEY_NAME, new ArrayOf(PARTITION_REPLICA_ASSIGNMENT_ENTRY), + "Replica assignment among kafka brokers for this topic partitions. If this is set num_partitions " + + "and replication_factor must be unset."), + new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY), "Topic level configuration for topic to be set.")); + + private static final Schema SINGLE_CREATE_TOPIC_REQUEST_V1 = SINGLE_CREATE_TOPIC_REQUEST_V0; + + private static final Schema CREATE_TOPICS_REQUEST_V0 = new Schema( + new Field(REQUESTS_KEY_NAME, new ArrayOf(SINGLE_CREATE_TOPIC_REQUEST_V0), + "An array of single topic creation requests. Can not have multiple entries for the same topic."), + new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a topic to be completely created on the " + + "controller node. Values <= 0 will trigger topic creation and return immediately")); + + private static final Schema CREATE_TOPICS_REQUEST_V1 = new Schema( + new Field(REQUESTS_KEY_NAME, new ArrayOf(SINGLE_CREATE_TOPIC_REQUEST_V1), "An array of single " + + "topic creation requests. Can not have multiple entries for the same topic."), + new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a topic to be completely created on the " + + "controller node. Values <= 0 will trigger topic creation and return immediately"), + new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN, "If this is true, the request will be validated, but the " + + "topic won't be created.")); + + /* v2 request is the same as v1. Throttle time has been added to the response */ + private static final Schema CREATE_TOPICS_REQUEST_V2 = CREATE_TOPICS_REQUEST_V1; + + public static Schema[] schemaVersions() { + return new Schema[]{CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2}; + } public static final class TopicDetails { public final int numPartitions; @@ -157,7 +207,7 @@ public class CreateTopicsRequest extends AbstractRequest { for (Object requestStructObj : requestStructs) { Struct singleRequestStruct = (Struct) requestStructObj; - String topic = singleRequestStruct.getString(TOPIC_KEY_NAME); + String topic = singleRequestStruct.get(TOPIC_NAME); if (topics.containsKey(topic)) duplicateTopics.add(topic); @@ -171,7 +221,7 @@ public class CreateTopicsRequest extends AbstractRequest { for (Object assignmentStructObj : assignmentsArray) { Struct assignmentStruct = (Struct) assignmentStructObj; - Integer partitionId = assignmentStruct.getInt(REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME); + Integer partitionId = assignmentStruct.get(PARTITION_ID); Object[] replicasArray = assignmentStruct.getArray(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME); List<Integer> replicas = new ArrayList<>(replicasArray.length); @@ -182,12 +232,12 @@ public class CreateTopicsRequest extends AbstractRequest { partitionReplicaAssignments.put(partitionId, replicas); } - Object[] configArray = singleRequestStruct.getArray(CONFIGS_KEY_NAME); + Object[] configArray = singleRequestStruct.getArray(CONFIG_ENTRIES_KEY_NAME); Map<String, String> configs = new HashMap<>(configArray.length); for (Object configStructObj : configArray) { Struct configStruct = (Struct) configStructObj; - String key = configStruct.getString(CONFIG_KEY_KEY_NAME); + String key = configStruct.getString(CONFIG_NAME_KEY_NAME); String value = configStruct.getString(CONFIG_VALUE_KEY_NAME); configs.put(key, value); @@ -262,7 +312,7 @@ public class CreateTopicsRequest extends AbstractRequest { String topic = entry.getKey(); TopicDetails args = entry.getValue(); - singleRequestStruct.set(TOPIC_KEY_NAME, topic); + singleRequestStruct.set(TOPIC_NAME, topic); singleRequestStruct.set(NUM_PARTITIONS_KEY_NAME, args.numPartitions); singleRequestStruct.set(REPLICATION_FACTOR_KEY_NAME, args.replicationFactor); @@ -270,7 +320,7 @@ public class CreateTopicsRequest extends AbstractRequest { List<Struct> replicaAssignmentsStructs = new ArrayList<>(args.replicasAssignments.size()); for (Map.Entry<Integer, List<Integer>> partitionReplicaAssignment : args.replicasAssignments.entrySet()) { Struct replicaAssignmentStruct = singleRequestStruct.instance(REPLICA_ASSIGNMENT_KEY_NAME); - replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME, partitionReplicaAssignment.getKey()); + replicaAssignmentStruct.set(PARTITION_ID, partitionReplicaAssignment.getKey()); replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, partitionReplicaAssignment.getValue().toArray()); replicaAssignmentsStructs.add(replicaAssignmentStruct); } @@ -279,12 +329,12 @@ public class CreateTopicsRequest extends AbstractRequest { // configs List<Struct> configsStructs = new ArrayList<>(args.configs.size()); for (Map.Entry<String, String> configEntry : args.configs.entrySet()) { - Struct configStruct = singleRequestStruct.instance(CONFIGS_KEY_NAME); - configStruct.set(CONFIG_KEY_KEY_NAME, configEntry.getKey()); + Struct configStruct = singleRequestStruct.instance(CONFIG_ENTRIES_KEY_NAME); + configStruct.set(CONFIG_NAME_KEY_NAME, configEntry.getKey()); configStruct.set(CONFIG_VALUE_KEY_NAME, configEntry.getValue()); configsStructs.add(configStruct); } - singleRequestStruct.set(CONFIGS_KEY_NAME, configsStructs.toArray()); + singleRequestStruct.set(CONFIG_ENTRIES_KEY_NAME, configsStructs.toArray()); createTopicRequestStructs.add(singleRequestStruct); } struct.set(REQUESTS_KEY_NAME, createTopicRequestStructs.toArray());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java index c34265d..b3c052b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -26,9 +29,37 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; + public class CreateTopicsResponse extends AbstractResponse { private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors"; - private static final String TOPIC_KEY_NAME = "topic"; + + private static final Schema TOPIC_ERROR_CODE = new Schema( + TOPIC_NAME, + ERROR_CODE); + + // Improves on TOPIC_ERROR_CODE by adding an error_message to complement the error_code + private static final Schema TOPIC_ERROR = new Schema( + TOPIC_NAME, + ERROR_CODE, + ERROR_MESSAGE); + + private static final Schema CREATE_TOPICS_RESPONSE_V0 = new Schema( + new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes.")); + + private static final Schema CREATE_TOPICS_RESPONSE_V1 = new Schema( + new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR), "An array of per topic errors.")); + + private static final Schema CREATE_TOPICS_RESPONSE_V2 = new Schema( + THROTTLE_TIME_MS, + new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR), "An array of per topic errors.")); + + public static Schema[] schemaVersions() { + return new Schema[]{CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2}; + } /** * Possible error codes: @@ -62,24 +93,23 @@ public class CreateTopicsResponse extends AbstractResponse { Map<String, ApiError> errors = new HashMap<>(); for (Object topicErrorStructObj : topicErrorStructs) { Struct topicErrorStruct = (Struct) topicErrorStructObj; - String topic = topicErrorStruct.getString(TOPIC_KEY_NAME); + String topic = topicErrorStruct.get(TOPIC_NAME); errors.put(topic, new ApiError(topicErrorStruct)); } - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); this.errors = errors; } @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.CREATE_TOPICS.responseSchema(version)); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); List<Struct> topicErrorsStructs = new ArrayList<>(errors.size()); for (Map.Entry<String, ApiError> topicError : errors.entrySet()) { Struct topicErrorsStruct = struct.instance(TOPIC_ERRORS_KEY_NAME); - topicErrorsStruct.set(TOPIC_KEY_NAME, topicError.getKey()); + topicErrorsStruct.set(TOPIC_NAME, topicError.getKey()); topicError.getValue().write(topicErrorsStruct); topicErrorsStructs.add(topicErrorsStruct); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java index c05bec6..2d50ea6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java @@ -19,6 +19,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.acl.AccessControlEntryFilter; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.resource.ResourceFilter; import org.apache.kafka.common.utils.Utils; @@ -29,10 +32,29 @@ import java.util.Collections; import java.util.List; import static org.apache.kafka.common.protocol.ApiKeys.DELETE_ACLS; +import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER; +import static org.apache.kafka.common.protocol.CommonFields.OPERATION; +import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE; +import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER; +import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER; +import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE; public class DeleteAclsRequest extends AbstractRequest { private final static String FILTERS = "filters"; + private static final Schema DELETE_ACLS_REQUEST_V0 = new Schema( + new Field(FILTERS, new ArrayOf(new Schema( + RESOURCE_TYPE, + RESOURCE_NAME_FILTER, + PRINCIPAL_FILTER, + HOST_FILTER, + OPERATION, + PERMISSION_TYPE)))); + + public static Schema[] schemaVersions() { + return new Schema[]{DELETE_ACLS_REQUEST_V0}; + } + public static class Builder extends AbstractRequest.Builder<DeleteAclsRequest> { private final List<AclBindingFilter> filters; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java index 973aa8e..0857287 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java @@ -19,6 +19,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.utils.Utils; @@ -30,10 +33,42 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE; +import static org.apache.kafka.common.protocol.CommonFields.HOST; +import static org.apache.kafka.common.protocol.CommonFields.OPERATION; +import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE; +import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL; +import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME; +import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; + public class DeleteAclsResponse extends AbstractResponse { public static final Logger log = LoggerFactory.getLogger(DeleteAclsResponse.class); - private final static String FILTER_RESPONSES = "filter_responses"; - private final static String MATCHING_ACLS = "matching_acls"; + private final static String FILTER_RESPONSES_KEY_NAME = "filter_responses"; + private final static String MATCHING_ACLS_KEY_NAME = "matching_acls"; + + private static final Schema MATCHING_ACL = new Schema( + ERROR_CODE, + ERROR_MESSAGE, + RESOURCE_TYPE, + RESOURCE_NAME, + PRINCIPAL, + HOST, + OPERATION, + PERMISSION_TYPE); + + private static final Schema DELETE_ACLS_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + new Field(FILTER_RESPONSES_KEY_NAME, + new ArrayOf(new Schema( + ERROR_CODE, + ERROR_MESSAGE, + new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL), "The matching ACLs"))))); + + public static Schema[] schemaVersions() { + return new Schema[]{DELETE_ACLS_RESPONSE_V0}; + } public static class AclDeletionResult { private final ApiError error; @@ -99,13 +134,13 @@ public class DeleteAclsResponse extends AbstractResponse { } public DeleteAclsResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + this.throttleTimeMs = struct.get(THROTTLE_TIME_MS); this.responses = new ArrayList<>(); - for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) { + for (Object responseStructObj : struct.getArray(FILTER_RESPONSES_KEY_NAME)) { Struct responseStruct = (Struct) responseStructObj; ApiError error = new ApiError(responseStruct); List<AclDeletionResult> deletions = new ArrayList<>(); - for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS)) { + for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS_KEY_NAME)) { Struct matchingAclStruct = (Struct) matchingAclStructObj; ApiError matchError = new ApiError(matchingAclStruct); AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct); @@ -119,23 +154,23 @@ public class DeleteAclsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DELETE_ACLS.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); List<Struct> responseStructs = new ArrayList<>(); for (AclFilterResponse response : responses) { - Struct responseStruct = struct.instance(FILTER_RESPONSES); + Struct responseStruct = struct.instance(FILTER_RESPONSES_KEY_NAME); response.error.write(responseStruct); List<Struct> deletionStructs = new ArrayList<>(); for (AclDeletionResult deletion : response.deletions()) { - Struct deletionStruct = responseStruct.instance(MATCHING_ACLS); + Struct deletionStruct = responseStruct.instance(MATCHING_ACLS_KEY_NAME); deletion.error.write(deletionStruct); RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct); RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct); deletionStructs.add(deletionStruct); } - responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new Struct[0])); + responseStruct.set(MATCHING_ACLS_KEY_NAME, deletionStructs.toArray(new Struct[0])); responseStructs.add(responseStruct); } - struct.set(FILTER_RESPONSES, responseStructs.toArray()); + struct.set(FILTER_RESPONSES_KEY_NAME, responseStructs.toArray()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java index fcd9836..350238e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java @@ -20,14 +20,23 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.INT64; + public class DeleteRecordsRequest extends AbstractRequest { public static final long HIGH_WATERMARK = -1L; @@ -37,13 +46,28 @@ public class DeleteRecordsRequest extends AbstractRequest { private static final String TIMEOUT_KEY_NAME = "timeout"; // topic level key names - private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; // partition level key names - private static final String PARTITION_KEY_NAME = "partition"; private static final String OFFSET_KEY_NAME = "offset"; + + private static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema( + PARTITION_ID, + new Field(OFFSET_KEY_NAME, INT64, "The offset before which the messages will be deleted.")); + + private static final Schema DELETE_RECORDS_REQUEST_TOPIC_V0 = new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(DELETE_RECORDS_REQUEST_PARTITION_V0))); + + private static final Schema DELETE_RECORDS_REQUEST_V0 = new Schema( + new Field(TOPICS_KEY_NAME, new ArrayOf(DELETE_RECORDS_REQUEST_TOPIC_V0)), + new Field(TIMEOUT_KEY_NAME, INT32, "The maximum time to await a response in ms.")); + + public static Schema[] schemaVersions() { + return new Schema[]{DELETE_RECORDS_REQUEST_V0}; + } + private final int timeout; private final Map<TopicPartition, Long> partitionOffsets; @@ -79,10 +103,10 @@ public class DeleteRecordsRequest extends AbstractRequest { partitionOffsets = new HashMap<>(); for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.getString(TOPIC_KEY_NAME); + String topic = topicStruct.get(TOPIC_NAME); for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { Struct partitionStruct = (Struct) partitionStructObj; - int partition = partitionStruct.getInt(PARTITION_KEY_NAME); + int partition = partitionStruct.get(PARTITION_ID); long offset = partitionStruct.getLong(OFFSET_KEY_NAME); partitionOffsets.put(new TopicPartition(topic, partition), offset); } @@ -103,11 +127,11 @@ public class DeleteRecordsRequest extends AbstractRequest { List<Struct> topicStructArray = new ArrayList<>(); for (Map.Entry<String, Map<Integer, Long>> offsetsByTopicEntry : offsetsByTopic.entrySet()) { Struct topicStruct = struct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_KEY_NAME, offsetsByTopicEntry.getKey()); + topicStruct.set(TOPIC_NAME, offsetsByTopicEntry.getKey()); List<Struct> partitionStructArray = new ArrayList<>(); for (Map.Entry<Integer, Long> offsetsByPartitionEntry : offsetsByTopicEntry.getValue().entrySet()) { Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME); - partitionStruct.set(PARTITION_KEY_NAME, offsetsByPartitionEntry.getKey()); + partitionStruct.set(PARTITION_ID, offsetsByPartitionEntry.getKey()); partitionStruct.set(OFFSET_KEY_NAME, offsetsByPartitionEntry.getValue()); partitionStructArray.add(partitionStruct); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java index f19f933..aeea1cd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java @@ -20,14 +20,24 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.INT64; + public class DeleteRecordsResponse extends AbstractResponse { public static final long INVALID_LOW_WATERMARK = -1L; @@ -36,13 +46,27 @@ public class DeleteRecordsResponse extends AbstractResponse { private static final String TOPICS_KEY_NAME = "topics"; // topic level key names - private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; // partition level key names - private static final String PARTITION_KEY_NAME = "partition"; private static final String LOW_WATERMARK_KEY_NAME = "low_watermark"; - private static final String ERROR_CODE_KEY_NAME = "error_code"; + + private static final Schema DELETE_RECORDS_RESPONSE_PARTITION_V0 = new Schema( + PARTITION_ID, + new Field(LOW_WATERMARK_KEY_NAME, INT64, "Smallest available offset of all live replicas"), + ERROR_CODE); + + private static final Schema DELETE_RECORDS_RESPONSE_TOPIC_V0 = new Schema( + TOPIC_NAME, + new Field(PARTITIONS_KEY_NAME, new ArrayOf(DELETE_RECORDS_RESPONSE_PARTITION_V0))); + + private static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0))); + + public static Schema[] schemaVersions() { + return new Schema[]{DELETE_RECORDS_RESPONSE_V0}; + } private final int throttleTimeMs; private final Map<TopicPartition, PartitionResponse> responses; @@ -81,16 +105,16 @@ public class DeleteRecordsResponse extends AbstractResponse { } public DeleteRecordsResponse(Struct struct) { - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); responses = new HashMap<>(); for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.getString(TOPIC_KEY_NAME); + String topic = topicStruct.get(TOPIC_NAME); for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { Struct partitionStruct = (Struct) partitionStructObj; - int partition = partitionStruct.getInt(PARTITION_KEY_NAME); + int partition = partitionStruct.get(PARTITION_ID); long lowWatermark = partitionStruct.getLong(LOW_WATERMARK_KEY_NAME); - Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME)); + Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE)); responses.put(new TopicPartition(topic, partition), new PartitionResponse(lowWatermark, error)); } } @@ -107,20 +131,19 @@ public class DeleteRecordsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DELETE_RECORDS.responseSchema(version)); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); Map<String, Map<Integer, PartitionResponse>> responsesByTopic = CollectionUtils.groupDataByTopic(responses); List<Struct> topicStructArray = new ArrayList<>(); for (Map.Entry<String, Map<Integer, PartitionResponse>> responsesByTopicEntry : responsesByTopic.entrySet()) { Struct topicStruct = struct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_KEY_NAME, responsesByTopicEntry.getKey()); + topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey()); List<Struct> partitionStructArray = new ArrayList<>(); for (Map.Entry<Integer, PartitionResponse> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) { Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME); PartitionResponse response = responsesByPartitionEntry.getValue(); - partitionStruct.set(PARTITION_KEY_NAME, responsesByPartitionEntry.getKey()); + partitionStruct.set(PARTITION_ID, responsesByPartitionEntry.getKey()); partitionStruct.set(LOW_WATERMARK_KEY_NAME, response.lowWatermark); - partitionStruct.set(ERROR_CODE_KEY_NAME, response.error.code()); + partitionStruct.set(ERROR_CODE, response.error.code()); partitionStructArray.add(partitionStruct); } topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray()); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java index 2ea8c21..4696b50 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.Utils; @@ -27,10 +30,26 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class DeleteTopicsRequest extends AbstractRequest { private static final String TOPICS_KEY_NAME = "topics"; private static final String TIMEOUT_KEY_NAME = "timeout"; + /* DeleteTopic api */ + private static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema( + new Field(TOPICS_KEY_NAME, new ArrayOf(STRING), "An array of topics to be deleted."), + new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a topic to be completely deleted on the " + + "controller node. Values <= 0 will trigger topic deletion and return immediately")); + + /* v1 request is the same as v0. Throttle time has been added to the response */ + private static final Schema DELETE_TOPICS_REQUEST_V1 = DELETE_TOPICS_REQUEST_V0; + + public static Schema[] schemaVersions() { + return new Schema[]{DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1}; + } + private final Set<String> topics; private final Integer timeout; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java index 3f11167..9c84c11 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -26,10 +29,29 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; + public class DeleteTopicsResponse extends AbstractResponse { private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes"; - private static final String TOPIC_KEY_NAME = "topic"; - private static final String ERROR_CODE_KEY_NAME = "error_code"; + + private static final Schema TOPIC_ERROR_CODE = new Schema( + TOPIC_NAME, + ERROR_CODE); + + private static final Schema DELETE_TOPICS_RESPONSE_V0 = new Schema( + new Field(TOPIC_ERROR_CODES_KEY_NAME, + new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes.")); + + private static final Schema DELETE_TOPICS_RESPONSE_V1 = new Schema( + THROTTLE_TIME_MS, + new Field(TOPIC_ERROR_CODES_KEY_NAME, new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes.")); + + public static Schema[] schemaVersions() { + return new Schema[]{DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1}; + } + /** * Possible error codes: @@ -52,13 +74,13 @@ public class DeleteTopicsResponse extends AbstractResponse { } public DeleteTopicsResponse(Struct struct) { - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); Object[] topicErrorCodesStructs = struct.getArray(TOPIC_ERROR_CODES_KEY_NAME); Map<String, Errors> errors = new HashMap<>(); for (Object topicErrorCodeStructObj : topicErrorCodesStructs) { Struct topicErrorCodeStruct = (Struct) topicErrorCodeStructObj; - String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME); - Errors error = Errors.forCode(topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME)); + String topic = topicErrorCodeStruct.get(TOPIC_NAME); + Errors error = Errors.forCode(topicErrorCodeStruct.get(ERROR_CODE)); errors.put(topic, error); } @@ -68,13 +90,12 @@ public class DeleteTopicsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DELETE_TOPICS.responseSchema(version)); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size()); for (Map.Entry<String, Errors> topicError : errors.entrySet()) { Struct topicErrorCodeStruct = struct.instance(TOPIC_ERROR_CODES_KEY_NAME); - topicErrorCodeStruct.set(TOPIC_KEY_NAME, topicError.getKey()); - topicErrorCodeStruct.set(ERROR_CODE_KEY_NAME, topicError.getValue().code()); + topicErrorCodeStruct.set(TOPIC_NAME, topicError.getKey()); + topicErrorCodeStruct.set(ERROR_CODE, topicError.getValue().code()); topicErrorCodeStructs.add(topicErrorCodeStruct); } struct.set(TOPIC_ERROR_CODES_KEY_NAME, topicErrorCodeStructs.toArray()); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java index 58ce539..1bacac7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java @@ -20,13 +20,33 @@ import org.apache.kafka.common.acl.AccessControlEntryFilter; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.resource.ResourceFilter; import java.nio.ByteBuffer; import java.util.Collections; +import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER; +import static org.apache.kafka.common.protocol.CommonFields.OPERATION; +import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE; +import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER; +import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER; +import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE; + public class DescribeAclsRequest extends AbstractRequest { + private static final Schema DESCRIBE_ACLS_REQUEST_V0 = new Schema( + RESOURCE_TYPE, + RESOURCE_NAME_FILTER, + PRINCIPAL_FILTER, + HOST_FILTER, + OPERATION, + PERMISSION_TYPE); + + public static Schema[] schemaVersions() { + return new Schema[]{DESCRIBE_ACLS_REQUEST_V0}; + } + public static class Builder extends AbstractRequest.Builder<DescribeAclsRequest> { private final AclBindingFilter filter; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java index 993a45f..f8b9695 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java @@ -20,6 +20,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.resource.Resource; @@ -30,9 +33,38 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE; +import static org.apache.kafka.common.protocol.CommonFields.HOST; +import static org.apache.kafka.common.protocol.CommonFields.OPERATION; +import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE; +import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL; +import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME; +import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; + public class DescribeAclsResponse extends AbstractResponse { - private final static String RESOURCES = "resources"; - private final static String ACLS = "acls"; + private final static String RESOURCES_KEY_NAME = "resources"; + private final static String ACLS_KEY_NAME = "acls"; + + private static final Schema DESCRIBE_ACLS_RESOURCE = new Schema( + RESOURCE_TYPE, + RESOURCE_NAME, + new Field(ACLS_KEY_NAME, new ArrayOf(new Schema( + PRINCIPAL, + HOST, + OPERATION, + PERMISSION_TYPE)))); + + private static final Schema DESCRIBE_ACLS_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + ERROR_CODE, + ERROR_MESSAGE, + new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_ACLS_RESOURCE), "The resources and their associated ACLs.")); + + public static Schema[] schemaVersions() { + return new Schema[]{DESCRIBE_ACLS_RESPONSE_V0}; + } private final int throttleTimeMs; private final ApiError error; @@ -45,13 +77,13 @@ public class DescribeAclsResponse extends AbstractResponse { } public DescribeAclsResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + this.throttleTimeMs = struct.get(THROTTLE_TIME_MS); this.error = new ApiError(struct); this.acls = new ArrayList<>(); - for (Object resourceStructObj : struct.getArray(RESOURCES)) { + for (Object resourceStructObj : struct.getArray(RESOURCES_KEY_NAME)) { Struct resourceStruct = (Struct) resourceStructObj; Resource resource = RequestUtils.resourceFromStructFields(resourceStruct); - for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) { + for (Object aclDataStructObj : resourceStruct.getArray(ACLS_KEY_NAME)) { Struct aclDataStruct = (Struct) aclDataStructObj; AccessControlEntry entry = RequestUtils.aceFromStructFields(aclDataStruct); this.acls.add(new AclBinding(resource, entry)); @@ -62,7 +94,7 @@ public class DescribeAclsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); error.write(struct); Map<Resource, List<AccessControlEntry>> resourceToData = new HashMap<>(); @@ -78,18 +110,18 @@ public class DescribeAclsResponse extends AbstractResponse { List<Struct> resourceStructs = new ArrayList<>(); for (Map.Entry<Resource, List<AccessControlEntry>> tuple : resourceToData.entrySet()) { Resource resource = tuple.getKey(); - Struct resourceStruct = struct.instance(RESOURCES); + Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); RequestUtils.resourceSetStructFields(resource, resourceStruct); List<Struct> dataStructs = new ArrayList<>(); for (AccessControlEntry entry : tuple.getValue()) { - Struct dataStruct = resourceStruct.instance(ACLS); + Struct dataStruct = resourceStruct.instance(ACLS_KEY_NAME); RequestUtils.aceSetStructFields(entry, dataStruct); dataStructs.add(dataStruct); } - resourceStruct.set(ACLS, dataStructs.toArray()); + resourceStruct.set(ACLS_KEY_NAME, dataStructs.toArray()); resourceStructs.add(resourceStruct); } - struct.set(RESOURCES, resourceStructs.toArray()); + struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java index 64fae0e..74e25f4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java @@ -17,6 +17,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -27,6 +30,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.types.Type.INT8; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class DescribeConfigsRequest extends AbstractRequest { private static final String RESOURCES_KEY_NAME = "resources"; @@ -34,6 +40,18 @@ public class DescribeConfigsRequest extends AbstractRequest { private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; private static final String CONFIG_NAMES_KEY_NAME = "config_names"; + private static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( + new Field(RESOURCE_TYPE_KEY_NAME, INT8), + new Field(RESOURCE_NAME_KEY_NAME, STRING), + new Field(CONFIG_NAMES_KEY_NAME, ArrayOf.nullable(STRING))); + + private static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema( + new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned.")); + + public static Schema[] schemaVersions() { + return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0}; + } + public static class Builder extends AbstractRequest.Builder { private final Map<Resource, Collection<String>> resourceToConfigNames; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java index 8694e1f..9b2289d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -27,6 +30,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; +import static org.apache.kafka.common.protocol.types.Type.INT8; +import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class DescribeConfigsResponse extends AbstractResponse { private static final String RESOURCES_KEY_NAME = "resources"; @@ -36,11 +47,31 @@ public class DescribeConfigsResponse extends AbstractResponse { private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries"; - private static final String CONFIG_NAME = "config_name"; - private static final String CONFIG_VALUE = "config_value"; - private static final String IS_SENSITIVE = "is_sensitive"; - private static final String IS_DEFAULT = "is_default"; - private static final String READ_ONLY = "read_only"; + private static final String CONFIG_NAME_KEY_NAME = "config_name"; + private static final String CONFIG_VALUE_KEY_NAME = "config_value"; + private static final String IS_SENSITIVE_KEY_NAME = "is_sensitive"; + private static final String IS_DEFAULT_KEY_NAME = "is_default"; + private static final String READ_ONLY_KEY_NAME = "read_only"; + + private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( + ERROR_CODE, + ERROR_MESSAGE, + new Field(RESOURCE_TYPE_KEY_NAME, INT8), + new Field(RESOURCE_NAME_KEY_NAME, STRING), + new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(new Schema( + new Field(CONFIG_NAME_KEY_NAME, STRING), + new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING), + new Field(READ_ONLY_KEY_NAME, BOOLEAN), + new Field(IS_DEFAULT_KEY_NAME, BOOLEAN), + new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN))))); + + private static final Schema DESCRIBE_CONFIGS_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0))); + + public static Schema[] schemaVersions() { + return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0}; + } public static class Config { private final ApiError error; @@ -105,7 +136,7 @@ public class DescribeConfigsResponse extends AbstractResponse { } public DescribeConfigsResponse(Struct struct) { - throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + throttleTimeMs = struct.get(THROTTLE_TIME_MS); Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); configs = new HashMap<>(resourcesArray.length); for (Object resourceObj : resourcesArray) { @@ -120,11 +151,11 @@ public class DescribeConfigsResponse extends AbstractResponse { List<ConfigEntry> configEntries = new ArrayList<>(configEntriesArray.length); for (Object configEntriesObj: configEntriesArray) { Struct configEntriesStruct = (Struct) configEntriesObj; - String configName = configEntriesStruct.getString(CONFIG_NAME); - String configValue = configEntriesStruct.getString(CONFIG_VALUE); - boolean isSensitive = configEntriesStruct.getBoolean(IS_SENSITIVE); - boolean isDefault = configEntriesStruct.getBoolean(IS_DEFAULT); - boolean readOnly = configEntriesStruct.getBoolean(READ_ONLY); + String configName = configEntriesStruct.getString(CONFIG_NAME_KEY_NAME); + String configValue = configEntriesStruct.getString(CONFIG_VALUE_KEY_NAME); + boolean isSensitive = configEntriesStruct.getBoolean(IS_SENSITIVE_KEY_NAME); + boolean isDefault = configEntriesStruct.getBoolean(IS_DEFAULT_KEY_NAME); + boolean readOnly = configEntriesStruct.getBoolean(READ_ONLY_KEY_NAME); configEntries.add(new ConfigEntry(configName, configValue, isSensitive, isDefault, readOnly)); } Config config = new Config(error, configEntries); @@ -147,7 +178,7 @@ public class DescribeConfigsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); List<Struct> resourceStructs = new ArrayList<>(configs.size()); for (Map.Entry<Resource, Config> entry : configs.entrySet()) { Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); @@ -162,11 +193,11 @@ public class DescribeConfigsResponse extends AbstractResponse { List<Struct> configEntryStructs = new ArrayList<>(config.entries.size()); for (ConfigEntry configEntry : config.entries) { Struct configEntriesStruct = resourceStruct.instance(CONFIG_ENTRIES_KEY_NAME); - configEntriesStruct.set(CONFIG_NAME, configEntry.name); - configEntriesStruct.set(CONFIG_VALUE, configEntry.value); - configEntriesStruct.set(IS_SENSITIVE, configEntry.isSensitive); - configEntriesStruct.set(IS_DEFAULT, configEntry.isDefault); - configEntriesStruct.set(READ_ONLY, configEntry.readOnly); + configEntriesStruct.set(CONFIG_NAME_KEY_NAME, configEntry.name); + configEntriesStruct.set(CONFIG_VALUE_KEY_NAME, configEntry.value); + configEntriesStruct.set(IS_SENSITIVE_KEY_NAME, configEntry.isSensitive); + configEntriesStruct.set(IS_DEFAULT_KEY_NAME, configEntry.isDefault); + configEntriesStruct.set(READ_ONLY_KEY_NAME, configEntry.readOnly); configEntryStructs.add(configEntriesStruct); } resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0])); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java index b43e254..56117da 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.Utils; @@ -25,9 +28,23 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class DescribeGroupsRequest extends AbstractRequest { private static final String GROUP_IDS_KEY_NAME = "group_ids"; + /* Describe group api */ + private static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema( + new Field(GROUP_IDS_KEY_NAME, new ArrayOf(STRING), "List of groupIds to request metadata for (an " + + "empty groupId array will return empty group metadata).")); + + /* v1 request is the same as v0. Throttle time has been added to response */ + private static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0; + + public static Schema[] schemaVersions() { + return new Schema[]{DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1}; + } + public static class Builder extends AbstractRequest.Builder<DescribeGroupsRequest> { private final List<String> groupIds; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 0e1d6bd..9241165 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -27,11 +30,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.types.Type.BYTES; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class DescribeGroupsResponse extends AbstractResponse { private static final String GROUPS_KEY_NAME = "groups"; - private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String GROUP_STATE_KEY_NAME = "state"; private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; @@ -44,6 +51,36 @@ public class DescribeGroupsResponse extends AbstractResponse { private static final String MEMBER_METADATA_KEY_NAME = "member_metadata"; private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment"; + private static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema( + new Field(MEMBER_ID_KEY_NAME, STRING, "The memberId assigned by the coordinator"), + new Field(CLIENT_ID_KEY_NAME, STRING, "The client id used in the member's latest join group request"), + new Field(CLIENT_HOST_KEY_NAME, STRING, "The client host used in the request session corresponding to the " + + "member's join group."), + new Field(MEMBER_METADATA_KEY_NAME, BYTES, "The metadata corresponding to the current group protocol in " + + "use (will only be present if the group is stable)."), + new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "The current assignment provided by the group leader " + + "(will only be present if the group is stable).")); + + private static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema( + ERROR_CODE, + new Field(GROUP_ID_KEY_NAME, STRING), + new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one of: Dead, Stable, AwaitingSync, " + + "PreparingRebalance, or empty if there is no active group)"), + new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group protocol type (will be empty if there is no active group)"), + new Field(PROTOCOL_KEY_NAME, STRING, "The current group protocol (only provided if the group is Stable)"), + new Field(MEMBERS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), "Current group members " + + "(only provided if the group is not Dead)")); + + private static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema( + new Field(GROUPS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0))); + private static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema( + THROTTLE_TIME_MS, + new Field(GROUPS_KEY_NAME, new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0))); + + public static Schema[] schemaVersions() { + return new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1}; + } + public static final String UNKNOWN_STATE = ""; public static final String UNKNOWN_PROTOCOL_TYPE = ""; public static final String UNKNOWN_PROTOCOL = ""; @@ -70,13 +107,13 @@ public class DescribeGroupsResponse extends AbstractResponse { } public DescribeGroupsResponse(Struct struct) { - this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); this.groups = new HashMap<>(); for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { Struct groupStruct = (Struct) groupObj; String groupId = groupStruct.getString(GROUP_ID_KEY_NAME); - Errors error = Errors.forCode(groupStruct.getShort(ERROR_CODE_KEY_NAME)); + Errors error = Errors.forCode(groupStruct.get(ERROR_CODE)); String state = groupStruct.getString(GROUP_STATE_KEY_NAME); String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME); String protocol = groupStruct.getString(PROTOCOL_KEY_NAME); @@ -209,15 +246,14 @@ public class DescribeGroupsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.responseSchema(version)); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); List<Struct> groupStructs = new ArrayList<>(); for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) { Struct groupStruct = struct.instance(GROUPS_KEY_NAME); GroupMetadata group = groupEntry.getValue(); groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey()); - groupStruct.set(ERROR_CODE_KEY_NAME, group.error.code()); + groupStruct.set(ERROR_CODE, group.error.code()); groupStruct.set(GROUP_STATE_KEY_NAME, group.state); groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType); groupStruct.set(PROTOCOL_KEY_NAME, group.protocol); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java index 338d684..0169da5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java @@ -19,8 +19,12 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -29,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.INT32; public class DescribeLogDirsRequest extends AbstractRequest { @@ -36,9 +42,17 @@ public class DescribeLogDirsRequest extends AbstractRequest { private static final String TOPICS_KEY_NAME = "topics"; // topic level key names - private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; + private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V0 = new Schema( + new Field("topics", ArrayOf.nullable(new Schema( + TOPIC_NAME, + new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic."))))); + + public static Schema[] schemaVersions() { + return new Schema[]{DESCRIBE_LOG_DIRS_REQUEST_V0}; + } + private final Set<TopicPartition> topicPartitions; public static class Builder extends AbstractRequest.Builder<DescribeLogDirsRequest> { @@ -75,7 +89,7 @@ public class DescribeLogDirsRequest extends AbstractRequest { topicPartitions = new HashSet<>(); for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.getString(TOPIC_KEY_NAME); + String topic = topicStruct.get(TOPIC_NAME); for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { int partition = (Integer) partitionObj; topicPartitions.add(new TopicPartition(topic, partition)); @@ -109,7 +123,7 @@ public class DescribeLogDirsRequest extends AbstractRequest { List<Struct> topicStructArray = new ArrayList<>(); for (Map.Entry<String, List<Integer>> partitionsByTopicEntry : partitionsByTopic.entrySet()) { Struct topicStruct = struct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_KEY_NAME, partitionsByTopicEntry.getKey()); + topicStruct.set(TOPIC_NAME, partitionsByTopicEntry.getKey()); topicStruct.set(PARTITIONS_KEY_NAME, partitionsByTopicEntry.getValue().toArray()); topicStructArray.add(topicStruct); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java index f6b31ae..e35056e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java @@ -20,14 +20,26 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; +import static org.apache.kafka.common.protocol.types.Type.INT64; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class DescribeLogDirsResponse extends AbstractResponse { @@ -37,40 +49,58 @@ public class DescribeLogDirsResponse extends AbstractResponse { private static final String LOG_DIRS_KEY_NAME = "log_dirs"; // dir level key names - private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String LOG_DIR_KEY_NAME = "log_dir"; private static final String TOPICS_KEY_NAME = "topics"; // topic level key names - private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; // partition level key names - private static final String PARTITION_KEY_NAME = "partition"; private static final String SIZE_KEY_NAME = "size"; private static final String OFFSET_LAG_KEY_NAME = "offset_lag"; private static final String IS_FUTURE_KEY_NAME = "is_future"; + private static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + new Field("log_dirs", new ArrayOf(new Schema( + ERROR_CODE, + new Field("log_dir", STRING, "The absolute log directory path."), + new Field("topics", new ArrayOf(new Schema( + TOPIC_NAME, + new Field("partitions", new ArrayOf(new Schema( + PARTITION_ID, + new Field("size", INT64, "The size of the log segments of the partition in bytes."), + new Field("offset_lag", INT64, "The lag of the log's LEO w.r.t. partition's HW " + + "(if it is the current log for the partition) or current replica's LEO " + + "(if it is the future log for the partition)"), + new Field("is_future", BOOLEAN, "True if this log is created by " + + "AlterReplicaDirRequest and will replace the current log of the replica " + + "in the future."))))))))))); + + public static Schema[] schemaVersions() { + return new Schema[]{DESCRIBE_LOG_DIRS_RESPONSE_V0}; + } + private final int throttleTimeMs; private final Map<String, LogDirInfo> logDirInfos; public DescribeLogDirsResponse(Struct struct) { - throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + throttleTimeMs = struct.get(THROTTLE_TIME_MS); logDirInfos = new HashMap<>(); for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) { Struct logDirStruct = (Struct) logDirStructObj; - Errors error = Errors.forCode(logDirStruct.getShort(ERROR_CODE_KEY_NAME)); + Errors error = Errors.forCode(logDirStruct.get(ERROR_CODE)); String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME); Map<TopicPartition, ReplicaInfo> replicaInfos = new HashMap<>(); for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) { Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.getString(TOPIC_KEY_NAME); + String topic = topicStruct.get(TOPIC_NAME); for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { Struct partitionStruct = (Struct) partitionStructObj; - int partition = partitionStruct.getInt(PARTITION_KEY_NAME); + int partition = partitionStruct.get(PARTITION_ID); long size = partitionStruct.getLong(SIZE_KEY_NAME); long offsetLag = partitionStruct.getLong(OFFSET_LAG_KEY_NAME); boolean isFuture = partitionStruct.getBoolean(IS_FUTURE_KEY_NAME); @@ -94,25 +124,25 @@ public class DescribeLogDirsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); List<Struct> logDirStructArray = new ArrayList<>(); for (Map.Entry<String, LogDirInfo> logDirInfosEntry : logDirInfos.entrySet()) { LogDirInfo logDirInfo = logDirInfosEntry.getValue(); Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME); - logDirStruct.set(ERROR_CODE_KEY_NAME, logDirInfo.error.code()); + logDirStruct.set(ERROR_CODE, logDirInfo.error.code()); logDirStruct.set(LOG_DIR_KEY_NAME, logDirInfosEntry.getKey()); Map<String, Map<Integer, ReplicaInfo>> replicaInfosByTopic = CollectionUtils.groupDataByTopic(logDirInfo.replicaInfos); List<Struct> topicStructArray = new ArrayList<>(); for (Map.Entry<String, Map<Integer, ReplicaInfo>> replicaInfosByTopicEntry : replicaInfosByTopic.entrySet()) { Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_KEY_NAME, replicaInfosByTopicEntry.getKey()); + topicStruct.set(TOPIC_NAME, replicaInfosByTopicEntry.getKey()); List<Struct> partitionStructArray = new ArrayList<>(); for (Map.Entry<Integer, ReplicaInfo> replicaInfosByPartitionEntry : replicaInfosByTopicEntry.getValue().entrySet()) { Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME); ReplicaInfo replicaInfo = replicaInfosByPartitionEntry.getValue(); - partitionStruct.set(PARTITION_KEY_NAME, replicaInfosByPartitionEntry.getKey()); + partitionStruct.set(PARTITION_ID, replicaInfosByPartitionEntry.getKey()); partitionStruct.set(SIZE_KEY_NAME, replicaInfo.size); partitionStruct.set(OFFSET_LAG_KEY_NAME, replicaInfo.offsetLag); partitionStruct.set(IS_FUTURE_KEY_NAME, replicaInfo.isFuture); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java index 01d73b2..243e9f5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java @@ -18,16 +18,33 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; +import static org.apache.kafka.common.protocol.types.Type.INT16; +import static org.apache.kafka.common.protocol.types.Type.INT64; +import static org.apache.kafka.common.protocol.types.Type.STRING; + public class EndTxnRequest extends AbstractRequest { private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result"; + private static final Schema END_TXN_REQUEST_V0 = new Schema( + new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."), + new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."), + new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."), + new Field(TRANSACTION_RESULT_KEY_NAME, BOOLEAN, "The result of the transaction (0 = ABORT, 1 = COMMIT)")); + + public static Schema[] schemaVersions() { + return new Schema[]{END_TXN_REQUEST_V0}; + } + public static class Builder extends AbstractRequest.Builder<EndTxnRequest> { private final String transactionalId; private final long producerId; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java index 47a6623..a3bae58 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java @@ -18,12 +18,22 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; + public class EndTxnResponse extends AbstractResponse { - private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final Schema END_TXN_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + ERROR_CODE); + + public static Schema[] schemaVersions() { + return new Schema[]{END_TXN_RESPONSE_V0}; + } // Possible error codes: // NotCoordinator @@ -43,8 +53,8 @@ public class EndTxnResponse extends AbstractResponse { } public EndTxnResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); - this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + this.throttleTimeMs = struct.get(THROTTLE_TIME_MS); + this.error = Errors.forCode(struct.get(ERROR_CODE)); } public int throttleTimeMs() { @@ -58,8 +68,8 @@ public class EndTxnResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.END_TXN.responseSchema(version)); - struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); - struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); + struct.set(ERROR_CODE, error.code()); return struct; }