dajac commented on code in PR #18513:
URL: https://github.com/apache/kafka/pull/18513#discussion_r1923692708


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2027,26 +2033,30 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
             classOf[GroupNotEmptyException])
 
-          // Test delete one correct static member
-          val removeOptions = new 
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new 
MemberToRemove(testInstanceId1)))
-          removeOptions.reason("test remove")
-          removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
-
-          assertNull(removeMembersResult.all().get())
-          val validMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(testInstanceId1))
-          assertNull(validMemberFuture.get())
+          // Stop the consumer threads and close consumers
+          // dynamic member will be removed, leaving two static members in the 
group
+          consumerThreadRunning.set(false)
+          assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
stop consumer threads in time")
 
           val describeTestGroupResult = 
client.describeConsumerGroups(Seq(testGroupId).asJava,
             new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
           assertEquals(1, describeTestGroupResult.describedGroups().size())
 
           testGroupDescription = 
describeTestGroupResult.describedGroups().get(testGroupId).get()
-
           assertEquals(testGroupId, testGroupDescription.groupId)
           assertFalse(testGroupDescription.isSimpleConsumerGroup)
           assertEquals(consumerSet.size - 1, 
testGroupDescription.members().size())
 
-          // Delete all active members remaining (a static member + a dynamic 
member)
+          // Test delete one correct static member
+          val removeOptions = new 
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new 
MemberToRemove(testInstanceId1)))
+          removeOptions.reason("test remove")
+          removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
+
+          assertNull(removeMembersResult.all().get())
+          val validMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(testInstanceId1))

Review Comment:
   nit: We could inline this inline too.



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1864,17 +1864,20 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       val consumerSet = groupInstanceSet.map { groupInstanceId => 
createConsumer(configOverrides = createProperties(groupInstanceId))}
       val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
 
-      val latch = new CountDownLatch(consumerSet.size)
+      val startLatch = new CountDownLatch(consumerSet.size)

Review Comment:
   Given the code duplication between the two tests, I wonder whether we should 
encapsulate the logic to manage a group of consumers in a small utility class. 
The class could create the consumers, start them, stop them, etc. and expose 
the relevant methods. What do you think?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2413,26 +2433,30 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
             classOf[GroupNotEmptyException])
 
-          // Test delete one correct static member
-          val removeOptions = new 
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new 
MemberToRemove(testInstanceId1)))
-          removeOptions.reason("test remove")
-          removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
-
-          assertNull(removeMembersResult.all().get())
-          val validMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(testInstanceId1))
-          assertNull(validMemberFuture.get())
+          // Stop the consumer threads and close consumers
+          // dynamic member will be removed, leaving two static members in the 
group
+          consumerThreadRunning.set(false)
+          assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
stop consumer threads in time")
 
           val describeTestGroupResult = 
client.describeConsumerGroups(Seq(testGroupId).asJava,
             new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
           assertEquals(1, describeTestGroupResult.describedGroups().size())
 
           testGroupDescription = 
describeTestGroupResult.describedGroups().get(testGroupId).get()
-
           assertEquals(testGroupId, testGroupDescription.groupId)
           assertFalse(testGroupDescription.isSimpleConsumerGroup)
           assertEquals(consumerSet.size - 1, 
testGroupDescription.members().size())
 
-          // Delete all active members remaining (a static member + a dynamic 
member)
+          // Test delete one correct static member
+          val removeOptions = new 
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new 
MemberToRemove(testInstanceId1)))
+          removeOptions.reason("test remove")
+          removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
+
+          assertNull(removeMembersResult.all().get())
+          val validMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(testInstanceId1))
+          assertNull(validMemberFuture.get())

Review Comment:
   ditto about inlining.



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2027,26 +2033,30 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
             classOf[GroupNotEmptyException])
 
-          // Test delete one correct static member
-          val removeOptions = new 
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new 
MemberToRemove(testInstanceId1)))
-          removeOptions.reason("test remove")
-          removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
-
-          assertNull(removeMembersResult.all().get())
-          val validMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(testInstanceId1))
-          assertNull(validMemberFuture.get())
+          // Stop the consumer threads and close consumers
+          // dynamic member will be removed, leaving two static members in the 
group
+          consumerThreadRunning.set(false)
+          assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to 
stop consumer threads in time")
 
           val describeTestGroupResult = 
client.describeConsumerGroups(Seq(testGroupId).asJava,
             new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
           assertEquals(1, describeTestGroupResult.describedGroups().size())
 
           testGroupDescription = 
describeTestGroupResult.describedGroups().get(testGroupId).get()
-
           assertEquals(testGroupId, testGroupDescription.groupId)
           assertFalse(testGroupDescription.isSimpleConsumerGroup)
           assertEquals(consumerSet.size - 1, 
testGroupDescription.members().size())
 
-          // Delete all active members remaining (a static member + a dynamic 
member)
+          // Test delete one correct static member
+          val removeOptions = new 
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new 
MemberToRemove(testInstanceId1)))
+          removeOptions.reason("test remove")

Review Comment:
   nit: Could we inline those and avoid `removeOptions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to