KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133) Author: Ismael Juma <ism...@juma.me.uk>
Reviewers: Jun Rao <jun...@gmail.com> Closes #3076 from ijuma/kafka-3267-describe-alter-configs-protocol Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/972b7545 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/972b7545 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/972b7545 Branch: refs/heads/trunk Commit: 972b7545363ae85a55f94cf7ea83614be8840b75 Parents: e1abf17 Author: Ismael Juma <ism...@juma.me.uk> Authored: Thu May 18 06:51:02 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Thu May 18 06:51:02 2017 +0100 ---------------------------------------------------------------------- checkstyle/suppressions.xml | 4 +- .../kafka/clients/admin/AclOperation.java | 12 +- .../apache/kafka/clients/admin/AdminClient.java | 60 +++++ .../clients/admin/AlterConfigsOptions.java | 45 ++++ .../clients/admin/AlterConfigsResults.java | 42 ++++ .../org/apache/kafka/clients/admin/Config.java | 63 ++++++ .../apache/kafka/clients/admin/ConfigEntry.java | 59 +++++ .../kafka/clients/admin/ConfigResource.java | 65 ++++++ .../clients/admin/DescribeConfigsOptions.java | 37 +++ .../clients/admin/DescribeConfigsResults.java | 59 +++++ .../kafka/clients/admin/KafkaAdminClient.java | 184 ++++++++++++++- .../kafka/clients/admin/ResourceType.java | 7 +- .../kafka/common/config/AbstractConfig.java | 7 + .../errors/BrokerAuthorizationException.java | 23 ++ .../apache/kafka/common/protocol/ApiKeys.java | 4 +- .../apache/kafka/common/protocol/Errors.java | 20 +- .../apache/kafka/common/protocol/Protocol.java | 182 ++++++++++----- .../kafka/common/protocol/types/Schema.java | 16 +- .../kafka/common/requests/AbstractRequest.java | 6 + .../kafka/common/requests/AbstractResponse.java | 4 + .../common/requests/AlterConfigsRequest.java | 179 +++++++++++++++ .../common/requests/AlterConfigsResponse.java | 88 ++++++++ .../apache/kafka/common/requests/ApiError.java | 99 ++++++++ .../common/requests/CreateTopicsRequest.java | 12 +- .../common/requests/CreateTopicsResponse.java | 68 +----- .../common/requests/DescribeConfigsRequest.java | 142 ++++++++++++ .../requests/DescribeConfigsResponse.java | 186 +++++++++++++++ .../apache/kafka/common/requests/Resource.java | 60 +++++ .../kafka/common/requests/ResourceType.java | 42 ++++ .../kafka/server/policy/CreateTopicPolicy.java | 6 +- .../kafka/clients/admin/AclOperationTest.java | 4 +- .../clients/admin/KafkaAdminClientTest.java | 20 +- .../kafka/clients/admin/ResourceTypeTest.java | 3 +- .../common/requests/RequestResponseTest.java | 60 ++++- .../src/main/scala/kafka/admin/AclCommand.scala | 14 +- .../src/main/scala/kafka/admin/AdminUtils.scala | 33 ++- .../main/scala/kafka/admin/ConfigCommand.scala | 17 +- core/src/main/scala/kafka/log/LogConfig.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../scala/kafka/security/auth/Operation.scala | 20 +- .../kafka/security/auth/ResourceType.scala | 7 +- .../security/auth/SimpleAclAuthorizer.scala | 2 +- .../main/scala/kafka/server/AdminManager.scala | 109 ++++++++- .../kafka/server/DelayedCreateTopics.scala | 8 +- .../src/main/scala/kafka/server/KafkaApis.scala | 82 ++++++- .../main/scala/kafka/server/KafkaServer.scala | 6 +- .../kafka/api/AuthorizerIntegrationTest.scala | 132 ++++++----- .../api/KafkaAdminClientIntegrationTest.scala | 224 ++++++++++++++++++- .../scala/unit/kafka/admin/AclCommandTest.scala | 11 +- .../AbstractCreateTopicsRequestTest.scala | 8 +- .../unit/kafka/server/RequestQuotaTest.scala | 18 +- .../processor/internals/StreamsKafkaClient.java | 3 +- 52 files changed, 2271 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 66548d9..dc00bee 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -19,7 +19,7 @@ files=".*/requests/AbstractResponse.java"/> <suppress checks="MethodLength" - files="KerberosLogin.java"/> + files="KerberosLogin.java|RequestResponseTest.java"/> <suppress checks="ParameterNumber" files="NetworkClient.java"/> @@ -46,7 +46,7 @@ files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|SsLTransportLayer|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer).java"/> <suppress checks="JavaNCSS" - files="KerberosLogin.java"/> + files="AbstractRequest.java|KerberosLogin.java"/> <suppress checks="JavaNCSS" files="AbstractRequest.java"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java index 14fb61b..062e5e3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java @@ -73,7 +73,17 @@ public enum AclOperation { /** * CLUSTER_ACTION operation. */ - CLUSTER_ACTION((byte) 9); + CLUSTER_ACTION((byte) 9), + + /** + * DESCRIBE_CONFIGS operation. + */ + DESCRIBE_CONFIGS((byte) 10), + + /** + * ALTER_CONFIGS operation. + */ + ALTER_CONFIGS((byte) 11); private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 8bb495c..4cfc174 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -32,6 +32,7 @@ import java.util.Properties; */ @InterfaceStability.Unstable public abstract class AdminClient implements AutoCloseable { + /** * Create a new AdminClient with the given configuration. * @@ -196,6 +197,7 @@ public abstract class AdminClient implements AutoCloseable { public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options); /** +<<<<<<< HEAD * Similar to #{@link AdminClient#describeAcls(AclBindingFilter, DescribeAclsOptions), * but uses the default options. * @@ -260,4 +262,62 @@ public abstract class AdminClient implements AutoCloseable { * @return The DeleteAclsResult. */ public abstract DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options); + + + /** + * Get the configuration for the specified resources with the default options. + * + * See {@link #describeConfigs(Collection, DescribeConfigsOptions)} for more details. + * + * @param resources The resources (topic and broker resource types are currently supported) + * @return The DescribeConfigsResults + */ + public DescribeConfigsResults describeConfigs(Collection<ConfigResource> resources) { + return describeConfigs(resources, new DescribeConfigsOptions()); + } + + /** + * Get the configuration for the specified resources. + * + * The returned configuration includes default values and the isDefault() method can be used to distinguish them + * from user supplied values. + * + * The value of config entries where isSensitive() is true is always {@code null} so that sensitive information + * is not disclosed. + * + * Config entries where isReadOnly() is true cannot be updated. + * + * @param resources The resources (topic and broker resource types are currently supported) + * @param options The options to use when describing configs + * @return The DescribeConfigsResults + */ + public abstract DescribeConfigsResults describeConfigs(Collection<ConfigResource> resources, + DescribeConfigsOptions options); + + /** + * Update the configuration for the specified resources with the default options. + * + * See {@link #alterConfigs(Map, AlterConfigsOptions)} for more details. + * + * @param configs The resources with their configs (topic is the only resource type with configs that can + * be updated currently) + * @return The AlterConfigsResults + */ + public AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs) { + return alterConfigs(configs, new AlterConfigsOptions()); + } + + /** + * Update the configuration for the specified resources with the default options. + * + * Updates are not transactional so they may succeed for some resources while fail for others. The configs for + * a particular resource are updated atomically. + * + * @param configs The resources with their configs (topic is the only resource type with configs that can + * be updated currently) + * @param options The options to use when describing configs + * @return The AlterConfigsResults + */ + public abstract AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options); + } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java new file mode 100644 index 0000000..5698fed --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +@InterfaceStability.Unstable +public class AlterConfigsOptions { + + private Integer timeoutMs = null; + private boolean validateOnly = false; + + public Integer timeoutMs() { + return timeoutMs; + } + + public boolean isValidateOnly() { + return validateOnly; + } + + public AlterConfigsOptions timeoutMs(Integer timeout) { + this.timeoutMs = timeout; + return this; + } + + public AlterConfigsOptions validateOnly(boolean validateOnly) { + this.validateOnly = validateOnly; + return this; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java new file mode 100644 index 0000000..3f44cfd --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +@InterfaceStability.Unstable +public class AlterConfigsResults { + + private final Map<ConfigResource, KafkaFuture<Void>> futures; + + AlterConfigsResults(Map<ConfigResource, KafkaFuture<Void>> futures) { + this.futures = futures; + } + + public Map<ConfigResource, KafkaFuture<Void>> results() { + return futures; + } + + public KafkaFuture<Void> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/Config.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Config.java b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java new file mode 100644 index 0000000..189a0b3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Collection; +import java.util.Collections; + +public class Config { + + private final Collection<ConfigEntry> entries; + + public Config(Collection<ConfigEntry> entries) { + this.entries = entries; + } + + public Collection<ConfigEntry> entries() { + return Collections.unmodifiableCollection(entries); + } + + public ConfigEntry get(String name) { + for (ConfigEntry entry : entries) + if (entry.name().equals(name)) + return entry; + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + Config config = (Config) o; + + return entries.equals(config.entries); + } + + @Override + public int hashCode() { + return entries.hashCode(); + } + + @Override + public String toString() { + return "Config(entries=" + entries + ")"; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java new file mode 100644 index 0000000..cafc8fb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +public class ConfigEntry { + + private final String name; + private final String value; + private final boolean isDefault; + private final boolean isSensitive; + private final boolean isReadOnly; + + public ConfigEntry(String name, String value) { + this(name, value, false, false, false); + } + + public ConfigEntry(String name, String value, boolean isDefault, boolean isSensitive, boolean isReadOnly) { + this.name = name; + this.value = value; + this.isDefault = isDefault; + this.isSensitive = isSensitive; + this.isReadOnly = isReadOnly; + } + + public String name() { + return name; + } + + public String value() { + return value; + } + + public boolean isDefault() { + return isDefault; + } + + public boolean isSensitive() { + return isSensitive; + } + + public boolean isReadOnly() { + return isReadOnly; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java new file mode 100644 index 0000000..61af4a8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +public final class ConfigResource { + + public enum Type { + BROKER, TOPIC, UNKNOWN; + } + + private final Type type; + private final String name; + + public ConfigResource(Type type, String name) { + this.type = type; + this.name = name; + } + + public Type type() { + return type; + } + + public String name() { + return name; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ConfigResource that = (ConfigResource) o; + + return type == that.type && name.equals(that.name); + } + + @Override + public int hashCode() { + int result = type.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ConfigResource{type=" + type + ", name='" + name + "'}"; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java new file mode 100644 index 0000000..f167bab --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Options for describeConfigs. + */ +@InterfaceStability.Unstable +public class DescribeConfigsOptions { + private Integer timeoutMs = null; + + public DescribeConfigsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; + return this; + } + + public Integer timeoutMs() { + return timeoutMs; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java new file mode 100644 index 0000000..c29872a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +@InterfaceStability.Unstable +public class DescribeConfigsResults { + + private final Map<ConfigResource, KafkaFuture<Config>> futures; + + DescribeConfigsResults(Map<ConfigResource, KafkaFuture<Config>> futures) { + this.futures = futures; + } + + public Map<ConfigResource, KafkaFuture<Config>> results() { + return futures; + } + + public KafkaFuture<Map<ConfigResource, Config>> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). + thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() { + @Override + public Map<ConfigResource, Config> apply(Void v) { + Map<ConfigResource, Config> configs = new HashMap<>(futures.size()); + for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) { + try { + configs.put(entry.getKey(), entry.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, because allOf ensured that all the futures + // completed successfully. + throw new RuntimeException(e); + } + } + return configs; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 9f1b1b2..76919ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -53,6 +53,8 @@ import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.AlterConfigsRequest; +import org.apache.kafka.common.requests.AlterConfigsResponse; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.CreateAclsRequest; @@ -69,8 +71,13 @@ import org.apache.kafka.common.requests.DeleteTopicsRequest; import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DescribeAclsRequest; import org.apache.kafka.common.requests.DescribeAclsResponse; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.DescribeConfigsRequest; +import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.Resource; +import org.apache.kafka.common.requests.ResourceType; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; @@ -355,13 +362,26 @@ public class KafkaAdminClient extends AdminClient { Node provide(); } + private class ConstantNodeIdProvider implements NodeProvider { + private final int nodeId; + + ConstantNodeIdProvider(int nodeId) { + this.nodeId = nodeId; + } + + @Override + public Node provide() { + return metadata.fetch().nodeById(nodeId); + } + } + /** * Provides a constant node which is known at construction time. */ - private static class ConstantAdminNodeProvider implements NodeProvider { + private static class ConstantNodeProvider implements NodeProvider { private final Node node; - ConstantAdminNodeProvider(Node node) { + ConstantNodeProvider(Node node) { this.node = node; } @@ -853,7 +873,7 @@ public class KafkaAdminClient extends AdminClient { public void handleResponse(AbstractResponse abstractResponse) { CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse; // Handle server responses for particular topics. - for (Map.Entry<String, CreateTopicsResponse.Error> entry : response.errors().entrySet()) { + for (Map.Entry<String, ApiError> entry : response.errors().entrySet()) { KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey()); if (future == null) { log.warn("Server response mentioned unknown topic {}", entry.getKey()); @@ -1071,7 +1091,7 @@ public class KafkaAdminClient extends AdminClient { continue; final KafkaFutureImpl<NodeApiVersions> nodeFuture = new KafkaFutureImpl<>(); nodeFutures.put(node, nodeFuture); - runnable.call(new Call("apiVersions", deadlineMs, new ConstantAdminNodeProvider(node)) { + runnable.call(new Call("apiVersions", deadlineMs, new ConstantNodeProvider(node)) { @Override public AbstractRequest.Builder createRequest(int timeoutMs) { return new ApiVersionsRequest.Builder(); @@ -1229,4 +1249,160 @@ public class KafkaAdminClient extends AdminClient { }, now); return new DeleteAclsResults(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures)); } + + @Override + public DescribeConfigsResults describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) { + final Map<ConfigResource, KafkaFutureImpl<Config>> singleRequestFutures = new HashMap<>(); + final Collection<Resource> singleRequestResources = new ArrayList<>(configResources.size()); + + final Map<ConfigResource, KafkaFutureImpl<Config>> brokerFutures = new HashMap<>(configResources.size()); + final Collection<Resource> brokerResources = new ArrayList<>(); + + for (ConfigResource resource : configResources) { + if (resource.type() != ConfigResource.Type.BROKER) { + singleRequestFutures.put(resource, new KafkaFutureImpl<Config>()); + singleRequestResources.add(configResourceToResource(resource)); + } else { + brokerFutures.put(resource, new KafkaFutureImpl<Config>()); + brokerResources.add(configResourceToResource(resource)); + } + } + + final long now = time.milliseconds(); + runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new DescribeConfigsRequest.Builder(singleRequestResources); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse; + for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : singleRequestFutures.entrySet()) { + ConfigResource configResource = entry.getKey(); + KafkaFutureImpl<Config> future = entry.getValue(); + DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource)); + if (!config.error().is(Errors.NONE)) { + future.completeExceptionally(config.error().exception()); + continue; + } + List<ConfigEntry> configEntries = new ArrayList<>(); + for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) { + configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(), + configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly())); + } + future.complete(new Config(configEntries)); + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(singleRequestFutures.values(), throwable); + } + }, now); + + for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : brokerFutures.entrySet()) { + final KafkaFutureImpl<Config> brokerFuture = entry.getValue(); + final Resource resource = configResourceToResource(entry.getKey()); + int nodeId = Integer.parseInt(resource.name()); + runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()), + new ConstantNodeIdProvider(nodeId)) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new DescribeConfigsRequest.Builder(Collections.singleton(resource)); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse; + DescribeConfigsResponse.Config config = response.configs().get(resource); + + if (!config.error().is(Errors.NONE)) + brokerFuture.completeExceptionally(config.error().exception()); + else { + List<ConfigEntry> configEntries = new ArrayList<>(); + for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) { + configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(), + configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly())); + } + brokerFuture.complete(new Config(configEntries)); + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(singleRequestFutures.values(), throwable); + } + }, now); + } + + Map<ConfigResource, KafkaFutureImpl<Config>> allFutures = new HashMap<>(configResources.size()); + allFutures.putAll(singleRequestFutures); + allFutures.putAll(brokerFutures); + return new DescribeConfigsResults(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures)); + } + + private Resource configResourceToResource(ConfigResource configResource) { + ResourceType resourceType; + switch (configResource.type()) { + case TOPIC: + resourceType = ResourceType.TOPIC; + break; + case BROKER: + resourceType = ResourceType.BROKER; + break; + default: + throw new IllegalArgumentException("Unexpected resource type " + configResource.type()); + } + return new Resource(resourceType, configResource.name()); + } + + @Override + public AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) { + final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>(configs.size()); + for (ConfigResource configResource : configs.keySet()) { + futures.put(configResource, new KafkaFutureImpl<Void>()); + } + final Map<Resource, AlterConfigsRequest.Config> requestMap = new HashMap<>(configs.size()); + for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) { + List<AlterConfigsRequest.ConfigEntry> configEntries = new ArrayList<>(); + for (ConfigEntry configEntry: entry.getValue().entries()) + configEntries.add(new AlterConfigsRequest.ConfigEntry(configEntry.name(), configEntry.value())); + ConfigResource resource = entry.getKey(); + requestMap.put(configResourceToResource(resource), new AlterConfigsRequest.Config(configEntries)); + } + + final long now = time.milliseconds(); + runnable.call(new Call("alterConfigs", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + public AbstractRequest.Builder createRequest(int timeoutMs) { + return new AlterConfigsRequest.Builder(requestMap, options.isValidateOnly()); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + AlterConfigsResponse response = (AlterConfigsResponse) abstractResponse; + for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : futures.entrySet()) { + KafkaFutureImpl<Void> future = entry.getValue(); + ApiException exception = response.errors().get(configResourceToResource(entry.getKey())).exception(); + if (exception != null) { + future.completeExceptionally(exception); + } else { + future.complete(null); + } + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(futures.values(), throwable); + } + }, now); + return new AlterConfigsResults(new HashMap<ConfigResource, KafkaFuture<Void>>(futures)); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java index 66a91e3..ca4fa0a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java @@ -48,7 +48,12 @@ public enum ResourceType { /** * The cluster as a whole. */ - CLUSTER((byte) 4); + CLUSTER((byte) 4), + + /** + * A broker. + */ + BROKER((byte) 5); private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index dc7fd7c..d2b6d34 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -126,6 +126,13 @@ public class AbstractConfig { return (String) get(key); } + public ConfigDef.Type typeOf(String key) { + ConfigDef.ConfigKey configKey = definition.configKeys().get(key); + if (configKey == null) + return null; + return configKey.type; + } + public Password getPassword(String key) { return (Password) get(key); } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java new file mode 100644 index 0000000..9f7211e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class BrokerAuthorizationException extends ApiException { + public BrokerAuthorizationException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 709d927..36f6403 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -65,7 +65,9 @@ public enum ApiKeys { TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false), DESCRIBE_ACLS(29, "DescribeAcls", false), CREATE_ACLS(30, "CreateAcls", false), - DELETE_ACLS(31, "DeleteAcls", false); + DELETE_ACLS(31, "DeleteAcls", false), + DESCRIBE_CONFIGS(32, "DescribeConfigs", false), + ALTER_CONFIGS(33, "AlterConfigs", false); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index c15edc1..db94b2c 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.BrokerAuthorizationException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ConcurrentTransactionsException; @@ -489,13 +490,18 @@ public enum Errors { return new ProducerIdAuthorizationException(message); } }), - SECURITY_DISABLED(55, "Security features are disabled.", - new ApiExceptionBuilder() { - @Override - public ApiException build(String message) { - return new SecurityDisabledException(message); - } - }); + SECURITY_DISABLED(55, "Security features are disabled.", new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new SecurityDisabledException(message); + } + }), + BROKER_AUTHORIZATION_FAILED(56, "Broker authorization failed", new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new BrokerAuthorizationException(message); + } + }); private interface ApiExceptionBuilder { ApiException build(String message); http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index e970eb1..d5ce469 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -142,9 +142,8 @@ public class Protocol { "The broker id of the controller broker."), new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1))); - - public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3}; - public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3}; + public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3}; + public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3}; /* Produce api */ @@ -227,8 +226,8 @@ public class Protocol { newThrottleTimeField()); public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2; - public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3}; - public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3}; + public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3}; + public static final Schema[] PRODUCE_RESPONSE = {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3}; /* Offset commit api */ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -337,7 +336,7 @@ public class Protocol { public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0))); - public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3}; + public static final Schema[] OFFSET_COMMIT_REQUEST = {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3}; /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */ public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0; @@ -348,7 +347,7 @@ public class Protocol { new Field("responses", new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0))); - public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3}; + public static final Schema[] OFFSET_COMMIT_RESPONSE = {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3}; /* Offset fetch api */ @@ -423,8 +422,8 @@ public class Protocol { new Field("error_code", INT16)); - public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3}; - public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3}; + public static final Schema[] OFFSET_FETCH_REQUEST = {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3}; + public static final Schema[] OFFSET_FETCH_RESPONSE = {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3}; /* List offset api */ public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -520,8 +519,8 @@ public class Protocol { new Field("responses", new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1))); - public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2}; - public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2}; + public static final Schema[] LIST_OFFSET_REQUEST = {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2}; + public static final Schema[] LIST_OFFSET_RESPONSE = {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2}; /* Fetch api */ public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -748,8 +747,8 @@ public class Protocol { newThrottleTimeField(), new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5))); - public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5}; - public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5}; + public static final Schema[] FETCH_REQUEST = {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5}; + public static final Schema[] FETCH_RESPONSE = {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5}; /* List groups api */ public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema(); @@ -766,8 +765,8 @@ public class Protocol { new Field("error_code", INT16), new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); - public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1}; - public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1}; + public static final Schema[] LIST_GROUPS_REQUEST = {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1}; + public static final Schema[] LIST_GROUPS_RESPONSE = {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1}; /* Describe group api */ public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids", @@ -814,8 +813,8 @@ public class Protocol { newThrottleTimeField(), new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0))); - public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1}; - public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1}; + public static final Schema[] DESCRIBE_GROUPS_REQUEST = {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1}; + public static final Schema[] DESCRIBE_GROUPS_RESPONSE = {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1}; /* Find coordinator api */ public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema( @@ -853,8 +852,8 @@ public class Protocol { "Host and port information for the coordinator for a consumer group.")); - public static final Schema[] FIND_COORDINATOR_REQUEST = new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1}; - public static final Schema[] FIND_COORDINATOR_RESPONSE = new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1}; + public static final Schema[] FIND_COORDINATOR_REQUEST = {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1}; + public static final Schema[] FIND_COORDINATOR_RESPONSE = {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1}; /* Controlled shutdown api */ public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id", @@ -872,8 +871,8 @@ public class Protocol { "The partitions that the broker still leads.")); /* V0 is not supported as it would require changes to the request header not to include `clientId` */ - public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = new Schema[] {null, CONTROLLED_SHUTDOWN_REQUEST_V1}; - public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = new Schema[] {null, CONTROLLED_SHUTDOWN_RESPONSE_V1}; + public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = {null, CONTROLLED_SHUTDOWN_REQUEST_V1}; + public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = {null, CONTROLLED_SHUTDOWN_RESPONSE_V1}; /* Join group api */ public static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(new Field("protocol_name", STRING), @@ -937,6 +936,7 @@ public class Protocol { new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0; + public static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema( newThrottleTimeField(), new Field("error_code", INT16), @@ -956,8 +956,8 @@ public class Protocol { new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); - public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2}; - public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2}; + public static final Schema[] JOIN_GROUP_REQUEST = {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2}; + public static final Schema[] JOIN_GROUP_RESPONSE = {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2}; /* SyncGroup api */ public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING), @@ -976,8 +976,8 @@ public class Protocol { newThrottleTimeField(), new Field("error_code", INT16), new Field("member_assignment", BYTES)); - public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1}; - public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1}; + public static final Schema[] SYNC_GROUP_REQUEST = {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1}; + public static final Schema[] SYNC_GROUP_RESPONSE = {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1}; /* Heartbeat api */ public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."), @@ -996,8 +996,8 @@ public class Protocol { newThrottleTimeField(), new Field("error_code", INT16)); - public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1}; - public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1}; + public static final Schema[] HEARTBEAT_REQUEST = {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1}; + public static final Schema[] HEARTBEAT_RESPONSE = {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1}; /* Leave group api */ public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."), @@ -1013,8 +1013,8 @@ public class Protocol { newThrottleTimeField(), new Field("error_code", INT16)); - public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1}; - public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1}; + public static final Schema[] LEAVE_GROUP_REQUEST = {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1}; + public static final Schema[] LEAVE_GROUP_RESPONSE = {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1}; /* Leader and ISR api */ public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 = @@ -1046,8 +1046,8 @@ public class Protocol { new Field("partitions", new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0))); - public static final Schema[] LEADER_AND_ISR_REQUEST = new Schema[] {LEADER_AND_ISR_REQUEST_V0}; - public static final Schema[] LEADER_AND_ISR_RESPONSE = new Schema[] {LEADER_AND_ISR_RESPONSE_V0}; + public static final Schema[] LEADER_AND_ISR_REQUEST = {LEADER_AND_ISR_REQUEST_V0}; + public static final Schema[] LEADER_AND_ISR_RESPONSE = {LEADER_AND_ISR_RESPONSE_V0}; /* Replica api */ public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."), @@ -1068,8 +1068,8 @@ public class Protocol { new Field("partitions", new ArrayOf(STOP_REPLICA_RESPONSE_PARTITION_V0))); - public static final Schema[] STOP_REPLICA_REQUEST = new Schema[] {STOP_REPLICA_REQUEST_V0}; - public static final Schema[] STOP_REPLICA_RESPONSE = new Schema[] {STOP_REPLICA_RESPONSE_V0}; + public static final Schema[] STOP_REPLICA_REQUEST = {STOP_REPLICA_REQUEST_V0}; + public static final Schema[] STOP_REPLICA_RESPONSE = {STOP_REPLICA_RESPONSE_V0}; /* Update metadata api */ @@ -1148,9 +1148,9 @@ public class Protocol { public static final Schema UPDATE_METADATA_RESPONSE_V3 = UPDATE_METADATA_RESPONSE_V2; - public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, + public static final Schema[] UPDATE_METADATA_REQUEST = {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3}; - public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, + public static final Schema[] UPDATE_METADATA_RESPONSE = {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3}; /* SASL handshake api */ @@ -1161,8 +1161,8 @@ public class Protocol { new Field("error_code", INT16), new Field("enabled_mechanisms", new ArrayOf(Type.STRING), "Array of mechanisms enabled in the server.")); - public static final Schema[] SASL_HANDSHAKE_REQUEST = new Schema[] {SASL_HANDSHAKE_REQUEST_V0}; - public static final Schema[] SASL_HANDSHAKE_RESPONSE = new Schema[] {SASL_HANDSHAKE_RESPONSE_V0}; + public static final Schema[] SASL_HANDSHAKE_REQUEST = {SASL_HANDSHAKE_REQUEST_V0}; + public static final Schema[] SASL_HANDSHAKE_RESPONSE = {SASL_HANDSHAKE_RESPONSE_V0}; /* ApiVersion api */ public static final Schema API_VERSIONS_REQUEST_V0 = new Schema(); @@ -1185,8 +1185,8 @@ public class Protocol { public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1}; /* Admin requests common */ - public static final Schema CONFIG_ENTRY = new Schema(new Field("config_key", STRING, "Configuration key name"), - new Field("config_value", STRING, "Configuration value")); + public static final Schema CONFIG_ENTRY = new Schema(new Field("config_name", STRING, "Configuration name"), + new Field("config_value", NULLABLE_STRING, "Configuration value")); public static final Schema PARTITION_REPLICA_ASSIGNMENT_ENTRY = new Schema( new Field("partition_id", INT32), @@ -1212,7 +1212,7 @@ public class Protocol { new Field("replica_assignment", new ArrayOf(PARTITION_REPLICA_ASSIGNMENT_ENTRY), "Replica assignment among kafka brokers for this topic partitions. If this is set num_partitions and replication_factor must be unset."), - new Field("configs", + new Field("config_entries", new ArrayOf(CONFIG_ENTRY), "Topic level configuration for topic to be set.")); @@ -1254,8 +1254,8 @@ public class Protocol { new ArrayOf(TOPIC_ERROR), "An array of per topic errors.")); - public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2}; - public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2}; + public static final Schema[] CREATE_TOPICS_REQUEST = {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2}; + public static final Schema[] CREATE_TOPICS_RESPONSE = {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2}; /* DeleteTopic api */ public static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema( @@ -1278,8 +1278,8 @@ public class Protocol { new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes.")); - public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1}; - public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1}; + public static final Schema[] DELETE_TOPICS_REQUEST = {DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1}; + public static final Schema[] DELETE_TOPICS_RESPONSE = {DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1}; public static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), new Field("offset", INT64, "The offset before which the messages will be deleted.")); @@ -1301,8 +1301,8 @@ public class Protocol { newThrottleTimeField(), new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0))); - public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0}; - public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0}; + public static final Schema[] DELETE_RECORDS_REQUEST = {DELETE_RECORDS_REQUEST_V0}; + public static final Schema[] DELETE_RECORDS_RESPONSE = {DELETE_RECORDS_RESPONSE_V0}; /* Transactions API */ public static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema( @@ -1327,9 +1327,9 @@ public class Protocol { "The epoch for the producer id. Will always be 0 if no transactional id was specified in the request.") ); - public static final Schema[] INIT_PRODUCER_ID_REQUEST = new Schema[] {INIT_PRODUCER_ID_REQUEST_V0}; + public static final Schema[] INIT_PRODUCER_ID_REQUEST = {INIT_PRODUCER_ID_REQUEST_V0}; - public static final Schema[] INIT_PRODUCER_ID_RESPONSE = new Schema[] {INIT_PRODUCER_ID_RESPONSE_V0}; + public static final Schema[] INIT_PRODUCER_ID_RESPONSE = {INIT_PRODUCER_ID_RESPONSE_V0}; /* Offsets for Leader Epoch api */ public static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0 = new Schema( @@ -1378,8 +1378,8 @@ public class Protocol { new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0), "An array of topics for which we have leader offsets for some requested Partition Leader Epoch")); - public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = new Schema[] {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0}; - public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = new Schema[] {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0}; + public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0}; + public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0}; public static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema( new Field("transactional_id", @@ -1408,8 +1408,8 @@ public class Protocol { INT16))))))) ); - public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = new Schema[] {ADD_PARTITIONS_TO_TXN_REQUEST_V0}; - public static final Schema[] ADD_PARTITIONS_TO_TXN_RESPONSE = new Schema[] {ADD_PARTITIONS_TO_TXN_RESPONSE_V0}; + public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = {ADD_PARTITIONS_TO_TXN_REQUEST_V0}; + public static final Schema[] ADD_PARTITIONS_TO_TXN_RESPONSE = {ADD_PARTITIONS_TO_TXN_RESPONSE_V0}; public static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema( new Field("transactional_id", @@ -1432,8 +1432,8 @@ public class Protocol { "An integer error code.") ); - public static final Schema[] ADD_OFFSETS_TO_TXN_REQUEST = new Schema[] {ADD_OFFSETS_TO_TXN_REQUEST_V0}; - public static final Schema[] ADD_OFFSETS_TO_TXN_RESPONSE = new Schema[] {ADD_OFFSETS_TO_TXN_RESPONSE_V0}; + public static final Schema[] ADD_OFFSETS_TO_TXN_REQUEST = {ADD_OFFSETS_TO_TXN_REQUEST_V0}; + public static final Schema[] ADD_OFFSETS_TO_TXN_RESPONSE = {ADD_OFFSETS_TO_TXN_RESPONSE_V0}; public static final Schema END_TXN_REQUEST_V0 = new Schema( new Field("transactional_id", @@ -1457,8 +1457,8 @@ public class Protocol { "An integer error code.") ); - public static final Schema[] END_TXN_REQUEST = new Schema[] {END_TXN_REQUEST_V0}; - public static final Schema[] END_TXN_RESPONSE = new Schema[] {END_TXN_RESPONSE_V0}; + public static final Schema[] END_TXN_REQUEST = {END_TXN_REQUEST_V0}; + public static final Schema[] END_TXN_RESPONSE = {END_TXN_RESPONSE_V0}; public static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema( new Field("producer_id", @@ -1506,8 +1506,8 @@ public class Protocol { new Field("transaction_markers", new ArrayOf(WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0), "Errors per partition from writing markers.") ); - public static final Schema[] WRITE_TXN_REQUEST = new Schema[] {WRITE_TXN_MARKERS_REQUEST_V0}; - public static final Schema[] WRITE_TXN_RESPONSE = new Schema[] {WRITE_TXN_MARKERS_RESPONSE_V0}; + public static final Schema[] WRITE_TXN_REQUEST = {WRITE_TXN_MARKERS_REQUEST_V0}; + public static final Schema[] WRITE_TXN_RESPONSE = {WRITE_TXN_MARKERS_RESPONSE_V0}; public static final Schema TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0 = new Schema( new Field("partition", INT32), @@ -1546,8 +1546,66 @@ public class Protocol { "Errors per partition from writing markers.") ); - public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = new Schema[] {TXN_OFFSET_COMMIT_REQUEST_V0}; - public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = new Schema[] {TXN_OFFSET_COMMIT_RESPONSE_V0}; + public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = {TXN_OFFSET_COMMIT_REQUEST_V0}; + public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = {TXN_OFFSET_COMMIT_RESPONSE_V0}; + + /* DescribeConfigs API */ + + public static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( + new Field("resource_type", INT8), + new Field("resource_name", STRING), + new Field("config_names", ArrayOf.nullable(STRING)) + ); + + public static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema( + new Field("resources", new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), + "An array of config resources to be returned.")); + + public static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( + new Field("error_code", INT16), + new Field("error_message", NULLABLE_STRING), + new Field("resource_type", INT8), + new Field("resource_name", STRING), + new Field("config_entries", new ArrayOf(new Schema( + new Field("config_name", STRING), + new Field("config_value", NULLABLE_STRING), + new Field("read_only", BOOLEAN), + new Field("is_default", BOOLEAN), + new Field("is_sensitive", BOOLEAN) + )) + )); + + public static final Schema DESCRIBE_CONFIGS_RESPONSE_V0 = new Schema( + newThrottleTimeField(), + new Field("resources", new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0))); + + public static final Schema[] DESCRIBE_CONFIGS_REQUEST = {DESCRIBE_CONFIGS_REQUEST_V0}; + public static final Schema[] DESCRIBE_CONFIGS_RESPONSE = {DESCRIBE_CONFIGS_RESPONSE_V0}; + + /* AlterConfigs API */ + + public static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( + new Field("resource_type", INT8), + new Field("resource_name", STRING), + new Field("config_entries", new ArrayOf(CONFIG_ENTRY))); + + public static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema( + new Field("resources", new ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0), + "An array of resources to update with the provided configs."), + new Field("validate_only", BOOLEAN)); + + public static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( + new Field("error_code", INT16), + new Field("error_message", NULLABLE_STRING), + new Field("resource_type", INT8), + new Field("resource_name", STRING)); + + public static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema( + newThrottleTimeField(), + new Field("resources", new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0))); + + public static final Schema[] ALTER_CONFIGS_REQUEST = {ALTER_CONFIGS_REQUEST_V0}; + public static final Schema[] ALTER_CONFIGS_RESPONSE = {ALTER_CONFIGS_RESPONSE_V0}; public static final Schema DESCRIBE_ACLS_REQUEST_V0 = new Schema( new Field("resource_type", INT8, "The filter resource type."), @@ -1675,6 +1733,8 @@ public class Protocol { REQUESTS[ApiKeys.DESCRIBE_ACLS.id] = DESCRIBE_ACLS_REQUEST; REQUESTS[ApiKeys.CREATE_ACLS.id] = CREATE_ACLS_REQUEST; REQUESTS[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_REQUEST; + REQUESTS[ApiKeys.DESCRIBE_CONFIGS.id] = DESCRIBE_CONFIGS_REQUEST; + REQUESTS[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; @@ -1708,6 +1768,8 @@ public class Protocol { RESPONSES[ApiKeys.DESCRIBE_ACLS.id] = DESCRIBE_ACLS_RESPONSE; RESPONSES[ApiKeys.CREATE_ACLS.id] = CREATE_ACLS_RESPONSE; RESPONSES[ApiKeys.DELETE_ACLS.id] = DELETE_ACLS_RESPONSE; + RESPONSES[ApiKeys.DESCRIBE_CONFIGS.id] = DESCRIBE_CONFIGS_RESPONSE; + RESPONSES[ApiKeys.ALTER_CONFIGS.id] = ALTER_CONFIGS_RESPONSE; /* set the minimum and maximum version of each api */ for (ApiKeys api : ApiKeys.values()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index f275ada..fbb520c 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -56,8 +56,7 @@ public class Schema extends Type { Object value = field.type().validate(r.get(field)); field.type.write(buffer, value); } catch (Exception e) { - throw new SchemaException("Error writing field '" + field.name + - "': " + + throw new SchemaException("Error writing field '" + field.name + "': " + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } @@ -73,8 +72,7 @@ public class Schema extends Type { try { objects[i] = fields[i].type.read(buffer); } catch (Exception e) { - throw new SchemaException("Error reading field '" + fields[i].name + - "': " + + throw new SchemaException("Error reading field '" + fields[i].name + "': " + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } } @@ -88,8 +86,14 @@ public class Schema extends Type { public int sizeOf(Object o) { int size = 0; Struct r = (Struct) o; - for (Field field : fields) - size += field.type.sizeOf(r.get(field)); + for (Field field : fields) { + try { + size += field.type.sizeOf(r.get(field)); + } catch (Exception e) { + throw new SchemaException("Error computing size for field '" + field.name + "': " + + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); + } + } return size; } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 16c0c21..2cd88e1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -208,6 +208,12 @@ public abstract class AbstractRequest extends AbstractRequestResponse { case DELETE_ACLS: request = new DeleteAclsRequest(struct, version); break; + case DESCRIBE_CONFIGS: + request = new DescribeConfigsRequest(struct, version); + break; + case ALTER_CONFIGS: + request = new AlterConfigsRequest(struct, version); + break; default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " + "code should be updated to do so.", apiKey)); http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index aee4f5e..1000ef5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -114,6 +114,10 @@ public abstract class AbstractResponse extends AbstractRequestResponse { return new CreateAclsResponse(struct); case DELETE_ACLS: return new DeleteAclsResponse(struct); + case DESCRIBE_CONFIGS: + return new DescribeConfigsResponse(struct); + case ALTER_CONFIGS: + return new AlterConfigsResponse(struct); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " + "code should be updated to do so.", apiKey)); http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java new file mode 100644 index 0000000..a964f85 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AlterConfigsRequest extends AbstractRequest { + + private static final String RESOURCES_KEY_NAME = "resources"; + private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; + private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; + private static final String VALIDATE_ONLY_KEY_NAME = "validate_only"; + + private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries"; + private static final String CONFIG_NAME = "config_name"; + private static final String CONFIG_VALUE = "config_value"; + + public static class Config { + private final Collection<ConfigEntry> entries; + + public Config(Collection<ConfigEntry> entries) { + this.entries = entries; + } + + public Collection<ConfigEntry> entries() { + return entries; + } + } + + public static class ConfigEntry { + private final String name; + private final String value; + + public ConfigEntry(String name, String value) { + this.name = name; + this.value = value; + } + + public String name() { + return name; + } + + public String value() { + return value; + } + + } + + public static class Builder extends AbstractRequest.Builder { + + private final Map<Resource, Config> configs; + private final boolean validateOnly; + + public Builder(Map<Resource, Config> configs, boolean validateOnly) { + super(ApiKeys.ALTER_CONFIGS); + this.configs = configs; + this.validateOnly = validateOnly; + } + + @Override + public AlterConfigsRequest build(short version) { + return new AlterConfigsRequest(version, configs, validateOnly); + } + } + + private final Map<Resource, Config> configs; + private final boolean validateOnly; + + public AlterConfigsRequest(short version, Map<Resource, Config> configs, boolean validateOnly) { + super(version); + this.configs = configs; + this.validateOnly = validateOnly; + } + + public AlterConfigsRequest(Struct struct, short version) { + super(version); + validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME); + Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); + configs = new HashMap<>(resourcesArray.length); + for (Object resourcesObj : resourcesArray) { + Struct resourcesStruct = (Struct) resourcesObj; + + ResourceType resourceType = ResourceType.forId(resourcesStruct.getByte(RESOURCE_TYPE_KEY_NAME)); + String resourceName = resourcesStruct.getString(RESOURCE_NAME_KEY_NAME); + Resource resource = new Resource(resourceType, resourceName); + + Object[] configEntriesArray = resourcesStruct.getArray(CONFIG_ENTRIES_KEY_NAME); + List<ConfigEntry> configEntries = new ArrayList<>(configEntriesArray.length); + for (Object configEntriesObj: configEntriesArray) { + Struct configEntriesStruct = (Struct) configEntriesObj; + String configName = configEntriesStruct.getString(CONFIG_NAME); + String configValue = configEntriesStruct.getString(CONFIG_VALUE); + configEntries.add(new ConfigEntry(configName, configValue)); + } + Config config = new Config(configEntries); + configs.put(resource, config); + } + } + + public Map<Resource, Config> configs() { + return configs; + } + + public boolean validateOnly() { + return validateOnly; + } + + @Override + protected Struct toStruct() { + Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.requestSchema(version())); + struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly); + List<Struct> resourceStructs = new ArrayList<>(configs.size()); + for (Map.Entry<Resource, Config> entry : configs.entrySet()) { + Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); + + Resource resource = entry.getKey(); + resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id()); + resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name()); + + Config config = entry.getValue(); + List<Struct> configEntryStructs = new ArrayList<>(config.entries.size()); + for (ConfigEntry configEntry : config.entries) { + Struct configEntriesStruct = resourceStruct.instance(CONFIG_ENTRIES_KEY_NAME); + configEntriesStruct.set(CONFIG_NAME, configEntry.name); + configEntriesStruct.set(CONFIG_VALUE, configEntry.value); + configEntryStructs.add(configEntriesStruct); + } + resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0])); + + resourceStructs.add(resourceStruct); + } + struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0])); + return struct; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + short version = version(); + switch (version) { + case 0: + ApiError error = ApiError.fromThrowable(e); + Map<Resource, ApiError> errors = new HashMap<>(configs.size()); + for (Resource resource : configs.keySet()) + errors.put(resource, error); + return new AlterConfigsResponse(throttleTimeMs, errors); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + version, this.getClass().getSimpleName(), ApiKeys.ALTER_CONFIGS.latestVersion())); + } + } + + public static AlterConfigsRequest parse(ByteBuffer buffer, short version) { + return new AlterConfigsRequest(ApiKeys.ALTER_CONFIGS.parseRequest(version, buffer), version); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java new file mode 100644 index 0000000..8f904d8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AlterConfigsResponse extends AbstractResponse { + + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; + + private static final String RESOURCES_KEY_NAME = "resources"; + private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; + private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; + + private final int throttleTimeMs; + private final Map<Resource, ApiError> errors; + + public AlterConfigsResponse(int throttleTimeMs, Map<Resource, ApiError> errors) { + this.throttleTimeMs = throttleTimeMs; + this.errors = errors; + + } + + public AlterConfigsResponse(Struct struct) { + throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); + errors = new HashMap<>(resourcesArray.length); + for (Object resourceObj : resourcesArray) { + Struct resourceStruct = (Struct) resourceObj; + ApiError error = new ApiError(resourceStruct); + ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME)); + String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME); + errors.put(new Resource(resourceType, resourceName), error); + } + } + + public Map<Resource, ApiError> errors() { + return errors; + } + + public int throttleTimeMs() { + return throttleTimeMs; + } + + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ApiKeys.ALTER_CONFIGS.responseSchema(version)); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + List<Struct> resourceStructs = new ArrayList<>(errors.size()); + for (Map.Entry<Resource, ApiError> entry : errors.entrySet()) { + Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); + Resource resource = entry.getKey(); + entry.getValue().write(resourceStruct); + resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id()); + resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name()); + resourceStructs.add(resourceStruct); + } + struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0])); + return struct; + } + + public static AlterConfigsResponse parse(ByteBuffer buffer, short version) { + return new AlterConfigsResponse(ApiKeys.ALTER_CONFIGS.parseResponse(version, buffer)); + } + +}