chia7712 commented on code in PR #21633:
URL: https://github.com/apache/kafka/pull/21633#discussion_r2919199771
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -347,15 +352,182 @@ private static void validateValues(Map<?, ?> valueMaps,
GroupCoordinatorConfig g
}
/**
- * Check that the given properties contain only valid consumer group
config names and that all values can be
- * parsed and are valid.
+ * Check that the given properties contain only valid group config names
and that
+ * all values can be parsed and are valid. The provided properties are
merged with
+ * the broker-level defaults before validation.
*/
public static void validate(Properties props, GroupCoordinatorConfig
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
- validateNames(props);
- Map<?, ?> valueMaps = CONFIG.parse(props);
+ Properties combinedConfigs = new Properties();
+
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
+ combinedConfigs.putAll(props);
+
+ validateNames(combinedConfigs);
+ Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateValues(valueMaps, groupCoordinatorConfig, shareGroupConfig);
}
+ /**
+ * Evaluate group config values to their effective values within
broker-level bounds.
+ * Out-of-range values are capped and a WARN log is emitted.
+ *
+ * @param props The raw group config properties.
+ * @param groupId The group id.
+ * @param groupCoordinatorConfig The group coordinator config.
+ * @param shareGroupConfig The share group config.
+ * @return A new Properties with out-of-range values capped.
+ */
+ public static Properties evaluate(
+ Properties props,
+ String groupId,
+ GroupCoordinatorConfig groupCoordinatorConfig,
+ ShareGroupConfig shareGroupConfig
+ ) {
+ Properties effective = new Properties();
+ effective.putAll(props);
+ evaluateValues(effective, groupId, groupCoordinatorConfig,
shareGroupConfig);
+ return effective;
+ }
+
+ private static void evaluateValues(
Review Comment:
If we rely on this function to auto-adjust dynamic configs, the static
validators in ConfigDef do not seem to be necessary. Keeping them is a bit
awkward and error-prone. For example, if we decrease the lower bound of
`group.share.record.lock.duration.ms` but forget to update the lower bound of
`share.record.lock.duration.ms` in the definition, users will be blocked from
lowering the value.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -347,15 +352,182 @@ private static void validateValues(Map<?, ?> valueMaps,
GroupCoordinatorConfig g
}
/**
- * Check that the given properties contain only valid consumer group
config names and that all values can be
- * parsed and are valid.
+ * Check that the given properties contain only valid group config names
and that
+ * all values can be parsed and are valid. The provided properties are
merged with
+ * the broker-level defaults before validation.
*/
public static void validate(Properties props, GroupCoordinatorConfig
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
- validateNames(props);
- Map<?, ?> valueMaps = CONFIG.parse(props);
+ Properties combinedConfigs = new Properties();
+
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
+ combinedConfigs.putAll(props);
+
+ validateNames(combinedConfigs);
+ Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateValues(valueMaps, groupCoordinatorConfig, shareGroupConfig);
}
+ /**
+ * Evaluate group config values to their effective values within
broker-level bounds.
+ * Out-of-range values are capped and a WARN log is emitted.
+ *
+ * @param props The raw group config properties.
+ * @param groupId The group id.
+ * @param groupCoordinatorConfig The group coordinator config.
+ * @param shareGroupConfig The share group config.
+ * @return A new Properties with out-of-range values capped.
+ */
+ public static Properties evaluate(
+ Properties props,
+ String groupId,
+ GroupCoordinatorConfig groupCoordinatorConfig,
+ ShareGroupConfig shareGroupConfig
+ ) {
+ Properties effective = new Properties();
+ effective.putAll(props);
+ evaluateValues(effective, groupId, groupCoordinatorConfig,
shareGroupConfig);
+ return effective;
+ }
+
+ private static void evaluateValues(
+ Properties props,
+ String groupId,
+ GroupCoordinatorConfig groupCoordinatorConfig,
+ ShareGroupConfig shareGroupConfig
+ ) {
+ // Consumer group configs
+ clampToRange(props, groupId, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
+ groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs());
+ clampToRange(props, groupId, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
+ groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs());
+
+ // Share group configs
+ clampToRange(props, groupId, SHARE_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
+ groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs());
+ clampToRange(props, groupId, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
+ groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs());
+ clampToRange(props, groupId, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+ shareGroupConfig.shareGroupMinRecordLockDurationMs(),
+ shareGroupConfig.shareGroupMaxRecordLockDurationMs());
+ clampToRange(props, groupId, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+ shareGroupConfig.shareGroupMinDeliveryCountLimit(),
+ shareGroupConfig.shareGroupMaxDeliveryCountLimit());
+ clampToRange(props, groupId, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+ shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
+ shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks());
+
+ // Streams group configs
+ clampToRange(props, groupId, STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
+ groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs());
+ clampToRange(props, groupId, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
+ groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
+ clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
Review Comment:
Should we use `clampToRange` instead to avoid duplicate code?
```java
clampToRange(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
0,
groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]