kevin-wu24 commented on code in PR #21053:
URL: https://github.com/apache/kafka/pull/21053#discussion_r2604186827
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -508,6 +509,16 @@ private List<String> getParts(String value, String key,
ConfigResource configRes
*/
public void replay(ConfigRecord record) {
Type type = Type.forId(record.resourceType());
+ // Filter out invalid configs
+ if (type != Type.UNKNOWN) {
+ Set<String> validConfigNames = configSchema.validConfigNames(type);
+ if (!validConfigNames.isEmpty() &&
!validConfigNames.contains(record.name())) {
+ // Ignore the record if it's a removed/invalid config
+ log.debug("Ignoring ConfigRecord for {} with invalid/removed
config name: {}",
+ new ConfigResource(type, record.resourceName()),
record.name());
+ return;
+ }
+ }
Review Comment:
We do not need this change. When upgrading the software version, the
controller that becomes active must load its most recent snapshot from disk,
and that will clean up the MetadataImage.
##########
metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java:
##########
@@ -281,4 +283,26 @@ public int
getStaticallyConfiguredMinInsyncReplicas(Map<String, ?> staticNodeCon
minInsyncReplicasString,
ConfigDef.Type.INT);
}
+
Review Comment:
Please remove the changes to this file. Look at
`ControllerConfigurationValidator` to see how dynamic config changes are
validated.
##########
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##########
@@ -189,5 +191,7 @@ object KafkaRaftServer {
val configSchema = new KafkaConfigSchema(Map(
ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef),
ConfigResource.Type.TOPIC -> LogConfig.configDefCopy,
+ ConfigResource.Type.GROUP -> GroupConfig.configDef(),
Review Comment:
Why is `DynamicBrokerConfig#AllDynamicConfigs` not used as the whitelist?
##########
metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java:
##########
@@ -57,7 +59,17 @@ public void deleteAll() {
public ConfigurationImage apply() {
Map<String, String> newData = new HashMap<>(image.data().size());
+ Type resourceType = image.resource().type();
+ Set<String> validConfigNames = resourceType != Type.UNKNOWN ?
+ ConfigurationsDelta.getValidConfigNames(resourceType) : Set.of();
Review Comment:
We should use `DynamicBrokerConfig#AllDynamicConfigs` as the whitelist.
##########
metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java:
##########
@@ -21,17 +21,48 @@
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.function.Supplier;
/**
* Represents changes to the configurations in the metadata image.
*/
public final class ConfigurationsDelta {
+ /**
Review Comment:
Please remove the changes to this file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]