aliehsaeedii commented on code in PR #21826:
URL: https://github.com/apache/kafka/pull/21826#discussion_r3383831217
##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -520,7 +546,11 @@ private Map<TopicPartition, Long> checkOffsetRange(final
Map<TopicPartition, Lon
return validatedTopicPartitionsOffsets;
}
- private int maybeDeleteInternalTopics(final Admin adminClient, final
StreamsResetterOptions options) {
+ private int maybeDeleteInternalTopics(final Admin adminClient, final
StreamsResetterOptions options) throws ExecutionException,
InterruptedException, TimeoutException {
+ if (!options.hasForce()) {
Review Comment:
Reusing `--force` couples two unrelated behaviors: force-removing active
group members (KAFKA-9146) and bypassing this app-id existence check. As
Devanshi noted on KAFKA-14922, that's potentially misleading — a user who only
wants to skip the existence check is also forced into member removal, and vice
versa. The ticket left the flag design ("reuse `--force`" vs. dedicated flags)
to be settled on the KIP; worth resolving there.
##########
tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java:
##########
@@ -226,6 +226,55 @@ public void
shouldNotAllowToResetWhenSpecifiedInternalTopicDoesNotExist(final Te
assertEquals(1, exitCode);
}
+ @Test
+ public void shouldFailWithErrorWhenApplicationIdDoesNotExist(final
TestInfo testInfo) {
+ final String nonExistentAppID = safeUniqueTestName(testInfo) +
"-does-not-exist";
+
+ final String[] parameters = new String[] {
+ "--application-id", nonExistentAppID,
+ "--bootstrap-server", cluster.bootstrapServers()
+ };
+ final Properties cleanUpConfig = new Properties();
+ cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+ cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
Integer.toString(CLEANUP_CONSUMER_TIMEOUT));
+
+ final String errorOutput = ToolsTestUtils.captureStandardErr(() -> {
+ final int exitCode = new StreamsResetter().execute(parameters,
cleanUpConfig);
+ assertEquals(1, exitCode);
+ });
+
+ assertTrue(errorOutput.contains(nonExistentAppID));
+ assertTrue(errorOutput.contains("--force"));
+ assertTrue(errorOutput.contains("Refusing to delete internal topics"));
+ }
+
+ @Test
+ public void shouldSucceedWhenApplicationIdExistsAsConsumerGroup(final
TestInfo testInfo) throws Exception {
Review Comment:
Neither the unit nor integration tests cover the `--force` path — i.e. that
a *non-existent* app id with `--force` skips validation and proceeds. Since
`--force` is the whole escape hatch here, a test pinning that behavior would be
valuable (this was my earlier "is `--force` ever invoked?" point).
##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -181,6 +184,29 @@ public int execute(final String[] args, final Properties
config) {
}
}
+ void validateApplicationIdExists(final String applicationId,
Review Comment:
This catches the case where *no* matching group exists, but not this case:
if both `foo` and `foo-v2` exist, resetting `foo` still passes the check yet
deletes `foo-v2-…-repartition` (matches `startsWith("foo-")`). Worth a code
comment or a note in the KIP as a known limitation.
##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -520,7 +546,11 @@ private Map<TopicPartition, Long> checkOffsetRange(final
Map<TopicPartition, Lon
return validatedTopicPartitionsOffsets;
}
- private int maybeDeleteInternalTopics(final Admin adminClient, final
StreamsResetterOptions options) {
+ private int maybeDeleteInternalTopics(final Admin adminClient, final
StreamsResetterOptions options) throws ExecutionException,
InterruptedException, TimeoutException {
+ if (!options.hasForce()) {
+ validateApplicationIdExists(options.applicationId(), adminClient);
Review Comment:
The ticket's agreed design was to "exit early" if the group doesn't exist,
but this runs after `maybeDeleteActiveConsumers` and the offset-reset/seek
step. Those are effectively no-ops for a non-existent group, so it's not a
correctness bug, but moving the check to the top of `execute()` would match the
"fail fast" intent (and the PR title) more faithfully.
##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -181,6 +184,29 @@ public int execute(final String[] args, final Properties
config) {
}
}
+ void validateApplicationIdExists(final String applicationId,
+ final Admin adminClient)
+ throws ExecutionException, InterruptedException, TimeoutException {
+
+ final Collection<GroupListing> groups = adminClient
+ .listGroups(new ListGroupsOptions())
Review Comment:
`listGroups()` enumerates every group in the cluster (cost + cluster-wide
Describe ACL) just to check one id. A targeted
`describeConsumerGroups(Set.of(applicationId))` avoids the full scan — though
note a missing group comes back as state `DEAD` rather than absent, so you'd
check the state. Either is fine; flagging the trade-off.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]