DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1661004357


##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -69,6 +70,13 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
     }
   }
 
+  private def validateGroupName(
+    name: String): Unit = {

Review Comment:
   Done



##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -114,6 +122,22 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
         val properties = new Properties()
         config.forEach((key, value) => properties.setProperty(key, value))
         ClientMetricsConfigs.validate(resource.name(), properties)
+      case GROUP =>
+        validateGroupName(resource.name())
+        val properties = new Properties()
+        val nullGroupConfigs = new mutable.ArrayBuffer[String]()
+        config.entrySet().forEach(e => {

Review Comment:
   Done



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -615,7 +615,6 @@ class KafkaServer(
                                                            ConfigType.USER -> 
new UserConfigHandler(quotaManagers, credentialProvider),
                                                            ConfigType.BROKER 
-> new BrokerConfigHandler(config, quotaManagers),
                                                            ConfigType.IP -> 
new IpConfigHandler(socketServer.connectionQuotas))
-

Review Comment:
   Done



##########
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##########
@@ -479,7 +479,6 @@ class ZkAdminManager(val config: KafkaConfig,
 
     resource -> ApiError.NONE
   }
-

Review Comment:
   Done



##########
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##########
@@ -524,7 +523,6 @@ class ZkAdminManager(val config: KafkaConfig,
             val configProps = 
this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
             prepareIncrementalConfigs(alterConfigOps, configProps, 
KafkaConfig.configKeys)
             alterBrokerConfigs(resource, validateOnly, configProps, 
configEntriesMap)
-

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+    public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+    public static final String CONSUMER_SESSION_TIMEOUT_MS_DOC
+        = "The timeout to detect client failures when using the consumer group 
protocol.";
+
+    public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+    public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_DOC
+        = "The heartbeat interval given to the members of a consumer group.";
+
+    public static final int DEFAULT_CONSUMER_GROUP_SESSION_TIMEOUT_MS = 45 * 
1000;
+
+    public static final int DEFAULT_CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS = 5 * 
1000;

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+    public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+    public static final String CONSUMER_SESSION_TIMEOUT_MS_DOC
+        = "The timeout to detect client failures when using the consumer group 
protocol.";
+
+    public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+    public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_DOC
+        = "The heartbeat interval given to the members of a consumer group.";
+
+    public static final int DEFAULT_CONSUMER_GROUP_SESSION_TIMEOUT_MS = 45 * 
1000;
+
+    public static final int DEFAULT_CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS = 5 * 
1000;
+
+    private static final ConfigDef CONFIG = new ConfigDef();
+
+    static {
+        CONFIG
+            .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
DEFAULT_CONSUMER_GROUP_SESSION_TIMEOUT_MS, atLeast(1),
+                MEDIUM, CONSUMER_SESSION_TIMEOUT_MS_DOC)
+            .define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
DEFAULT_CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS,
+                atLeast(1),
+                MEDIUM, CONSUMER_HEARTBEAT_INTERVAL_MS_DOC);

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+    public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+    public static final String CONSUMER_SESSION_TIMEOUT_MS_DOC
+        = "The timeout to detect client failures when using the consumer group 
protocol.";
+
+    public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+    public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_DOC
+        = "The heartbeat interval given to the members of a consumer group.";
+
+    public static final int DEFAULT_CONSUMER_GROUP_SESSION_TIMEOUT_MS = 45 * 
1000;
+
+    public static final int DEFAULT_CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS = 5 * 
1000;
+
+    private static final ConfigDef CONFIG = new ConfigDef();
+
+    static {

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+    public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+    public static final String CONSUMER_SESSION_TIMEOUT_MS_DOC
+        = "The timeout to detect client failures when using the consumer group 
protocol.";
+
+    public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+    public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_DOC
+        = "The heartbeat interval given to the members of a consumer group.";
+
+    public static final int DEFAULT_CONSUMER_GROUP_SESSION_TIMEOUT_MS = 45 * 
1000;
+
+    public static final int DEFAULT_CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS = 5 * 
1000;
+
+    private static final ConfigDef CONFIG = new ConfigDef();
+
+    static {
+        CONFIG
+            .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
DEFAULT_CONSUMER_GROUP_SESSION_TIMEOUT_MS, atLeast(1),
+                MEDIUM, CONSUMER_SESSION_TIMEOUT_MS_DOC)
+            .define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
DEFAULT_CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS,
+                atLeast(1),
+                MEDIUM, CONSUMER_HEARTBEAT_INTERVAL_MS_DOC);
+    }
+
+    public GroupConfig(Map<?, ?> props) {
+        super(CONFIG, props, false);
+    }
+
+    public static List<String> configNames() {
+        return new ArrayList<>(CONFIG.names());
+    }
+
+    /**
+     * Check that property names are valid
+     */
+    public static void validateNames(Properties props) {
+        List<String> names = configNames();

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The group config manager is responsible for config modification and 
cleaning.
+ */
+public class GroupConfigManager {
+
+    private final GroupConfig defaultConfig;
+
+    private final Map<String, GroupConfig> configMap;
+
+    public GroupConfigManager(Map<?, ?>  defaultConfig) {
+        this.configMap = new ConcurrentHashMap<>();
+        this.defaultConfig = new GroupConfig(defaultConfig);
+    }
+
+    /**
+     * Update the configuration of the provided group.
+     *
+     * @param groupId                   The group id.
+     * @param newGroupConfig            The new group config.
+     */
+    public void updateGroupConfig(String groupId, Properties newGroupConfig) {
+        if (null == groupId || groupId.isEmpty()) {
+            throw new InvalidRequestException("Group name can't be empty.");
+        }
+
+        // Validate the configuration
+        GroupConfig.validate(newGroupConfig);
+
+        final GroupConfig newConfig = 
GroupConfig.fromProps(defaultConfig.originals(),
+            newGroupConfig);

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The group config manager is responsible for config modification and 
cleaning.
+ */
+public class GroupConfigManager {
+
+    private final GroupConfig defaultConfig;
+
+    private final Map<String, GroupConfig> configMap;
+
+    public GroupConfigManager(Map<?, ?>  defaultConfig) {
+        this.configMap = new ConcurrentHashMap<>();
+        this.defaultConfig = new GroupConfig(defaultConfig);
+    }
+
+    /**
+     * Update the configuration of the provided group.
+     *
+     * @param groupId                   The group id.
+     * @param newGroupConfig            The new group config.
+     */
+    public void updateGroupConfig(String groupId, Properties newGroupConfig) {
+        if (null == groupId || groupId.isEmpty()) {
+            throw new InvalidRequestException("Group name can't be empty.");
+        }
+
+        // Validate the configuration
+        GroupConfig.validate(newGroupConfig);
+
+        final GroupConfig newConfig = 
GroupConfig.fromProps(defaultConfig.originals(),
+            newGroupConfig);
+        configMap.put(groupId, newConfig);
+    }
+
+    /**
+     * Get the group config if it exists, otherwise return None.
+     *
+     * @param groupId  The group id.
+     * @return The group config.
+     */
+    public Optional<GroupConfig> getGroupConfig(String groupId) {

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The group config manager is responsible for config modification and 
cleaning.
+ */
+public class GroupConfigManager {
+
+    private final GroupConfig defaultConfig;
+
+    private final Map<String, GroupConfig> configMap;
+
+    public GroupConfigManager(Map<?, ?>  defaultConfig) {

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The group config manager is responsible for config modification and 
cleaning.
+ */
+public class GroupConfigManager {
+
+    private final GroupConfig defaultConfig;
+
+    private final Map<String, GroupConfig> configMap;
+
+    public GroupConfigManager(Map<?, ?>  defaultConfig) {
+        this.configMap = new ConcurrentHashMap<>();
+        this.defaultConfig = new GroupConfig(defaultConfig);
+    }
+
+    /**
+     * Update the configuration of the provided group.
+     *
+     * @param groupId                   The group id.
+     * @param newGroupConfig            The new group config.
+     */
+    public void updateGroupConfig(String groupId, Properties newGroupConfig) {
+        if (null == groupId || groupId.isEmpty()) {
+            throw new InvalidRequestException("Group name can't be empty.");
+        }
+
+        // Validate the configuration
+        GroupConfig.validate(newGroupConfig);
+
+        final GroupConfig newConfig = 
GroupConfig.fromProps(defaultConfig.originals(),
+            newGroupConfig);
+        configMap.put(groupId, newConfig);
+    }
+
+    /**
+     * Get the group config if it exists, otherwise return None.
+     *
+     * @param groupId  The group id.
+     * @return The group config.
+     */
+    public Optional<GroupConfig> getGroupConfig(String groupId) {
+        if (configMap.containsKey(groupId))
+            return Optional.of(configMap.get(groupId));
+
+        return Optional.empty();

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -158,6 +164,9 @@ public GroupCoordinatorService build() {
                 throw new IllegalArgumentException("CoordinatorRuntimeMetrics 
must be set.");
             if (groupCoordinatorMetrics == null)
                 throw new IllegalArgumentException("GroupCoordinatorMetrics 
must be set.");
+            if (groupConfigManager == null) {
+                throw new IllegalArgumentException("GroupConfigManager must be 
set.");
+            }

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4784,6 +4801,25 @@ public Set<String> groupIds() {
         return Collections.unmodifiableSet(this.groups.keySet());
     }
 
+
+    /**
+     * Get the session timeout of the provided group.
+     */
+    private int getConsumerGroupSessionTimeoutMs(String groupId) {

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4784,6 +4801,25 @@ public Set<String> groupIds() {
         return Collections.unmodifiableSet(this.groups.keySet());
     }
 
+
+    /**
+     * Get the session timeout of the provided group.
+     */
+    private int getConsumerGroupSessionTimeoutMs(String groupId) {
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.getGroupConfig(groupId);
+        return groupConfig.map(GroupConfig::sessionTimeoutMs)
+            .orElse(consumerGroupSessionTimeoutMs);
+    }
+
+    /**
+     * Get the heartbeat interval of the provided group.
+     */
+    private int getConsumerGroupHeartbeatIntervalMs(String groupId) {

Review Comment:
   Done



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##########
@@ -387,6 +387,8 @@ public static class Builder {
         private int classicGroupMinSessionTimeoutMs = 10;
         private int classicGroupMaxSessionTimeoutMs = 10 * 60 * 1000;
         private final GroupCoordinatorMetricsShard metrics = 
mock(GroupCoordinatorMetricsShard.class);
+

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -118,6 +119,7 @@ public static class Builder<S extends CoordinatorShard<U>, 
U> {
         private CoordinatorMetrics coordinatorMetrics;
         private Serializer<U> serializer;
         private Compression compression;
+        private GroupConfigManager groupConfigManager;

Review Comment:
   Done



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -103,6 +103,7 @@ public static class Builder {
             private MetadataImage metadataImage = null;

Review Comment:
   Done



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -333,6 +335,8 @@ class BrokerServer(
       tokenManager = new DelegationTokenManager(config, tokenCache, time)
       tokenManager.startup()
 
+      groupConfigManager = new GroupConfigManager(config.extractGroupConfigMap)

Review Comment:
   Done



##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -266,3 +267,12 @@ class ClientMetricsConfigHandler(private val 
clientMetricsManager: ClientMetrics
     clientMetricsManager.updateSubscription(subscriptionGroupId, properties)
   }
 }
+
+/**
+ * The GroupConfigHandler will process individual group config changes.
+ */
+class GroupConfigHandler(private val groupConfigManager: GroupConfigManager) 
extends ConfigHandler with Logging {

Review Comment:
   Done



##########
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##########
@@ -566,6 +567,44 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))

Review Comment:
   Done



##########
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##########
@@ -566,6 +567,44 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testDynamicGroupConfigChange(quorum: String): Unit = {
+    val newSessionTimeoutMs = 50000
+    val consumerGroupId = "group-foo"
+    val admin = createAdminClient()
+    try {
+      val resource = new ConfigResource(ConfigResource.Type.GROUP, 
consumerGroupId)
+      val op = new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
newSessionTimeoutMs.toString),
+        OpType.SET)
+      admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all.get
+    } finally {
+      admin.close()
+    }
+
+    TestUtils.retry(10000) {
+      val configOpt = 
brokerServers.head.groupConfigManager.getGroupConfig(consumerGroupId)
+      assertTrue(configOpt.isPresent)
+    }
+
+    val groupConfig = 
brokerServers.head.groupConfigManager.getGroupConfig(consumerGroupId).get()
+    assertEquals(newSessionTimeoutMs, groupConfig.sessionTimeoutMs)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+    public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+    public static final String CONSUMER_SESSION_TIMEOUT_MS_DOC
+        = "The timeout to detect client failures when using the consumer group 
protocol.";
+
+    public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+    public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_DOC
+        = "The heartbeat interval given to the members of a consumer group.";
+
+    public static final int DEFAULT_CONSUMER_GROUP_SESSION_TIMEOUT_MS = 45 * 
1000;
+
+    public static final int DEFAULT_CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS = 5 * 
1000;
+
+    private static final ConfigDef CONFIG = new ConfigDef();
+
+    static {
+        CONFIG
+            .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
DEFAULT_CONSUMER_GROUP_SESSION_TIMEOUT_MS, atLeast(1),
+                MEDIUM, CONSUMER_SESSION_TIMEOUT_MS_DOC)
+            .define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
DEFAULT_CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS,
+                atLeast(1),
+                MEDIUM, CONSUMER_HEARTBEAT_INTERVAL_MS_DOC);
+    }
+
+    public GroupConfig(Map<?, ?> props) {
+        super(CONFIG, props, false);
+    }
+
+    public static List<String> configNames() {
+        return new ArrayList<>(CONFIG.names());
+    }
+
+    /**
+     * Check that property names are valid
+     */
+    public static void validateNames(Properties props) {
+        List<String> names = configNames();
+        for (Object name : props.keySet()) {
+            if (!names.contains(name)) {
+                throw new InvalidConfigurationException("Unknown group config 
name: " + name);
+            }
+        }
+    }
+
+    public static void validate(Properties props) {
+        validateNames(props);
+        CONFIG.parse(props);
+    }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to