0xffff-zhiyan commented on code in PR #21053:
URL: https://github.com/apache/kafka/pull/21053#discussion_r2922361289
##########
metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java:
##########
@@ -45,6 +48,11 @@ public void finishSnapshot() {
}
public void replay(ConfigRecord record) {
+ if (!supportedConfigChecker.isSupported(image.resource().type(),
record.name())) {
Review Comment:
Yes, you're right. Changed to use
`ConfigResource.Type.forId(record.resourceType())` which is more explicit so we
doesn't require an assertion.
##########
metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java:
##########
@@ -116,6 +120,69 @@ public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
+ @Test
+ public void testConfigurationDeltaFiltering() {
+ Set<String> validConfigs = Set.of("foo", "bar");
+ SupportedConfigChecker supportedConfigChecker = (resourceType,
configName) -> validConfigs.contains(configName);
+
+ Map<String, String> initialConfigs = Map.of("foo", "value1"); // valid
+ ConfigurationImage image = new ConfigurationImage(new
ConfigResource(BROKER, "0"), initialConfigs);
+
+ ConfigurationDelta delta = new ConfigurationDelta(image,
supportedConfigChecker);
+ delta.replay(new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("bar").setValue("value2"));
+ delta.replay(new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("qux").setValue("value3"));
+
+ ConfigurationImage result = delta.apply();
+
+ assertTrue(result.data().containsKey("foo"));
+ assertTrue(result.data().containsKey("bar"));
+ assertFalse(result.data().containsKey("qux"));
+ }
+
+ @Test
+ public void testConfigurationDeltaWithoutFiltering() {
+ Map<String, String> initialConfigs = Map.of("foo", "value1", "bar",
"value2");
+ ConfigurationImage image = new ConfigurationImage(new
ConfigResource(BROKER, "0"), initialConfigs);
+
+ ConfigurationDelta delta = new ConfigurationDelta(image,
SupportedConfigChecker.TRUE);
+ delta.replay(new
ConfigRecord().setResourceType(BROKER.id()).setResourceName("0")
+ .setName("baz").setValue("value3"));
+
+ ConfigurationImage result = delta.apply();
+
+ assertTrue(result.data().containsKey("foo"));
+ assertTrue(result.data().containsKey("bar"));
+ assertTrue(result.data().containsKey("baz"));
+ }
+
+ @Test
+ public void testConfigurationDeltaPreventsInvalidConfigsInResultingImage()
{
+ Set<String> validConfigs = Set.of("foo", "bar");
+ SupportedConfigChecker supportedConfigChecker = (resourceType,
configName) -> validConfigs.contains(configName);
+
+ Map<String, String> initialConfigs = Map.of(
+ "foo", "value1", // valid
+ "bar", "value2", // valid
+ "invalid", "value3" // invalid
+ );
+ ConfigurationImage image = new ConfigurationImage(new
ConfigResource(BROKER, "0"), initialConfigs);
Review Comment:
Removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]