jsancio commented on code in PR #21053:
URL: https://github.com/apache/kafka/pull/21053#discussion_r2742871184
##########
shell/src/main/java/org/apache/kafka/shell/MetadataShell.java:
##########
@@ -148,10 +152,13 @@ public MetadataShell(
private void initializeWithSnapshotFileReader() throws Exception {
this.fileLock = takeDirectoryLockIfExists(parentParent(new
File(snapshotPath)));
+ DynamicConfigValidator dynamicConfigValidator = new
ControllerConfigurationValidator(new KafkaConfig(new Properties(), false));
+
this.loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setNodeId(-1).
setHighWaterMarkAccessor(() ->
snapshotFileReader.highWaterMark()).
+ setConfigValidator(dynamicConfigValidator).
Review Comment:
Why isn't this the implementation for the builder? This way metadata loader
only need to override this predicate when necessary.
##########
metadata/src/main/java/org/apache/kafka/metadata/DynamicConfigValidator.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.config.ConfigResource;
+
+/**
+ * Validator interface for checking if a configuration name is valid for a
given resource type.
+ */
+@FunctionalInterface
+public interface DynamicConfigValidator {
Review Comment:
I don't think we should use the word "valid." I think that most reader will
assume that this means that it is a supported config AND that the value of the
config is valid. How about `SupportedConfigChecker` for the type and
`isSupported` for the verb/method?
##########
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java:
##########
@@ -51,19 +52,27 @@
public final class MetadataDelta {
public static class Builder {
private MetadataImage image = MetadataImage.EMPTY;
+ private DynamicConfigValidator dynamicConfigValidator = null;
Review Comment:
Please avoid using `null`. Either use `Optional` or make sure that the type
has a default implementation. In this case the default implementation of
`DynamicConfigValidator` can be a function that returns always `true`.
##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -81,6 +82,7 @@ public static class Builder {
private FaultHandler faultHandler = FaultHandlerException::new;
private MetadataLoaderMetrics metrics = null;
private Supplier<OptionalLong> highWaterMarkAccessor = null;
+ private DynamicConfigValidator dynamicConfigValidator = null;
Review Comment:
Again. Try to avoid using `null`. In this cause you can use and implement a
default implementation that always returns `true`, no?
##########
metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java:
##########
@@ -34,9 +35,15 @@
public final class ConfigurationsDelta {
private final ConfigurationsImage image;
private final Map<ConfigResource, ConfigurationDelta> changes = new
HashMap<>();
+ private final DynamicConfigValidator dynamicConfigValidator;
public ConfigurationsDelta(ConfigurationsImage image) {
+ this(image, null);
+ }
Review Comment:
Please avoid overloading constructors/methods.
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -327,6 +338,13 @@ private ApiError validateAlterConfig(
Map<String, String> alteredConfigsForAlterConfigPolicyCheck = new
HashMap<>();
TimelineHashMap<String, String> existingConfigsSnapshot =
configData.get(configResource);
if (existingConfigsSnapshot != null) {
+ if (configValidator != null) {
+ for (String name : existingConfigsSnapshot.keySet()) {
+ if (!configValidator.isValidConfig(configResource.type(),
name)) {
+ existingConfigsSnapshot.remove(name);
Review Comment:
@0xffff-zhiyan , you dont need this change now that you are skipping removed
configs during the replay phase.
##########
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java:
##########
@@ -83,7 +92,12 @@ public MetadataDelta build() {
private DelegationTokenDelta delegationTokenDelta = null;
public MetadataDelta(MetadataImage image) {
+ this(image, null);
+ }
Review Comment:
Let's try to avoid having overloaded constructors of function. I suggest
removing this constructor and may be even making it private. Ideally, user
should be using the Builder for this type.
##########
metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java:
##########
@@ -45,6 +48,9 @@ public void finishSnapshot() {
}
public void replay(ConfigRecord record) {
+ if (dynamicConfigValidator != null &&
!dynamicConfigValidator.isValidConfig(image.resource().type(), record.name())) {
+ return;
+ }
Review Comment:
Do not allow this object to be null and instead require them to provide a
default implementation like always returning true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]