jsancio commented on code in PR #21053:
URL: https://github.com/apache/kafka/pull/21053#discussion_r2921092903
##########
metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java:
##########
@@ -116,6 +120,69 @@ public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
+ @Test
+ public void testConfigurationDeltaFiltering() {
+ Set<String> validConfigs = Set.of("foo", "bar");
+ SupportedConfigChecker supportedConfigChecker = (resourceType,
configName) -> validConfigs.contains(configName);
+
+ Map<String, String> initialConfigs = Map.of("foo", "value1"); // valid
+ ConfigurationImage image = new ConfigurationImage(new
ConfigResource(BROKER, "0"), initialConfigs);
+
+ ConfigurationDelta delta = new ConfigurationDelta(image,
supportedConfigChecker);
+ delta.replay(new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("bar").setValue("value2"));
+ delta.replay(new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("qux").setValue("value3"));
+
+ ConfigurationImage result = delta.apply();
+
+ assertTrue(result.data().containsKey("foo"));
+ assertTrue(result.data().containsKey("bar"));
+ assertFalse(result.data().containsKey("qux"));
+ }
+
+ @Test
+ public void testConfigurationDeltaWithoutFiltering() {
+ Map<String, String> initialConfigs = Map.of("foo", "value1", "bar",
"value2");
+ ConfigurationImage image = new ConfigurationImage(new
ConfigResource(BROKER, "0"), initialConfigs);
+
+ ConfigurationDelta delta = new ConfigurationDelta(image,
SupportedConfigChecker.TRUE);
+ delta.replay(new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("baz").setValue("value3"));
+
+ ConfigurationImage result = delta.apply();
+
+ assertTrue(result.data().containsKey("foo"));
+ assertTrue(result.data().containsKey("bar"));
+ assertTrue(result.data().containsKey("baz"));
+ }
+
+ @Test
+ public void testConfigurationDeltaPreventsInvalidConfigsInResultingImage()
{
+ Set<String> validConfigs = Set.of("foo", "bar");
+ SupportedConfigChecker supportedConfigChecker = (resourceType,
configName) -> validConfigs.contains(configName);
+
+ Map<String, String> initialConfigs = Map.of(
+ "foo", "value1", // valid
+ "bar", "value2", // valid
+ "invalid", "value3" // invalid
+ );
+ ConfigurationImage image = new ConfigurationImage(new
ConfigResource(BROKER, "0"), initialConfigs);
Review Comment:
Okay. This is not a realistic test if SupportedConfigChecker is functional
(always returns the same value for the same input). If supported config checker
filtered a value the configuration image will never have the value.
I think we should remove this test to avoid confusion.
##########
metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java:
##########
@@ -45,6 +48,11 @@ public void finishSnapshot() {
}
public void replay(ConfigRecord record) {
+ if (!supportedConfigChecker.isSupported(image.resource().type(),
record.name())) {
Review Comment:
I see. This works because `image.resource().type() ==
record.resourceType()`, right? Should we assert that?
##########
core/src/main/scala/kafka/server/DefaultSupportedConfigChecker.scala:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
Review Comment:
Let's avoid adding new Scala types if it is not necessary. It looks like you
can implement this type in Java in the server module. You can put in the
`org.apache.kafka.server.config` package.
##########
core/src/main/scala/kafka/server/DefaultSupportedConfigChecker.scala:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.coordinator.group.GroupConfig
+import org.apache.kafka.metadata.SupportedConfigChecker
+import org.apache.kafka.server.config.DynamicConfig
+import org.apache.kafka.server.metrics.ClientMetricsConfigs
+import org.apache.kafka.storage.internals.log.LogConfig
+import scala.jdk.CollectionConverters._
+
+/**
+ * Default implementation of SupportedConfigChecker that checks if a
configuration name
+ * is supported for a given resource type based on the actual config
definitions.
+ *
+ * This class maintains a whitelist of valid configuration names per resource
type:
+ * - TOPIC: Configurations defined in LogConfig
+ * - BROKER: Configurations defined in DynamicConfig.Broker
+ * - CLIENT_METRICS: Configurations defined in ClientMetricsConfigs
+ * - GROUP: Configurations defined in GroupConfig
+ */
+class DefaultSupportedConfigChecker extends SupportedConfigChecker {
+ private val validConfigsByType: Map[ConfigResource.Type, util.Set[String]] =
{
+ val topicConfigs = LogConfig.nonInternalConfigNames.asScala.toSet
+ val brokerConfigs = DynamicConfig.Broker.names.asScala.toSet
+ val clientMetricsConfigs =
ClientMetricsConfigs.configDef().names.asScala.toSet
+ val groupConfigs = GroupConfig.configDef().names.asScala.toSet
+
+ Map(
+ ConfigResource.Type.TOPIC -> topicConfigs.asJava,
+ ConfigResource.Type.BROKER -> brokerConfigs.asJava,
+ ConfigResource.Type.CLIENT_METRICS -> clientMetricsConfigs.asJava,
+ ConfigResource.Type.GROUP -> groupConfigs.asJava
+ )
Review Comment:
Why are you converting to Scala then back to Java? E.g.
`asScala.toSet.asJava`
##########
core/src/main/scala/kafka/server/DefaultSupportedConfigChecker.scala:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.coordinator.group.GroupConfig
+import org.apache.kafka.metadata.SupportedConfigChecker
+import org.apache.kafka.server.config.DynamicConfig
+import org.apache.kafka.server.metrics.ClientMetricsConfigs
+import org.apache.kafka.storage.internals.log.LogConfig
+import scala.jdk.CollectionConverters._
+
+/**
+ * Default implementation of SupportedConfigChecker that checks if a
configuration name
+ * is supported for a given resource type based on the actual config
definitions.
+ *
+ * This class maintains a whitelist of valid configuration names per resource
type:
+ * - TOPIC: Configurations defined in LogConfig
+ * - BROKER: Configurations defined in DynamicConfig.Broker
+ * - CLIENT_METRICS: Configurations defined in ClientMetricsConfigs
+ * - GROUP: Configurations defined in GroupConfig
+ */
+class DefaultSupportedConfigChecker extends SupportedConfigChecker {
Review Comment:
Make the type/class final and public when moving it to Java.
##########
core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala:
##########
@@ -211,4 +211,5 @@ class ControllerConfigurationValidatorTest {
assertThrows(classOf[InvalidConfigurationException], () =>
validator.validate(
new ConfigResource(GROUP, "group"), config, emptyMap())).getMessage)
}
+
Review Comment:
Extra line not needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]