abbccdda commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r466127053



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final Admin adminClient, final 
boolean dryRun) {
-        System.out.println("Deleting all internal/auto-created topics for 
application " + options.valueOf(applicationIdOption));
+    private int maybeDeleteInternalTopics(final Admin adminClient, final 
boolean dryRun) {
+        if (!options.valuesOf(internalTopicsOption).isEmpty()) {
+            return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun);
+        } else {
+            return maybeDeleteInferredInternalTopics(adminClient, dryRun);
+        }
+    }
+
+    private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, 
final boolean dryRun) {
+        final List<String> internalTopics = 
options.valuesOf(internalTopicsOption);
+        int topicNotFound = EXIT_CODE_SUCCESS;
+
+        final List<String> topicsToDelete = new ArrayList<>();
+        final List<String> notFoundInternalTopics = new ArrayList<>();
+
+        System.out.println("Deleting specified internal/auto-created topics " 
+ internalTopics);
+        for (final String topic : internalTopics) {
+            if (allTopics.contains(topic) && isInferredInternalTopic(topic)) {
+                topicsToDelete.add(topic);
+            } else {
+                notFoundInternalTopics.add(topic);
+            }
+        }
+
+        if (!notFoundInternalTopics.isEmpty()) {
+            System.out.println("Following topics were not detected as 
internal, skipping them");
+            for (final String topic : notFoundInternalTopics) {
+                System.out.println("Topic: " + topic);
+            }
+            topicNotFound = EXIT_CODE_ERROR;
+        }
+
+        System.out.println("Following internal topics will be deleted for 
application " + options.valueOf(applicationIdOption));
+        for (final String topic : topicsToDelete) {
+            System.out.println("Topic: " + topic);
+        }
+
+        if (!dryRun) {
+            doDelete(topicsToDelete, adminClient);
+        }
+        System.out.println("Done.");
+        return topicNotFound;
+    }
+
+    private int maybeDeleteInferredInternalTopics(final Admin adminClient, 
final boolean dryRun) {
         final List<String> topicsToDelete = new ArrayList<>();
         for (final String listing : allTopics) {
-            if (isInternalTopic(listing)) {
-                if (!dryRun) {
-                    topicsToDelete.add(listing);
-                } else {
-                    System.out.println("Topic: " + listing);
-                }
+            if (isInferredInternalTopic(listing)) {
+                topicsToDelete.add(listing);
             }
         }
+
+        System.out.println("Following inferred internal/auto-created topics 
will be deleted for application " + options.valueOf(applicationIdOption));
+        for (final String topic : topicsToDelete) {
+            System.out.println("Topic: " + topic);
+        }
+
         if (!dryRun) {
             doDelete(topicsToDelete, adminClient);
         }
         System.out.println("Done.");
+        return EXIT_CODE_SUCCESS;

Review comment:
       We could refactor out a helper function here.

##########
File path: docs/streams/developer-guide/app-reset-tool.html
##########
@@ -77,6 +77,7 @@
         <div class="section" id="step-1-run-the-application-reset-tool">
             <h2>Step 1: Run the application reset tool<a class="headerlink" 
href="#step-1-run-the-application-reset-tool" title="Permalink to this 
headline"></a></h2>
             <p>Invoke the application reset tool from the command line</p>
+            <p>Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that you run this once with --dry-run 
to preview your changes before making them.</p>

Review comment:
       for html changes, it is recommended to include a screenshot of the built 
website. Please refer to 
https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -262,6 +263,52 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
         Assert.assertEquals(1, exitCode);
     }
 
+    void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() throws 
Exception {
+        appID = testId + "-not-reset-without-intermediate-topic";
+        final String[] parameters = new String[]{
+            "--application-id", appID,
+            "--bootstrap-servers", cluster.bootstrapServers(),
+            "--internal-topics", NON_EXISTING_TOPIC,
+            "--execute"
+        };
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, 
cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
+    }
+
+    void testResetWhenInternalTopicsAreSpecified() throws Exception {
+        final boolean useRepartitioned = true;
+
+        appID = testId + "-with-internal-topics-option";
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+        // RUN
+        streams = new 
KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, 
OUTPUT_TOPIC_2), streamsConfig);
+        streams.start();
+        
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+        streams.close();
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT);
+
+        // RESET
+        streams.cleanUp();
+
+        final List<String> internalTopics = 
cluster.getAllTopicsInCluster().stream()
+                .filter(topic -> topic.startsWith(appID + "-"))
+                .collect(Collectors.toList());
+        final boolean cleanResult = tryCleanGlobal(!useRepartitioned,

Review comment:
       Could just use `false`

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -167,7 +171,7 @@ public int run(final String[] args,
             final HashMap<Object, Object> consumerConfig = new 
HashMap<>(config);
             consumerConfig.putAll(properties);
             exitCode = 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
-            maybeDeleteInternalTopics(adminClient, dryRun);
+            exitCode |= maybeDeleteInternalTopics(adminClient, dryRun);

Review comment:
       should we do check both exit codes and decide whether to return 0 or 1?

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final Admin adminClient, final 
boolean dryRun) {
-        System.out.println("Deleting all internal/auto-created topics for 
application " + options.valueOf(applicationIdOption));
+    private int maybeDeleteInternalTopics(final Admin adminClient, final 
boolean dryRun) {
+        if (!options.valuesOf(internalTopicsOption).isEmpty()) {
+            return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun);
+        } else {
+            return maybeDeleteInferredInternalTopics(adminClient, dryRun);
+        }
+    }
+
+    private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, 
final boolean dryRun) {
+        final List<String> internalTopics = 
options.valuesOf(internalTopicsOption);
+        int topicNotFound = EXIT_CODE_SUCCESS;
+
+        final List<String> topicsToDelete = new ArrayList<>();
+        final List<String> notFoundInternalTopics = new ArrayList<>();
+
+        System.out.println("Deleting specified internal/auto-created topics " 
+ internalTopics);
+        for (final String topic : internalTopics) {
+            if (allTopics.contains(topic) && isInferredInternalTopic(topic)) {
+                topicsToDelete.add(topic);
+            } else {
+                notFoundInternalTopics.add(topic);
+            }
+        }
+
+        if (!notFoundInternalTopics.isEmpty()) {

Review comment:
       Should we throw exception here to stop the run or just skip the not 
found topics?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -262,6 +263,52 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
         Assert.assertEquals(1, exitCode);
     }
 
+    void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() throws 
Exception {
+        appID = testId + "-not-reset-without-intermediate-topic";
+        final String[] parameters = new String[]{
+            "--application-id", appID,
+            "--bootstrap-servers", cluster.bootstrapServers(),
+            "--internal-topics", NON_EXISTING_TOPIC,
+            "--execute"
+        };
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, 
cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
+    }
+
+    void testResetWhenInternalTopicsAreSpecified() throws Exception {
+        final boolean useRepartitioned = true;
+
+        appID = testId + "-with-internal-topics-option";
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+        // RUN
+        streams = new 
KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, 
OUTPUT_TOPIC_2), streamsConfig);
+        streams.start();
+        
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+        streams.close();
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT);
+
+        // RESET
+        streams.cleanUp();
+
+        final List<String> internalTopics = 
cluster.getAllTopicsInCluster().stream()
+                .filter(topic -> topic.startsWith(appID + "-"))
+                .collect(Collectors.toList());
+        final boolean cleanResult = tryCleanGlobal(!useRepartitioned,
+                "--internal-topics",
+                String.join(",", internalTopics.subList(1, 
internalTopics.size())) + "," + OUTPUT_TOPIC);
+        Assert.assertEquals(false, cleanResult); // Reset will give error code 
since output topic is not a valid internal topic

Review comment:
       could use `assertFalse`

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final Admin adminClient, final 
boolean dryRun) {
-        System.out.println("Deleting all internal/auto-created topics for 
application " + options.valueOf(applicationIdOption));
+    private int maybeDeleteInternalTopics(final Admin adminClient, final 
boolean dryRun) {
+        if (!options.valuesOf(internalTopicsOption).isEmpty()) {
+            return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun);
+        } else {
+            return maybeDeleteInferredInternalTopics(adminClient, dryRun);
+        }
+    }
+
+    private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, 
final boolean dryRun) {
+        final List<String> internalTopics = 
options.valuesOf(internalTopicsOption);
+        int topicNotFound = EXIT_CODE_SUCCESS;
+
+        final List<String> topicsToDelete = new ArrayList<>();
+        final List<String> notFoundInternalTopics = new ArrayList<>();
+
+        System.out.println("Deleting specified internal/auto-created topics " 
+ internalTopics);
+        for (final String topic : internalTopics) {
+            if (allTopics.contains(topic) && isInferredInternalTopic(topic)) {
+                topicsToDelete.add(topic);
+            } else {
+                notFoundInternalTopics.add(topic);
+            }
+        }
+
+        if (!notFoundInternalTopics.isEmpty()) {
+            System.out.println("Following topics were not detected as 
internal, skipping them");
+            for (final String topic : notFoundInternalTopics) {
+                System.out.println("Topic: " + topic);
+            }
+            topicNotFound = EXIT_CODE_ERROR;
+        }
+
+        System.out.println("Following internal topics will be deleted for 
application " + options.valueOf(applicationIdOption));
+        for (final String topic : topicsToDelete) {
+            System.out.println("Topic: " + topic);
+        }
+
+        if (!dryRun) {
+            doDelete(topicsToDelete, adminClient);
+        }
+        System.out.println("Done.");
+        return topicNotFound;
+    }
+
+    private int maybeDeleteInferredInternalTopics(final Admin adminClient, 
final boolean dryRun) {
         final List<String> topicsToDelete = new ArrayList<>();
         for (final String listing : allTopics) {
-            if (isInternalTopic(listing)) {
-                if (!dryRun) {
-                    topicsToDelete.add(listing);
-                } else {
-                    System.out.println("Topic: " + listing);
-                }
+            if (isInferredInternalTopic(listing)) {
+                topicsToDelete.add(listing);
             }
         }
+
+        System.out.println("Following inferred internal/auto-created topics 
will be deleted for application " + options.valueOf(applicationIdOption));
+        for (final String topic : topicsToDelete) {

Review comment:
       Could we print all topics in one line, instead of multiple?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to