m1a2st commented on code in PR #16317:
URL: https://github.com/apache/kafka/pull/16317#discussion_r1642099195


##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -493,36 +496,48 @@ private void alterConfigWithKraft(Admin client, 
Optional<String> brokerId, Map<S
     }
 
     private void verifyConfig(Admin client, Optional<String> brokerId, 
Map<String, String> config) throws Exception {
-        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(""));
         TestUtils.waitForCondition(() -> {
-            Map<String, String> current = 
client.describeConfigs(singletonList(configResource))
-                    .all()
-                    .get()
-                    .values()
-                    .stream()
-                    .flatMap(e -> e.entries().stream())
-                    .collect(HashMap::new, (map, entry) -> 
map.put(entry.name(), entry.value()), HashMap::putAll);
+            Map<String, String> current = getConfigEntryStream(client, 
configResource)
+                    .filter(configEntry -> 
Objects.nonNull(configEntry.value()))
+                    .collect(Collectors.toMap(ConfigEntry::name, 
ConfigEntry::value));
             return config.entrySet().stream().allMatch(e -> 
e.getValue().equals(current.get(e.getKey())));
         }, 10000, config + " are not updated");
     }
 
+    private Stream<ConfigEntry> getConfigEntryStream(Admin client,
+                                                     ConfigResource 
configResource) throws InterruptedException, ExecutionException {
+        return client.describeConfigs(singletonList(configResource))
+                .all()
+                .get()
+                .values()
+                .stream()
+                .flatMap(e -> e.entries().stream());
+    }
+
     private void deleteAndVerifyConfig(Admin client, Optional<String> 
brokerId, Set<String> config) throws Exception {
         ConfigCommand.ConfigCommandOptions deleteOpts =
                 new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId),
                         asList("--delete-config", String.join(",", config))));
         ConfigCommand.alterConfig(client, deleteOpts);
-        verifyConfigDefaultValue(client, brokerId, config);
+        verifyPerBrokerConfigValue(client, brokerId, config);

Review Comment:
   Pardon me, we should emphasis two point in this method, One is that it get 
broker level config, and another is that it get config default value. It's 
right?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to