dajac commented on code in PR #17503:
URL: https://github.com/apache/kafka/pull/17503#discussion_r1800833758


##########
tests/kafkatest/tests/client/consumer_protocol_migration_test.py:
##########
@@ -0,0 +1,407 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+    LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, 
LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+    LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, 
LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+    LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
+
+class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+    STICKY = "org.apache.kafka.clients.consumer.StickyAssignor"
+    COOPERATIVE_STICKEY = 
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
+    all_assignment_strategies = [RANGE, ROUND_ROBIN, COOPERATIVE_STICKEY, 
STICKY]
+
+    all_consumer_versions = [LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+                             LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, 
LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+                             LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, 
LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+                             LATEST_3_7, LATEST_3_8, DEV_BRANCH]
+    consumer_versions_supporting_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_0_11_0]
+    consumer_versions_supporting_cooperative_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_2_4]
+    consumer_versions_supporting_static_membership = [v for v in 
all_consumer_versions if v >= LATEST_2_3]
+
+    def __init__(self, test_context):
+        super(ConsumerProtocolMigrationTest, self).__init__(test_context, 
num_consumers=5, num_producers=1,
+                                                            num_zk=0, 
num_brokers=1,
+                                                            
use_new_coordinator=True, topics={
+                self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 
'replication-factor': 1 }
+            })
+
+    def bounce_all_consumers(self, consumer, clean_shutdown=True):
+        for node in consumer.nodes:
+            consumer.stop_node(node, clean_shutdown)
+
+        wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, 
timeout_sec=10,
+                   err_msg="Timed out waiting for the consumers to shutdown")
+
+        # Wait until the group becomes empty. We use the 50-second timeout 
because the
+        # consumer session timeout is 45 seconds.
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(state="empty"),
+                   timeout_sec=50,
+                   err_msg="Timed out waiting for the group to become empty.")
+
+        for node in consumer.nodes:
+            consumer.start_node(node)
+
+        self.await_all_members(consumer)
+        self.await_consumed_messages(consumer)
+
+    def rolling_bounce_consumers(self, consumer, clean_shutdown=True):
+        for node in consumer.nodes:
+            consumer.stop_node(node, clean_shutdown)
+
+            wait_until(lambda: len(consumer.dead_nodes()) == 1,
+                       timeout_sec=self.session_timeout_sec+5,
+                       err_msg="Timed out waiting for the consumer to 
shutdown")
+
+            consumer.start_node(node)
+
+            self.await_all_members(consumer)
+            self.await_consumed_messages(consumer)
+
+    def set_group_instance_id(self, consumer):
+        consumer.static_membership = True
+        for ind, node in enumerate(consumer.nodes):
+            node.group_instance_id = "migration-test-member-%d" % ind
+
+    def set_consumer_version(self, consumer, version):
+        for node in consumer.nodes:
+            node.version = version
+
+    def assert_group_type(self, type, timeout_sec=10):
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(type=type), timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting to list expected %s group." % 
type)
+
+    @cluster(num_nodes=8)
+    @matrix(
+        enable_autocommit=[True, False],

Review Comment:
   I wonder we could also drop this. Testing with auto-commit enabled seems 
reasonable to me. Or, is there something that you wanted to test in particular?



##########
tests/kafkatest/tests/client/consumer_protocol_migration_test.py:
##########
@@ -0,0 +1,407 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+    LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, 
LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+    LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, 
LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+    LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
+
+class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+    STICKY = "org.apache.kafka.clients.consumer.StickyAssignor"
+    COOPERATIVE_STICKEY = 
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
+    all_assignment_strategies = [RANGE, ROUND_ROBIN, COOPERATIVE_STICKEY, 
STICKY]
+
+    all_consumer_versions = [LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+                             LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, 
LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+                             LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, 
LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+                             LATEST_3_7, LATEST_3_8, DEV_BRANCH]
+    consumer_versions_supporting_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_0_11_0]
+    consumer_versions_supporting_cooperative_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_2_4]
+    consumer_versions_supporting_static_membership = [v for v in 
all_consumer_versions if v >= LATEST_2_3]
+
+    def __init__(self, test_context):
+        super(ConsumerProtocolMigrationTest, self).__init__(test_context, 
num_consumers=5, num_producers=1,
+                                                            num_zk=0, 
num_brokers=1,
+                                                            
use_new_coordinator=True, topics={
+                self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 
'replication-factor': 1 }
+            })
+
+    def bounce_all_consumers(self, consumer, clean_shutdown=True):
+        for node in consumer.nodes:
+            consumer.stop_node(node, clean_shutdown)
+
+        wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, 
timeout_sec=10,
+                   err_msg="Timed out waiting for the consumers to shutdown")
+
+        # Wait until the group becomes empty. We use the 50-second timeout 
because the
+        # consumer session timeout is 45 seconds.
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(state="empty"),
+                   timeout_sec=50,
+                   err_msg="Timed out waiting for the group to become empty.")
+
+        for node in consumer.nodes:
+            consumer.start_node(node)
+
+        self.await_all_members(consumer)
+        self.await_consumed_messages(consumer)
+
+    def rolling_bounce_consumers(self, consumer, clean_shutdown=True):
+        for node in consumer.nodes:
+            consumer.stop_node(node, clean_shutdown)
+
+            wait_until(lambda: len(consumer.dead_nodes()) == 1,
+                       timeout_sec=self.session_timeout_sec+5,
+                       err_msg="Timed out waiting for the consumer to 
shutdown")
+
+            consumer.start_node(node)
+
+            self.await_all_members(consumer)
+            self.await_consumed_messages(consumer)
+
+    def set_group_instance_id(self, consumer):
+        consumer.static_membership = True
+        for ind, node in enumerate(consumer.nodes):
+            node.group_instance_id = "migration-test-member-%d" % ind
+
+    def set_consumer_version(self, consumer, version):
+        for node in consumer.nodes:
+            node.version = version
+
+    def assert_group_type(self, type, timeout_sec=10):
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(type=type), timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting to list expected %s group." % 
type)
+
+    @cluster(num_nodes=8)
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in all_consumer_versions],
+        assignment_strategy=[RANGE, ROUND_ROBIN]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_sticky_assignor],
+        assignment_strategy=[STICKY]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[True],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_static_membership],
+        assignment_strategy=[RANGE, ROUND_ROBIN, STICKY]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_cooperative_sticky_assignor],
+        assignment_strategy=[COOPERATIVE_STICKEY]
+    )
+    def test_consumer_offline_migration(self, enable_autocommit, 
static_membership, metadata_quorum,
+                                        consumer_group_migration_policy, 
consumer_version, assignment_strategy):
+        """
+        Verify correct consumer behavior when the consumers in the group are 
restarted to perform
+        offline upgrade/downgrade.
+
+        Setup: single Kafka cluster with one producer and a set of consumers 
in one group.
+
+        - Start a producer which continues producing new messages throughout 
the test.
+        - Start up the consumers with classic protocols and wait until they've 
joined the group.
+        - Offline upgrade: restart the consumers with consumer protocols, 
waiting for all consumers
+          to finish shutting down before starting up them.
+        - Offline downgrade: Restart the consumers with classic protocols, 
waiting for all consumers
+          to finish shutting down before starting up them.
+        - Verify delivery semantics according to the failure type.

Review Comment:
   I don't understand the last point. Do we have failure type?



##########
tests/kafkatest/tests/client/consumer_protocol_migration_test.py:
##########
@@ -0,0 +1,407 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+    LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, 
LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+    LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, 
LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+    LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
+
+class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+    STICKY = "org.apache.kafka.clients.consumer.StickyAssignor"
+    COOPERATIVE_STICKEY = 
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
+    all_assignment_strategies = [RANGE, ROUND_ROBIN, COOPERATIVE_STICKEY, 
STICKY]
+
+    all_consumer_versions = [LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+                             LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, 
LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+                             LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, 
LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+                             LATEST_3_7, LATEST_3_8, DEV_BRANCH]
+    consumer_versions_supporting_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_0_11_0]
+    consumer_versions_supporting_cooperative_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_2_4]
+    consumer_versions_supporting_static_membership = [v for v in 
all_consumer_versions if v >= LATEST_2_3]
+
+    def __init__(self, test_context):
+        super(ConsumerProtocolMigrationTest, self).__init__(test_context, 
num_consumers=5, num_producers=1,
+                                                            num_zk=0, 
num_brokers=1,
+                                                            
use_new_coordinator=True, topics={

Review Comment:
   The new group coordinator is enabled by default. We may be able to remove 
`use_new_coordinator`.



##########
tests/kafkatest/tests/client/consumer_protocol_migration_test.py:
##########
@@ -0,0 +1,407 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+    LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, 
LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+    LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, 
LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+    LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
+
+class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+    STICKY = "org.apache.kafka.clients.consumer.StickyAssignor"
+    COOPERATIVE_STICKEY = 
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
+    all_assignment_strategies = [RANGE, ROUND_ROBIN, COOPERATIVE_STICKEY, 
STICKY]
+
+    all_consumer_versions = [LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+                             LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, 
LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+                             LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, 
LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+                             LATEST_3_7, LATEST_3_8, DEV_BRANCH]
+    consumer_versions_supporting_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_0_11_0]
+    consumer_versions_supporting_cooperative_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_2_4]
+    consumer_versions_supporting_static_membership = [v for v in 
all_consumer_versions if v >= LATEST_2_3]
+
+    def __init__(self, test_context):
+        super(ConsumerProtocolMigrationTest, self).__init__(test_context, 
num_consumers=5, num_producers=1,
+                                                            num_zk=0, 
num_brokers=1,
+                                                            
use_new_coordinator=True, topics={
+                self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 
'replication-factor': 1 }
+            })
+
+    def bounce_all_consumers(self, consumer, clean_shutdown=True):
+        for node in consumer.nodes:
+            consumer.stop_node(node, clean_shutdown)
+
+        wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, 
timeout_sec=10,
+                   err_msg="Timed out waiting for the consumers to shutdown")
+
+        # Wait until the group becomes empty. We use the 50-second timeout 
because the
+        # consumer session timeout is 45 seconds.
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(state="empty"),
+                   timeout_sec=50,
+                   err_msg="Timed out waiting for the group to become empty.")
+
+        for node in consumer.nodes:
+            consumer.start_node(node)
+
+        self.await_all_members(consumer)
+        self.await_consumed_messages(consumer)
+
+    def rolling_bounce_consumers(self, consumer, clean_shutdown=True):
+        for node in consumer.nodes:
+            consumer.stop_node(node, clean_shutdown)
+
+            wait_until(lambda: len(consumer.dead_nodes()) == 1,
+                       timeout_sec=self.session_timeout_sec+5,
+                       err_msg="Timed out waiting for the consumer to 
shutdown")
+
+            consumer.start_node(node)
+
+            self.await_all_members(consumer)
+            self.await_consumed_messages(consumer)
+
+    def set_group_instance_id(self, consumer):
+        consumer.static_membership = True
+        for ind, node in enumerate(consumer.nodes):
+            node.group_instance_id = "migration-test-member-%d" % ind
+
+    def set_consumer_version(self, consumer, version):
+        for node in consumer.nodes:
+            node.version = version
+
+    def assert_group_type(self, type, timeout_sec=10):
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(type=type), timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting to list expected %s group." % 
type)
+
+    @cluster(num_nodes=8)
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in all_consumer_versions],
+        assignment_strategy=[RANGE, ROUND_ROBIN]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_sticky_assignor],
+        assignment_strategy=[STICKY]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[True],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_static_membership],
+        assignment_strategy=[RANGE, ROUND_ROBIN, STICKY]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_cooperative_sticky_assignor],
+        assignment_strategy=[COOPERATIVE_STICKEY]
+    )
+    def test_consumer_offline_migration(self, enable_autocommit, 
static_membership, metadata_quorum,
+                                        consumer_group_migration_policy, 
consumer_version, assignment_strategy):
+        """
+        Verify correct consumer behavior when the consumers in the group are 
restarted to perform
+        offline upgrade/downgrade.
+
+        Setup: single Kafka cluster with one producer and a set of consumers 
in one group.
+
+        - Start a producer which continues producing new messages throughout 
the test.
+        - Start up the consumers with classic protocols and wait until they've 
joined the group.
+        - Offline upgrade: restart the consumers with consumer protocols, 
waiting for all consumers
+          to finish shutting down before starting up them.
+        - Offline downgrade: Restart the consumers with classic protocols, 
waiting for all consumers
+          to finish shutting down before starting up them.
+        - Verify delivery semantics according to the failure type.
+        """
+        producer = self.setup_producer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC, 
group_protocol=consumer_group.classic_group_protocol,
+                                       version=consumer_version, 
assignment_strategy=assignment_strategy,
+                                       enable_autocommit=enable_autocommit)
+
+        kafka_version = KafkaVersion(consumer_version)
+        if kafka_version == LATEST_2_3 or kafka_version == LATEST_2_4 or 
(static_membership and kafka_version > LATEST_2_4):
+            # group-instance-id is required in 2.3.
+            self.set_group_instance_id(consumer)
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+        self.await_consumed_messages(consumer)
+        self.assert_group_type("classic")
+
+        # Upgrade the group protocol and restart all consumers.
+        consumer.group_protocol = consumer_group.consumer_group_protocol
+        self.set_consumer_version(consumer, DEV_BRANCH)
+
+        self.bounce_all_consumers(consumer)
+        self.assert_group_type("consumer")
+
+        # Downgrade the group protocol and restart all consumers.
+        consumer.group_protocol = consumer_group.classic_group_protocol
+        self.set_consumer_version(consumer, kafka_version)
+        self.bounce_all_consumers(consumer)
+        self.assert_group_type("classic")
+
+        consumer.stop_all()
+
+    @cluster(num_nodes=8)
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional"],
+        consumer_version=[str(v) for v in all_consumer_versions],
+        assignment_strategy=[RANGE, ROUND_ROBIN]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_sticky_assignor],
+        assignment_strategy=[STICKY]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[True],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_static_membership],
+        assignment_strategy=[RANGE, ROUND_ROBIN, STICKY]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_cooperative_sticky_assignor],
+        assignment_strategy=[COOPERATIVE_STICKEY]
+    )
+    def test_consumer_rolling_migration(self, enable_autocommit, 
static_membership, metadata_quorum,

Review Comment:
   I wonder if this one is needed given that we also have two other tests to 
test the upgrade and the downgrade. They basically test the same thing, except 
for the configuration.



##########
tests/kafkatest/tests/client/consumer_protocol_migration_test.py:
##########
@@ -0,0 +1,407 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+    LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, 
LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+    LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, 
LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+    LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
+
+class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+    STICKY = "org.apache.kafka.clients.consumer.StickyAssignor"
+    COOPERATIVE_STICKEY = 
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
+    all_assignment_strategies = [RANGE, ROUND_ROBIN, COOPERATIVE_STICKEY, 
STICKY]
+
+    all_consumer_versions = [LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+                             LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, 
LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+                             LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, 
LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+                             LATEST_3_7, LATEST_3_8, DEV_BRANCH]
+    consumer_versions_supporting_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_0_11_0]
+    consumer_versions_supporting_cooperative_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_2_4]
+    consumer_versions_supporting_static_membership = [v for v in 
all_consumer_versions if v >= LATEST_2_3]
+
+    def __init__(self, test_context):
+        super(ConsumerProtocolMigrationTest, self).__init__(test_context, 
num_consumers=5, num_producers=1,
+                                                            num_zk=0, 
num_brokers=1,
+                                                            
use_new_coordinator=True, topics={
+                self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 
'replication-factor': 1 }
+            })
+
+    def bounce_all_consumers(self, consumer, clean_shutdown=True):
+        for node in consumer.nodes:
+            consumer.stop_node(node, clean_shutdown)
+
+        wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, 
timeout_sec=10,
+                   err_msg="Timed out waiting for the consumers to shutdown")
+
+        # Wait until the group becomes empty. We use the 50-second timeout 
because the
+        # consumer session timeout is 45 seconds.
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(state="empty"),
+                   timeout_sec=50,
+                   err_msg="Timed out waiting for the group to become empty.")
+
+        for node in consumer.nodes:
+            consumer.start_node(node)
+
+        self.await_all_members(consumer)
+        self.await_consumed_messages(consumer)
+
+    def rolling_bounce_consumers(self, consumer, clean_shutdown=True):
+        for node in consumer.nodes:
+            consumer.stop_node(node, clean_shutdown)
+
+            wait_until(lambda: len(consumer.dead_nodes()) == 1,
+                       timeout_sec=self.session_timeout_sec+5,
+                       err_msg="Timed out waiting for the consumer to 
shutdown")
+
+            consumer.start_node(node)
+
+            self.await_all_members(consumer)
+            self.await_consumed_messages(consumer)
+
+    def set_group_instance_id(self, consumer):
+        consumer.static_membership = True
+        for ind, node in enumerate(consumer.nodes):
+            node.group_instance_id = "migration-test-member-%d" % ind
+
+    def set_consumer_version(self, consumer, version):
+        for node in consumer.nodes:
+            node.version = version
+
+    def assert_group_type(self, type, timeout_sec=10):
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(type=type), timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting to list expected %s group." % 
type)
+
+    @cluster(num_nodes=8)
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in all_consumer_versions],
+        assignment_strategy=[RANGE, ROUND_ROBIN]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_sticky_assignor],
+        assignment_strategy=[STICKY]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[True],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_static_membership],
+        assignment_strategy=[RANGE, ROUND_ROBIN, STICKY]
+    )
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],
+        consumer_version=[str(v) for v in 
consumer_versions_supporting_cooperative_sticky_assignor],
+        assignment_strategy=[COOPERATIVE_STICKEY]
+    )
+    def test_consumer_offline_migration(self, enable_autocommit, 
static_membership, metadata_quorum,
+                                        consumer_group_migration_policy, 
consumer_version, assignment_strategy):
+        """
+        Verify correct consumer behavior when the consumers in the group are 
restarted to perform
+        offline upgrade/downgrade.
+
+        Setup: single Kafka cluster with one producer and a set of consumers 
in one group.
+
+        - Start a producer which continues producing new messages throughout 
the test.
+        - Start up the consumers with classic protocols and wait until they've 
joined the group.
+        - Offline upgrade: restart the consumers with consumer protocols, 
waiting for all consumers
+          to finish shutting down before starting up them.
+        - Offline downgrade: Restart the consumers with classic protocols, 
waiting for all consumers
+          to finish shutting down before starting up them.
+        - Verify delivery semantics according to the failure type.
+        """
+        producer = self.setup_producer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC, 
group_protocol=consumer_group.classic_group_protocol,
+                                       version=consumer_version, 
assignment_strategy=assignment_strategy,
+                                       enable_autocommit=enable_autocommit)
+
+        kafka_version = KafkaVersion(consumer_version)
+        if kafka_version == LATEST_2_3 or kafka_version == LATEST_2_4 or 
(static_membership and kafka_version > LATEST_2_4):

Review Comment:
   I don't fully understand the versions here. Could you please explain?



##########
tests/kafkatest/tests/kafka_test.py:
##########
@@ -28,18 +28,20 @@ class KafkaTest(Test):
     setUp. The Zookeeper and Kafka services are available as the fields
     KafkaTest.zk and KafkaTest.kafka.
     """
-    def __init__(self, test_context, num_zk, num_brokers, topics=None):
+    def __init__(self, test_context, num_zk, num_brokers, topics=None, 
use_new_coordinator=None):

Review Comment:
   Based on my previous comment, I wonder if we need this change.



##########
tests/kafkatest/tests/client/consumer_protocol_migration_test.py:
##########
@@ -0,0 +1,407 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+    LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, 
LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+    LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, 
LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+    LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
+
+class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+    STICKY = "org.apache.kafka.clients.consumer.StickyAssignor"
+    COOPERATIVE_STICKEY = 
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
+    all_assignment_strategies = [RANGE, ROUND_ROBIN, COOPERATIVE_STICKEY, 
STICKY]
+
+    all_consumer_versions = [LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+                             LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, 
LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+                             LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, 
LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+                             LATEST_3_7, LATEST_3_8, DEV_BRANCH]
+    consumer_versions_supporting_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_0_11_0]
+    consumer_versions_supporting_cooperative_sticky_assignor = [v for v in 
all_consumer_versions if v >= LATEST_2_4]
+    consumer_versions_supporting_static_membership = [v for v in 
all_consumer_versions if v >= LATEST_2_3]
+
+    def __init__(self, test_context):
+        super(ConsumerProtocolMigrationTest, self).__init__(test_context, 
num_consumers=5, num_producers=1,
+                                                            num_zk=0, 
num_brokers=1,
+                                                            
use_new_coordinator=True, topics={
+                self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 
'replication-factor': 1 }
+            })
+
+    def bounce_all_consumers(self, consumer, clean_shutdown=True):
+        for node in consumer.nodes:
+            consumer.stop_node(node, clean_shutdown)
+
+        wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, 
timeout_sec=10,
+                   err_msg="Timed out waiting for the consumers to shutdown")
+
+        # Wait until the group becomes empty. We use the 50-second timeout 
because the
+        # consumer session timeout is 45 seconds.
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(state="empty"),
+                   timeout_sec=50,
+                   err_msg="Timed out waiting for the group to become empty.")
+
+        for node in consumer.nodes:
+            consumer.start_node(node)
+
+        self.await_all_members(consumer)
+        self.await_consumed_messages(consumer)
+
+    def rolling_bounce_consumers(self, consumer, clean_shutdown=True):
+        for node in consumer.nodes:
+            consumer.stop_node(node, clean_shutdown)
+
+            wait_until(lambda: len(consumer.dead_nodes()) == 1,
+                       timeout_sec=self.session_timeout_sec+5,
+                       err_msg="Timed out waiting for the consumer to 
shutdown")
+
+            consumer.start_node(node)
+
+            self.await_all_members(consumer)
+            self.await_consumed_messages(consumer)
+
+    def set_group_instance_id(self, consumer):
+        consumer.static_membership = True
+        for ind, node in enumerate(consumer.nodes):
+            node.group_instance_id = "migration-test-member-%d" % ind
+
+    def set_consumer_version(self, consumer, version):
+        for node in consumer.nodes:
+            node.version = version
+
+    def assert_group_type(self, type, timeout_sec=10):
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(type=type), timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting to list expected %s group." % 
type)
+
+    @cluster(num_nodes=8)
+    @matrix(
+        enable_autocommit=[True, False],
+        static_membership=[False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade", 
"downgrade", "disabled"],

Review Comment:
   For the offline upgrade/downgrade, my understanding is that it should work 
all the time. Hence I wonder if we could just test it with `disabled`.



##########
tests/kafkatest/tests/client/consumer_protocol_migration_test.py:
##########
@@ -0,0 +1,407 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+    LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, 
LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+    LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, 
LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+    LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
+
+class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+    STICKY = "org.apache.kafka.clients.consumer.StickyAssignor"
+    COOPERATIVE_STICKEY = 
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
+    all_assignment_strategies = [RANGE, ROUND_ROBIN, COOPERATIVE_STICKEY, 
STICKY]
+
+    all_consumer_versions = [LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+                             LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, 
LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+                             LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, 
LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+                             LATEST_3_7, LATEST_3_8, DEV_BRANCH]

Review Comment:
   I am also fine with keeping only range and cooperative_sticky. In the end, 
the important part is to ensure that we test the eager and the cooperative 
protocols.



##########
tests/kafkatest/tests/client/consumer_protocol_migration_test.py:
##########
@@ -0,0 +1,407 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.utils.util import wait_until
+from ducktape.mark.resource import cluster
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+    LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, 
LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+    LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, 
LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+    LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
+
+class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 6
+
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+    STICKY = "org.apache.kafka.clients.consumer.StickyAssignor"
+    COOPERATIVE_STICKEY = 
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
+    all_assignment_strategies = [RANGE, ROUND_ROBIN, COOPERATIVE_STICKEY, 
STICKY]
+
+    all_consumer_versions = [LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, \
+                             LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, 
LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+                             LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, 
LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
+                             LATEST_3_7, LATEST_3_8, DEV_BRANCH]

Review Comment:
   I would add the following versions:
   * 3.5.0 (add rack to embedded protocol)
   * 3.4.0 (add generation id to embedded protocol)
   * 2.4.0 (add group instance id to embedded protocol)
   
   Otherwise, I agree that we can drop versions prior to 2.0. I would also add 
3.6, 3.7 and 3.8 in order to have a recent versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to