dajac commented on code in PR #14911:
URL: https://github.com/apache/kafka/pull/14911#discussion_r1415823136
##########
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##########
@@ -251,6 +251,112 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
}
+ @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "5000"),
+ new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms",
value = "5000")
+ ))
+ def
testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled():
Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+ val instanceId = "instanceId"
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers =
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+ controllers = raftCluster.controllerServers().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
+
+ // Create the topic.
+ val topicId = TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
+
+ // This is the expected assignment.
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+ // Heartbeats until the partitions are assigned.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+
+ // A new static member tries to join the group with an inuse instanceid
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // The new static member join group will keep failing with an
UnreleasedInstanceIdException
+ // exception until eventually it gets through because the existing member
will be kicked out
+ // because of not sending a heartbeat till session timeout expiry.
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
Review Comment:
nit: `Could not re-join the group successfully. Last....`?
##########
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##########
@@ -251,6 +251,112 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
}
+ @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "5000"),
+ new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms",
value = "5000")
+ ))
+ def
testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled():
Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+ val instanceId = "instanceId"
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers =
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+ controllers = raftCluster.controllerServers().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
+
+ // Create the topic.
+ val topicId = TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
+
+ // This is the expected assignment.
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+ // Heartbeats until the partitions are assigned.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+
+ // A new static member tries to join the group with an inuse instanceid
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // The new static member join group will keep failing with an
UnreleasedInstanceIdException
+ // exception until eventually it gets through because the existing member
will be kicked out
+ // because of not sending a heartbeat till session timeout expiry.
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+
+ print(consumerGroupHeartbeatRequest)
+ // Verify the response. The group epoch bumps upto 4 which eventually
reflects in the new member epoch
+ assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+
Review Comment:
nit: Let's remove this empty line.
##########
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##########
@@ -251,6 +251,112 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
}
+ @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "5000"),
+ new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms",
value = "5000")
+ ))
+ def
testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled():
Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+ val instanceId = "instanceId"
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers =
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+ controllers = raftCluster.controllerServers().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
+
+ // Create the topic.
+ val topicId = TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
+
+ // This is the expected assignment.
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+ // Heartbeats until the partitions are assigned.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+
+ // A new static member tries to join the group with an inuse instanceid
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // The new static member join group will keep failing with an
UnreleasedInstanceIdException
+ // exception until eventually it gets through because the existing member
will be kicked out
+ // because of not sending a heartbeat till session timeout expiry.
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+
+ print(consumerGroupHeartbeatRequest)
Review Comment:
nit: Let's remove this.
##########
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##########
@@ -251,6 +251,112 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
}
+ @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "5000"),
+ new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms",
value = "5000")
+ ))
+ def
testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled():
Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+ val instanceId = "instanceId"
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers =
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+ controllers = raftCluster.controllerServers().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
+
+ // Create the topic.
+ val topicId = TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
+
+ // This is the expected assignment.
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+ // Heartbeats until the partitions are assigned.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+
+ // A new static member tries to join the group with an inuse instanceid
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // The new static member join group will keep failing with an
UnreleasedInstanceIdException
+ // exception until eventually it gets through because the existing member
will be kicked out
+ // because of not sending a heartbeat till session timeout expiry.
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+
+ print(consumerGroupHeartbeatRequest)
+ // Verify the response. The group epoch bumps upto 4 which eventually
reflects in the new member epoch
Review Comment:
nit: `.`
##########
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##########
@@ -251,6 +251,112 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
}
+ @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "5000"),
+ new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms",
value = "5000")
+ ))
+ def
testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled():
Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+ val instanceId = "instanceId"
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers =
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+ controllers = raftCluster.controllerServers().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
+
+ // Create the topic.
+ val topicId = TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
+
+ // This is the expected assignment.
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+ // Heartbeats until the partitions are assigned.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+
+ // A new static member tries to join the group with an inuse instanceid
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // The new static member join group will keep failing with an
UnreleasedInstanceIdException
Review Comment:
I am wondering whether we should actually check that
`UnreleasedInstanceIdException` was received at least once. Have you considered
this?
##########
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##########
@@ -251,6 +251,112 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
}
+ @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value
= "5000"),
+ new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms",
value = "5000")
+ ))
+ def
testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled():
Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+ val instanceId = "instanceId"
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers =
raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
+ controllers = raftCluster.controllerServers().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
+
+ // Create the topic.
+ val topicId = TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setInstanceId(instanceId)
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
+
+ // This is the expected assignment.
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+ // Heartbeats until the partitions are assigned.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+
+ // A new static member tries to join the group with an inuse instanceid
Review Comment:
nit: `.`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]