aliehsaeedii commented on code in PR #21826:
URL: https://github.com/apache/kafka/pull/21826#discussion_r3335290943
##########
tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java:
##########
@@ -431,4 +437,125 @@ public synchronized Map<TopicPartition,
OffsetAndTimestamp> offsetsForTimes(fina
return topicPartitionToOffsetAndTimestamp;
}
}
+ @Test
+ public void shouldFailIfApplicationIdDoesNotExistAsConsumerGroup() throws
Exception {
+ final String groupId = "my-app";
+
+ final Admin adminClient = mock(Admin.class);
+ final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(
+ List.of(
+ new GroupListing("my-app-v1",
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE)),
+ new GroupListing("my-app-v2",
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE))
+ )
+ ));
+
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> streamsResetter.validateApplicationIdExists(groupId,
adminClient));
+ }
+
+ @Test
+ public void shouldSucceedIfApplicationIdExistsAsConsumerGroup() throws
Exception {
+ final String groupId = "my-app-v1";
+
+ final Admin adminClient = mock(Admin.class);
+ final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(
+ List.of(
+ new GroupListing("my-app-v1",
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE)),
+ new GroupListing("my-app-v2",
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE))
+ )
+ ));
+
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+ // should not throw
+ streamsResetter.validateApplicationIdExists(groupId, adminClient);
+ }
+
+ @Test
+ public void shouldFailIfNoConsumerGroupsExistAtAll() throws Exception {
+ final String groupId = "my-app";
+
+ final Admin adminClient = mock(Admin.class);
+ final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(List.of()));
+
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> streamsResetter.validateApplicationIdExists(groupId,
adminClient));
+ }
+
+ @Test
+ public void shouldFailIfApplicationIdIsAPrefixOfExistingGroup() throws
Exception {
+ // "my-app" is a prefix of "my-app-v1" but NOT an exact match
+ final String groupId = "my-app";
+
+ final Admin adminClient = mock(Admin.class);
+ final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(
+ List.of(
+ new GroupListing("my-app-v1",
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE))
+ )
+ ));
+
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> streamsResetter.validateApplicationIdExists(groupId,
adminClient));
+ }
+
+ @Test
+ public void shouldFailWithTypoInApplicationId() throws Exception {
+ // user types "my-ap" instead of "my-app-v1"
+ final String groupId = "my-ap";
+
+ final Admin adminClient = mock(Admin.class);
+ final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(
+ List.of(
+ new GroupListing("my-app-v1",
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE)),
+ new GroupListing("my-app-v2",
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE))
+ )
+ ));
+
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+ final IllegalArgumentException ex =
assertThrows(IllegalArgumentException.class,
+ () -> streamsResetter.validateApplicationIdExists(groupId,
adminClient));
+
+ // error message should mention the bad application-id and --force hint
+ assertTrue(ex.getMessage().contains("my-ap"));
+ assertTrue(ex.getMessage().contains("--force"));
+ }
+
+ @Test
+ public void shouldMatchExactGroupIdNotSubstring() throws Exception {
+ // "foo" should NOT match "foobar" or "foo-v1"
+ final String groupId = "foo";
+
+ final Admin adminClient = mock(Admin.class);
+ final ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
+
+
when(listGroupsResult.all()).thenReturn(KafkaFutureImpl.completedFuture(
+ List.of(
+ new GroupListing("foobar",
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE)),
+ new GroupListing("foo-v1",
Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.STABLE))
+ )
+ ));
+
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> streamsResetter.validateApplicationIdExists(groupId,
adminClient));
+ }
+
+ @Test
+ public void shouldSkipValidationWhenForceOptionProvided() throws Exception
{
+ final Admin adminClient = mock(Admin.class);
+
+ verify(adminClient, never()).listGroups(any(ListGroupsOptions.class));
+ }
Review Comment:
I see no integration with `maybeDeleteInternalTopics`. All 7 tests call
`validateApplicationIdExists()` directly. Nothing verifies the wiring.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]