majialoong commented on code in PR #21633:
URL: https://github.com/apache/kafka/pull/21633#discussion_r2925700121
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -347,15 +352,182 @@ private static void validateValues(Map<?, ?> valueMaps,
GroupCoordinatorConfig g
}
/**
- * Check that the given properties contain only valid consumer group
config names and that all values can be
- * parsed and are valid.
+ * Check that the given properties contain only valid group config names
and that
+ * all values can be parsed and are valid. The provided properties are
merged with
+ * the broker-level defaults before validation.
*/
public static void validate(Properties props, GroupCoordinatorConfig
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
- validateNames(props);
- Map<?, ?> valueMaps = CONFIG.parse(props);
+ Properties combinedConfigs = new Properties();
+
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
+ combinedConfigs.putAll(props);
+
+ validateNames(combinedConfigs);
+ Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateValues(valueMaps, groupCoordinatorConfig, shareGroupConfig);
}
+ /**
+ * Evaluate group config values to their effective values within
broker-level bounds.
+ * Out-of-range values are capped and a WARN log is emitted.
+ *
+ * @param props The raw group config properties.
+ * @param groupId The group id.
+ * @param groupCoordinatorConfig The group coordinator config.
+ * @param shareGroupConfig The share group config.
+ * @return A new Properties with out-of-range values capped.
+ */
+ public static Properties evaluate(
+ Properties props,
+ String groupId,
+ GroupCoordinatorConfig groupCoordinatorConfig,
+ ShareGroupConfig shareGroupConfig
+ ) {
+ Properties effective = new Properties();
+ effective.putAll(props);
+ evaluateValues(effective, groupId, groupCoordinatorConfig,
shareGroupConfig);
+ return effective;
+ }
+
+ private static void evaluateValues(
+ Properties props,
+ String groupId,
+ GroupCoordinatorConfig groupCoordinatorConfig,
+ ShareGroupConfig shareGroupConfig
+ ) {
+ // Consumer group configs
+ clampToRange(props, groupId, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
+ groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs());
+ clampToRange(props, groupId, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
+ groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs());
+
+ // Share group configs
+ clampToRange(props, groupId, SHARE_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
+ groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs());
+ clampToRange(props, groupId, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
+ groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs());
+ clampToRange(props, groupId, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+ shareGroupConfig.shareGroupMinRecordLockDurationMs(),
+ shareGroupConfig.shareGroupMaxRecordLockDurationMs());
+ clampToRange(props, groupId, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+ shareGroupConfig.shareGroupMinDeliveryCountLimit(),
+ shareGroupConfig.shareGroupMaxDeliveryCountLimit());
+ clampToRange(props, groupId, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+ shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
+ shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks());
+
+ // Streams group configs
+ clampToRange(props, groupId, STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
+ groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs());
+ clampToRange(props, groupId, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
+ groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
+ clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
Review Comment:
Thanks for the suggestion! While the effect would be the same, I went with
`clampToMax` here because it more clearly communicates that this config only
has an upper bound, and avoids introducing a hardcoded 0 as the min.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]