This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3b1524c KAFKA-7466: Add IncrementalAlterConfigs API (KIP-339) (#6247) 3b1524c is described below commit 3b1524c5dfd2a94f3fb919dad0de70984963772b Author: Manikumar Reddy <manikumar.re...@gmail.com> AuthorDate: Wed Apr 17 04:56:33 2019 +0530 KAFKA-7466: Add IncrementalAlterConfigs API (KIP-339) (#6247) Reviewers: Colin P. McCabe <cmcc...@apache.org>, Viktor Somogyi <viktorsomo...@gmail.com>, Stanislav Kozlovski <stanislav_kozlov...@outlook.com>, Rajini Sivaram <rajinisiva...@googlemail.com>, Ismael Juma <ism...@juma.me.uk> --- checkstyle/suppressions.xml | 2 +- .../apache/kafka/clients/admin/AdminClient.java | 46 ++++++ .../apache/kafka/clients/admin/AlterConfigOp.java | 96 +++++++++++ .../kafka/clients/admin/KafkaAdminClient.java | 90 +++++++++++ .../org/apache/kafka/common/config/ConfigDef.java | 4 + .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../kafka/common/requests/AbstractRequest.java | 2 + .../kafka/common/requests/AbstractResponse.java | 2 + .../requests/IncrementalAlterConfigsRequest.java | 91 +++++++++++ .../requests/IncrementalAlterConfigsResponse.java | 99 ++++++++++++ .../message/IncrementalAlterConfigsRequest.json | 41 +++++ .../message/IncrementalAlterConfigsResponse.json | 36 +++++ .../kafka/clients/admin/KafkaAdminClientTest.java | 56 +++++++ .../kafka/clients/admin/MockAdminClient.java | 7 + .../kafka/common/requests/RequestResponseTest.java | 35 ++++ core/src/main/scala/kafka/log/LogConfig.scala | 2 + .../src/main/scala/kafka/server/AdminManager.scala | 180 +++++++++++++++------ core/src/main/scala/kafka/server/KafkaApis.scala | 31 ++++ .../kafka/api/AdminClientIntegrationTest.scala | 178 ++++++++++++++++++++ .../kafka/api/AuthorizerIntegrationTest.scala | 33 +++- .../server/DynamicBrokerReconfigurationTest.scala | 16 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 10 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 28 +++- 23 files changed, 1023 insertions(+), 68 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ce2706d..ca103a2 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -43,7 +43,7 @@ files="Sender.java"/> <suppress checks="ClassDataAbstractionCoupling" - files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/> + files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|AdminClient|KafkaAdminClient).java"/> <suppress checks="ClassDataAbstractionCoupling" files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index b823cdc..8826f83 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -368,7 +368,9 @@ public abstract class AdminClient implements AutoCloseable { * @param configs The resources with their configs (topic is the only resource type with configs that can * be updated currently) * @return The AlterConfigsResult + * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}. */ + @Deprecated public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) { return alterConfigs(configs, new AlterConfigsOptions()); } @@ -385,10 +387,54 @@ public abstract class AdminClient implements AutoCloseable { * be updated currently) * @param options The options to use when describing configs * @return The AlterConfigsResult + * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)}. */ + @Deprecated public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options); /** + * Incrementally updates the configuration for the specified resources with default options. + * + * This is a convenience method for #{@link AdminClient#incrementalAlterConfigs(Map, AlterConfigsOptions)} with default options. + * See the overload for more details.* + * + * This operation is supported by brokers with version 2.3.0 or higher. + * + * @param configs The resources with their configs + * @return The IncrementalAlterConfigsResult + */ + public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) { + return incrementalAlterConfigs(configs, new AlterConfigsOptions()); + } + + + /** + * Incrementally update the configuration for the specified resources. + * + * Updates are not transactional so they may succeed for some resources while fail for others. The configs for + * a particular resource are updated atomically. + * + * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from + * the returned {@code IncrementalAlterConfigsResult}:</p> + * <ul> + * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException} + * if the authenticated user didn't have alter access to the cluster.</li> + * <li>{@link org.apache.kafka.common.errors.TopicAuthorizationException} + * if the authenticated user didn't have alter access to the Topic.</li> + * <li>{@link org.apache.kafka.common.errors.InvalidRequestException} + * if the request details are invalid. e.g., a configuration key was specified more than once for a resource</li> + * </ul>* + * + * This operation is supported by brokers with version 2.3.0 or higher. + * + * @param configs The resources with their configs + * @param options The options to use when altering configs + * @return The IncrementalAlterConfigsResult + */ + public abstract AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, + Collection<AlterConfigOp>> configs, AlterConfigsOptions options); + + /** * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigOp.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigOp.java new file mode 100644 index 0000000..367c842 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigOp.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * A class representing a alter configuration entry containing name, value and operation type. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class AlterConfigOp { + + public enum OpType { + SET((byte) 0), DELETE((byte) 1), APPEND((byte) 2), SUBTRACT((byte) 3); + + private static final Map<Byte, OpType> OP_TYPES = Collections.unmodifiableMap( + Arrays.stream(values()).collect(Collectors.toMap(OpType::id, Function.identity())) + ); + + private final byte id; + + OpType(final byte id) { + this.id = id; + } + + public byte id() { + return id; + } + + public static OpType forId(final byte id) { + return OP_TYPES.get(id); + } + } + + private final ConfigEntry configEntry; + private final OpType opType; + + public AlterConfigOp(ConfigEntry configEntry, OpType operationType) { + this.configEntry = configEntry; + this.opType = operationType; + } + + public ConfigEntry configEntry() { + return configEntry; + }; + + public OpType opType() { + return opType; + }; + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final AlterConfigOp that = (AlterConfigOp) o; + return opType == that.opType && + Objects.equals(configEntry, that.configEntry); + } + + @Override + public int hashCode() { + return Objects.hash(opType, configEntry); + } + + @Override + public String toString() { + return "AlterConfigOp{" + + "opType=" + opType + + ", configEntry=" + configEntry + + '}'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 336597f..23d7fd5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -69,6 +69,10 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; import org.apache.kafka.common.message.MetadataRequestData; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -121,6 +125,8 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest; +import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.MetadataRequest; @@ -1885,6 +1891,7 @@ public class KafkaAdminClient extends AdminClient { } @Override + @Deprecated public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) { final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>(); // We must make a separate AlterConfigs request for every BROKER resource we want to alter @@ -1949,6 +1956,89 @@ public class KafkaAdminClient extends AdminClient { } @Override + public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, + final AlterConfigsOptions options) { + final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>(); + // We must make a separate AlterConfigs request for every BROKER resource we want to alter + // and send the request to that specific broker. Other resources are grouped together into + // a single request that may be sent to any broker. + final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(); + + for (ConfigResource resource : configs.keySet()) { + if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) { + NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name())); + allFutures.putAll(incrementalAlterConfigs(configs, options, Collections.singleton(resource), nodeProvider)); + } else + unifiedRequestResources.add(resource); + } + if (!unifiedRequestResources.isEmpty()) + allFutures.putAll(incrementalAlterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider())); + + return new AlterConfigsResult(new HashMap<>(allFutures)); + } + + private Map<ConfigResource, KafkaFutureImpl<Void>> incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, + final AlterConfigsOptions options, + Collection<ConfigResource> resources, + NodeProvider nodeProvider) { + final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>(); + for (ConfigResource resource : resources) + futures.put(resource, new KafkaFutureImpl<>()); + + final long now = time.milliseconds(); + runnable.call(new Call("incrementalAlterConfigs", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) { + + @Override + public AbstractRequest.Builder createRequest(int timeoutMs) { + return new IncrementalAlterConfigsRequest.Builder( + toIncrementalAlterConfigsRequestData(resources, configs, options.shouldValidateOnly())); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + IncrementalAlterConfigsResponse response = (IncrementalAlterConfigsResponse) abstractResponse; + Map<ConfigResource, ApiError> errors = IncrementalAlterConfigsResponse.fromResponseData(response.data()); + for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : futures.entrySet()) { + KafkaFutureImpl<Void> future = entry.getValue(); + ApiException exception = errors.get(entry.getKey()).exception(); + if (exception != null) { + future.completeExceptionally(exception); + } else { + future.complete(null); + } + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(futures.values(), throwable); + } + }, now); + return futures; + } + + private IncrementalAlterConfigsRequestData toIncrementalAlterConfigsRequestData(final Collection<ConfigResource> resources, + final Map<ConfigResource, Collection<AlterConfigOp>> configs, + final boolean validateOnly) { + IncrementalAlterConfigsRequestData requestData = new IncrementalAlterConfigsRequestData(); + requestData.setValidateOnly(validateOnly); + for (ConfigResource resource : resources) { + AlterableConfigSet alterableConfigSet = new AlterableConfigSet(); + for (AlterConfigOp configEntry : configs.get(resource)) + alterableConfigSet.add(new AlterableConfig(). + setName(configEntry.configEntry().name()). + setValue(configEntry.configEntry().value()). + setConfigOperation(configEntry.opType().id())); + + AlterConfigsResource alterConfigsResource = new AlterConfigsResource(); + alterConfigsResource.setResourceType(resource.type().id()). + setResourceName(resource.name()).setConfigs(alterableConfigSet); + requestData.resources().add(alterConfigsResource); + } + return requestData; + } + + @Override public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options) { final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new HashMap<>(replicaAssignment.size()); diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 1259882..9f28b56 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -1078,6 +1078,10 @@ public class ConfigDef { public boolean hasDefault() { return !NO_DEFAULT_VALUE.equals(this.defaultValue); } + + public Type type() { + return type; + } } protected List<String> headers() { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 33d6736..5391135 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -38,6 +38,8 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData; import org.apache.kafka.common.message.SaslAuthenticateResponseData; import org.apache.kafka.common.message.SaslHandshakeRequestData; import org.apache.kafka.common.message.SaslHandshakeResponseData; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; @@ -189,7 +191,9 @@ public enum ApiKeys { DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()), DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()), ELECT_PREFERRED_LEADERS(43, "ElectPreferredLeaders", ElectPreferredLeadersRequestData.SCHEMAS, - ElectPreferredLeadersResponseData.SCHEMAS); + ElectPreferredLeadersResponseData.SCHEMAS), + INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS, + IncrementalAlterConfigsResponseData.SCHEMAS); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index c069bc9..c199f8e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -231,6 +231,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return new DeleteGroupsRequest(struct, apiVersion); case ELECT_PREFERRED_LEADERS: return new ElectPreferredLeadersRequest(struct, apiVersion); + case INCREMENTAL_ALTER_CONFIGS: + return new IncrementalAlterConfigsRequest(struct, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 50ae0b5..c21fa2b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -158,6 +158,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse { return new DeleteGroupsResponse(struct); case ELECT_PREFERRED_LEADERS: return new ElectPreferredLeadersResponse(struct, version); + case INCREMENTAL_ALTER_CONFIGS: + return new IncrementalAlterConfigsResponse(struct, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java new file mode 100644 index 0000000..3a87cdb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class IncrementalAlterConfigsRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder<IncrementalAlterConfigsRequest> { + private final IncrementalAlterConfigsRequestData data; + + public Builder(IncrementalAlterConfigsRequestData data) { + super(ApiKeys.INCREMENTAL_ALTER_CONFIGS); + this.data = data; + } + + @Override + public IncrementalAlterConfigsRequest build(short version) { + return new IncrementalAlterConfigsRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final IncrementalAlterConfigsRequestData data; + private final short version; + + private IncrementalAlterConfigsRequest(IncrementalAlterConfigsRequestData data, short version) { + super(ApiKeys.INCREMENTAL_ALTER_CONFIGS, version); + this.data = data; + this.version = version; + } + + IncrementalAlterConfigsRequest(final Struct struct, final short version) { + super(ApiKeys.INCREMENTAL_ALTER_CONFIGS, version); + this.data = new IncrementalAlterConfigsRequestData(struct, version); + this.version = version; + } + + public static IncrementalAlterConfigsRequest parse(ByteBuffer buffer, short version) { + return new IncrementalAlterConfigsRequest(ApiKeys.INCREMENTAL_ALTER_CONFIGS.parseRequest(version, buffer), version); + } + + public IncrementalAlterConfigsRequestData data() { + return data; + } + + @Override + protected Struct toStruct() { + return data.toStruct(version); + } + + @Override + public AbstractResponse getErrorResponse(final int throttleTimeMs, final Throwable e) { + IncrementalAlterConfigsResponseData response = new IncrementalAlterConfigsResponseData(); + ApiError apiError = ApiError.fromThrowable(e); + for (AlterConfigsResource resource : data.resources()) { + response.responses().add(new AlterConfigsResourceResult() + .setResourceName(resource.resourceName()) + .setResourceType(resource.resourceType()) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message())); + } + return new IncrementalAlterConfigsResponse(response); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java new file mode 100644 index 0000000..1e5aea1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class IncrementalAlterConfigsResponse extends AbstractResponse { + + public static IncrementalAlterConfigsResponseData toResponseData(final int requestThrottleMs, + final Map<ConfigResource, ApiError> results) { + IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData(); + responseData.setThrottleTimeMs(requestThrottleMs); + for (Map.Entry<ConfigResource, ApiError> entry : results.entrySet()) { + responseData.responses().add(new AlterConfigsResourceResult(). + setResourceName(entry.getKey().name()). + setResourceType(entry.getKey().type().id()). + setErrorCode(entry.getValue().error().code()). + setErrorMessage(entry.getValue().message())); + } + return responseData; + } + + public static Map<ConfigResource, ApiError> fromResponseData(final IncrementalAlterConfigsResponseData data) { + Map<ConfigResource, ApiError> map = new HashMap<>(); + for (AlterConfigsResourceResult result : data.responses()) { + map.put(new ConfigResource(ConfigResource.Type.forId(result.resourceType()), result.resourceName()), + new ApiError(Errors.forCode(result.errorCode()), result.errorMessage())); + } + return map; + } + + private final IncrementalAlterConfigsResponseData data; + + public IncrementalAlterConfigsResponse(IncrementalAlterConfigsResponseData data) { + this.data = data; + } + + public IncrementalAlterConfigsResponse(final Struct struct, final short version) { + this.data = new IncrementalAlterConfigsResponseData(struct, version); + } + + public IncrementalAlterConfigsResponseData data() { + return data; + } + + @Override + public Map<Errors, Integer> errorCounts() { + HashMap<Errors, Integer> counts = new HashMap<>(); + for (AlterConfigsResourceResult result : data.responses()) { + Errors error = Errors.forCode(result.errorCode()); + counts.put(error, counts.getOrDefault(error, 0) + 1); + } + return counts; + } + + @Override + protected Struct toStruct(final short version) { + return data.toStruct(version); + } + + @Override + public boolean shouldClientThrottle(short version) { + return version >= 0; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + public static IncrementalAlterConfigsResponse parse(ByteBuffer buffer, short version) { + return new IncrementalAlterConfigsResponse( + ApiKeys.INCREMENTAL_ALTER_CONFIGS.responseSchema(version).read(buffer), version); + } +} diff --git a/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json b/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json new file mode 100644 index 0000000..d808c04 --- /dev/null +++ b/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 44, + "type": "request", + "name": "IncrementalAlterConfigsRequest", + "validVersions": "0", + "fields": [ + { "name": "Resources", "type": "[]AlterConfigsResource", "versions": "0+", + "about": "The incremental updates for each resource.", "fields": [ + { "name": "ResourceType", "type": "int8", "versions": "0+", "mapKey": true, + "about": "The resource type." }, + { "name": "ResourceName", "type": "string", "versions": "0+", "mapKey": true, + "about": "The resource name." }, + { "name": "Configs", "type": "[]AlterableConfig", "versions": "0+", + "about": "The configurations.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, + "about": "The configuration key name." }, + { "name": "ConfigOperation", "type": "int8", "versions": "0+", "mapKey": true, + "about": "The type (Set, Delete, Append, Subtract) of operation." }, + { "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The value to set for the configuration key."} + ]} + ]}, + { "name": "ValidateOnly", "type": "bool", "versions": "0+", + "about": "True if we should validate the request, but not change the configurations."} + ] +} diff --git a/clients/src/main/resources/common/message/IncrementalAlterConfigsResponse.json b/clients/src/main/resources/common/message/IncrementalAlterConfigsResponse.json new file mode 100644 index 0000000..71aa997 --- /dev/null +++ b/clients/src/main/resources/common/message/IncrementalAlterConfigsResponse.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 44, + "type": "response", + "name": "IncrementalAlterConfigsResponse", + "validVersions": "0", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "responses", "type": "[]AlterConfigsResourceResult", "versions": "0+", + "about": "The responses for each resource.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The resource error code." }, + { "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+", + "about": "The resource error message, or null if there was no error." }, + { "name": "ResourceType", "type": "int8", "versions": "0+", + "about": "The resource type." }, + { "name": "ResourceName", "type": "string", "versions": "0+", + "about": "The resource name." } + ]} + ] +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 220fc50..1367b94 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -59,6 +59,8 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateAclsResponse; @@ -78,6 +80,7 @@ import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse; import org.apache.kafka.common.requests.ElectPreferredLeadersResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; @@ -1203,6 +1206,59 @@ public class KafkaAdminClientTest { } } + @Test + public void testIncrementalAlterConfigs() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + //test error scenarios + IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData(); + responseData.responses().add(new AlterConfigsResourceResult() + .setResourceName("") + .setResourceType(ConfigResource.Type.BROKER.id()) + .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()) + .setErrorMessage("authorization error")); + + responseData.responses().add(new AlterConfigsResourceResult() + .setResourceName("topic1") + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Config value append is not allowed for config")); + + env.kafkaClient().prepareResponse(new IncrementalAlterConfigsResponse(responseData)); + + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic1"); + + AlterConfigOp alterConfigOp1 = new AlterConfigOp( + new ConfigEntry("log.segment.bytes", "1073741"), + AlterConfigOp.OpType.SET); + + AlterConfigOp alterConfigOp2 = new AlterConfigOp( + new ConfigEntry("compression.type", "gzip"), + AlterConfigOp.OpType.APPEND); + + final Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(); + configs.put(brokerResource, Collections.singletonList(alterConfigOp1)); + configs.put(topicResource, Collections.singletonList(alterConfigOp2)); + + AlterConfigsResult result = env.adminClient().incrementalAlterConfigs(configs); + TestUtils.assertFutureError(result.values().get(brokerResource), ClusterAuthorizationException.class); + TestUtils.assertFutureError(result.values().get(topicResource), InvalidRequestException.class); + + // Test a call where there are no errors. + responseData = new IncrementalAlterConfigsResponseData(); + responseData.responses().add(new AlterConfigsResourceResult() + .setResourceName("") + .setResourceType(ConfigResource.Type.BROKER.id()) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(ApiError.NONE.message())); + + env.kafkaClient().prepareResponse(new IncrementalAlterConfigsResponse(responseData)); + env.adminClient().incrementalAlterConfigs(Collections.singletonMap(brokerResource, asList(alterConfigOp1))).all().get(); + } + } + @SafeVarargs private static <T> void assertCollectionIs(Collection<T> collection, T... elements) { for (T element : elements) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index b669a32..9709372 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -375,11 +375,18 @@ public class MockAdminClient extends AdminClient { } @Override + @Deprecated public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } @Override + public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, + AlterConfigsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index ca695c7..e595195 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -62,6 +62,12 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData; import org.apache.kafka.common.message.SaslAuthenticateResponseData; import org.apache.kafka.common.message.SaslHandshakeRequestData; import org.apache.kafka.common.message.SaslHandshakeResponseData; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -338,6 +344,9 @@ public class RequestResponseTest { checkRequest(createElectPreferredLeadersRequestNullPartitions()); checkErrorResponse(createElectPreferredLeadersRequest(), new UnknownServerException()); checkResponse(createElectPreferredLeadersResponse(), 0); + checkRequest(createIncrementalAlterConfigsRequest()); + checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException()); + checkResponse(createIncrementalAlterConfigsResponse(), 0); } @Test @@ -1477,4 +1486,30 @@ public class RequestResponseTest { return new ElectPreferredLeadersResponse(data); } + private IncrementalAlterConfigsRequest createIncrementalAlterConfigsRequest() { + IncrementalAlterConfigsRequestData data = new IncrementalAlterConfigsRequestData(); + AlterableConfig alterableConfig = new AlterableConfig() + .setName("retention.ms") + .setConfigOperation((byte) 0) + .setValue("100"); + AlterableConfigSet alterableConfigs = new AlterableConfigSet(); + alterableConfigs.add(alterableConfig); + + data.resources().add(new AlterConfigsResource() + .setResourceName("testtopic") + .setResourceType(ResourceType.TOPIC.code()) + .setConfigs(alterableConfigs)); + return new IncrementalAlterConfigsRequest.Builder(data).build((short) 0); + } + + private IncrementalAlterConfigsResponse createIncrementalAlterConfigsResponse() { + IncrementalAlterConfigsResponseData data = new IncrementalAlterConfigsResponseData(); + + data.responses().add(new AlterConfigsResourceResult() + .setResourceName("testtopic") + .setResourceType(ResourceType.TOPIC.code()) + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Duplicate Keys")); + return new IncrementalAlterConfigsResponse(data); + } } diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index ab58949..c1e5c62 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -297,6 +297,8 @@ object LogConfig { throw new InvalidConfigurationException(s"Unknown topic config name: $name") } + private[kafka] def configKeys: Map[String, ConfigKey] = configDef.configKeys.asScala + /** * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 0cdaad6..d424700 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -24,6 +24,9 @@ import kafka.log.LogConfig import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.clients.admin.AlterConfigOp +import org.apache.kafka.clients.admin.AlterConfigOp.OpType +import org.apache.kafka.common.config.ConfigDef.ConfigKey import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource} import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException} import org.apache.kafka.common.internals.Topic @@ -38,7 +41,7 @@ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, Describe import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata -import scala.collection.{mutable, _} +import scala.collection.{Map, mutable, _} import scala.collection.JavaConverters._ class AdminManager(val config: KafkaConfig, @@ -364,59 +367,113 @@ class AdminManager(val config: KafkaConfig, def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = { configs.map { case (resource, config) => - def validateConfigPolicy(resourceType: ConfigResource.Type): Unit = { - alterConfigPolicy match { - case Some(policy) => - val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap - policy.validate(new AlterConfigPolicy.RequestMetadata( - new ConfigResource(resourceType, resource.name), configEntriesMap.asJava)) - case None => - } - } try { + val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap + + val configProps = new Properties + config.entries.asScala.foreach { configEntry => + configProps.setProperty(configEntry.name, configEntry.value) + } + resource.`type` match { - case ConfigResource.Type.TOPIC => - val topic = resource.name + case ConfigResource.Type.TOPIC => alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap) + case ConfigResource.Type.BROKER => alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap) + case resourceType => + throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType") + } + } catch { + case e @ (_: ConfigException | _: IllegalArgumentException) => + val message = s"Invalid config value for resource $resource: ${e.getMessage}" + info(message) + resource -> ApiError.fromThrowable(new InvalidRequestException(message, e)) + case e: Throwable => + // Log client errors at a lower level than unexpected exceptions + val message = s"Error processing alter configs request for resource $resource, config $config" + if (e.isInstanceOf[ApiException]) + info(message, e) + else + error(message, e) + resource -> ApiError.fromThrowable(e) + } + }.toMap + } - val properties = new Properties - config.entries.asScala.foreach { configEntry => - properties.setProperty(configEntry.name, configEntry.value) - } + private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean, + configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { + val topic = resource.name + adminZkClient.validateTopicConfig(topic, configProps) + validateConfigPolicy(resource, configEntriesMap) + if (!validateOnly) { + info(s"Updating topic $topic with new configuration $config") + adminZkClient.changeTopicConfig(topic, configProps) + } - adminZkClient.validateTopicConfig(topic, properties) - validateConfigPolicy(ConfigResource.Type.TOPIC) - if (!validateOnly) { - info(s"Updating topic $topic with new configuration $config") - adminZkClient.changeTopicConfig(topic, properties) - } + resource -> ApiError.NONE + } - resource -> ApiError.NONE + private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean, + configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { + val brokerId = getBrokerId(resource) + val perBrokerConfig = brokerId.nonEmpty + this.config.dynamicConfig.validate(configProps, perBrokerConfig) + validateConfigPolicy(resource, configEntriesMap) + if (!validateOnly) { + if (perBrokerConfig) + this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps) + adminZkClient.changeBrokerConfig(brokerId, + this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig)) + } - case ConfigResource.Type.BROKER => - val brokerId = if (resource.name == null || resource.name.isEmpty) - None - else { - val id = resourceNameToBrokerId(resource.name) - if (id != this.config.brokerId) - throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name") - Some(id) - } - val configProps = new Properties - config.entries.asScala.foreach { configEntry => - configProps.setProperty(configEntry.name, configEntry.value) - } + resource -> ApiError.NONE + } + + private def getBrokerId(resource: ConfigResource) = { + if (resource.name == null || resource.name.isEmpty) + None + else { + val id = resourceNameToBrokerId(resource.name) + if (id != this.config.brokerId) + throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name") + Some(id) + } + } + + private def validateConfigPolicy(resource: ConfigResource, configEntriesMap: Map[String, String]): Unit = { + alterConfigPolicy match { + case Some(policy) => + policy.validate(new AlterConfigPolicy.RequestMetadata( + new ConfigResource(resource.`type`(), resource.name), configEntriesMap.asJava)) + case None => + } + } + def incrementalAlterConfigs(configs: Map[ConfigResource, List[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = { + configs.map { case (resource, alterConfigOps) => + try { + //throw InvalidRequestException if any duplicate keys + val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry().name()) + .mapValues(_.size).filter(_._2 > 1).keys.toSet + if (duplicateKeys.nonEmpty) + throw new InvalidRequestException(s"Error due to duplicate config keys : ${duplicateKeys.mkString(",")}") + + val configEntriesMap = alterConfigOps.map(entry => (entry.configEntry().name(), entry.configEntry().value())).toMap + + resource.`type` match { + case ConfigResource.Type.TOPIC => + val configProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, resource.name) + prepareIncrementalConfigs(alterConfigOps, configProps, LogConfig.configKeys) + alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap) + + case ConfigResource.Type.BROKER => + val brokerId = getBrokerId(resource) val perBrokerConfig = brokerId.nonEmpty - this.config.dynamicConfig.validate(configProps, perBrokerConfig) - validateConfigPolicy(ConfigResource.Type.BROKER) - if (!validateOnly) { - if (perBrokerConfig) - this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps) - adminZkClient.changeBrokerConfig(brokerId, - this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig)) - } - resource -> ApiError.NONE + val persistentProps = if (perBrokerConfig) adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.get.toString) + else adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default) + + val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig) + prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys) + alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap) case resourceType => throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType") } @@ -427,7 +484,7 @@ class AdminManager(val config: KafkaConfig, resource -> ApiError.fromThrowable(new InvalidRequestException(message, e)) case e: Throwable => // Log client errors at a lower level than unexpected exceptions - val message = s"Error processing alter configs request for resource $resource, config $config" + val message = s"Error processing alter configs request for resource $resource, config $alterConfigOps" if (e.isInstanceOf[ApiException]) info(message, e) else @@ -437,6 +494,37 @@ class AdminManager(val config: KafkaConfig, }.toMap } + private def prepareIncrementalConfigs(alterConfigOps: List[AlterConfigOp], configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = { + + def listType(configName: String, configKeys: Map[String, ConfigKey]): Boolean = { + val configKey = configKeys(configName) + if (configKey == null) + throw new InvalidConfigurationException(s"Unknown topic config name: $configName") + configKey.`type` == ConfigDef.Type.LIST + } + + alterConfigOps.foreach { alterConfigOp => + alterConfigOp.opType() match { + case OpType.SET => configProps.setProperty(alterConfigOp.configEntry().name(), alterConfigOp.configEntry().value()) + case OpType.DELETE => configProps.remove(alterConfigOp.configEntry().name()) + case OpType.APPEND => { + if (!listType(alterConfigOp.configEntry().name(), configKeys)) + throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry().name()}") + val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList + val newValueList = oldValueList ::: alterConfigOp.configEntry().value().split(",").toList + configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(",")) + } + case OpType.SUBTRACT => { + if (!listType(alterConfigOp.configEntry().name(), configKeys)) + throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry().name()}") + val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList + val newValueList = oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList) + configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(",")) + } + } + } + } + def shutdown() { topicPurgatory.shutdown() CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 140fdd4..56adc9b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -39,6 +39,8 @@ import kafka.security.auth.{Resource, _} import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.utils.{CoreUtils, Logging} import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} +import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ @@ -151,6 +153,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request) case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request) case ApiKeys.ELECT_PREFERRED_LEADERS => handleElectPreferredReplicaLeader(request) + case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request) } } catch { case e: FatalExitError => throw e @@ -2155,6 +2158,34 @@ class KafkaApis(val requestChannel: RequestChannel, new ApiError(error, null) } + def handleIncrementalAlterConfigsRequest(request: RequestChannel.Request): Unit = { + val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest] + + val configs = alterConfigsRequest.data().resources().iterator().asScala.map { alterConfigResource => + val configResource = new ConfigResource(ConfigResource.Type.forId(alterConfigResource.resourceType()), alterConfigResource.resourceName()) + configResource -> alterConfigResource.configs().iterator().asScala.map { + alterConfig => new AlterConfigOp(new ConfigEntry(alterConfig.name(), alterConfig.value()), OpType.forId(alterConfig.configOperation())) }.toList + }.toMap + + val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) => + resource.`type` match { + case ConfigResource.Type.BROKER => + authorize(request.session, AlterConfigs, Resource.ClusterResource) + case ConfigResource.Type.TOPIC => + authorize(request.session, AlterConfigs, Resource(Topic, resource.name, LITERAL)) + case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") + } + } + + val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data().validateOnly()) + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(request.session, resource) + } + sendResponseMaybeThrottle(request, requestThrottleMs => + new IncrementalAlterConfigsResponse(IncrementalAlterConfigsResponse.toResponseData(requestThrottleMs, + (authorizedResult ++ unauthorizedResult).asJava))) + } + def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = { val describeConfigsRequest = request.body[DescribeConfigsRequest] val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource => diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index dbb6213..9e35f40 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -1406,6 +1406,184 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(2, currentLeader(partition1)) assertEquals(2, currentLeader(partition2)) } + + @Test + def testValidIncrementalAlterConfigs(): Unit = { + client = AdminClient.create(createConfig) + + // Create topics + val topic1 = "incremental-alter-configs-topic-1" + val topic1Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic1) + val topic1CreateConfigs = new Properties + topic1CreateConfigs.setProperty(LogConfig.RetentionMsProp, "60000000") + topic1CreateConfigs.setProperty(LogConfig.CleanupPolicyProp, LogConfig.Compact) + createTopic(topic1, numPartitions = 1, replicationFactor = 1, topic1CreateConfigs) + + val topic2 = "incremental-alter-configs-topic-2" + val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2) + createTopic(topic2) + + // Alter topic configs + var topic1AlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.FlushMsProp, "1000"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Delete), AlterConfigOp.OpType.APPEND), + new AlterConfigOp(new ConfigEntry(LogConfig.RetentionMsProp, ""), AlterConfigOp.OpType.DELETE) + ).asJavaCollection + + val topic2AlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "lz4"), AlterConfigOp.OpType.SET) + ).asJavaCollection + + var alterResult = client.incrementalAlterConfigs(Map( + topic1Resource -> topic1AlterConfigs, + topic2Resource -> topic2AlterConfigs + ).asJava) + + assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) + alterResult.all.get + + // Verify that topics were updated correctly + var describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava) + var configs = describeResult.all.get + + assertEquals(2, configs.size) + + assertEquals("1000", configs.get(topic1Resource).get(LogConfig.FlushMsProp).value) + assertEquals("compact,delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value) + assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString, configs.get(topic1Resource).get(LogConfig.RetentionMsProp).value) + + assertEquals("0.9", configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value) + assertEquals("lz4", configs.get(topic2Resource).get(LogConfig.CompressionTypeProp).value) + + //verify subtract operation + topic1AlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.SUBTRACT) + ).asJava + + alterResult = client.incrementalAlterConfigs(Map( + topic1Resource -> topic1AlterConfigs + ).asJava) + alterResult.all.get + + // Verify that topics were updated correctly + describeResult = client.describeConfigs(Seq(topic1Resource).asJava) + configs = describeResult.all.get + + assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value) + assertEquals("1000", configs.get(topic1Resource).get(LogConfig.FlushMsProp).value) // verify previous change is still intact + + // Alter topics with validateOnly=true + topic1AlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.APPEND) + ).asJava + + alterResult = client.incrementalAlterConfigs(Map( + topic1Resource -> topic1AlterConfigs + ).asJava, new AlterConfigsOptions().validateOnly(true)) + alterResult.all.get + + // Verify that topics were not updated due to validateOnly = true + describeResult = client.describeConfigs(Seq(topic1Resource).asJava) + configs = describeResult.all.get + + assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value) + + //Alter topics with validateOnly=true with invalid configs + topic1AlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "zip"), AlterConfigOp.OpType.SET) + ).asJava + + alterResult = client.incrementalAlterConfigs(Map( + topic1Resource -> topic1AlterConfigs + ).asJava, new AlterConfigsOptions().validateOnly(true)) + + assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], + Some("Invalid config value for resource")) + } + + @Test + def testInvalidIncrementalAlterConfigs(): Unit = { + client = AdminClient.create(createConfig) + + // Create topics + val topic1 = "incremental-alter-configs-topic-1" + val topic1Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic1) + createTopic(topic1) + + val topic2 = "incremental-alter-configs-topic-2" + val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2) + createTopic(topic2) + + //Add duplicate Keys for topic1 + var topic1AlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.75"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.65"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.SET) // valid entry + ).asJavaCollection + + //Add valid config for topic2 + var topic2AlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), AlterConfigOp.OpType.SET) + ).asJavaCollection + + var alterResult = client.incrementalAlterConfigs(Map( + topic1Resource -> topic1AlterConfigs, + topic2Resource -> topic2AlterConfigs + ).asJava) + assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) + + //InvalidRequestException error for topic1 + assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], + Some("Error due to duplicate config keys")) + + //operation should succeed for topic2 + alterResult.values().get(topic2Resource).get() + + // Verify that topic1 is not config not updated, and topic2 config is updated + val describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava) + val configs = describeResult.all.get + assertEquals(2, configs.size) + + assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topic1Resource).get(LogConfig.MinCleanableDirtyRatioProp).value) + assertEquals(Defaults.CompressionType.toString, configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value) + assertEquals("0.9", configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value) + + //check invalid use of append/subtract operation types + topic1AlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.APPEND) + ).asJavaCollection + + topic2AlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy"), AlterConfigOp.OpType.SUBTRACT) + ).asJavaCollection + + alterResult = client.incrementalAlterConfigs(Map( + topic1Resource -> topic1AlterConfigs, + topic2Resource -> topic2AlterConfigs + ).asJava) + assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) + + assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], + Some("Config value append is not allowed for config")) + + assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidRequestException], + Some("Config value subtract is not allowed for config")) + + + //try to add invalid config + topic1AlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), AlterConfigOp.OpType.SET) + ).asJavaCollection + + alterResult = client.incrementalAlterConfigs(Map( + topic1Resource -> topic1AlterConfigs + ).asJava) + assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet) + + assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], + Some("Invalid config value for resource")) + } } object AdminClientIntegrationTest { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 9b15389..0bc1045 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -25,7 +25,7 @@ import kafka.network.SocketServer import kafka.security.auth._ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigOp} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.producer._ @@ -33,8 +33,9 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME -import org.apache.kafka.common.message.{ControlledShutdownRequestData, CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData} +import org.apache.kafka.common.message._ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet} +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigSet} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord} @@ -148,8 +149,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse], ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse], ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse], - ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse] - ) + ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse], + ApiKeys.INCREMENTAL_ALTER_CONFIGS -> classOf[IncrementalAlterConfigsResponse] + ) val requestKeyToError = Map[ApiKeys, Nothing => Errors]( ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), @@ -194,7 +196,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED), ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error), ApiKeys.ELECT_PREFERRED_LEADERS -> ((resp: ElectPreferredLeadersResponse) => - ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error()) + ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error()), + ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => + IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error) ) val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( @@ -233,7 +237,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl, ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl, ApiKeys.CREATE_PARTITIONS -> topicAlterAcl, - ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl + ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl, + ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl ) @Before @@ -392,6 +397,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest { new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000") ))), true).build() + private def incrementalAlterConfigsRequest = { + val data = new IncrementalAlterConfigsRequestData + val alterableConfig = new AlterableConfig + alterableConfig.setName(LogConfig.MaxMessageBytesProp). + setValue("1000000").setConfigOperation(AlterConfigOp.OpType.SET.id()) + val alterableConfigSet = new AlterableConfigSet + alterableConfigSet.add(alterableConfig) + data.resources().add(new AlterConfigsResource(). + setResourceName(tp.topic).setResourceType(ConfigResource.Type.TOPIC.id()). + setConfigs(alterableConfigSet)) + new IncrementalAlterConfigsRequest.Builder(data).build() + } + private def describeAclsRequest = new DescribeAclsRequest.Builder(AclBindingFilter.ANY).build() private def createAclsRequest = new CreateAclsRequest.Builder( @@ -449,7 +467,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest, // Check StopReplica last since some APIs depend on replica availability ApiKeys.STOP_REPLICA -> stopReplicaRequest, - ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest + ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest, + ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest ) for ((key, request) <- requestKeyToRequest) { diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index fb7b539..760644e 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -39,6 +39,7 @@ import kafka.utils._ import kafka.utils.Implicits._ import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness} import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer} @@ -329,7 +330,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet Files.copy(Paths.get(combinedStoreProps.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)), Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)), StandardCopyOption.REPLACE_EXISTING) - TestUtils.alterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get() + TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get() verifySslProduceConsume(sslProperties1, "alter-truststore-4") verifySslProduceConsume(sslProperties2, "alter-truststore-5") } @@ -509,7 +510,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // Enable unclean leader election val newProps = new Properties newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true") - TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get + TestUtils.incrementalAlterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get waitForConfigOnServer(controller, KafkaConfig.UncleanLeaderElectionEnableProp, "true") // Verify that the old follower with missing records is elected as the new leader @@ -908,7 +909,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val unknownConfig = "some.config" props.put(unknownConfig, "some.config.value") - TestUtils.alterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get + TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1), "Listener config not updated") @@ -971,11 +972,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet .mkString(",") val props = fetchBrokerConfigsFromZooKeeper(servers.head) - val listenerProps = props.asScala.keySet.filter(_.startsWith(listenerPrefix(listenerName))) - listenerProps.foreach(props.remove) + val deleteListenerProps = new Properties() + deleteListenerProps ++= props.asScala.filter(entry => entry._1.startsWith(listenerPrefix(listenerName))) + TestUtils.incrementalAlterConfigs(servers, adminClients.head, deleteListenerProps, perBrokerConfig = true, opType = OpType.DELETE).all.get + + props.clear() props.put(KafkaConfig.ListenersProp, listeners) props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap) - TestUtils.alterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get + TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1), "Listeners not updated") diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 05f4bcd..710ed57 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -22,9 +22,9 @@ import kafka.log.LogConfig import kafka.network.RequestChannel.Session import kafka.security.auth._ import kafka.utils.TestUtils -import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} +import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, InitProducerIdRequestData, JoinGroupRequestData, LeaveGroupRequestData, SaslAuthenticateRequestData, SaslHandshakeRequestData} +import org.apache.kafka.common.message._ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.message.ControlledShutdownRequestData @@ -402,6 +402,10 @@ class RequestQuotaTest extends BaseRequestTest { .setTimeoutMs(0) .setTopicPartitions(Collections.singletonList(partition))) + case ApiKeys.INCREMENTAL_ALTER_CONFIGS => + new IncrementalAlterConfigsRequest.Builder( + new IncrementalAlterConfigsRequestData()) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } @@ -501,6 +505,8 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs case ApiKeys.ELECT_PREFERRED_LEADERS => new ElectPreferredLeadersResponse(response).throttleTimeMs + case ApiKeys.INCREMENTAL_ALTER_CONFIGS => + new IncrementalAlterConfigsResponse(response, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion()).throttleTimeMs case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId") } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c84d75e..f1a5cca 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -26,8 +26,8 @@ import java.security.cert.X509Certificate import java.time.Duration import java.util.{Collections, Properties} import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit} -import javax.net.ssl.X509TrustManager +import javax.net.ssl.X509TrustManager import kafka.api._ import kafka.cluster.{Broker, EndPoint} import kafka.log._ @@ -38,7 +38,8 @@ import Implicits._ import kafka.controller.LeaderIsrAndControllerEpoch import kafka.zk._ import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry} +import org.apache.kafka.clients.admin.AlterConfigOp.OpType +import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.{KafkaFuture, TopicPartition} @@ -1410,6 +1411,20 @@ object TestUtils extends Logging { adminClient.alterConfigs(configs) } + def incrementalAlterConfigs(servers: Seq[KafkaServer], adminClient: AdminClient, props: Properties, + perBrokerConfig: Boolean, opType: OpType = OpType.SET): AlterConfigsResult = { + val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new ConfigEntry(k, v), opType) }.toList.asJavaCollection + val configs = if (perBrokerConfig) { + servers.map { server => + val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) + (resource, configEntries) + }.toMap.asJava + } else { + Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> configEntries).asJava + } + adminClient.incrementalAlterConfigs(configs) + } + def alterTopicConfigs(adminClient: AdminClient, topic: String, topicConfigs: Properties): AlterConfigsResult = { val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava val newConfig = new Config(configEntries) @@ -1451,15 +1466,18 @@ object TestUtils extends Logging { (out.toString, err.toString) } - def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = { + def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable], + expectedErrorMessage: Option[String] = None): Unit = { try { future.get() fail("Expected CompletableFuture.get to return an exception") } catch { case e: ExecutionException => - val cause = e.getCause() + val cause = e.getCause assertTrue("Expected an exception of type " + clazz.getName + "; got type " + - cause.getClass().getName, clazz.isInstance(cause)) + cause.getClass.getName, clazz.isInstance(cause)) + expectedErrorMessage.foreach(message => assertTrue(s"Received error message : ${cause.getMessage}" + + s" does not contain expected error message : $message", cause.getMessage.contains(message))) } }