dajac commented on code in PR #15304:
URL: https://github.com/apache/kafka/pull/15304#discussion_r1858213566


##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -290,17 +293,87 @@ public void 
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
 
         try (Admin client = cluster.admin()) {
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG, 
"PKCS12"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG, 
"/temp/test.jks"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG, 
"password"), alterOpts));
         }
     }
 
+    @ClusterTest
+    public void testUpdateInvalidBrokerConfigs() {
+        updateAndCheckInvalidBrokerConfig(Optional.empty());
+        
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
 + ""));
+    }
+
+    private void updateAndCheckInvalidBrokerConfig(Optional<String> 
brokerIdOrDefault) {
+        List<String> alterOpts = 
generateDefaultAlterOpts(cluster.bootstrapServers());
+        try (Admin client = cluster.admin()) {
+            alterConfigWithAdmin(client, brokerIdOrDefault, 
Collections.singletonMap("invalid", "2"), alterOpts);
+
+            Stream<String> describeCommand = Stream.concat(
+                    Stream.concat(
+                            Stream.of("--bootstrap-server", 
cluster.bootstrapServers()),
+                            Stream.of(entityOp(brokerIdOrDefault).toArray(new 
String[0]))),
+                    Stream.of("--entity-type", "brokers", "--describe"));
+            String describeResult = captureStandardStream(false, 
run(describeCommand));
+
+            // We will treat unknown config as sensitive
+            assertTrue(describeResult.contains("sensitive=true"), 
describeResult);
+            // Sensitive config will not return
+            assertTrue(describeResult.contains("invalid=null"), 
describeResult);
+        }
+    }
+
+    @ClusterTest
+    public void testUpdateInvalidTopicConfigs() throws ExecutionException, 
InterruptedException {
+        List<String> alterOpts = asList("--bootstrap-server", 
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+        try (Admin client = cluster.admin()) {
+            client.createTopics(Collections.singletonList(new 
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+            assertInstanceOf(
+                    InvalidConfigurationException.class,
+                    assertThrows(
+                            ExecutionException.class,
+                            () -> ConfigCommand.alterConfig(
+                                    client,
+                                    new ConfigCommand.ConfigCommandOptions(
+                                            toArray(alterOpts,
+                                                    asList("--add-config", 
"invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
+                    ).getCause()
+            );
+        }
+    }
+
+    // Test case from KAFKA-13788
+    @ClusterTest
+    public void testUpdateBrokerConfigNotAffectedByInvalidConfig() {
+        try (Admin client = cluster.admin()) {
+            ConfigCommand.alterConfig(client, new 
ConfigCommand.ConfigCommandOptions(
+                    toArray(asList("--bootstrap-server", 
cluster.bootstrapServers(),
+                            "--alter",
+                            "--add-config", "log.cleaner.threadzz=2",

Review Comment:
   For my understanding, it seems that we don't validate broker configs. I 
suppose that we don't because plugins could rely on unknown configs. Is my 
understanding correct?



##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -290,17 +293,87 @@ public void 
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
 
         try (Admin client = cluster.admin()) {
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG, 
"PKCS12"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG, 
"/temp/test.jks"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG, 
"password"), alterOpts));
         }
     }
 
+    @ClusterTest
+    public void testUpdateInvalidBrokerConfigs() {
+        updateAndCheckInvalidBrokerConfig(Optional.empty());
+        
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
 + ""));
+    }
+
+    private void updateAndCheckInvalidBrokerConfig(Optional<String> 
brokerIdOrDefault) {
+        List<String> alterOpts = 
generateDefaultAlterOpts(cluster.bootstrapServers());
+        try (Admin client = cluster.admin()) {
+            alterConfigWithAdmin(client, brokerIdOrDefault, 
Collections.singletonMap("invalid", "2"), alterOpts);
+
+            Stream<String> describeCommand = Stream.concat(
+                    Stream.concat(
+                            Stream.of("--bootstrap-server", 
cluster.bootstrapServers()),
+                            Stream.of(entityOp(brokerIdOrDefault).toArray(new 
String[0]))),
+                    Stream.of("--entity-type", "brokers", "--describe"));
+            String describeResult = captureStandardStream(false, 
run(describeCommand));
+
+            // We will treat unknown config as sensitive
+            assertTrue(describeResult.contains("sensitive=true"), 
describeResult);
+            // Sensitive config will not return
+            assertTrue(describeResult.contains("invalid=null"), 
describeResult);
+        }
+    }
+
+    @ClusterTest
+    public void testUpdateInvalidTopicConfigs() throws ExecutionException, 
InterruptedException {
+        List<String> alterOpts = asList("--bootstrap-server", 
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+        try (Admin client = cluster.admin()) {
+            client.createTopics(Collections.singletonList(new 
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+            assertInstanceOf(

Review Comment:
   nit: You may be able to use `assertFutureThrows`.



##########
test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java:
##########
@@ -174,8 +174,9 @@ private KafkaConfig createNodeConfig(TestKitNode node) 
throws IOException {
             }
             props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
quorumVoterStringBuilder.toString());
 
-            // reduce log cleaner offset map memory usage
-            props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
"2097152");
+            // reduce log cleaner offset map memory usage, must be at greater 
than 1MB per cleaner thread, set to 2M+2 so that
+            // we can set 2 cleaner threads.
+            props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
"2097154");

Review Comment:
   Is this change related to the PR?



##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -333,6 +333,9 @@ object TestUtils extends Logging {
     if 
(!props.containsKey(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG))
       
props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0")
     rack.foreach(props.put(ServerConfigs.BROKER_RACK_CONFIG, _))
+    // reduce log cleaner offset map memory usage, must be at greater than 1MB 
per cleaner thread, set to 2M+2 so that
+    // we can set 2 cleaner threads.
+    props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097154")

Review Comment:
   Is this change related to the PR?



##########
core/src/test/java/kafka/admin/ConfigCommandTest.java:
##########
@@ -1019,13 +1019,13 @@ public synchronized DescribeConfigsResult 
describeConfigs(Collection<ConfigResou
 
             @SuppressWarnings("deprecation")

Review Comment:
   I suppose that it does not apply any more.



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -172,27 +171,14 @@ object ConfigCommand extends Logging {
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
     entityTypeHead match {
-      case ConfigType.TOPIC =>
-        alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
-      case ConfigType.BROKER =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-        val sensitiveEntries = newEntries.filter(_._2.value == null)
-        if (sensitiveEntries.nonEmpty)
-          throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-        val newConfig = new JConfig(newEntries.asJava.values)
-
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+      case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER | 
ConfigType.GROUP =>
+        val configResourceType = entityTypeHead match {
+          case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+          case ConfigType.CLIENT_METRICS =>ConfigResource.Type.CLIENT_METRICS
+          case ConfigType.BROKER => ConfigResource.Type.BROKER
+          case ConfigType.GROUP => ConfigResource.Type.GROUP
+        }
+        alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, configResourceType)

Review Comment:
   What does happen if the unsupported exception is thrown here? I wonder 
whether we should catch it in the main method in order to provide a meaningful 
error message to the user. What do you think?



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -405,7 +420,7 @@ object ConfigCommand extends Logging {
         val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
         val alterLogLevelEntries = (configsToBeAdded.values.map(new 
AlterConfigOp(_, AlterConfigOp.OpType.SET))
           ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-        ).asJavaCollection
+          ).asJavaCollection

Review Comment:
   I find this line pretty hard to read. While we do changes here, I wonder if 
we could refactor it a bit more. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to