Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac merged PR #15411: URL: https://github.com/apache/kafka/pull/15411 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on PR #15411: URL: https://github.com/apache/kafka/pull/15411#issuecomment-2046712467 Re-triggered a build as last one had errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1557562821 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1046,6 +1054,7 @@ object KafkaConfig { .define(ConsumerGroupMaxHeartbeatIntervalMsProp, INT, Defaults.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, ConsumerGroupMaxHeartbeatIntervalMsDoc) .define(ConsumerGroupMaxSizeProp, INT, Defaults.CONSUMER_GROUP_MAX_SIZE, atLeast(1), MEDIUM, ConsumerGroupMaxSizeDoc) .define(ConsumerGroupAssignorsProp, LIST, Defaults.CONSUMER_GROUP_ASSIGNORS, null, MEDIUM, ConsumerGroupAssignorsDoc) + .defineInternal(ConsumerGroupMigrationPolicyProp, STRING, Defaults.CONSUMER_GROUP_MIGRATION_POLICY, in(Utils.enumOptions(classOf[ConsumerGroupMigrationPolicy]): _*), MEDIUM, ConsumerGroupMigrationPolicyDoc) Review Comment: Could we use `ConfigDef.CaseInsensitiveValidString.in` and extend the test to ensure that it is case insensitive? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupMigrationPolicy { +/** Both upgrade and downgrade are enabled.*/ +BIDIRECTIONAL("bidirectional"), + +/** Only upgrade is enabled.*/ +UPGRADE("upgrade"), + +/** Only downgrade is enabled.*/ +DOWNGRADE("downgrade"), + +/** Neither upgrade nor downgrade is enabled.*/ +DISABLED("disabled"); + +private final String name; + +ConsumerGroupMigrationPolicy(String name) { +this.name = name; +} + +@Override +public String toString() { +return name; +} + +private final static Map NAME_TO_ENUM = Arrays.stream(values()) +.collect(Collectors.toMap(policy -> policy.name.toLowerCase(Locale.ROOT), Function.identity())); + +/** + * Parse a string into the corresponding {@code GroupProtocolMigrationPolicy} enum value, in a case-insensitive manner. + * + * @return The {{@link ConsumerGroupMigrationPolicy}} according to the string passed. None is returned if + * the string doesn't correspond to a valid policy. + */ +public static ConsumerGroupMigrationPolicy parse(String name) { +if (name == null) { +return DISABLED; +} +ConsumerGroupMigrationPolicy policy = NAME_TO_ENUM.get(name.toLowerCase(Locale.ROOT)); + +return policy == null ? DISABLED : policy; +} + Review Comment: nit: This empty line could be removed. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -677,6 +679,12 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval for registered consumers." val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single consumer group can accommodate." val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor." + val ConsumerGroupMigrationPolicyDoc = "The config that enables converting the non-empty classic group using the consumer embedded protocol to the non-empty consumer group using the consumer group protocol and vice versa; " + +"conversions of empty groups in both directions are always enabled regardless of this policy. " + +ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled, " + +ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade is enabled, " + Review Comment: nit: `upgrade` -> `upgrade from classic group to consumer group`. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -677,6 +679,12 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dongnuo123 commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1554638676 ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1831,6 +1832,22 @@ class KafkaConfigTest { assertTrue(config.isNewGroupCoordinatorEnabled) } + @Test + def testGroupProtocolMigrationPolicy(): Unit = { +val props = new Properties() +props.putAll(kraftProps()) + +// Invalid GroupProtocolMigrationPolicy value. +props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, "foo") +assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + +ConsumerGroupMigrationPolicy.values().foreach { policy => + props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString) Review Comment: Though it's not in `ConsumerGroupMigrationPolicy#parse`, in this test `KafkaConfig.fromProps` requires case sensitivity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1553481594 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupMigrationPolicy { +/** Both upgrade and downgrade are enabled.*/ +BIDIRECTIONAL("bidirectional"), + +/** Only upgrade is enabled.*/ +UPGRADE("upgrade"), + +/** Only downgrade is enabled.*/ +DOWNGRADE("downgrade"), + +/** Neither upgrade nor downgrade is enabled.*/ +DISABLED("disabled"); + +private final String policy; + +ConsumerGroupMigrationPolicy(String config) { +this.policy = config; +} Review Comment: nit: We use different names `config` and `policy`. This is confusing. How about using `name`? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupMigrationPolicy { +/** Both upgrade and downgrade are enabled.*/ +BIDIRECTIONAL("bidirectional"), + +/** Only upgrade is enabled.*/ +UPGRADE("upgrade"), + +/** Only downgrade is enabled.*/ +DOWNGRADE("downgrade"), + +/** Neither upgrade nor downgrade is enabled.*/ +DISABLED("disabled"); + +private final String policy; + +ConsumerGroupMigrationPolicy(String config) { +this.policy = config; +} + +@Override +public String toString() { +return policy; +} + +public static String validValuesDescription = +BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled" + ", " + +UPGRADE + ": only upgrade is enabled" + ", " + +DOWNGRADE + ": only downgrade is enabled" + ", " + Review Comment: nit: Should we complement with the from...to... like you side for BIDIRECTIONAL? ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1831,6 +1832,22 @@ class KafkaConfigTest { assertTrue(config.isNewGroupCoordinatorEnabled) } + @Test + def testGroupProtocolMigrationPolicy(): Unit = { +val props = new Properties() +props.putAll(kraftProps()) + +// Invalid GroupProtocolMigrationPolicy value. +props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, "foo") +assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + +ConsumerGroupMigrationPolicy.values().foreach { policy => + props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString) Review Comment: Is it case sensitive? ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1831,6 +1832,22 @@ class KafkaConfigTest { assertTrue(config.isNewGroupCoordinatorEnabled) } + @Test + def testGroupProtocolMigrationPolicy(): Unit = { +val props = new
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1553480909 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -677,6 +679,7 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval for registered consumers." val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single consumer group can accommodate." val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor." + val ConsumerGroupMigrationPolicyDoc = "The config that enables converting the classic group using the consumer embedded protocol to the consumer group using the consumer group protocol and vice versa. " + ConsumerGroupMigrationPolicy.validValuesDescription Review Comment: Thanks. I would rather prefer to keep all the documentation defined here. This is what we usually do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1553477271 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -248,9 +249,10 @@ object KafkaConfig { val ConsumerGroupMaxSessionTimeoutMsProp = "group.consumer.max.session.timeout.ms" val ConsumerGroupHeartbeatIntervalMsProp = "group.consumer.heartbeat.interval.ms" val ConsumerGroupMinHeartbeatIntervalMsProp = "group.consumer.min.heartbeat.interval.ms" - val ConsumerGroupMaxHeartbeatIntervalMsProp ="group.consumer.max.heartbeat.interval.ms" + val ConsumerGroupMaxHeartbeatIntervalMsProp = "group.consumer.max.heartbeat.interval.ms" val ConsumerGroupMaxSizeProp = "group.consumer.max.size" val ConsumerGroupAssignorsProp = "group.consumer.assignors" + val ConsumerGroupMigrationPolicyProp = "consumer.group.migration.policy" Review Comment: Sorry, I missed this one: `consumer.group` -> `group.consumer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dongnuo123 commented on PR #15411: URL: https://github.com/apache/kafka/pull/15411#issuecomment-2037835825 Will need to add unit tests for the change when group upgrade/downgrade conversion is finished. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dongnuo123 commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1551865511 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -251,6 +252,7 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsProp ="group.consumer.max.heartbeat.interval.ms" val ConsumerGroupMaxSizeProp = "group.consumer.max.size" val ConsumerGroupAssignorsProp = "group.consumer.assignors" + val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy" Review Comment: Let me change it to `consumer.group.migration.policy` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1551485397 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -251,6 +252,7 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsProp ="group.consumer.max.heartbeat.interval.ms" val ConsumerGroupMaxSizeProp = "group.consumer.max.size" val ConsumerGroupAssignorsProp = "group.consumer.assignors" + val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy" Review Comment: I am still not fully satisfied with this property because `group.consumer.upgrade.policy = upgrade` reads weird. What could we use to replace `upgrade` here? `conversion`? `migration`? ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -251,6 +252,7 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsProp ="group.consumer.max.heartbeat.interval.ms" Review Comment: nit: While we are here, could you add the missing space after `=`? ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -677,6 +679,7 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval for registered consumers." val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single consumer group can accommodate." val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor." + val GroupConsumerUpgradePolicyDoc = "The config that enables the group protocol upgrade/downgrade. The valid values are " + Utils.join(Utils.enumOptions(classOf[GroupConsumerUpgradePolicy]), ",") + "." Review Comment: It would be great if we could extend the documentation here a little bit. I think that we need to call out that this is about converting classic group using the consumer embedded protocol to the consumer group protocol and vice versa. We could also call out the various policies with a small descriptions. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConsumerUpgradePolicy.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum GroupConsumerUpgradePolicy { Review Comment: nit: `GroupConsumer` to `ConsumerGroup`. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1700,6 +1704,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val consumerGroupMaxHeartbeatIntervalMs = getInt(KafkaConfig.ConsumerGroupMaxHeartbeatIntervalMsProp) val consumerGroupMaxSize = getInt(KafkaConfig.ConsumerGroupMaxSizeProp) val consumerGroupAssignors = getConfiguredInstances(KafkaConfig.ConsumerGroupAssignorsProp, classOf[PartitionAssignor]) + val groupConsumerUpgradePolicy = GroupConsumerUpgradePolicy.parse(getString(KafkaConfig.GroupConsumerUpgradePolicyProp)) Review Comment: nit: `consumerGroup..` ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -251,6 +252,7 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsProp ="group.consumer.max.heartbeat.interval.ms" val ConsumerGroupMaxSizeProp = "group.consumer.max.size" val ConsumerGroupAssignorsProp = "group.consumer.assignors" + val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy" Review Comment: nit: `ConsumerGroupUpgradePolicyProp` ## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ## @@ -151,6 +152,7 @@ public class Defaults { UniformAssignor.class.getName(), RangeAssignor.class.getName() ); +public static final String GROUP_CONSUMER_UPGRADE_POLICY = GroupConsumerUpgradePolicy.DISABLED.toString(); Review Comment: nit: `CONSUMER_GROUP_` ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -677,6 +679,7 @@ object
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dongnuo123 commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1542076157 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationPolicy.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum GroupProtocolMigrationPolicy { +/** Both upgrade and downgrade are enabled.*/ +BOTH("both"), Review Comment: Hmm I can't think of a better name either... Maybe `bidirectional`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1539248065 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -279,6 +280,7 @@ object KafkaConfig { val NewGroupCoordinatorEnableProp = "group.coordinator.new.enable" val GroupCoordinatorRebalanceProtocolsProp = "group.coordinator.rebalance.protocols" val GroupCoordinatorNumThreadsProp = "group.coordinator.threads" + val GroupProtocolMigrationPolicyProp = "group.protocol.migration.policy" Review Comment: nit: It may be better to have it scope as a "consumer" config. How about using `group.consumer.upgrade.policy`? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationPolicy.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum GroupProtocolMigrationPolicy { +/** Both upgrade and downgrade are enabled.*/ +BOTH("both"), + +/** Only upgrade is enabled.*/ +UPGRADE("upgrade"), + +/** Only downgrade is enabled.*/ +DOWNGRADE("downgrade"), + +/** Neither upgrade nor downgrade is enabled.*/ +NONE("none"); Review Comment: nit: "Disabled" may be clearer. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1119,6 +1122,7 @@ object KafkaConfig { .define(GroupCoordinatorNumThreadsProp, INT, Defaults.GROUP_COORDINATOR_NUM_THREADS, atLeast(1), MEDIUM, GroupCoordinatorNumThreadsDoc) // Internal configuration used by integration and system tests. .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, Defaults.NEW_GROUP_COORDINATOR_ENABLE, null, MEDIUM, NewGroupCoordinatorEnableDoc) + .define(GroupProtocolMigrationPolicyProp, STRING, Defaults.GROUP_PROTOCOL_MIGRATION, in(Utils.enumOptions(classOf[GroupProtocolMigrationPolicy]):_*), MEDIUM, GroupProtocolMigrationPolicyDoc) Review Comment: Let's move it to the consumer group section. I think that we should also keep it internal (with defineInternal) for now. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationPolicy.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum GroupProtocolMigrationPolicy { +/** Both upgrade and downgrade are enabled.*/ +BOTH("both"), Review Comment: nit: I don't like "both" because "group.consumer.upgrade.policy=both" does not tell much when you read it. I wonder if we could come up with a better name for this one but I don't have a good idea at the moment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dongnuo123 commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1529263591 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationConfig.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum GroupProtocolMigrationConfig { Review Comment: Yeah I agree. It does feel a bit weird.. Let me change it to migration policy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
jeffkbkim commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1503404075 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationConfig.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum GroupProtocolMigrationConfig { Review Comment: it doesn't seem like other configs in KafkaConfigs are referred to as their own "Config" and the name made me think it would have several configs. How's `GroupProtocolMigrationPolicy` and `group.protocol.migration.policy`? Or we can stick with `group.protocol.migration` and go with `GroupProtocolMigration` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dongnuo123 opened a new pull request, #15411: URL: https://github.com/apache/kafka/pull/15411 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org