dajac commented on code in PR #19904: URL: https://github.com/apache/kafka/pull/19904#discussion_r2131654014
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ########## @@ -204,6 +205,11 @@ public class GroupCoordinatorConfig { ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " + ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled."; + public static final String CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_CONFIG = "group.consumer.regex.batch.refresh.max.interval.ms"; + public static final String CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DOC = "The interval at which the group coordinator will refresh " + + "the topics matching the group subscribed regexes. This is only applicable to consumer groups using the consumer group protocol. "; + public static final int CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DEFAULT = 10 * 60 * 1000; // 10 minutes + Review Comment: We cannot add a new config without a KIP now. We will soon do a KIP to improve regular expressions, I suggest to include it in that one. In the mean time, we can keep it internal, mainly for testing purposes. What do you think? If you agree, let's add a comment to state that it is internal for testing. Regarding the name, I would go with something simpler: `group.consumer.regex.refresh.interval.ms`. What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ########## @@ -308,6 +314,7 @@ public class GroupCoordinatorConfig { .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC) .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC) .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC) + .define(CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DEFAULT, atLeast(REGEX_BATCH_REFRESH_MIN_INTERVAL_MS + 1), MEDIUM, CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DOC) Review Comment: nit: Should we require at least 1 minutes to avoid having too many refreshes? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3134,11 +3137,11 @@ private boolean maybeUpdateRegularExpressions( // 2. The last refresh is older than 10s. If the group does not have any regular // expressions but the current member just brought a new one, we should continue. long lastRefreshTimeMs = group.lastResolvedRegularExpressionRefreshTimeMs(); - if (time.milliseconds() <= lastRefreshTimeMs + REGEX_BATCH_REFRESH_INTERVAL_MS) { + if (time.milliseconds() <= lastRefreshTimeMs + REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) { Review Comment: nit: Let's extract time into a var (e.g. currentTimeMs) as we reuse it now. ########## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ########## @@ -3076,6 +3076,23 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { sendAndReceiveRegexHeartbeat(response, listenerName, None) } + @Test + def testConsumerGroupHeartbeatWithRegexWithGrantedTopicDescribeAcl(): Unit = { Review Comment: Should we add a test in which we start with an ACL, subscribe to the topic, get it, remove the ACL, and finally verify that it is removed after the refresh timeout? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org