dengziming commented on code in PR #15304:
URL: https://github.com/apache/kafka/pull/15304#discussion_r1862887011


##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -80,6 +82,11 @@ object ConfigCommand extends Logging {
         System.err.println(e.getMessage)
         Exit.exit(1)
 
+      case e: UnsupportedVersionException =>
+        logger.debug(s"Unsupported API encountered in server when executing 
config command with args '${args.mkString(" ")}'")
+        e.printStackTrace(System.err)

Review Comment:
   Yes, since it's an expected error, it's unnecessary to print the full stack 
trace. 



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -172,27 +178,25 @@ object ConfigCommand extends Logging {
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
     entityTypeHead match {
-      case ConfigType.TOPIC =>
-        alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
-      case ConfigType.BROKER =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-        val sensitiveEntries = newEntries.filter(_._2.value == null)
-        if (sensitiveEntries.nonEmpty)
-          throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-        val newConfig = new JConfig(newEntries.asJava.values)
-
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+      case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER | 
ConfigType.GROUP =>
+        val configResourceType = entityTypeHead match {
+          case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+          case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
+          case ConfigType.BROKER => ConfigResource.Type.BROKER
+          case ConfigType.GROUP => ConfigResource.Type.GROUP
+        }
+        try {
+          alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, configResourceType)
+        } catch {
+          case e: ExecutionException =>
+            e.getCause match {
+              case cause: UnsupportedVersionException if entityTypeHead == 
ConfigType.BROKER =>

Review Comment:
   I was trying to fallback to old API so we should only support BROKER, now 
it's ok to cover all types.



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -172,27 +178,25 @@ object ConfigCommand extends Logging {
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
     entityTypeHead match {
-      case ConfigType.TOPIC =>
-        alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
-      case ConfigType.BROKER =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-        val sensitiveEntries = newEntries.filter(_._2.value == null)
-        if (sensitiveEntries.nonEmpty)
-          throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-        val newConfig = new JConfig(newEntries.asJava.values)
-
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+      case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER | 
ConfigType.GROUP =>
+        val configResourceType = entityTypeHead match {
+          case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+          case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
+          case ConfigType.BROKER => ConfigResource.Type.BROKER
+          case ConfigType.GROUP => ConfigResource.Type.GROUP
+        }
+        try {
+          alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, configResourceType)
+        } catch {
+          case e: ExecutionException =>
+            e.getCause match {
+              case cause: UnsupportedVersionException if entityTypeHead == 
ConfigType.BROKER =>
+                throw new UnsupportedVersionException(s"Could not update 
broker config $entityNameHead, because brokers don't support api 
${ApiKeys.INCREMENTAL_ALTER_CONFIGS},"
+                + " You can upgrade your brokers to version 2.3.0 or newer to 
avoid this error.", cause)

Review Comment:
   I reworded it to firstly suggest using older version of client, and I still 
keep the suggestion of upgrading server.



##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -290,17 +303,125 @@ public void 
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
 
         try (Admin client = cluster.admin()) {
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG, 
"PKCS12"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG, 
"/temp/test.jks"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG, 
"password"), alterOpts));
         }
     }
 
+    @ClusterTest
+    public void testUpdateInvalidBrokerConfigs() {
+        updateAndCheckInvalidBrokerConfig(Optional.empty());
+        
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
 + ""));
+    }
+
+    private void updateAndCheckInvalidBrokerConfig(Optional<String> 
brokerIdOrDefault) {
+        List<String> alterOpts = 
generateDefaultAlterOpts(cluster.bootstrapServers());
+        try (Admin client = cluster.admin()) {
+            alterConfigWithAdmin(client, brokerIdOrDefault, 
Collections.singletonMap("invalid", "2"), alterOpts);
+
+            Stream<String> describeCommand = Stream.concat(
+                    Stream.concat(
+                            Stream.of("--bootstrap-server", 
cluster.bootstrapServers()),
+                            Stream.of(entityOp(brokerIdOrDefault).toArray(new 
String[0]))),
+                    Stream.of("--entity-type", "brokers", "--describe"));
+            String describeResult = captureStandardStream(false, 
run(describeCommand));
+
+            // We will treat unknown config as sensitive
+            assertTrue(describeResult.contains("sensitive=true"), 
describeResult);
+            // Sensitive config will not return
+            assertTrue(describeResult.contains("invalid=null"), 
describeResult);
+        }
+    }
+
+    @ClusterTest
+    public void testUpdateInvalidTopicConfigs() throws ExecutionException, 
InterruptedException {
+        List<String> alterOpts = asList("--bootstrap-server", 
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+        try (Admin client = cluster.admin()) {
+            client.createTopics(Collections.singletonList(new 
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+            assertInstanceOf(
+                    InvalidConfigurationException.class,
+                    assertThrows(
+                            ExecutionException.class,
+                            () -> ConfigCommand.alterConfig(
+                                    client,
+                                    new ConfigCommand.ConfigCommandOptions(
+                                            toArray(alterOpts,
+                                                    asList("--add-config", 
"invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
+                    ).getCause()
+            );
+        }
+    }
+
+    // Test case from KAFKA-13788
+    @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", 
value = "2097154"),
+    })
+    public void testUpdateBrokerConfigNotAffectedByInvalidConfig() {
+        try (Admin client = cluster.admin()) {
+            ConfigCommand.alterConfig(client, new 
ConfigCommand.ConfigCommandOptions(
+                    toArray(asList("--bootstrap-server", 
cluster.bootstrapServers(),
+                            "--alter",
+                            "--add-config", "log.cleaner.threadzz=2",
+                            "--entity-type", "brokers",
+                            "--entity-default"))));
+
+            ConfigCommand.alterConfig(client, new 
ConfigCommand.ConfigCommandOptions(
+                    toArray(asList("--bootstrap-server", 
cluster.bootstrapServers(),
+                            "--alter",
+                            "--add-config", "log.cleaner.threads=2",
+                            "--entity-type", "brokers",
+                            "--entity-default"))));
+            kafka.utils.TestUtils.waitUntilTrue(
+                    () -> 
cluster.brokerSocketServers().stream().allMatch(broker -> 
broker.config().getInt("log.cleaner.threads") == 2),
+                    () -> "Timeout waiting for topic config propagating to 
broker",
+                    org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+                    100L);
+        }
+    }
+
+    @ClusterTest(
+            serverProperties = {@ClusterConfigProperty(key = 
"log.cleaner.dedupe.buffer.size", value = "2097154")},
+            // Zk code has been removed, use kraft and mockito to mock this 
situation
+            metadataVersion = MetadataVersion.IBP_3_3_IV0
+    )
+    public void testFallbackToDeprecatedAlterConfigs() throws 
NoSuchMethodException, InvocationTargetException, InstantiationException, 
IllegalAccessException {
+        try (Admin client = cluster.admin()) {
+            Admin spyAdmin = Mockito.spy(client);
+
+            AlterConfigsResult mockResult;
+            {
+                // Create a mock result of unsupported version error
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new 
UnsupportedVersionException("simulated error"));
+                Constructor<AlterConfigsResult> constructor = 
AlterConfigsResult.class.getDeclaredConstructor(java.util.Map.class);
+                constructor.setAccessible(true);
+                mockResult = 
constructor.newInstance(Collections.singletonMap(new 
ConfigResource(ConfigResource.Type.BROKER, ""), future));
+                constructor.setAccessible(false);
+            }

Review Comment:
   Good suggestions.



##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -290,17 +303,125 @@ public void 
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
 
         try (Admin client = cluster.admin()) {
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG, 
"PKCS12"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG, 
"/temp/test.jks"), alterOpts));
             assertThrows(ExecutionException.class,
-                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                    () -> alterConfigWithAdmin(client, 
Optional.of(defaultBrokerId),
                             singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG, 
"password"), alterOpts));
         }
     }
 
+    @ClusterTest
+    public void testUpdateInvalidBrokerConfigs() {
+        updateAndCheckInvalidBrokerConfig(Optional.empty());
+        
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
 + ""));
+    }
+
+    private void updateAndCheckInvalidBrokerConfig(Optional<String> 
brokerIdOrDefault) {
+        List<String> alterOpts = 
generateDefaultAlterOpts(cluster.bootstrapServers());
+        try (Admin client = cluster.admin()) {
+            alterConfigWithAdmin(client, brokerIdOrDefault, 
Collections.singletonMap("invalid", "2"), alterOpts);
+
+            Stream<String> describeCommand = Stream.concat(
+                    Stream.concat(
+                            Stream.of("--bootstrap-server", 
cluster.bootstrapServers()),
+                            Stream.of(entityOp(brokerIdOrDefault).toArray(new 
String[0]))),
+                    Stream.of("--entity-type", "brokers", "--describe"));
+            String describeResult = captureStandardStream(false, 
run(describeCommand));
+
+            // We will treat unknown config as sensitive
+            assertTrue(describeResult.contains("sensitive=true"), 
describeResult);
+            // Sensitive config will not return
+            assertTrue(describeResult.contains("invalid=null"), 
describeResult);
+        }
+    }
+
+    @ClusterTest
+    public void testUpdateInvalidTopicConfigs() throws ExecutionException, 
InterruptedException {
+        List<String> alterOpts = asList("--bootstrap-server", 
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+        try (Admin client = cluster.admin()) {
+            client.createTopics(Collections.singletonList(new 
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+            assertInstanceOf(
+                    InvalidConfigurationException.class,
+                    assertThrows(
+                            ExecutionException.class,
+                            () -> ConfigCommand.alterConfig(
+                                    client,
+                                    new ConfigCommand.ConfigCommandOptions(
+                                            toArray(alterOpts,
+                                                    asList("--add-config", 
"invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
+                    ).getCause()
+            );
+        }
+    }
+
+    // Test case from KAFKA-13788
+    @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", 
value = "2097154"),

Review Comment:
   A comment has been added to explain this config, and optimized the 
indentation.(We don't enforce it in checkstyle.xml and there is not a unified 
config for us, I find the default value in IDEA is 8, maybe we can set it in 
the future but that would change too much code).
   
   
![image](https://github.com/user-attachments/assets/e25325fb-82ab-4dcf-a7ab-7810d4d03cdd)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to