dajac commented on code in PR #19904:
URL: https://github.com/apache/kafka/pull/19904#discussion_r2131654014


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -204,6 +205,11 @@ public class GroupCoordinatorConfig {
         ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from 
consumer group to classic group is enabled, " +
         ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor 
downgrade is enabled.";
 
+    public static final String 
CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_CONFIG = 
"group.consumer.regex.batch.refresh.max.interval.ms";
+    public static final String 
CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DOC = "The interval at which 
the group coordinator will refresh " +
+        "the topics matching the group subscribed regexes. This is only 
applicable to consumer groups using the consumer group protocol. ";
+    public static final int 
CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DEFAULT = 10 * 60 * 1000; // 
10 minutes
+

Review Comment:
   We cannot add a new config without a KIP now. We will soon do a KIP to 
improve regular expressions, I suggest to include it in that one. In the mean 
time, we can keep it internal, mainly for testing purposes. What do you think? 
If you agree, let's add a comment to state that it is internal for testing.
   
   Regarding the name, I would go with something simpler: 
`group.consumer.regex.refresh.interval.ms`. What do you think?
   



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -308,6 +314,7 @@ public class GroupCoordinatorConfig {
         .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, 
CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, 
CONSUMER_GROUP_MAX_SIZE_DOC)
         .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, 
CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
         .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, 
CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)),
 MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
+        .define(CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_CONFIG, 
INT, CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DEFAULT, 
atLeast(REGEX_BATCH_REFRESH_MIN_INTERVAL_MS + 1), MEDIUM, 
CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DOC)

Review Comment:
   nit: Should we require at least 1 minutes to avoid having too many refreshes?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3134,11 +3137,11 @@ private boolean maybeUpdateRegularExpressions(
         // 2. The last refresh is older than 10s. If the group does not have 
any regular
         //    expressions but the current member just brought a new one, we 
should continue.
         long lastRefreshTimeMs = 
group.lastResolvedRegularExpressionRefreshTimeMs();
-        if (time.milliseconds() <= lastRefreshTimeMs + 
REGEX_BATCH_REFRESH_INTERVAL_MS) {
+        if (time.milliseconds() <= lastRefreshTimeMs + 
REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) {

Review Comment:
   nit: Let's extract time into a var (e.g. currentTimeMs) as we reuse it now.



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -3076,6 +3076,23 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     sendAndReceiveRegexHeartbeat(response, listenerName, None)
   }
 
+  @Test
+  def testConsumerGroupHeartbeatWithRegexWithGrantedTopicDescribeAcl(): Unit = 
{

Review Comment:
   Should we add a test in which we start with an ACL, subscribe to the topic, 
get it, remove the ACL, and finally verify that it is removed after the refresh 
timeout?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to