AndrewJSchofield commented on code in PR #17070: URL: https://github.com/apache/kafka/pull/17070#discussion_r1741670918
########## core/src/main/scala/kafka/server/BrokerServer.scala: ########## @@ -412,11 +414,12 @@ class BrokerServer( replicaManager, time, shareFetchSessionCache, - config.shareGroupConfig.shareGroupRecordLockDurationMs, Review Comment: I don't quite see the point of this change. The purpose of the parameter has changed - it was the lock duration, but now its the default lock duration if the group does not override it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java: ########## @@ -41,10 +41,14 @@ public class GroupConfig extends AbstractConfig { public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = "consumer.heartbeat.interval.ms"; + public static final String SHARE_RECORD_LOCK_DURATION_MS = "share.record.lock.duration.ms"; + public final int consumerSessionTimeoutMs; public final int consumerHeartbeatIntervalMs; + public final int shareRecordLockDurationMs; + private static final ConfigDef CONFIG = new ConfigDef() Review Comment: The way the KIP is written, if the group config is not set, it takes the default value from the broker config. I don't think this quite says that. It says that the group config's default is the broker config's default, so if I modified the broker config with the intention of affecting all share groups, it would not work. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java: ########## @@ -147,4 +167,11 @@ public int sessionTimeoutMs() { public int heartbeatIntervalMs() { return consumerHeartbeatIntervalMs; } + + /** + * The share group record lock duration in milliseconds + */ + public int recordLockDurationMs() { Review Comment: Really, calling this `shareRecordLockDurationMs` would be preferable, and you should rename the consumer group ones to start with `consumer` such as `consumerHeartbeatIntervalMs`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java: ########## @@ -67,6 +67,13 @@ public Optional<GroupConfig> groupConfig(String groupId) { return Optional.ofNullable(configMap.get(groupId)); } + public Optional<Integer> getShareGroupRecordLockDurationMs(String groupId) { Review Comment: Javadoc please. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ########## @@ -357,6 +378,12 @@ public GroupCoordinatorConfig(AbstractConfig config) { require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs, String.format("%s must be less than or equals to %s", SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); + require(shareGroupRecordLockDurationMs >= shareGroupMinRecordLockDurationMs, + String.format("%s must be greater than or equals to %s", Review Comment: As @smjn mentioned, indentation. At the least, the indentation should match the surrounding code. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ########## @@ -162,6 +162,18 @@ public class GroupCoordinatorConfig { public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000; public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members."; + public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG = "group.share.record.lock.duration.ms"; Review Comment: Indeed. I don't think these record lock configs belong in the GroupCoordinatorConfig. I would say that ShareGroupConfig is best. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java: ########## @@ -67,6 +67,13 @@ public Optional<GroupConfig> groupConfig(String groupId) { return Optional.ofNullable(configMap.get(groupId)); } + public Optional<Integer> getShareGroupRecordLockDurationMs(String groupId) { + if (!groupConfig(groupId).isPresent()) { Review Comment: Also, the optionality here is only partially correct. It there is no config set for the group, it returns empty. But, if I have set any configs for the group, regardless of whether it was the share group record lock duration, a value will be returned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org