kirktrue commented on code in PR #21074:
URL: https://github.com/apache/kafka/pull/21074#discussion_r2587167108


##########
tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java:
##########
@@ -293,6 +310,77 @@ public void 
testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {
         assertEquals(beginningAndEndOffset, position);
     }
 
+    @Test
+    public void shouldRetryToRemoveMembersOnUnknownMemberIdExceptionAndForce() 
throws Exception {
+        final String groupId = "groupId";
+
+        final Admin adminClient = mock(Admin.class);
+        final ConsumerGroupDescription consumerGroupDescription = 
mock(ConsumerGroupDescription.class);
+
+        when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+                .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, 
KafkaFutureImpl.completedFuture(consumerGroupDescription))));
+        when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any()))
+                .thenReturn(new 
RemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new 
LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of()))
+                .thenReturn(new 
RemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()), 
Set.of()));
+        
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+        streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true);
+
+        verify(adminClient, 
times(2)).removeMembersFromConsumerGroup(eq(groupId), any());
+    }
+
+    @Test
+    public void shouldFailIfThereAreMembersAndNotForce() throws Exception {
+        final String groupId = "groupId";
+
+        final Admin adminClient = mock(Admin.class);
+        final ConsumerGroupDescription consumerGroupDescription = 
mock(ConsumerGroupDescription.class);
+
+        when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+                .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, 
KafkaFutureImpl.completedFuture(consumerGroupDescription))));
+        
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+        assertThrows(IllegalStateException.class, () -> 
streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, false));
+
+        verify(adminClient, 
never()).removeMembersFromConsumerGroup(eq(groupId), any());

Review Comment:
   This is because the `force` parameter is set to `false`, right?



##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -177,30 +179,42 @@ public int execute(final String[] args, final Properties 
config) {
         }
     }
 
-    private void maybeDeleteActiveConsumers(final String groupId,
-                                            final Admin adminClient,
-                                            final StreamsResetterOptions 
options)
+    // visible for testing
+    void maybeDeleteActiveConsumers(final String groupId,
+                                    final Admin adminClient,
+                                    final boolean force)
         throws ExecutionException, InterruptedException {
-        final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(
-            Set.of(groupId),
-            new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
-        try {
-            final List<MemberDescription> members =
-                new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
-            if (!members.isEmpty()) {
-                if (options.hasForce()) {
-                    System.out.println("Force deleting all active members in 
the group: " + groupId);
-                    adminClient.removeMembersFromConsumerGroup(groupId, new 
RemoveMembersFromConsumerGroupOptions()).all().get();
-                } else {
-                    throw new IllegalStateException("Consumer group '" + 
groupId + "' is still active "
-                        + "and has following members: " + members + ". "
-                        + "Make sure to stop all running application instances 
before running the reset tool."
-                        + " You can use option '--force' to remove active 
members from the group.");
+        int retries = 0;
+        while (true) {
+            final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(
+                    Set.of(groupId),
+                    new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
+            try {
+                final List<MemberDescription> members =
+                        new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
+                if (!members.isEmpty()) {
+                    if (force) {
+                        System.out.println("Force deleting all active members 
in the group: " + groupId);
+                        adminClient.removeMembersFromConsumerGroup(groupId, 
new RemoveMembersFromConsumerGroupOptions()).all().get();

Review Comment:
   Unrelated to this PR, but out of curiosity, is there any reason not to pass 
in the members retrieved in line 193/194 into the 
`RemoveMembersFromConsumerGroupOptions` constructor on line 198? That seems 
like it would save an extra lookup. But for a tool, I guess it's not as 
critical that it be as efficient as possible 🤔 



##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -177,30 +179,42 @@ public int execute(final String[] args, final Properties 
config) {
         }
     }
 
-    private void maybeDeleteActiveConsumers(final String groupId,
-                                            final Admin adminClient,
-                                            final StreamsResetterOptions 
options)
+    // visible for testing
+    void maybeDeleteActiveConsumers(final String groupId,
+                                    final Admin adminClient,
+                                    final boolean force)
         throws ExecutionException, InterruptedException {
-        final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(
-            Set.of(groupId),
-            new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
-        try {
-            final List<MemberDescription> members =
-                new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
-            if (!members.isEmpty()) {
-                if (options.hasForce()) {
-                    System.out.println("Force deleting all active members in 
the group: " + groupId);
-                    adminClient.removeMembersFromConsumerGroup(groupId, new 
RemoveMembersFromConsumerGroupOptions()).all().get();
-                } else {
-                    throw new IllegalStateException("Consumer group '" + 
groupId + "' is still active "
-                        + "and has following members: " + members + ". "
-                        + "Make sure to stop all running application instances 
before running the reset tool."
-                        + " You can use option '--force' to remove active 
members from the group.");
+        int retries = 0;
+        while (true) {
+            final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(
+                    Set.of(groupId),
+                    new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
+            try {
+                final List<MemberDescription> members =
+                        new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
+                if (!members.isEmpty()) {
+                    if (force) {
+                        System.out.println("Force deleting all active members 
in the group: " + groupId);
+                        adminClient.removeMembersFromConsumerGroup(groupId, 
new RemoveMembersFromConsumerGroupOptions()).all().get();
+                    } else {
+                        throw new IllegalStateException("Consumer group '" + 
groupId + "' is still active "
+                                + "and has following members: " + members + ". 
"
+                                + "Make sure to stop all running application 
instances before running the reset tool."
+                                + " You can use option '--force' to remove 
active members from the group.");
+                    }
+                }
+                return;
+            } catch (ExecutionException ee) {
+                // If the group ID is not found, this is not an error case
+                if (ee.getCause() instanceof GroupIdNotFoundException) {
+                    return;
+                }
+                // if a member is unknown, it may mean that it left the group 
itself. Retrying to confirm.
+                if (ee.getCause() instanceof KafkaException ke && 
ke.getCause() instanceof UnknownMemberIdException) {
+                    if (retries++ < 3) {

Review Comment:
   Consider adding some kind of constant like `NUM_RETRIES` or something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to