jsancio commented on code in PR #21053:
URL: https://github.com/apache/kafka/pull/21053#discussion_r2770513959
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -87,6 +89,7 @@ static class Builder {
private Map<String, Object> staticConfig = Map.of();
private int nodeId = 0;
private FeatureControlManager featureControl = null;
+ private SupportedConfigChecker supportedConfigChecker = (resourceType,
configName) -> true;
Review Comment:
Instead of defining this default value 3 times (at least) why not define
this in SupportedConfigChecker. E.g.
```java
private static final SupportedConfigChecker TRUE = ...;
SupportedConfigChecker true() {
return TRUE;
}
```
##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java:
##########
@@ -76,7 +76,9 @@ public Optional<TopicMetadata> topicMetadata(String
topicName) {
@Override
public CoordinatorMetadataDelta emptyDelta() {
- return new KRaftCoordinatorMetadataDelta(new
MetadataDelta(metadataImage));
+ return new KRaftCoordinatorMetadataDelta(new MetadataDelta.Builder().
+ setImage(metadataImage).
+ build());
Review Comment:
We tend to use this formatting in this module:
```java
return new KRaftCoordinatorMetadataDelta(
new MetadataDelta.Builder()
.setImage(metadataImage)
.build()
);
```
##########
metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java:
##########
@@ -45,6 +48,9 @@ public void finishSnapshot() {
}
public void replay(ConfigRecord record) {
+ if (!supportedConfigChecker.isSupported(image.resource().type(),
record.name())) {
+ return;
+ }
Review Comment:
Same here. I suggest writing a comment before the "return" explaining why
kafka skips these records.
##########
core/src/main/scala/kafka/server/SharedServer.scala:
##########
@@ -112,6 +111,7 @@ class SharedServer(
private var usedByController: Boolean = false
val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
+ val supportedConfigChecker: SupportedConfigChecker = new
ControllerConfigurationValidator(brokerConfig)
Review Comment:
Btw, it is a code smell that this type is called controller configuration
validator, it takes a broker config and it is used in shared server. For
example, before your change the controller configuration validator was only
created in controller server and used by the quorum controller.
##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -44,7 +47,33 @@ import scala.collection.mutable
* in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the
same sense
* as the others. It is not persisted to the metadata log.
*/
-class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends
ConfigurationValidator {
+class ControllerConfigurationValidator(private val kafkaConfig: KafkaConfig)
+ extends ConfigurationValidator with SupportedConfigChecker {
+ private val validConfigsByType: Map[ConfigResource.Type, util.Set[String]] =
{
+ val topicConfigs = LogConfig.nonInternalConfigNames.asScala.toSet
+ val brokerConfigs = DynamicConfig.Broker.names.asScala.toSet
+ val clientMetricsConfigs =
ClientMetricsConfigs.configDef().names.asScala.toSet
+ val groupConfigs = GroupConfig.configDef().names.asScala.toSet
+ // Quota configs can be used with different resource types, so we include
them for all types
+ val allQuotaConfigs =
QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs().names.asScala ++
+
QuotaConfig.userAndClientQuotaConfigs().names.asScala ++
+ QuotaConfig.ipConfigs.names.asScala
+
+ Map(
+ ConfigResource.Type.TOPIC -> (topicConfigs ++ allQuotaConfigs).asJava,
+ ConfigResource.Type.BROKER -> (brokerConfigs ++ allQuotaConfigs).asJava,
+ ConfigResource.Type.CLIENT_METRICS -> (clientMetricsConfigs ++
allQuotaConfigs).asJava,
+ ConfigResource.Type.GROUP -> (groupConfigs ++ allQuotaConfigs).asJava
+ )
Review Comment:
Why did you have to add all quota configs for all of the resources. That
doesn't seem right. Each resource should define what quota are valid. For
example, I see the following definition in LogConfig:
```java
.define(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG,
LIST, QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_DEFAULT,
ThrottledReplicaListValidator.INSTANCE, MEDIUM,
QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_DOC)
.define(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, LIST,
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_DEFAULT,
ThrottledReplicaListValidator.INSTANCE, MEDIUM,
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_DOC)
```
Looking at the validation for the broker resource it looks like all keys are
allowed:
```java
case BROKER => validateBrokerName(resource.name())
```
##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -2059,7 +2059,8 @@ public void testOnMetadataUpdate() {
verify(coordinator0).onLoaded(CoordinatorMetadataImage.EMPTY);
// Publish a new image.
- CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(new
MetadataDelta(MetadataImage.EMPTY));
+ CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(
+ new
MetadataDelta.Builder().setImage(MetadataImage.EMPTY).build());
Review Comment:
Same comment here. See my other comment about how to format this change.
##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -44,7 +47,33 @@ import scala.collection.mutable
* in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the
same sense
* as the others. It is not persisted to the metadata log.
*/
-class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends
ConfigurationValidator {
+class ControllerConfigurationValidator(private val kafkaConfig: KafkaConfig)
Review Comment:
Why did you decide to extend this type? The implementation of `isSupported`
is completely disjoint of the rest of the type. It looks like you should be
able to implement this type in the server module. The server module should have
access to tall of the config definitions that you need.
##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -44,7 +47,33 @@ import scala.collection.mutable
* in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the
same sense
* as the others. It is not persisted to the metadata log.
*/
-class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends
ConfigurationValidator {
+class ControllerConfigurationValidator(private val kafkaConfig: KafkaConfig)
+ extends ConfigurationValidator with SupportedConfigChecker {
+ private val validConfigsByType: Map[ConfigResource.Type, util.Set[String]] =
{
Review Comment:
The formatting looks off. How about:
```scala
class ControllerConfigurationValidator(
private val kafkaConfig: KafkaConfig
) extends ConfigurationValidator with SupportedConfigChecker {
private val validConfigsByType: Map[ConfigResource.Type, util.Set[String]]
= {
```
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -509,6 +520,11 @@ private List<String> getParts(String value, String key,
ConfigResource configRes
public void replay(ConfigRecord record) {
Type type = Type.forId(record.resourceType());
ConfigResource configResource = new ConfigResource(type,
record.resourceName());
+
+ if (!supportedConfigChecker.isSupported(configResource.type(),
record.name())) {
+ return;
+ }
Review Comment:
I suggest writing a comment before the return explaining why the controller
skips records that are not "supported."
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]