lucasbru commented on code in PR #21724:
URL: https://github.com/apache/kafka/pull/21724#discussion_r2930512225
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1664,29 +1664,60 @@ private void validateRackAwarenessConfiguration() {
final Map<String, String> clientTags = getClientTags();
if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
- throw new ConfigException("At most " +
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
- "can be specified using " +
CLIENT_TAG_PREFIX + " prefix.");
+ throw new ConfigException(
+ String.format(
+ "At most %s client tags can be specified using %s prefix.",
+ MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+ CLIENT_TAG_PREFIX
+ )
+ );
}
for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
+ if (rackAwareAssignmentTag.isEmpty()) {
+ throw new ConfigException(
+ RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+ rackAwareAssignmentTags,
+ "Contains invalid value []. Tag key cannot be empty."
+ );
+ }
if (!clientTags.containsKey(rackAwareAssignmentTag)) {
- throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
- rackAwareAssignmentTags,
- "Contains invalid value [" +
rackAwareAssignmentTag + "] " +
- "which doesn't have corresponding
tag set via [" + CLIENT_TAG_PREFIX + "] prefix.");
+ throw new ConfigException(
+ RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+ rackAwareAssignmentTags,
+ String.format(
+ "Contains invalid value [%s] which doesn't have
corresponding tag set via [%s] prefix.",
+ rackAwareAssignmentTag,
+ CLIENT_TAG_PREFIX
+ )
+ );
}
}
clientTags.forEach((tagKey, tagValue) -> {
+ if (tagKey.trim().isEmpty()) {
+ throw new ConfigException("Invalid config `client.tag.`
(missing client tag key).");
+ }
+ if (tagValue.trim().isEmpty()) {
+ throw new ConfigException(
+ CLIENT_TAG_PREFIX + tagKey,
+ "[]",
Review Comment:
The second argument to `ConfigException` is supposed to be the actual
configured value. Passing a hardcoded `"[]"` is misleading — the user set `" "`
(whitespace), not `[]`. Consider passing `tagValue` instead so the error
message reflects what was actually configured.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1664,29 +1664,60 @@ private void validateRackAwarenessConfiguration() {
final Map<String, String> clientTags = getClientTags();
if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
- throw new ConfigException("At most " +
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
- "can be specified using " +
CLIENT_TAG_PREFIX + " prefix.");
+ throw new ConfigException(
+ String.format(
+ "At most %s client tags can be specified using %s prefix.",
+ MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+ CLIENT_TAG_PREFIX
+ )
+ );
}
for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
+ if (rackAwareAssignmentTag.isEmpty()) {
+ throw new ConfigException(
+ RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+ rackAwareAssignmentTags,
+ "Contains invalid value []. Tag key cannot be empty."
+ );
+ }
if (!clientTags.containsKey(rackAwareAssignmentTag)) {
- throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
- rackAwareAssignmentTags,
- "Contains invalid value [" +
rackAwareAssignmentTag + "] " +
- "which doesn't have corresponding
tag set via [" + CLIENT_TAG_PREFIX + "] prefix.");
+ throw new ConfigException(
+ RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+ rackAwareAssignmentTags,
+ String.format(
+ "Contains invalid value [%s] which doesn't have
corresponding tag set via [%s] prefix.",
+ rackAwareAssignmentTag,
+ CLIENT_TAG_PREFIX
+ )
+ );
}
}
clientTags.forEach((tagKey, tagValue) -> {
+ if (tagKey.trim().isEmpty()) {
+ throw new ConfigException("Invalid config `client.tag.`
(missing client tag key).");
Review Comment:
This uses the single-argument `ConfigException` constructor with a fully
custom message, while all other errors in this method use the three-argument
constructor (`name, value, message`). For consistency, consider using the
three-argument form here too, e.g. `new ConfigException(CLIENT_TAG_PREFIX,
tagKey, "Tag key cannot be empty.")`.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1664,29 +1664,60 @@ private void validateRackAwarenessConfiguration() {
final Map<String, String> clientTags = getClientTags();
if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
- throw new ConfigException("At most " +
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
- "can be specified using " +
CLIENT_TAG_PREFIX + " prefix.");
+ throw new ConfigException(
+ String.format(
+ "At most %s client tags can be specified using %s prefix.",
+ MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+ CLIENT_TAG_PREFIX
+ )
+ );
}
for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
+ if (rackAwareAssignmentTag.isEmpty()) {
Review Comment:
This checks `isEmpty()` without `trim()`, while below at line 1698 you check
`tagKey.trim().isEmpty()`. This is technically correct because
`COMMA_WITH_WHITESPACE` already strips whitespace during list parsing, but the
inconsistency could confuse future readers. Maybe worth a brief comment
explaining why `trim()` isn't needed here.
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1182,7 +1182,34 @@ public void shouldGetClientTagsMapWhenSet() {
@Test
public void
shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithUnknownTags() {
props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, "cluster");
- assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertEquals(
+ "Invalid value [cluster] for configuration
rack.aware.assignment.tags: Contains invalid value [cluster] which doesn't have
corresponding tag set via [client.tag.] prefix.",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void shouldAllowWhitespacesInRackAwareAssignmentTagsList() {
+ props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, " zone ,
cluster ");
+ props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a");
+ props.put(StreamsConfig.clientTagPrefix("cluster"), "cluster-1");
+ final StreamsConfig config = new StreamsConfig(props);
+ final Map<String, String> clientTags = config.getClientTags();
+ assertEquals(2, clientTags.size());
+ assertEquals("eu-central-1a", clientTags.get("zone"));
+ assertEquals("cluster-1", clientTags.get("cluster"));
Review Comment:
This test validates existing behavior (the `COMMA_WITH_WHITESPACE` pattern
in `ConfigDef` already handles whitespace around commas) rather than new code
from this PR. Fine to keep as a regression test, but maybe rename to make clear
it's documenting existing behavior rather than testing new functionality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]