majialoong commented on code in PR #21633:
URL: https://github.com/apache/kafka/pull/21633#discussion_r2925700121


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -347,15 +352,182 @@ private static void validateValues(Map<?, ?> valueMaps, 
GroupCoordinatorConfig g
     }
 
     /**
-     * Check that the given properties contain only valid consumer group 
config names and that all values can be
-     * parsed and are valid.
+     * Check that the given properties contain only valid group config names 
and that
+     * all values can be parsed and are valid. The provided properties are 
merged with
+     * the broker-level defaults before validation.
      */
     public static void validate(Properties props, GroupCoordinatorConfig 
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
-        validateNames(props);
-        Map<?, ?> valueMaps = CONFIG.parse(props);
+        Properties combinedConfigs = new Properties();
+        
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
+        combinedConfigs.putAll(props);
+
+        validateNames(combinedConfigs);
+        Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
         validateValues(valueMaps, groupCoordinatorConfig, shareGroupConfig);
     }
 
+    /**
+     * Evaluate group config values to their effective values within 
broker-level bounds.
+     * Out-of-range values are capped and a WARN log is emitted.
+     *
+     * @param props                  The raw group config properties.
+     * @param groupId                The group id.
+     * @param groupCoordinatorConfig The group coordinator config.
+     * @param shareGroupConfig       The share group config.
+     * @return A new Properties with out-of-range values capped.
+     */
+    public static Properties evaluate(
+        Properties props,
+        String groupId,
+        GroupCoordinatorConfig groupCoordinatorConfig,
+        ShareGroupConfig shareGroupConfig
+    ) {
+        Properties effective = new Properties();
+        effective.putAll(props);
+        evaluateValues(effective, groupId, groupCoordinatorConfig, 
shareGroupConfig);
+        return effective;
+    }
+
+    private static void evaluateValues(
+        Properties props,
+        String groupId,
+        GroupCoordinatorConfig groupCoordinatorConfig,
+        ShareGroupConfig shareGroupConfig
+    ) {
+        // Consumer group configs
+        clampToRange(props, groupId, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
+            groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs());
+        clampToRange(props, groupId, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
+            groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs());
+
+        // Share group configs
+        clampToRange(props, groupId, SHARE_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
+            groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs());
+        clampToRange(props, groupId, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
+            groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs());
+        clampToRange(props, groupId, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+            shareGroupConfig.shareGroupMinRecordLockDurationMs(),
+            shareGroupConfig.shareGroupMaxRecordLockDurationMs());
+        clampToRange(props, groupId, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+            shareGroupConfig.shareGroupMinDeliveryCountLimit(),
+            shareGroupConfig.shareGroupMaxDeliveryCountLimit());
+        clampToRange(props, groupId, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+            shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
+            shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks());
+
+        // Streams group configs
+        clampToRange(props, groupId, STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
+            groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs());
+        clampToRange(props, groupId, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
+            groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
+        clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,

Review Comment:
   Thanks for the suggestion! While the effect would be the same, I went with 
`clampToMax` here because it more clearly communicates that this config only 
has an upper bound, and avoids introducing a hardcoded 0 as the min. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to