Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-10 Thread via GitHub


dajac merged PR #15411:
URL: https://github.com/apache/kafka/pull/15411


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-10 Thread via GitHub


dajac commented on PR #15411:
URL: https://github.com/apache/kafka/pull/15411#issuecomment-2046712467

   Re-triggered a build as last one had errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-09 Thread via GitHub


dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1557562821


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1046,6 +1054,7 @@ object KafkaConfig {
   .define(ConsumerGroupMaxHeartbeatIntervalMsProp, INT, 
Defaults.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, 
ConsumerGroupMaxHeartbeatIntervalMsDoc)
   .define(ConsumerGroupMaxSizeProp, INT, Defaults.CONSUMER_GROUP_MAX_SIZE, 
atLeast(1), MEDIUM, ConsumerGroupMaxSizeDoc)
   .define(ConsumerGroupAssignorsProp, LIST, 
Defaults.CONSUMER_GROUP_ASSIGNORS, null, MEDIUM, ConsumerGroupAssignorsDoc)
+  .defineInternal(ConsumerGroupMigrationPolicyProp, STRING, 
Defaults.CONSUMER_GROUP_MIGRATION_POLICY, 
in(Utils.enumOptions(classOf[ConsumerGroupMigrationPolicy]): _*), MEDIUM, 
ConsumerGroupMigrationPolicyDoc)

Review Comment:
   Could we use `ConfigDef.CaseInsensitiveValidString.in` and extend the test 
to ensure that it is case insensitive?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupMigrationPolicy {
+/** Both upgrade and downgrade are enabled.*/
+BIDIRECTIONAL("bidirectional"),
+
+/** Only upgrade is enabled.*/
+UPGRADE("upgrade"),
+
+/** Only downgrade is enabled.*/
+DOWNGRADE("downgrade"),
+
+/** Neither upgrade nor downgrade is enabled.*/
+DISABLED("disabled");
+
+private final String name;
+
+ConsumerGroupMigrationPolicy(String name) {
+this.name = name;
+}
+
+@Override
+public String toString() {
+return name;
+}
+
+private final static Map 
NAME_TO_ENUM = Arrays.stream(values())
+.collect(Collectors.toMap(policy -> 
policy.name.toLowerCase(Locale.ROOT), Function.identity()));
+
+/**
+ * Parse a string into the corresponding {@code 
GroupProtocolMigrationPolicy} enum value, in a case-insensitive manner.
+ *
+ * @return The {{@link ConsumerGroupMigrationPolicy}} according to the 
string passed. None is returned if
+ * the string doesn't correspond to a valid policy.
+ */
+public static ConsumerGroupMigrationPolicy parse(String name) {
+if (name == null) {
+return DISABLED;
+}
+ConsumerGroupMigrationPolicy policy = 
NAME_TO_ENUM.get(name.toLowerCase(Locale.ROOT));
+
+return policy == null ? DISABLED : policy;
+}
+

Review Comment:
   nit: This empty line could be removed.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -677,6 +679,12 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval 
for registered consumers."
   val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single 
consumer group can accommodate."
   val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full 
class names. The first one in the list is considered as the default assignor to 
be used in the case where the consumer does not specify an assignor."
+  val ConsumerGroupMigrationPolicyDoc = "The config that enables converting 
the non-empty classic group using the consumer embedded protocol to the 
non-empty consumer group using the consumer group protocol and vice versa; " +
+"conversions of empty groups in both directions are always enabled 
regardless of this policy. " +
+ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from classic 
group to consumer group and downgrade from consumer group to classic group are 
enabled, " +
+ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade is enabled, " +

Review Comment:
   nit: `upgrade` -> `upgrade from classic group to consumer group`.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -677,6 +679,12 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum 

Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-06 Thread via GitHub


dongnuo123 commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1554638676


##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1831,6 +1832,22 @@ class KafkaConfigTest {
 assertTrue(config.isNewGroupCoordinatorEnabled)
   }
 
+  @Test
+  def testGroupProtocolMigrationPolicy(): Unit = {
+val props = new Properties()
+props.putAll(kraftProps())
+
+// Invalid GroupProtocolMigrationPolicy value.
+props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, "foo")
+assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
+
+ConsumerGroupMigrationPolicy.values().foreach { policy =>
+  props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString)

Review Comment:
   Though it's not in `ConsumerGroupMigrationPolicy#parse`, in this test 
`KafkaConfig.fromProps` requires case sensitivity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-05 Thread via GitHub


dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1553481594


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupMigrationPolicy {
+/** Both upgrade and downgrade are enabled.*/
+BIDIRECTIONAL("bidirectional"),
+
+/** Only upgrade is enabled.*/
+UPGRADE("upgrade"),
+
+/** Only downgrade is enabled.*/
+DOWNGRADE("downgrade"),
+
+/** Neither upgrade nor downgrade is enabled.*/
+DISABLED("disabled");
+
+private final String policy;
+
+ConsumerGroupMigrationPolicy(String config) {
+this.policy = config;
+}

Review Comment:
   nit: We use different names `config` and `policy`. This is confusing. How 
about using `name`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupMigrationPolicy {
+/** Both upgrade and downgrade are enabled.*/
+BIDIRECTIONAL("bidirectional"),
+
+/** Only upgrade is enabled.*/
+UPGRADE("upgrade"),
+
+/** Only downgrade is enabled.*/
+DOWNGRADE("downgrade"),
+
+/** Neither upgrade nor downgrade is enabled.*/
+DISABLED("disabled");
+
+private final String policy;
+
+ConsumerGroupMigrationPolicy(String config) {
+this.policy = config;
+}
+
+@Override
+public String toString() {
+return policy;
+}
+
+public static String validValuesDescription =
+BIDIRECTIONAL   + ": both upgrade from classic group to consumer group 
and downgrade from consumer group to classic group are enabled" + ", " +
+UPGRADE + ": only upgrade is enabled" + ", " +
+DOWNGRADE   + ": only downgrade is enabled" + ", " +

Review Comment:
   nit: Should we complement with the from...to... like you side for 
BIDIRECTIONAL? 



##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1831,6 +1832,22 @@ class KafkaConfigTest {
 assertTrue(config.isNewGroupCoordinatorEnabled)
   }
 
+  @Test
+  def testGroupProtocolMigrationPolicy(): Unit = {
+val props = new Properties()
+props.putAll(kraftProps())
+
+// Invalid GroupProtocolMigrationPolicy value.
+props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, "foo")
+assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
+
+ConsumerGroupMigrationPolicy.values().foreach { policy =>
+  props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString)

Review Comment:
   Is it case sensitive?



##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1831,6 +1832,22 @@ class KafkaConfigTest {
 assertTrue(config.isNewGroupCoordinatorEnabled)
   }
 
+  @Test
+  def testGroupProtocolMigrationPolicy(): Unit = {
+val props = new 

Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-05 Thread via GitHub


dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1553480909


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -677,6 +679,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval 
for registered consumers."
   val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single 
consumer group can accommodate."
   val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full 
class names. The first one in the list is considered as the default assignor to 
be used in the case where the consumer does not specify an assignor."
+  val ConsumerGroupMigrationPolicyDoc = "The config that enables converting 
the classic group using the consumer embedded protocol to the consumer group 
using the consumer group protocol and vice versa. " + 
ConsumerGroupMigrationPolicy.validValuesDescription

Review Comment:
   Thanks. I would rather prefer to keep all the documentation defined here. 
This is what we usually do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-05 Thread via GitHub


dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1553477271


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -248,9 +249,10 @@ object KafkaConfig {
   val ConsumerGroupMaxSessionTimeoutMsProp = 
"group.consumer.max.session.timeout.ms"
   val ConsumerGroupHeartbeatIntervalMsProp = 
"group.consumer.heartbeat.interval.ms"
   val ConsumerGroupMinHeartbeatIntervalMsProp = 
"group.consumer.min.heartbeat.interval.ms"
-  val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"
+  val ConsumerGroupMaxHeartbeatIntervalMsProp = 
"group.consumer.max.heartbeat.interval.ms"
   val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
   val ConsumerGroupAssignorsProp = "group.consumer.assignors"
+  val ConsumerGroupMigrationPolicyProp = "consumer.group.migration.policy"

Review Comment:
   Sorry, I missed this one: `consumer.group` -> `group.consumer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-04 Thread via GitHub


dongnuo123 commented on PR #15411:
URL: https://github.com/apache/kafka/pull/15411#issuecomment-2037835825

   Will need to add unit tests for the change when group upgrade/downgrade 
conversion is finished.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-04 Thread via GitHub


dongnuo123 commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1551865511


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"
   val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
   val ConsumerGroupAssignorsProp = "group.consumer.assignors"
+  val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy"

Review Comment:
   Let me change it to `consumer.group.migration.policy`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-04-04 Thread via GitHub


dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1551485397


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"
   val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
   val ConsumerGroupAssignorsProp = "group.consumer.assignors"
+  val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy"

Review Comment:
   I am still not fully satisfied with this property because 
`group.consumer.upgrade.policy = upgrade` reads weird. What could we use to 
replace `upgrade` here? `conversion`? `migration`?



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"

Review Comment:
   nit: While we are here, could you add the missing space after `=`?



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -677,6 +679,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval 
for registered consumers."
   val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single 
consumer group can accommodate."
   val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full 
class names. The first one in the list is considered as the default assignor to 
be used in the case where the consumer does not specify an assignor."
+  val GroupConsumerUpgradePolicyDoc = "The config that enables the group 
protocol upgrade/downgrade. The valid values are " + 
Utils.join(Utils.enumOptions(classOf[GroupConsumerUpgradePolicy]), ",") + "."

Review Comment:
   It would be great if we could extend the documentation here a little bit. I 
think that we need to call out that this is about converting classic group 
using the consumer embedded protocol to the consumer group protocol and vice 
versa. We could also call out the various policies with a small descriptions.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConsumerUpgradePolicy.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupConsumerUpgradePolicy {

Review Comment:
   nit: `GroupConsumer` to `ConsumerGroup`.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1700,6 +1704,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val consumerGroupMaxHeartbeatIntervalMs = 
getInt(KafkaConfig.ConsumerGroupMaxHeartbeatIntervalMsProp)
   val consumerGroupMaxSize = getInt(KafkaConfig.ConsumerGroupMaxSizeProp)
   val consumerGroupAssignors = 
getConfiguredInstances(KafkaConfig.ConsumerGroupAssignorsProp, 
classOf[PartitionAssignor])
+  val groupConsumerUpgradePolicy = 
GroupConsumerUpgradePolicy.parse(getString(KafkaConfig.GroupConsumerUpgradePolicyProp))

Review Comment:
   nit: `consumerGroup..`



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"
   val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
   val ConsumerGroupAssignorsProp = "group.consumer.assignors"
+  val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy"

Review Comment:
   nit: `ConsumerGroupUpgradePolicyProp`



##
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##
@@ -151,6 +152,7 @@ public class Defaults {
 UniformAssignor.class.getName(),
 RangeAssignor.class.getName()
 );
+public static final String GROUP_CONSUMER_UPGRADE_POLICY = 
GroupConsumerUpgradePolicy.DISABLED.toString();

Review Comment:
   nit: `CONSUMER_GROUP_`



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -677,6 +679,7 @@ object 

Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-03-27 Thread via GitHub


dongnuo123 commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1542076157


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationPolicy.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupProtocolMigrationPolicy {
+/** Both upgrade and downgrade are enabled.*/
+BOTH("both"),

Review Comment:
   Hmm I can't think of a better name either... Maybe `bidirectional`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-03-26 Thread via GitHub


dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1539248065


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -279,6 +280,7 @@ object KafkaConfig {
   val NewGroupCoordinatorEnableProp = "group.coordinator.new.enable"
   val GroupCoordinatorRebalanceProtocolsProp = 
"group.coordinator.rebalance.protocols"
   val GroupCoordinatorNumThreadsProp = "group.coordinator.threads"
+  val GroupProtocolMigrationPolicyProp = "group.protocol.migration.policy"

Review Comment:
   nit: It may be better to have it scope as a "consumer" config. How about 
using `group.consumer.upgrade.policy`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationPolicy.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupProtocolMigrationPolicy {
+/** Both upgrade and downgrade are enabled.*/
+BOTH("both"),
+
+/** Only upgrade is enabled.*/
+UPGRADE("upgrade"),
+
+/** Only downgrade is enabled.*/
+DOWNGRADE("downgrade"),
+
+/** Neither upgrade nor downgrade is enabled.*/
+NONE("none");

Review Comment:
   nit: "Disabled" may be clearer.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1119,6 +1122,7 @@ object KafkaConfig {
   .define(GroupCoordinatorNumThreadsProp, INT, 
Defaults.GROUP_COORDINATOR_NUM_THREADS, atLeast(1), MEDIUM, 
GroupCoordinatorNumThreadsDoc)
   // Internal configuration used by integration and system tests.
   .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, 
Defaults.NEW_GROUP_COORDINATOR_ENABLE, null, MEDIUM, 
NewGroupCoordinatorEnableDoc)
+  .define(GroupProtocolMigrationPolicyProp, STRING, 
Defaults.GROUP_PROTOCOL_MIGRATION, 
in(Utils.enumOptions(classOf[GroupProtocolMigrationPolicy]):_*), MEDIUM, 
GroupProtocolMigrationPolicyDoc)

Review Comment:
   Let's move it to the consumer group section. I think that we should also 
keep it internal (with defineInternal) for now.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationPolicy.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupProtocolMigrationPolicy {
+/** Both upgrade and downgrade are enabled.*/
+BOTH("both"),

Review Comment:
   nit: I don't like "both" because "group.consumer.upgrade.policy=both" does 
not tell much when you read it. I wonder if we could come up with a better name 
for this one but I don't have a good idea at the moment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-03-18 Thread via GitHub


dongnuo123 commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1529263591


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationConfig.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupProtocolMigrationConfig {

Review Comment:
   Yeah I agree. It does feel a bit weird.. Let me change it to migration 
policy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-02-26 Thread via GitHub


jeffkbkim commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1503404075


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationConfig.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupProtocolMigrationConfig {

Review Comment:
   it doesn't seem like other configs in KafkaConfigs are referred to as their 
own "Config" and the name made me think it would have several configs. How's 
`GroupProtocolMigrationPolicy` and `group.protocol.migration.policy`? Or we can 
stick with `group.protocol.migration` and go with `GroupProtocolMigration`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-02-21 Thread via GitHub


dongnuo123 opened a new pull request, #15411:
URL: https://github.com/apache/kafka/pull/15411

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org