dajac commented on code in PR #18034:
URL: https://github.com/apache/kafka/pull/18034#discussion_r1872827341
##########
clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java:
##########
@@ -45,8 +49,37 @@ public MemberDescription(String memberId,
this.assignment = assignment == null ?
new MemberAssignment(Collections.emptySet()) : assignment;
this.targetAssignment = targetAssignment;
+ this.memberEpoch = memberEpoch;
+ this.upgraded = upgraded;
}
+ /**
+ * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional,
String, String, MemberAssignment, Optional, Optional, Optional)}.
+ */
+ @Deprecated
+ public MemberDescription(String memberId,
+ Optional<String> groupInstanceId,
+ String clientId,
+ String host,
+ MemberAssignment assignment,
+ Optional<MemberAssignment> targetAssignment
+ ) {
Review Comment:
nit: I think that we are mixing two code styles here. We would usually use:
```
public MemberDescription(
String memberId,
Optional<String> groupInstanceId,
String clientId,
String host,
MemberAssignment assignment,
Optional<MemberAssignment> targetAssignment
) {
```
or
```
public MemberDescription(String memberId,
Optional<String> groupInstanceId,
String clientId,
String host,
MemberAssignment assignment,
Optional<MemberAssignment> targetAssignment) {
```
I personally prefer the first one but it is up to you.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1921,12 +1921,17 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Test that we can get information about the test consumer group.
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
var testGroupDescription =
describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
+ assertEquals(groupType == GroupType.CLASSIC,
testGroupDescription.groupEpoch.isEmpty)
+ assertEquals(groupType == GroupType.CLASSIC,
testGroupDescription.targetAssignmentEpoch.isEmpty)
assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(groupInstanceSet.size,
testGroupDescription.members().size())
val members = testGroupDescription.members()
- members.asScala.foreach(member => assertEquals(testClientId,
member.clientId()))
+ members.asScala.foreach(member => {
+ assertEquals(testClientId, member.clientId)
+ assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty
else Optional.of(true), member.upgraded)
+ })
Review Comment:
nit: `foreach(member => {` -> `foreach { member => `
##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java:
##########
@@ -164,17 +165,21 @@ public void testSuccessfulHandleConsumerGroupResponse() {
Optional.of(new MemberAssignment(Set.of(
new TopicPartition("foo", 1),
new TopicPartition("bar", 2)
- )))
+ ))),
+ Optional.of(10),
+ Optional.of(true)
Review Comment:
Should we cover the unknown and the classic cases in the tests too?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2062,6 +2067,86 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ /**
+ * Test the consumer group APIs.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupWithMemberMigration(quorum: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups
+ assertEquals(0, list1.all.get.size)
+ assertEquals(0, list1.errors.get.size)
+ assertEquals(0, list1.valid.get.size)
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+
+ client.createTopics(util.Arrays.asList(
+ new NewTopic(testTopicName, testNumPartitions, 1.toShort),
+ )).all.get
+ waitForTopics(client, List(testTopicName), List())
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null))
+ producer.send(new ProducerRecord(testTopicName, 1, null, null))
+ producer.flush()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+
+ val testGroupId = "test_group_id"
+ val testClassicClientId = "test_classic_client_id"
+ val testConsumerClientId = "test_consumer_client_id"
+
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testClassicClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name)
+
+ classicConsumer = createConsumer(configOverrides = newConsumerConfig)
+ classicConsumer.subscribe(List(testTopicName).asJava)
+ classicConsumer.poll(JDuration.ofMillis(1000))
+
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testConsumerClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name)
+ consumerConsumer = createConsumer(configOverrides = newConsumerConfig)
+ consumerConsumer.subscribe(List(testTopicName).asJava)
+ consumerConsumer.poll(JDuration.ofMillis(1000))
+
+ TestUtils.waitUntilTrue(() => {
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ describeConsumerGroupResult.containsKey(testGroupId) &&
+ describeConsumerGroupResult.get(testGroupId).members.size == 2
+ }, s"Expected the offset for partition 0 to eventually become 1.")
+
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ val group = describeConsumerGroupResult.get(testGroupId)
+ assertNotNull(group)
+ assertTrue(group.groupEpoch.isPresent && group.groupEpoch.get > 0)
+ assertTrue(group.targetAssignmentEpoch.isPresent &&
group.targetAssignmentEpoch.get > 0)
Review Comment:
ditto.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2062,6 +2067,86 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ /**
+ * Test the consumer group APIs.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupWithMemberMigration(quorum: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups
+ assertEquals(0, list1.all.get.size)
+ assertEquals(0, list1.errors.get.size)
+ assertEquals(0, list1.valid.get.size)
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+
+ client.createTopics(util.Arrays.asList(
+ new NewTopic(testTopicName, testNumPartitions, 1.toShort),
+ )).all.get
+ waitForTopics(client, List(testTopicName), List())
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null))
+ producer.send(new ProducerRecord(testTopicName, 1, null, null))
+ producer.flush()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+
+ val testGroupId = "test_group_id"
+ val testClassicClientId = "test_classic_client_id"
+ val testConsumerClientId = "test_consumer_client_id"
+
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testClassicClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name)
+
+ classicConsumer = createConsumer(configOverrides = newConsumerConfig)
+ classicConsumer.subscribe(List(testTopicName).asJava)
+ classicConsumer.poll(JDuration.ofMillis(1000))
+
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testConsumerClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name)
+ consumerConsumer = createConsumer(configOverrides = newConsumerConfig)
+ consumerConsumer.subscribe(List(testTopicName).asJava)
+ consumerConsumer.poll(JDuration.ofMillis(1000))
+
+ TestUtils.waitUntilTrue(() => {
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ describeConsumerGroupResult.containsKey(testGroupId) &&
+ describeConsumerGroupResult.get(testGroupId).members.size == 2
Review Comment:
Here you could also wait until the group becomes STABLE. It will ease the
assertions afterwards.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1921,12 +1921,17 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Test that we can get information about the test consumer group.
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
var testGroupDescription =
describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
+ assertEquals(groupType == GroupType.CLASSIC,
testGroupDescription.groupEpoch.isEmpty)
+ assertEquals(groupType == GroupType.CLASSIC,
testGroupDescription.targetAssignmentEpoch.isEmpty)
Review Comment:
Would it be possible to actually verify the values too?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2062,6 +2067,86 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ /**
+ * Test the consumer group APIs.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupWithMemberMigration(quorum: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups
+ assertEquals(0, list1.all.get.size)
+ assertEquals(0, list1.errors.get.size)
+ assertEquals(0, list1.valid.get.size)
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+
+ client.createTopics(util.Arrays.asList(
+ new NewTopic(testTopicName, testNumPartitions, 1.toShort),
+ )).all.get
+ waitForTopics(client, List(testTopicName), List())
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null))
+ producer.send(new ProducerRecord(testTopicName, 1, null, null))
+ producer.flush()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+
+ val testGroupId = "test_group_id"
+ val testClassicClientId = "test_classic_client_id"
+ val testConsumerClientId = "test_consumer_client_id"
+
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testClassicClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name)
+
+ classicConsumer = createConsumer(configOverrides = newConsumerConfig)
+ classicConsumer.subscribe(List(testTopicName).asJava)
+ classicConsumer.poll(JDuration.ofMillis(1000))
+
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testConsumerClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name)
+ consumerConsumer = createConsumer(configOverrides = newConsumerConfig)
+ consumerConsumer.subscribe(List(testTopicName).asJava)
+ consumerConsumer.poll(JDuration.ofMillis(1000))
+
+ TestUtils.waitUntilTrue(() => {
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ describeConsumerGroupResult.containsKey(testGroupId) &&
+ describeConsumerGroupResult.get(testGroupId).members.size == 2
+ }, s"Expected the offset for partition 0 to eventually become 1.")
Review Comment:
This comment seems to be off.
##########
clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java:
##########
@@ -131,13 +170,33 @@ public Optional<MemberAssignment> targetAssignment() {
return targetAssignment;
}
+ /**
+ * The epoch of the group member.
+ */
+ public Optional<Integer> memberEpoch() {
+ return memberEpoch;
+ }
+
+ /**
+ * The flag indicating whether a member with a consumer group uses the
consumer protocol.
+ * True for consumer member.
+ * False for classic member
+ * Empty for unknown or members in classic group.
Review Comment:
nit: I suggest to reference the group type in the java doc in order to be
crystal clear. e.g.
```
The flag indicating whether a member within a {CONSUMER} group uses the
{CONSUMER} protocol.
The optional is set to true if it does, to false if it does not, and to
empty if it is unknown or if the group
is a {CLASSIC} group.
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java:
##########
@@ -215,6 +208,20 @@ public Set<AclOperation> authorizedOperations() {
return authorizedOperations;
}
+ /**
+ * The epoch of the consumer group.
+ */
+ public Optional<Integer> groupEpoch() {
+ return groupEpoch;
+ }
+
+ /**
+ * The epoch of the target assignment.
+ */
+ public Optional<Integer> targetAssignmentEpoch() {
+ return targetAssignmentEpoch;
+ }
Review Comment:
Let's also mention here that they are only provided when the type is
consumer.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2062,6 +2067,86 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ /**
+ * Test the consumer group APIs.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupWithMemberMigration(quorum: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups
+ assertEquals(0, list1.all.get.size)
+ assertEquals(0, list1.errors.get.size)
+ assertEquals(0, list1.valid.get.size)
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+
+ client.createTopics(util.Arrays.asList(
+ new NewTopic(testTopicName, testNumPartitions, 1.toShort),
+ )).all.get
+ waitForTopics(client, List(testTopicName), List())
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null))
+ producer.send(new ProducerRecord(testTopicName, 1, null, null))
+ producer.flush()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+
+ val testGroupId = "test_group_id"
+ val testClassicClientId = "test_classic_client_id"
+ val testConsumerClientId = "test_consumer_client_id"
+
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testClassicClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name)
+
+ classicConsumer = createConsumer(configOverrides = newConsumerConfig)
+ classicConsumer.subscribe(List(testTopicName).asJava)
+ classicConsumer.poll(JDuration.ofMillis(1000))
+
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testConsumerClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name)
+ consumerConsumer = createConsumer(configOverrides = newConsumerConfig)
+ consumerConsumer.subscribe(List(testTopicName).asJava)
+ consumerConsumer.poll(JDuration.ofMillis(1000))
+
+ TestUtils.waitUntilTrue(() => {
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ describeConsumerGroupResult.containsKey(testGroupId) &&
+ describeConsumerGroupResult.get(testGroupId).members.size == 2
+ }, s"Expected the offset for partition 0 to eventually become 1.")
+
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ val group = describeConsumerGroupResult.get(testGroupId)
+ assertNotNull(group)
+ assertTrue(group.groupEpoch.isPresent && group.groupEpoch.get > 0)
+ assertTrue(group.targetAssignmentEpoch.isPresent &&
group.targetAssignmentEpoch.get > 0)
+
+ val classicMember = group.members.asScala.find(_.clientId ==
testClassicClientId)
+ assertTrue(classicMember.isDefined)
+ assertTrue(classicMember.get.memberEpoch.isPresent &&
classicMember.get.memberEpoch.get > 0)
+ assertTrue(classicMember.get.upgraded.isPresent &&
!classicMember.get.upgraded.get)
Review Comment:
`assertEquals(Optional.of(false), classicMember.get.upgraded)`.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2062,6 +2067,86 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ /**
+ * Test the consumer group APIs.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupWithMemberMigration(quorum: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups
+ assertEquals(0, list1.all.get.size)
+ assertEquals(0, list1.errors.get.size)
+ assertEquals(0, list1.valid.get.size)
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+
+ client.createTopics(util.Arrays.asList(
+ new NewTopic(testTopicName, testNumPartitions, 1.toShort),
+ )).all.get
+ waitForTopics(client, List(testTopicName), List())
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null))
+ producer.send(new ProducerRecord(testTopicName, 1, null, null))
+ producer.flush()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+
+ val testGroupId = "test_group_id"
+ val testClassicClientId = "test_classic_client_id"
+ val testConsumerClientId = "test_consumer_client_id"
+
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testClassicClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name)
+
+ classicConsumer = createConsumer(configOverrides = newConsumerConfig)
+ classicConsumer.subscribe(List(testTopicName).asJava)
+ classicConsumer.poll(JDuration.ofMillis(1000))
+
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testConsumerClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name)
+ consumerConsumer = createConsumer(configOverrides = newConsumerConfig)
+ consumerConsumer.subscribe(List(testTopicName).asJava)
+ consumerConsumer.poll(JDuration.ofMillis(1000))
+
+ TestUtils.waitUntilTrue(() => {
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ describeConsumerGroupResult.containsKey(testGroupId) &&
+ describeConsumerGroupResult.get(testGroupId).members.size == 2
+ }, s"Expected the offset for partition 0 to eventually become 1.")
+
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ val group = describeConsumerGroupResult.get(testGroupId)
+ assertNotNull(group)
+ assertTrue(group.groupEpoch.isPresent && group.groupEpoch.get > 0)
+ assertTrue(group.targetAssignmentEpoch.isPresent &&
group.targetAssignmentEpoch.get > 0)
+
+ val classicMember = group.members.asScala.find(_.clientId ==
testClassicClientId)
+ assertTrue(classicMember.isDefined)
+ assertTrue(classicMember.get.memberEpoch.isPresent &&
classicMember.get.memberEpoch.get > 0)
+ assertTrue(classicMember.get.upgraded.isPresent &&
!classicMember.get.upgraded.get)
+
+ val consumerMember = group.members.asScala.find(_.clientId ==
testConsumerClientId)
+ assertTrue(consumerMember.isDefined)
+ assertTrue(classicMember.get.memberEpoch.isPresent &&
classicMember.get.memberEpoch.get > 0)
+ assertTrue(consumerMember.get.upgraded.isPresent &&
consumerMember.get.upgraded.get)
Review Comment:
ditto.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2062,6 +2067,86 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ /**
+ * Test the consumer group APIs.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupWithMemberMigration(quorum: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups
+ assertEquals(0, list1.all.get.size)
+ assertEquals(0, list1.errors.get.size)
+ assertEquals(0, list1.valid.get.size)
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+
+ client.createTopics(util.Arrays.asList(
+ new NewTopic(testTopicName, testNumPartitions, 1.toShort),
+ )).all.get
+ waitForTopics(client, List(testTopicName), List())
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null))
+ producer.send(new ProducerRecord(testTopicName, 1, null, null))
+ producer.flush()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+
+ val testGroupId = "test_group_id"
+ val testClassicClientId = "test_classic_client_id"
+ val testConsumerClientId = "test_consumer_client_id"
+
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testClassicClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name)
+
+ classicConsumer = createConsumer(configOverrides = newConsumerConfig)
+ classicConsumer.subscribe(List(testTopicName).asJava)
+ classicConsumer.poll(JDuration.ofMillis(1000))
+
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testConsumerClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name)
+ consumerConsumer = createConsumer(configOverrides = newConsumerConfig)
+ consumerConsumer.subscribe(List(testTopicName).asJava)
+ consumerConsumer.poll(JDuration.ofMillis(1000))
+
+ TestUtils.waitUntilTrue(() => {
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ describeConsumerGroupResult.containsKey(testGroupId) &&
+ describeConsumerGroupResult.get(testGroupId).members.size == 2
+ }, s"Expected the offset for partition 0 to eventually become 1.")
+
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ val group = describeConsumerGroupResult.get(testGroupId)
+ assertNotNull(group)
+ assertTrue(group.groupEpoch.isPresent && group.groupEpoch.get > 0)
+ assertTrue(group.targetAssignmentEpoch.isPresent &&
group.targetAssignmentEpoch.get > 0)
+
+ val classicMember = group.members.asScala.find(_.clientId ==
testClassicClientId)
+ assertTrue(classicMember.isDefined)
+ assertTrue(classicMember.get.memberEpoch.isPresent &&
classicMember.get.memberEpoch.get > 0)
Review Comment:
`assertEquals(Optional.of(2), classicMember.get.memberEpoch)`.
##########
clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java:
##########
@@ -131,13 +170,33 @@ public Optional<MemberAssignment> targetAssignment() {
return targetAssignment;
}
+ /**
+ * The epoch of the group member.
Review Comment:
nit: Let's also mention that this is only provided in the CONSUMER group
case.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2062,6 +2067,86 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ /**
+ * Test the consumer group APIs.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupWithMemberMigration(quorum: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups
+ assertEquals(0, list1.all.get.size)
+ assertEquals(0, list1.errors.get.size)
+ assertEquals(0, list1.valid.get.size)
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+
+ client.createTopics(util.Arrays.asList(
+ new NewTopic(testTopicName, testNumPartitions, 1.toShort),
+ )).all.get
+ waitForTopics(client, List(testTopicName), List())
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null))
+ producer.send(new ProducerRecord(testTopicName, 1, null, null))
+ producer.flush()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+
+ val testGroupId = "test_group_id"
+ val testClassicClientId = "test_classic_client_id"
+ val testConsumerClientId = "test_consumer_client_id"
+
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testClassicClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name)
+
+ classicConsumer = createConsumer(configOverrides = newConsumerConfig)
+ classicConsumer.subscribe(List(testTopicName).asJava)
+ classicConsumer.poll(JDuration.ofMillis(1000))
+
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testConsumerClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name)
+ consumerConsumer = createConsumer(configOverrides = newConsumerConfig)
+ consumerConsumer.subscribe(List(testTopicName).asJava)
+ consumerConsumer.poll(JDuration.ofMillis(1000))
+
+ TestUtils.waitUntilTrue(() => {
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ describeConsumerGroupResult.containsKey(testGroupId) &&
+ describeConsumerGroupResult.get(testGroupId).members.size == 2
+ }, s"Expected the offset for partition 0 to eventually become 1.")
+
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ val group = describeConsumerGroupResult.get(testGroupId)
+ assertNotNull(group)
+ assertTrue(group.groupEpoch.isPresent && group.groupEpoch.get > 0)
Review Comment:
It should be 2 if you implement my previous suggestions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]