[ https://issues.apache.org/jira/browse/KAFKA-3362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704610#comment-16704610 ]
ASF GitHub Bot commented on KAFKA-3362: --------------------------------------- andrasbeni closed pull request #3495: KAFKA-3362: Update protocol schema and field doc strings URL: https://github.com/apache/kafka/pull/3495 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index 920c2957c4d..f0fbf268a30 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -64,19 +64,19 @@ public static final short CONSUMER_PROTOCOL_V0 = 0; public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema( - new Field(VERSION_KEY_NAME, Type.INT16)); + new Field(VERSION_KEY_NAME, Type.INT16, "Version number of consumer protocol.")); private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA) .set(VERSION_KEY_NAME, CONSUMER_PROTOCOL_V0); public static final Schema SUBSCRIPTION_V0 = new Schema( - new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)), - new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES)); + new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING), "Topics consumer subscribes to."), + new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES, "Custom user data sent to partition assignor.")); public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema( - new Field(TOPIC_KEY_NAME, Type.STRING), - new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32))); + new Field(TOPIC_KEY_NAME, Type.STRING, "Topic consumer subscribed to."), + new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32), "Partitions of the topic which are assigned to consumer.")); public static final Schema ASSIGNMENT_V0 = new Schema( - new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)), - new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES)); + new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0), "Topic partitions assigned to consumer."), + new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES, "Custom user data sent by partition assignor.")); public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) { Struct struct = new Struct(SUBSCRIPTION_V0); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java index 14b39ae9dac..8fd92632b11 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java @@ -51,14 +51,14 @@ new Field(CONFIG_VALUE, NULLABLE_STRING, "Configuration value")); private static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING), - new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY))); + new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Id of the resource type to alter configuration of. Value 2 means topic, 4 means broker."), + new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of the resource to alter configuration of."), + new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY), "Configuration entries to alter.")); private static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema( new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0), "An array of resources to update with the provided configs."), - new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN)); + new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN, "If true, only validation takes place and the changes are not applied.")); public static Schema[] schemaVersions() { return new Schema[] {ALTER_CONFIGS_REQUEST_V0}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java index f292ef6731b..337c391f086 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java @@ -45,12 +45,13 @@ private static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( ERROR_CODE, ERROR_MESSAGE, - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING)); + new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Type of resource this response entity is for."), + new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of resource this response entity is for.")); private static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema( THROTTLE_TIME_MS, - new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0))); + new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0), + "Result of change for each resource.")); public static Schema[] schemaVersions() { return new Schema[]{ALTER_CONFIGS_RESPONSE_V0}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java index f8d15466d62..fa1174a3423 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java @@ -52,7 +52,8 @@ TOPIC_NAME, new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, - ERROR_CODE))))))); + ERROR_CODE)), "Error codes for each partition."))), + "Result of the operation for each topic.")); public static Schema[] schemaVersions() { return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java index 41563381151..a9274d09ef4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java @@ -43,9 +43,9 @@ private static final String CONFIG_NAMES_KEY_NAME = "config_names"; private static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING), - new Field(CONFIG_NAMES_KEY_NAME, ArrayOf.nullable(STRING))); + new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Id of resource type to fetch configuration of. Value 2 means topic, 4 means broker."), + new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of the resource to fetch configuration of."), + new Field(CONFIG_NAMES_KEY_NAME, ArrayOf.nullable(STRING), "Configuration names requested. Null for all configurations.")); private static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema( new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned.")); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java index e463618e71b..4c80a6ed51d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java @@ -59,11 +59,11 @@ private static final String CONFIG_SOURCE_KEY_NAME = "config_source"; private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTRY_V0 = new Schema( - new Field(CONFIG_NAME_KEY_NAME, STRING), - new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING), - new Field(READ_ONLY_KEY_NAME, BOOLEAN), - new Field(IS_DEFAULT_KEY_NAME, BOOLEAN), - new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN)); + new Field(CONFIG_NAME_KEY_NAME, STRING, "Name of config requested."), + new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING, "Value of config requested."), + new Field(READ_ONLY_KEY_NAME, BOOLEAN, "True if configuration is read-only, false otherwise."), + new Field(IS_DEFAULT_KEY_NAME, BOOLEAN, "True if configuration is not overridden, false otherwise."), + new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN, "True if configuration is a password, false otherwise.")); private static final Schema DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1 = new Schema( new Field(CONFIG_NAME_KEY_NAME, STRING), @@ -81,8 +81,8 @@ private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( ERROR_CODE, ERROR_MESSAGE, - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING), + new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Type of the resource this response entity is for."), + new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of the resource this response entity is for."), new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V0))); private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V1 = new Schema( diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 65cf7fea6b1..d07aafbabc8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -66,7 +66,7 @@ private static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema( PARTITION_ID, - new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."), + new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset to begin this fetch from."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to fetch.")); // FETCH_REQUEST_PARTITION_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed. @@ -87,8 +87,14 @@ private static final Schema FETCH_REQUEST_V0 = new Schema( new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), - new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), - new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response " + + "if insufficient data is available at the time the request is issued."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response. " + + "If client sets this to 0 the server will always respond immediately. " + + "However, if there is no new data since the last request broker will respond with empty record sets." + + " If set to 1, the server will respond as soon at least one partition has at least" + + " one byte of data or the specified timeout occurs. By setting higher values in combination" + + " with the timeout consumer can tune for throughput trade a little additional latency for reading only large chunks of data. "), new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch.")); // The V1 Fetch Request body is the same as V0. @@ -102,8 +108,14 @@ // The partition ordering is now relevant - partitions will be processed in order they appear in request. private static final Schema FETCH_REQUEST_V3 = new Schema( new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), - new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), - new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response " + + "if insufficient data is available at the time the request is issued."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response." + + " If client sets this to 0 the server will always respond immediately." + + " However, if there is no new data since the last request broker will respond with empty record sets." + + " If set to 1, the server will respond as soon at least one partition has at least" + + " one byte of data or the specified timeout occurs. By setting higher values in combination" + + " with the timeout consumer can tune for throughput trade a little additional latency for reading only large chunks of data."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + "if the first message in the first non-empty partition of the fetch is larger than this " + "value, the message will still be returned to ensure that progress can be made."), @@ -112,8 +124,14 @@ // The V4 Fetch Request adds the fetch isolation level and exposes magic v2 (via the response). private static final Schema FETCH_REQUEST_V4 = new Schema( new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), - new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), - new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response " + + "if insufficient data is available at the time the request is issued."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response." + + " If client sets this to 0 the server will always respond immediately." + + " However, if there is no new data since the last request broker will respond with empty record sets." + + " If set to 1, the server will respond as soon at least one partition has at least" + + " one byte of data or the specified timeout occurs. By setting higher values in combination" + + " with the timeout consumer can tune for throughput trade a little additional latency for reading only large chunks of data."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + "if the first message in the first non-empty partition of the fetch is larger than this " + "value, the message will still be returned to ensure that progress can be made."), @@ -128,8 +146,14 @@ // FETCH_REQUEST_V5 added a per-partition log_start_offset field - the earliest available offset of partition data that can be consumed. private static final Schema FETCH_REQUEST_V5 = new Schema( new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), - new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), - new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response " + + "if insufficient data is available at the time the request is issued."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response." + + " If client sets this to 0 the server will always respond immediately." + + " However, if there is no new data since the last request broker will respond with empty record sets." + + " If set to 1, the server will respond as soon at least one partition has at least" + + " one byte of data or the specified timeout occurs. By setting higher values in combination" + + " with the timeout consumer can tune for throughput trade a little additional latency for reading only large chunks of data."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + "if the first message in the first non-empty partition of the fetch is larger than this " + "value, the message will still be returned to ensure that progress can be made."), @@ -159,8 +183,14 @@ private static final Schema FETCH_REQUEST_V7 = new Schema( new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), - new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), - new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response " + + "if insufficient data is available at the time the request is issued."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response." + + " If client sets this to 0 the server will always respond immediately." + + " However, if there is no new data since the last request broker will respond with empty record sets." + + " If set to 1, the server will respond as soon at least one partition has at least" + + " one byte of data or the specified timeout occurs. By setting higher values in combination" + + " with the timeout consumer can tune for throughput trade a little additional latency for reading only large chunks of data."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + "if the first message in the first non-empty partition of the fetch is larger than this " + "value, the message will still be returned to ensure that progress can be made."), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index ed0f5a347dd..2cdfdffadb7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -75,7 +75,9 @@ new Field(HIGH_WATERMARK_KEY_NAME, INT64, "Last committed offset.")); private static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema( new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V0), - new Field(RECORD_SET_KEY_NAME, RECORDS)); + new Field(RECORD_SET_KEY_NAME, RECORDS, "Data fetch from the partition. " + + "In version 1 record set only includes messages of v0 (magic byte 0). " + + "In version 2 and 3, record set can include messages of v0 and v1 (magic byte 0 and 1)")); private static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema( TOPIC_NAME, @@ -123,11 +125,11 @@ private static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema( new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V4), - new Field(RECORD_SET_KEY_NAME, RECORDS)); + new Field(RECORD_SET_KEY_NAME, RECORDS, "Data fetched from this partition.")); private static final Schema FETCH_RESPONSE_PARTITION_V5 = new Schema( new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V5), - new Field(RECORD_SET_KEY_NAME, RECORDS)); + new Field(RECORD_SET_KEY_NAME, RECORDS, "Data fetched from this partition.")); private static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema( TOPIC_NAME, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index a7b62a98660..992f80df561 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -45,8 +45,8 @@ /* Join group api */ private static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema( - new Field(PROTOCOL_NAME_KEY_NAME, STRING), - new Field(PROTOCOL_METADATA_KEY_NAME, BYTES)); + new Field(PROTOCOL_NAME_KEY_NAME, STRING, "Protocol type name."), + new Field(PROTOCOL_METADATA_KEY_NAME, BYTES, "Protocol type specific member metadata.")); private static final Schema JOIN_GROUP_REQUEST_V0 = new Schema( GROUP_ID, @@ -55,7 +55,8 @@ MEMBER_ID, new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"), new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " + - "that the member supports")); + "that the member supports. Coordinator chooses a single protocol supported by all members. " + + "This enables for e.g. rolling upgrades without downtime.")); private static final Schema JOIN_GROUP_REQUEST_V1 = new Schema( GROUP_ID, @@ -66,7 +67,8 @@ MEMBER_ID, new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"), new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " + - "that the member supports")); + "that the member supports. Coordinator chooses a single protocol supported by all members. " + + "This enables for e.g. rolling upgrades without downtime.")); /* v2 request is the same as v1. Throttle time has been added to response */ private static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 4bcd6e6d65f..0ecaa57bc3b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -47,7 +47,7 @@ private static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema( MEMBER_ID, - new Field(MEMBER_METADATA_KEY_NAME, BYTES)); + new Field(MEMBER_METADATA_KEY_NAME, BYTES, "Metadata supplied in this member's join group request.")); private static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema( ERROR_CODE, @@ -55,7 +55,9 @@ new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"), new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"), MEMBER_ID, - new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); + new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0), + "Leader will receive the full list of members along with associated metadata for the protocol chosen. " + + "Other members, followers, will receive an empty array of members.")); private static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0; @@ -66,7 +68,9 @@ new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"), new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"), MEMBER_ID, - new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); + new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0), + "Leader will receive the full list of members along with associated metadata for the protocol chosen. " + + "Other members, followers, will receive an empty array of members.")); public static Schema[] schemaVersions() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index 9c82ae06bd6..9c7374afb13 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -40,14 +40,14 @@ private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema( GROUP_ID, - new Field(PROTOCOL_TYPE_KEY_NAME, STRING)); + new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Current group protocol's name.")); private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema( ERROR_CODE, - new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); + new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0), "Information about each group managed by this broker.")); private static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema( THROTTLE_TIME_MS, ERROR_CODE, - new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); + new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0), "Information about each group managed by this broker.")); public static Schema[] schemaVersions() { return new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 98f53bdfc44..258321c1fea 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -60,11 +60,16 @@ private static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema( PARTITION_ID, - new Field(TIMESTAMP_KEY_NAME, INT64, "Timestamp."), - new Field(MAX_NUM_OFFSETS_KEY_NAME, INT32, "Maximum offsets to return.")); + new Field(TIMESTAMP_KEY_NAME, INT64, "Target timestamp for partition. Used to ask for all messages before a certain time." + + " There are two special values: specify -1 to receive the latest offset (i.e offset of the next message)" + + " and -2 to receive the earliest available offset."), + new Field(MAX_NUM_OFFSETS_KEY_NAME, INT32, "Maximum number of offsets to return. " + + "Note that because offsets are puled in descending order, asking for the earliest offset will always return a single element.")); private static final Schema LIST_OFFSET_REQUEST_PARTITION_V1 = new Schema( PARTITION_ID, - new Field(TIMESTAMP_KEY_NAME, INT64, "The target timestamp for the partition.")); + new Field(TIMESTAMP_KEY_NAME, INT64, "The target timestamp for the partition. Used to ask for all messages before a certain time." + + " There are two special values: specify -1 to receive the latest offset (i.e offset of the next message)" + + " and -2 to receive the earliest available offset.")); private static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema( TOPIC_NAME, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index de7c8f6cbc2..8fe3aa6d005 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -104,7 +104,7 @@ private static final Schema METADATA_RESPONSE_V0 = new Schema( new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V0), "Host and port information for all brokers."), - new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V0))); + new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V0), "Metadata for each topic.")); private static final Schema METADATA_BROKER_V1 = new Schema( new Field(NODE_ID_KEY_NAME, INT32, "The broker id."), @@ -139,20 +139,20 @@ private static final Schema METADATA_RESPONSE_V1 = new Schema( new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."), new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."), - new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1))); + new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1), "Metadata for each topic requested.")); private static final Schema METADATA_RESPONSE_V2 = new Schema( new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."), new Field(CLUSTER_ID_KEY_NAME, NULLABLE_STRING, "The cluster id that this broker belongs to."), new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."), - new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1))); + new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1), "Metadata for each topic requested.")); private static final Schema METADATA_RESPONSE_V3 = new Schema( THROTTLE_TIME_MS, new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."), new Field(CLUSTER_ID_KEY_NAME, NULLABLE_STRING, "The cluster id that this broker belongs to."), new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."), - new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1))); + new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1), "Metadata for each topic requested.")); private static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3; @@ -162,7 +162,7 @@ new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."), new Field(CLUSTER_ID_KEY_NAME, NULLABLE_STRING, "The cluster id that this broker belongs to."), new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."), - new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V2))); + new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V2), "Metadata for each topic requested.")); public static Schema[] schemaVersions() { return new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 4686c3bc258..d5d032aef22 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -66,7 +66,9 @@ private static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema( PARTITION_ID, new Field(COMMIT_OFFSET_KEY_NAME, INT64, "Message offset to be committed."), - new Field(TIMESTAMP_KEY_NAME, INT64, "Timestamp of the commit"), + new Field(TIMESTAMP_KEY_NAME, INT64, "Timestamp of the commit. If the time stamp field is not set (-1)," + + " brokers will set commit time to receive time before committing offset. Users can explicitly set commit timestamp " + + "if they want to retain committed offset longer on the broker than configured offset retention time."), new Field(METADATA_KEY_NAME, NULLABLE_STRING, "Any associated metadata the client wants to keep.")); private static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema( @@ -100,7 +102,9 @@ GROUP_ID, GENERATION_ID, MEMBER_ID, - new Field(RETENTION_TIME_KEY_NAME, INT64, "Time period in ms to retain the offset."), + new Field(RETENTION_TIME_KEY_NAME, INT64, "Time period in ms to retain the offset." + + " Brokers will always retain offsets until its commit timestamp + user specified retetntion time in the commit request. " + + "I retention time is not set (-1), broker offset retention time will be used as default."), new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets.")); /* v3 request is same as v2. Throttle time has been added to response */ diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index e398442e847..914d6beae10 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -53,27 +53,30 @@ private static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema( PARTITION_ID, - new Field(COMMIT_OFFSET_KEY_NAME, INT64, "Last committed message offset."), - new Field(METADATA_KEY_NAME, NULLABLE_STRING, "Any associated metadata the client wants to keep."), + new Field(COMMIT_OFFSET_KEY_NAME, INT64, "Last committed message offset. " + + "Note that if there is no offset associated with a topicpartition for given consumer group. " + + "The broker does not set an error code (since it is not really an error), " + + "but sets the offset field to -1 and returns empty metadata. "), + new Field(METADATA_KEY_NAME, NULLABLE_STRING, "Associated metadata the client set when committing offset."), ERROR_CODE); private static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema( TOPIC_NAME, - new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0))); + new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0), "List of per-partition offset metadata.")); private static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema( - new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0))); + new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0), "List of per-topic offset metadata.")); private static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0; private static final Schema OFFSET_FETCH_RESPONSE_V2 = new Schema( - new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)), + new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0), "List of per-topic offset metadata."), ERROR_CODE); /* v3 request is the same as v2. Throttle time has been added to v3 response */ private static final Schema OFFSET_FETCH_RESPONSE_V3 = new Schema( THROTTLE_TIME_MS, - new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)), + new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0), "List of per-topic offset metadata."), ERROR_CODE); public static Schema[] schemaVersions() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 91e3aebcf3f..9b3ac7a62c4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -63,14 +63,16 @@ TOPIC_NAME, new Field(PARTITION_DATA_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, - new Field(RECORD_SET_KEY_NAME, RECORDS))))); + new Field(RECORD_SET_KEY_NAME, RECORDS, + "A set of records in the format described in <a href=\"#protocol_messages\">The messages</a> section"))), + "Data being published to a particular partition of the topic.")); private static final Schema PRODUCE_REQUEST_V0 = new Schema( new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " + "received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for " + "only the leader and -1 for the full ISR."), new Field(TIMEOUT_KEY_NAME, INT32, "The time to await a response in ms."), - new Field(TOPIC_DATA_KEY_NAME, new ArrayOf(TOPIC_PRODUCE_DATA_V0))); + new Field(TOPIC_DATA_KEY_NAME, new ArrayOf(TOPIC_PRODUCE_DATA_V0), "Data being produced to a particular topic.")); /** * The body of PRODUCE_REQUEST_V1 is the same as PRODUCE_REQUEST_V0. diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index afedc9d6e88..f4e9cba2c88 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -104,7 +104,8 @@ new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, ERROR_CODE, - new Field(BASE_OFFSET_KEY_NAME, INT64), + new Field(BASE_OFFSET_KEY_NAME, INT64, + "Offset assigned to the first record in the record set appended to the partition."), new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " + "the messages. If CreateTime is used for the topic, the timestamp will be -1. " + "If LogAppendTime is used for the topic, the timestamp will be " + @@ -131,7 +132,8 @@ new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, ERROR_CODE, - new Field(BASE_OFFSET_KEY_NAME, INT64), + new Field(BASE_OFFSET_KEY_NAME, INT64, + "Offset assigned to the first record in the record set appended to the partition."), new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " + "the messages. If CreateTime is used for the topic, the timestamp will be -1. " + "If LogAppendTime is used for the topic, the timestamp will be the broker local " + diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java index 14ed2625d5a..f60b5271753 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java @@ -41,12 +41,14 @@ private static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema( MEMBER_ID, - new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES)); + new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "Protocol specific state (e.g. partition assignments)")); private static final Schema SYNC_GROUP_REQUEST_V0 = new Schema( GROUP_ID, GENERATION_ID, MEMBER_ID, - new Field(GROUP_ASSIGNMENT_KEY_NAME, new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0))); + new Field(GROUP_ASSIGNMENT_KEY_NAME, new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0), + "All members send SyncGroup immediately after joining the group, " + + "but only the leader provides the group's assignment.")); /* v1 request is the same as v0. Throttle time has been added to response */ private static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java index 77f951251fc..532c322ab42 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java @@ -34,11 +34,11 @@ private static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema( ERROR_CODE, - new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES)); + new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "State assigned by group leader to this member.")); private static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema( THROTTLE_TIME_MS, ERROR_CODE, - new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES)); + new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "State assigned by group leader to this member.")); public static Schema[] schemaVersions() { return new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java index f4bf157adaf..ed0297c3e41 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java @@ -49,7 +49,8 @@ new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."), new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( TOPIC_NAME, - new Field(PARTITIONS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0)))), + new Field(PARTITIONS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0), + "Error codes for each partition."))), "Errors per partition from writing markers.")); private static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema( ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update protocol schema and field doc strings > -------------------------------------------- > > Key: KAFKA-3362 > URL: https://issues.apache.org/jira/browse/KAFKA-3362 > Project: Kafka > Issue Type: Sub-task > Reporter: Grant Henke > Priority: Major > > In KAFKA-3361, auto generation of docs based on the definitions in > Protocol.java was added. There are some inconsistencies and missing > information in the docs strings in the code vs. the [wiki > page|https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol]. > > The code documentation strings should be reviewed and updated to be complete > and accurate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)