chenyulin0719 commented on code in PR #18513:
URL: https://github.com/apache/kafka/pull/18513#discussion_r1926583011
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2027,26 +2033,30 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
classOf[GroupNotEmptyException])
- // Test delete one correct static member
- val removeOptions = new
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new
MemberToRemove(testInstanceId1)))
- removeOptions.reason("test remove")
- removeMembersResult =
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
-
- assertNull(removeMembersResult.all().get())
- val validMemberFuture = removeMembersResult.memberResult(new
MemberToRemove(testInstanceId1))
- assertNull(validMemberFuture.get())
+ // Stop the consumer threads and close consumers
+ // dynamic member will be removed, leaving two static members in the
group
+ consumerThreadRunning.set(false)
+ assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to
stop consumer threads in time")
val describeTestGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava,
new
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(1, describeTestGroupResult.describedGroups().size())
testGroupDescription =
describeTestGroupResult.describedGroups().get(testGroupId).get()
-
assertEquals(testGroupId, testGroupDescription.groupId)
assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(consumerSet.size - 1,
testGroupDescription.members().size())
- // Delete all active members remaining (a static member + a dynamic
member)
+ // Test delete one correct static member
+ val removeOptions = new
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new
MemberToRemove(testInstanceId1)))
+ removeOptions.reason("test remove")
+ removeMembersResult =
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
+
+ assertNull(removeMembersResult.all().get())
+ val validMemberFuture = removeMembersResult.memberResult(new
MemberToRemove(testInstanceId1))
Review Comment:
Done.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2413,26 +2433,30 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
classOf[GroupNotEmptyException])
- // Test delete one correct static member
- val removeOptions = new
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new
MemberToRemove(testInstanceId1)))
- removeOptions.reason("test remove")
- removeMembersResult =
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
-
- assertNull(removeMembersResult.all().get())
- val validMemberFuture = removeMembersResult.memberResult(new
MemberToRemove(testInstanceId1))
- assertNull(validMemberFuture.get())
+ // Stop the consumer threads and close consumers
+ // dynamic member will be removed, leaving two static members in the
group
+ consumerThreadRunning.set(false)
+ assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to
stop consumer threads in time")
val describeTestGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava,
new
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(1, describeTestGroupResult.describedGroups().size())
testGroupDescription =
describeTestGroupResult.describedGroups().get(testGroupId).get()
-
assertEquals(testGroupId, testGroupDescription.groupId)
assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(consumerSet.size - 1,
testGroupDescription.members().size())
- // Delete all active members remaining (a static member + a dynamic
member)
+ // Test delete one correct static member
+ val removeOptions = new
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new
MemberToRemove(testInstanceId1)))
+ removeOptions.reason("test remove")
+ removeMembersResult =
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
+
+ assertNull(removeMembersResult.all().get())
+ val validMemberFuture = removeMembersResult.memberResult(new
MemberToRemove(testInstanceId1))
+ assertNull(validMemberFuture.get())
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]