Preethi-Sundaravelu commented on code in PR #21826:
URL: https://github.com/apache/kafka/pull/21826#discussion_r3359738976


##########
tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java:
##########
@@ -431,4 +437,125 @@ public synchronized Map<TopicPartition, 
OffsetAndTimestamp> offsetsForTimes(fina
             return topicPartitionToOffsetAndTimestamp;
         }
     }
+    @Test
+    public void shouldFailIfApplicationIdDoesNotExistAsConsumerGroup() throws 
Exception {
+        final String groupId = "my-app";
+
+        final Admin adminClient = mock(Admin.class);
+        final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+        
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(
+                List.of(
+                        new GroupListing("my-app-v1", 
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE)),
+                        new GroupListing("my-app-v2", 
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE))
+                )
+        ));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+        assertThrows(IllegalArgumentException.class,
+                () -> streamsResetter.validateApplicationIdExists(groupId, 
adminClient));
+    }
+
+    @Test
+    public void shouldSucceedIfApplicationIdExistsAsConsumerGroup() throws 
Exception {
+        final String groupId = "my-app-v1";
+
+        final Admin adminClient = mock(Admin.class);
+        final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+        
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(
+                List.of(
+                        new GroupListing("my-app-v1", 
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE)),
+                        new GroupListing("my-app-v2", 
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE))
+                )
+        ));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+        // should not throw
+        streamsResetter.validateApplicationIdExists(groupId, adminClient);
+    }
+
+    @Test
+    public void shouldFailIfNoConsumerGroupsExistAtAll() throws Exception {
+        final String groupId = "my-app";
+
+        final Admin adminClient = mock(Admin.class);
+        final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+        
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(List.of()));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+        assertThrows(IllegalArgumentException.class,
+                () -> streamsResetter.validateApplicationIdExists(groupId, 
adminClient));
+    }
+
+    @Test
+    public void shouldFailIfApplicationIdIsAPrefixOfExistingGroup() throws 
Exception {
+        // "my-app" is a prefix of "my-app-v1" but NOT an exact match
+        final String groupId = "my-app";
+
+        final Admin adminClient = mock(Admin.class);
+        final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+        
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(
+                List.of(
+                        new GroupListing("my-app-v1", 
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE))
+                )
+        ));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+        assertThrows(IllegalArgumentException.class,
+                () -> streamsResetter.validateApplicationIdExists(groupId, 
adminClient));
+    }
+
+    @Test
+    public void shouldFailWithTypoInApplicationId() throws Exception {
+        // user types "my-ap" instead of "my-app-v1"
+        final String groupId = "my-ap";
+
+        final Admin adminClient = mock(Admin.class);
+        final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+        
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(
+                List.of(
+                        new GroupListing("my-app-v1", 
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE)),
+                        new GroupListing("my-app-v2", 
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE))
+                )
+        ));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+        final IllegalArgumentException ex = 
assertThrows(IllegalArgumentException.class,
+                () -> streamsResetter.validateApplicationIdExists(groupId, 
adminClient));
+
+        // error message should mention the bad application-id and --force hint
+        assertTrue(ex.getMessage().contains("my-ap"));
+        assertTrue(ex.getMessage().contains("--force"));
+    }
+
+    @Test
+    public void shouldMatchExactGroupIdNotSubstring() throws Exception {
+        // "foo" should NOT match "foobar" or "foo-v1"
+        final String groupId = "foo";
+
+        final Admin adminClient = mock(Admin.class);
+        final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+        
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(
+                List.of(
+                        new GroupListing("foobar", 
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE)),
+                        new GroupListing("foo-v1", 
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE))
+                )
+        ));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+        assertThrows(IllegalArgumentException.class,
+                () -> streamsResetter.validateApplicationIdExists(groupId, 
adminClient));
+    }
+
+    @Test
+    public void shouldSkipValidationWhenForceOptionProvided() throws Exception 
{

Review Comment:
   Agreed. I have added integration tests that invoke 
`StreamsResetter.execute()` and verify both the failure and success paths 
through `maybeDeleteInternalTopics`, so the wiring is now covered ee2e. With 
that in place I will remove this test
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to