AndrewJSchofield commented on code in PR #17070:
URL: https://github.com/apache/kafka/pull/17070#discussion_r1741670918


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -412,11 +414,12 @@ class BrokerServer(
         replicaManager,
         time,
         shareFetchSessionCache,
-        config.shareGroupConfig.shareGroupRecordLockDurationMs,

Review Comment:
   I don't quite see the point of this change. The purpose of the parameter has 
changed - it was the lock duration, but now its the default lock duration if 
the group does not override it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -41,10 +41,14 @@ public class GroupConfig extends AbstractConfig {
 
     public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
 
+    public static final String SHARE_RECORD_LOCK_DURATION_MS = 
"share.record.lock.duration.ms";
+
     public final int consumerSessionTimeoutMs;
 
     public final int consumerHeartbeatIntervalMs;
 
+    public final int shareRecordLockDurationMs;
+
     private static final ConfigDef CONFIG = new ConfigDef()

Review Comment:
   The way the KIP is written, if the group config is not set, it takes the 
default value from the broker config. I don't think this quite says that. It 
says that the group config's default is the broker config's default, so if I 
modified the broker config with the intention of affecting all share groups, it 
would not work.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -147,4 +167,11 @@ public int sessionTimeoutMs() {
     public int heartbeatIntervalMs() {
         return consumerHeartbeatIntervalMs;
     }
+
+    /**
+     * The share group record lock duration in milliseconds
+     */
+    public int recordLockDurationMs() {

Review Comment:
   Really, calling this `shareRecordLockDurationMs` would be preferable, and 
you should rename the consumer group ones to start with `consumer` such as 
`consumerHeartbeatIntervalMs`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -67,6 +67,13 @@ public Optional<GroupConfig> groupConfig(String groupId) {
         return Optional.ofNullable(configMap.get(groupId));
     }
 
+    public Optional<Integer> getShareGroupRecordLockDurationMs(String groupId) 
{

Review Comment:
   Javadoc please.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -357,6 +378,12 @@ public GroupCoordinatorConfig(AbstractConfig config) {
         require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs,
             String.format("%s must be less than or equals to %s",
                 SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
+        require(shareGroupRecordLockDurationMs >= 
shareGroupMinRecordLockDurationMs,
+                String.format("%s must be greater than or equals to %s",

Review Comment:
   As @smjn mentioned, indentation. At the least, the indentation should match 
the surrounding code.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -162,6 +162,18 @@ public class GroupCoordinatorConfig {
     public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 
15000;
     public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = 
"The maximum heartbeat interval for share group members.";
 
+    public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG = 
"group.share.record.lock.duration.ms";

Review Comment:
   Indeed. I don't think these record lock configs belong in the 
GroupCoordinatorConfig. I would say that ShareGroupConfig is best.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -67,6 +67,13 @@ public Optional<GroupConfig> groupConfig(String groupId) {
         return Optional.ofNullable(configMap.get(groupId));
     }
 
+    public Optional<Integer> getShareGroupRecordLockDurationMs(String groupId) 
{
+        if (!groupConfig(groupId).isPresent()) {

Review Comment:
   Also, the optionality here is only partially correct. It there is no config 
set for the group, it returns empty. But, if I have set any configs for the 
group, regardless of whether it was the share group record lock duration, a 
value will be returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to