abbccdda commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r428745517



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3612,6 +3611,26 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
                 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
     }
 
+    private List<MemberIdentity> getMembersFromGroup(String groupId) {
+        Collection<MemberDescription> members;
+        try {
+            members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+        } catch (Throwable ex) {

Review comment:
       I think we should catch `Exception` here:
   
https://stackoverflow.com/questions/2274102/difference-between-using-throwable-and-exception-in-a-try-catch
   

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
         KafkaFutureImpl<Map<MemberIdentity, Errors>> future = new 
KafkaFutureImpl<>();
 
         ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, 
RemoveMembersFromConsumerGroupOptions> context =
-            new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
+                new ConsumerGroupOperationContext<>(groupId, options, 
deadline, future);

Review comment:
       Let's get back the original indentation.

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -186,9 +190,15 @@ private void validateNoActiveConsumers(final String 
groupId,
         final List<MemberDescription> members =
             new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
         if (!members.isEmpty()) {
-            throw new IllegalStateException("Consumer group '" + groupId + "' 
is still active "
-                    + "and has following members: " + members + ". "
-                    + "Make sure to stop all running application instances 
before running the reset tool.");
+            if (options.has(forceOption)) {
+                System.out.println("Force deleting all active members in the 
group: " + groupId);
+                adminClient.removeMembersFromConsumerGroup(groupId, new 
RemoveMembersFromConsumerGroupOptions()).all();

Review comment:
       Should we check the member removal result here before proceeding? If 
that call failed, the whole operation should fail with error message containing 
the result IMHO.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -507,7 +544,7 @@ private Topology 
setupTopologyWithoutIntermediateUserTopic() {
         return builder.build();
     }
 
-    private void cleanGlobal(final boolean withIntermediateTopics,
+    private int tryCleanGlobal(final boolean withIntermediateTopics,

Review comment:
       We could add meta comment for the return value here, and instead of 
returning an exit code, I feel a boolean is suffice to indicate whether the 
clean operation was successful or not.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -2411,6 +2411,50 @@ public void testRemoveMembersFromGroup() throws 
Exception {
             assertNull(noErrorResult.all().get());
             assertNull(noErrorResult.memberResult(memberOne).get());
             assertNull(noErrorResult.memberResult(memberTwo).get());
+
+            // Return with success for "removeAll" scenario

Review comment:
       This test looks good, but it seems that we didn't test the case where 
some members get deleted successfully while some are not?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
         KafkaFutureImpl<Map<MemberIdentity, Errors>> future = new 
KafkaFutureImpl<>();
 
         ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, 
RemoveMembersFromConsumerGroupOptions> context =
-            new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
+                new ConsumerGroupOperationContext<>(groupId, options, 
deadline, future);
 
+        List<MemberIdentity> members;
+        if (options.removeAll()) {
+            members = getMembersFromGroup(groupId);
+        } else {
+            members = options.members().stream().map(
+                    
MemberToRemove::toMemberIdentity).collect(Collectors.toList());
+        }
         Call findCoordinatorCall = getFindCoordinatorCall(context,
-            () -> getRemoveMembersFromGroupCall(context));
+            () -> getRemoveMembersFromGroupCall(context, members));
         runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
         return new RemoveMembersFromConsumerGroupResult(future, 
options.members());
     }
 
-    private Call 
getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, 
Errors>, RemoveMembersFromConsumerGroupOptions> context) {
+    private Call 
getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, 
Errors>,
+            RemoveMembersFromConsumerGroupOptions> context, 
List<MemberIdentity> allMembers) {
         return new Call("leaveGroup",
                         context.deadline(),
                         new ConstantNodeIdProvider(context.node().get().id())) 
{
             @Override
             LeaveGroupRequest.Builder createRequest(int timeoutMs) {
-                return new LeaveGroupRequest.Builder(context.groupId(),
-                                                     
context.options().members().stream().map(
-                                                         
MemberToRemove::toMemberIdentity).collect(Collectors.toList()));
+                    return new LeaveGroupRequest.Builder(context.groupId(),

Review comment:
       nit: we could merge L3666-3667

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3612,6 +3611,26 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
                 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
     }
 
+    private List<MemberIdentity> getMembersFromGroup(String groupId) {
+        Collection<MemberDescription> members;
+        try {
+            members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+        } catch (Throwable ex) {
+            throw new KafkaException("Encounter exception when trying to get 
members from group: " + groupId, ex);
+        }
+
+        List<MemberIdentity> memberToRemove = new ArrayList<>();
+        for (final MemberDescription member : members) {
+            if (member.groupInstanceId().isPresent()) {
+                memberToRemove.add(new 
MemberIdentity().setGroupInstanceId(member.groupInstanceId().get())
+                );

Review comment:
       This indentation is a bit weird, let's just merge L3625-3626

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##########
@@ -37,7 +37,15 @@ public 
RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members)
         this.members = new HashSet<>(members);
     }
 
+    public RemoveMembersFromConsumerGroupOptions() {
+        this.members = new HashSet<>();

Review comment:
       Collections.emptySet() makes more sense since it is immutable.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3660,7 +3686,7 @@ void handleResponse(AbstractResponse abstractResponse) {
                     // We set member.id to empty here explicitly, so that the 
lookup will succeed as user doesn't
                     // know the exact member.id.
                     memberErrors.put(new MemberIdentity()
-                                         
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+                                         
.setMemberId(memberResponse.memberId())

Review comment:
       I could see this doesn't hold true for a plain static member removal. 
Let's discuss why skipping the individual member check in 
`RemoveMembersFromConsumerGroupResult` makes sense over there.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
         KafkaFutureImpl<Map<MemberIdentity, Errors>> future = new 
KafkaFutureImpl<>();
 
         ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, 
RemoveMembersFromConsumerGroupOptions> context =
-            new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
+                new ConsumerGroupOperationContext<>(groupId, options, 
deadline, future);
 
+        List<MemberIdentity> members;
+        if (options.removeAll()) {
+            members = getMembersFromGroup(groupId);
+        } else {
+            members = options.members().stream().map(
+                    
MemberToRemove::toMemberIdentity).collect(Collectors.toList());
+        }
         Call findCoordinatorCall = getFindCoordinatorCall(context,
-            () -> getRemoveMembersFromGroupCall(context));
+            () -> getRemoveMembersFromGroupCall(context, members));
         runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
         return new RemoveMembersFromConsumerGroupResult(future, 
options.members());
     }
 
-    private Call 
getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, 
Errors>, RemoveMembersFromConsumerGroupOptions> context) {
+    private Call 
getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, 
Errors>,
+            RemoveMembersFromConsumerGroupOptions> context, 
List<MemberIdentity> allMembers) {

Review comment:
       nit: we could name it `members` now

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##########
@@ -46,26 +46,42 @@
      * If not, the first member error shall be returned.
      */
     public KafkaFuture<Void> all() {
-        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
-        this.future.whenComplete((memberErrors, throwable) -> {
-            if (throwable != null) {
-                result.completeExceptionally(throwable);
-            } else {
-                for (MemberToRemove memberToRemove : memberInfos) {
-                    if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-                        return;
+        if (removeAll()) {

Review comment:
       In `removeAll()` mode, why could we skip the individual member removal 
results? I guess although we don't need to verify against the original member 
list (because they don't exist for `removeAll`), going throw the sub error list 
is still valuable to make sure there is no unexpected failure.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##########
@@ -46,26 +46,42 @@
      * If not, the first member error shall be returned.
      */
     public KafkaFuture<Void> all() {
-        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
-        this.future.whenComplete((memberErrors, throwable) -> {
-            if (throwable != null) {
-                result.completeExceptionally(throwable);
-            } else {
-                for (MemberToRemove memberToRemove : memberInfos) {
-                    if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-                        return;
+        if (removeAll()) {
+            final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+            this.future.whenComplete((memberErrors, throwable) -> {
+                if (throwable != null) {
+                    result.completeExceptionally(throwable);
+                } else {
+                    System.out.println("Remove all active members succeeded, 
removed " + memberErrors.size() + " members: " + memberErrors.keySet());

Review comment:
       Remove print statement.

##########
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##########
@@ -1147,6 +1174,16 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
           assertEquals(testGroupId, testGroupDescription.groupId)
           assertFalse(testGroupDescription.isSimpleConsumerGroup)
+          assertEquals(consumerSet.size -1, 
testGroupDescription.members().size())

Review comment:
       size - 1

##########
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##########
@@ -1017,47 +1017,70 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       assertTrue(0 == list1.errors().get().size())
       assertTrue(0 == list1.valid().get().size())
       val testTopicName = "test_topic"
+      val testTopicName1 = testTopicName + "1"
+      val testTopicName2 = testTopicName + "2"
       val testNumPartitions = 2
-      client.createTopics(Collections.singleton(
-        new NewTopic(testTopicName, testNumPartitions, 1.toShort))).all().get()
-      waitForTopics(client, List(testTopicName), List())
+
+      client.createTopics(util.Arrays.asList(new NewTopic(testTopicName, 
testNumPartitions, 1.toShort),
+        new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
+        new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName, testTopicName1, 
testTopicName2), List())
 
       val producer = createProducer()
       try {
         producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
       } finally {
         Utils.closeQuietly(producer, "producer")
       }
+
+      val EMPTY_GROUP_INSTANCE_ID = ""
       val testGroupId = "test_group_id"
       val testClientId = "test_client_id"
       val testInstanceId = "test_instance_id"
+      val testInstanceId1 = testInstanceId + "1"

Review comment:
       I prefer `testInstanceIdOne = "test_instance_id_1"` and 
`testInstanceIdTwo = "test_instance_id_2"`

##########
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##########
@@ -1017,47 +1017,71 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       assertTrue(0 == list1.errors().get().size())
       assertTrue(0 == list1.valid().get().size())
       val testTopicName = "test_topic"
+      val testTopicName1 = testTopicName + "1"
+      val testTopicName2 = testTopicName + "2"
       val testNumPartitions = 2
-      client.createTopics(Collections.singleton(
-        new NewTopic(testTopicName, testNumPartitions, 1.toShort))).all().get()
-      waitForTopics(client, List(testTopicName), List())
+
+      client.createTopics(util.Arrays.asList(new NewTopic(testTopicName, 
testNumPartitions, 1.toShort),
+        new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
+        new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName, testTopicName1, 
testTopicName2), List())
 
       val producer = createProducer()
       try {
         producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
       } finally {
         Utils.closeQuietly(producer, "producer")
       }
+
+      val EMPTY_GROUP_INSTANCE_ID = ""

Review comment:
       Fair enough

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -547,6 +584,13 @@ private void cleanGlobal(final boolean 
withIntermediateTopics,
         cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
 
         final int exitCode = new StreamsResetter().run(parameters, 
cleanUpConfig);
+        return exitCode;

Review comment:
       Like said earlier, I think we could just return
   `return new StreamsResetter().run(parameters, cleanUpConfig) == 0`

##########
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##########
@@ -1138,7 +1165,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           val validMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(testInstanceId))
           assertNull(validMemberFuture.get())
 
-          // The group should contain no member now.
+          // The group's active members number should decrease by 1

Review comment:
       We could remove this comment for now

##########
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##########
@@ -1155,12 +1192,15 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
           assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
           assertNull(deleteResult.deletedGroups().get(testGroupId).get())
-        } finally {
-          consumerThread.interrupt()
-          consumerThread.join()
-        }
       } finally {
-        Utils.closeQuietly(consumer, "consumer")
+        consumerThreads.foreach {
+          case consumerThread =>
+            consumerThread.interrupt()
+            consumerThread.join()
+        }
+      }
+      }finally {

Review comment:
       nit: format
   I'm pretty surprised this wasn't caught in my previous template. Let me 
check how to cover this in style test as well.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -261,6 +261,43 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
         Assert.assertEquals(1, exitCode);
     }
 
+    public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {
+        appID = testId + "-with-force-option";
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT * 100);

Review comment:
       What does `"" + ` mean?

##########
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##########
@@ -1075,13 +1098,17 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
           assertEquals(testGroupId, testGroupDescription.groupId())
           assertFalse(testGroupDescription.isSimpleConsumerGroup)
-          assertEquals(1, testGroupDescription.members().size())
+          assertEquals(groupInstanceSet.size, 
testGroupDescription.members().size())
           val member = testGroupDescription.members().iterator().next()
           assertEquals(testClientId, member.clientId())
-          val topicPartitions = member.assignment().topicPartitions()
-          assertEquals(testNumPartitions, topicPartitions.size())
-          assertEquals(testNumPartitions, topicPartitions.asScala.
-            count(tp => tp.topic().equals(testTopicName)))
+          val members = testGroupDescription.members()
+          assertEquals(testClientId, members.asScala.head.clientId())

Review comment:
       Does this check duplicate L1103? Also I think it makes sense to check 
all the members' clientId as they should all equal to `testClientId`

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -507,7 +544,7 @@ private Topology 
setupTopologyWithoutIntermediateUserTopic() {
         return builder.build();
     }
 
-    private void cleanGlobal(final boolean withIntermediateTopics,
+    private int tryCleanGlobal(final boolean withIntermediateTopics,
                              final String resetScenario,

Review comment:
       nit: parameters are not aligned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to