chia7712 commented on code in PR #16524:
URL: https://github.com/apache/kafka/pull/16524#discussion_r1665921015


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -213,82 +214,151 @@ public class GroupCoordinatorConfig {
      */
     public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 
* 1000;
 
-    private final AbstractConfig config;
+    private final int numThreads;
+    private final int appendLingerMs;
+    private final int consumerGroupSessionTimeoutMs;
+    private final int consumerGroupHeartbeatIntervalMs;
+    private final int consumerGroupMaxSize;
+    private final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors;
+    private final int offsetsTopicSegmentBytes;
+    private final int offsetMetadataMaxSize;
+    private final int classicGroupMaxSize;
+    private final int classicGroupInitialRebalanceDelayMs;
+    private final int classicGroupMinSessionTimeoutMs;
+    private final int classicGroupMaxSessionTimeoutMs;
+    private final long offsetsRetentionCheckIntervalMs;
+    private final long offsetsRetentionMs;
+    private final int offsetCommitTimeoutMs;
+    private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;
+    private final CompressionType offsetTopicCompressionType;
+    private final int offsetsLoadBufferSize;
+    private final int offsetsTopicPartitions;
+    private final short offsetsTopicReplicationFactor;
+    private final short offsetCommitRequiredAcks;
+    private final int consumerGroupMinSessionTimeoutMs;
+    private final int consumerGroupMaxSessionTimeoutMs;
+    private final int consumerGroupMinHeartbeatIntervalMs;
+    private final int consumerGroupMaxHeartbeatIntervalMs;
 
     public GroupCoordinatorConfig(AbstractConfig config) {
-        this.config = config;
+        this.numThreads = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
+        this.appendLingerMs = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
+        this.consumerGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
+        this.consumerGroupHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.consumerGroupMaxSize = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG);
+        this.consumerGroupAssignors = 
config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
 ConsumerGroupPartitionAssignor.class);

Review Comment:
   could you add test to make sure incorrect value can cause error since 
CONSUMER_GROUP_ASSIGNORS_CONFIG` has no validator?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -213,82 +214,151 @@ public class GroupCoordinatorConfig {
      */
     public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 
* 1000;
 
-    private final AbstractConfig config;
+    private final int numThreads;
+    private final int appendLingerMs;
+    private final int consumerGroupSessionTimeoutMs;
+    private final int consumerGroupHeartbeatIntervalMs;
+    private final int consumerGroupMaxSize;
+    private final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors;
+    private final int offsetsTopicSegmentBytes;
+    private final int offsetMetadataMaxSize;
+    private final int classicGroupMaxSize;
+    private final int classicGroupInitialRebalanceDelayMs;
+    private final int classicGroupMinSessionTimeoutMs;
+    private final int classicGroupMaxSessionTimeoutMs;
+    private final long offsetsRetentionCheckIntervalMs;
+    private final long offsetsRetentionMs;
+    private final int offsetCommitTimeoutMs;
+    private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;
+    private final CompressionType offsetTopicCompressionType;
+    private final int offsetsLoadBufferSize;
+    private final int offsetsTopicPartitions;
+    private final short offsetsTopicReplicationFactor;
+    private final short offsetCommitRequiredAcks;
+    private final int consumerGroupMinSessionTimeoutMs;
+    private final int consumerGroupMaxSessionTimeoutMs;
+    private final int consumerGroupMinHeartbeatIntervalMs;
+    private final int consumerGroupMaxHeartbeatIntervalMs;
 
     public GroupCoordinatorConfig(AbstractConfig config) {
-        this.config = config;
+        this.numThreads = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
+        this.appendLingerMs = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
+        this.consumerGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
+        this.consumerGroupHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.consumerGroupMaxSize = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG);
+        this.consumerGroupAssignors = 
config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
 ConsumerGroupPartitionAssignor.class);
+        this.offsetsTopicSegmentBytes = 
config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG);
+        this.offsetMetadataMaxSize = 
config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG);
+        this.classicGroupMaxSize = 
config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG);
+        this.classicGroupInitialRebalanceDelayMs = 
config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+        this.classicGroupMinSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+        this.classicGroupMaxSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+        this.offsetsRetentionCheckIntervalMs = 
config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG);
+        this.offsetsRetentionMs = 
config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 
1000L;
+        this.offsetCommitTimeoutMs = 
config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+        this.consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(
+                
config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
+        this.offsetTopicCompressionType = 
Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG))

Review Comment:
   ditto. please add test. Also, maybe we should add validator to the def of 
`OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -213,82 +214,151 @@ public class GroupCoordinatorConfig {
      */
     public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 
* 1000;
 
-    private final AbstractConfig config;
+    private final int numThreads;
+    private final int appendLingerMs;
+    private final int consumerGroupSessionTimeoutMs;
+    private final int consumerGroupHeartbeatIntervalMs;
+    private final int consumerGroupMaxSize;
+    private final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors;
+    private final int offsetsTopicSegmentBytes;
+    private final int offsetMetadataMaxSize;
+    private final int classicGroupMaxSize;
+    private final int classicGroupInitialRebalanceDelayMs;
+    private final int classicGroupMinSessionTimeoutMs;
+    private final int classicGroupMaxSessionTimeoutMs;
+    private final long offsetsRetentionCheckIntervalMs;
+    private final long offsetsRetentionMs;
+    private final int offsetCommitTimeoutMs;
+    private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;
+    private final CompressionType offsetTopicCompressionType;
+    private final int offsetsLoadBufferSize;
+    private final int offsetsTopicPartitions;
+    private final short offsetsTopicReplicationFactor;
+    private final short offsetCommitRequiredAcks;
+    private final int consumerGroupMinSessionTimeoutMs;
+    private final int consumerGroupMaxSessionTimeoutMs;
+    private final int consumerGroupMinHeartbeatIntervalMs;
+    private final int consumerGroupMaxHeartbeatIntervalMs;
 
     public GroupCoordinatorConfig(AbstractConfig config) {
-        this.config = config;
+        this.numThreads = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
+        this.appendLingerMs = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
+        this.consumerGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
+        this.consumerGroupHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.consumerGroupMaxSize = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG);
+        this.consumerGroupAssignors = 
config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
 ConsumerGroupPartitionAssignor.class);
+        this.offsetsTopicSegmentBytes = 
config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG);
+        this.offsetMetadataMaxSize = 
config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG);
+        this.classicGroupMaxSize = 
config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG);
+        this.classicGroupInitialRebalanceDelayMs = 
config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+        this.classicGroupMinSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+        this.classicGroupMaxSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+        this.offsetsRetentionCheckIntervalMs = 
config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG);
+        this.offsetsRetentionMs = 
config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 
1000L;
+        this.offsetCommitTimeoutMs = 
config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+        this.consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(

Review Comment:
   ditto



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -39,6 +39,7 @@
 import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
 import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
 import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+import static org.apache.kafka.common.utils.Utils.require;
 
 /**
  * The group coordinator configurations.

Review Comment:
   please add comments why we create local attributes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to