abbccdda commented on a change in pull request #8923: URL: https://github.com/apache/kafka/pull/8923#discussion_r466127053
########## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ########## @@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) { return options.valuesOf(intermediateTopicsOption).contains(topic); } - private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) { - System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption)); + private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) { + if (!options.valuesOf(internalTopicsOption).isEmpty()) { + return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun); + } else { + return maybeDeleteInferredInternalTopics(adminClient, dryRun); + } + } + + private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, final boolean dryRun) { + final List<String> internalTopics = options.valuesOf(internalTopicsOption); + int topicNotFound = EXIT_CODE_SUCCESS; + + final List<String> topicsToDelete = new ArrayList<>(); + final List<String> notFoundInternalTopics = new ArrayList<>(); + + System.out.println("Deleting specified internal/auto-created topics " + internalTopics); + for (final String topic : internalTopics) { + if (allTopics.contains(topic) && isInferredInternalTopic(topic)) { + topicsToDelete.add(topic); + } else { + notFoundInternalTopics.add(topic); + } + } + + if (!notFoundInternalTopics.isEmpty()) { + System.out.println("Following topics were not detected as internal, skipping them"); + for (final String topic : notFoundInternalTopics) { + System.out.println("Topic: " + topic); + } + topicNotFound = EXIT_CODE_ERROR; + } + + System.out.println("Following internal topics will be deleted for application " + options.valueOf(applicationIdOption)); + for (final String topic : topicsToDelete) { + System.out.println("Topic: " + topic); + } + + if (!dryRun) { + doDelete(topicsToDelete, adminClient); + } + System.out.println("Done."); + return topicNotFound; + } + + private int maybeDeleteInferredInternalTopics(final Admin adminClient, final boolean dryRun) { final List<String> topicsToDelete = new ArrayList<>(); for (final String listing : allTopics) { - if (isInternalTopic(listing)) { - if (!dryRun) { - topicsToDelete.add(listing); - } else { - System.out.println("Topic: " + listing); - } + if (isInferredInternalTopic(listing)) { + topicsToDelete.add(listing); } } + + System.out.println("Following inferred internal/auto-created topics will be deleted for application " + options.valueOf(applicationIdOption)); + for (final String topic : topicsToDelete) { + System.out.println("Topic: " + topic); + } + if (!dryRun) { doDelete(topicsToDelete, adminClient); } System.out.println("Done."); + return EXIT_CODE_SUCCESS; Review comment: We could refactor out a helper function here. ########## File path: docs/streams/developer-guide/app-reset-tool.html ########## @@ -77,6 +77,7 @@ <div class="section" id="step-1-run-the-application-reset-tool"> <h2>Step 1: Run the application reset tool<a class="headerlink" href="#step-1-run-the-application-reset-tool" title="Permalink to this headline"></a></h2> <p>Invoke the application reset tool from the command line</p> + <p>Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with --dry-run to preview your changes before making them.</p> Review comment: for html changes, it is recommended to include a screenshot of the built website. Please refer to https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ########## @@ -262,6 +263,52 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception Assert.assertEquals(1, exitCode); } + void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() throws Exception { + appID = testId + "-not-reset-without-intermediate-topic"; + final String[] parameters = new String[]{ + "--application-id", appID, + "--bootstrap-servers", cluster.bootstrapServers(), + "--internal-topics", NON_EXISTING_TOPIC, + "--execute" + }; + final Properties cleanUpConfig = new Properties(); + cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); + + final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); + Assert.assertEquals(1, exitCode); + } + + void testResetWhenInternalTopicsAreSpecified() throws Exception { + final boolean useRepartitioned = true; + + appID = testId + "-with-internal-topics-option"; + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + + // RUN + streams = new KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2), streamsConfig); + streams.start(); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); + + streams.close(); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); + + // RESET + streams.cleanUp(); + + final List<String> internalTopics = cluster.getAllTopicsInCluster().stream() + .filter(topic -> topic.startsWith(appID + "-")) + .collect(Collectors.toList()); + final boolean cleanResult = tryCleanGlobal(!useRepartitioned, Review comment: Could just use `false` ########## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ########## @@ -167,7 +171,7 @@ public int run(final String[] args, final HashMap<Object, Object> consumerConfig = new HashMap<>(config); consumerConfig.putAll(properties); exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun); - maybeDeleteInternalTopics(adminClient, dryRun); + exitCode |= maybeDeleteInternalTopics(adminClient, dryRun); Review comment: should we do check both exit codes and decide whether to return 0 or 1? ########## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ########## @@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) { return options.valuesOf(intermediateTopicsOption).contains(topic); } - private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) { - System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption)); + private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) { + if (!options.valuesOf(internalTopicsOption).isEmpty()) { + return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun); + } else { + return maybeDeleteInferredInternalTopics(adminClient, dryRun); + } + } + + private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, final boolean dryRun) { + final List<String> internalTopics = options.valuesOf(internalTopicsOption); + int topicNotFound = EXIT_CODE_SUCCESS; + + final List<String> topicsToDelete = new ArrayList<>(); + final List<String> notFoundInternalTopics = new ArrayList<>(); + + System.out.println("Deleting specified internal/auto-created topics " + internalTopics); + for (final String topic : internalTopics) { + if (allTopics.contains(topic) && isInferredInternalTopic(topic)) { + topicsToDelete.add(topic); + } else { + notFoundInternalTopics.add(topic); + } + } + + if (!notFoundInternalTopics.isEmpty()) { Review comment: Should we throw exception here to stop the run or just skip the not found topics? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ########## @@ -262,6 +263,52 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception Assert.assertEquals(1, exitCode); } + void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() throws Exception { + appID = testId + "-not-reset-without-intermediate-topic"; + final String[] parameters = new String[]{ + "--application-id", appID, + "--bootstrap-servers", cluster.bootstrapServers(), + "--internal-topics", NON_EXISTING_TOPIC, + "--execute" + }; + final Properties cleanUpConfig = new Properties(); + cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); + + final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); + Assert.assertEquals(1, exitCode); + } + + void testResetWhenInternalTopicsAreSpecified() throws Exception { + final boolean useRepartitioned = true; + + appID = testId + "-with-internal-topics-option"; + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + + // RUN + streams = new KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2), streamsConfig); + streams.start(); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); + + streams.close(); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); + + // RESET + streams.cleanUp(); + + final List<String> internalTopics = cluster.getAllTopicsInCluster().stream() + .filter(topic -> topic.startsWith(appID + "-")) + .collect(Collectors.toList()); + final boolean cleanResult = tryCleanGlobal(!useRepartitioned, + "--internal-topics", + String.join(",", internalTopics.subList(1, internalTopics.size())) + "," + OUTPUT_TOPIC); + Assert.assertEquals(false, cleanResult); // Reset will give error code since output topic is not a valid internal topic Review comment: could use `assertFalse` ########## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ########## @@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) { return options.valuesOf(intermediateTopicsOption).contains(topic); } - private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) { - System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption)); + private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) { + if (!options.valuesOf(internalTopicsOption).isEmpty()) { + return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun); + } else { + return maybeDeleteInferredInternalTopics(adminClient, dryRun); + } + } + + private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, final boolean dryRun) { + final List<String> internalTopics = options.valuesOf(internalTopicsOption); + int topicNotFound = EXIT_CODE_SUCCESS; + + final List<String> topicsToDelete = new ArrayList<>(); + final List<String> notFoundInternalTopics = new ArrayList<>(); + + System.out.println("Deleting specified internal/auto-created topics " + internalTopics); + for (final String topic : internalTopics) { + if (allTopics.contains(topic) && isInferredInternalTopic(topic)) { + topicsToDelete.add(topic); + } else { + notFoundInternalTopics.add(topic); + } + } + + if (!notFoundInternalTopics.isEmpty()) { + System.out.println("Following topics were not detected as internal, skipping them"); + for (final String topic : notFoundInternalTopics) { + System.out.println("Topic: " + topic); + } + topicNotFound = EXIT_CODE_ERROR; + } + + System.out.println("Following internal topics will be deleted for application " + options.valueOf(applicationIdOption)); + for (final String topic : topicsToDelete) { + System.out.println("Topic: " + topic); + } + + if (!dryRun) { + doDelete(topicsToDelete, adminClient); + } + System.out.println("Done."); + return topicNotFound; + } + + private int maybeDeleteInferredInternalTopics(final Admin adminClient, final boolean dryRun) { final List<String> topicsToDelete = new ArrayList<>(); for (final String listing : allTopics) { - if (isInternalTopic(listing)) { - if (!dryRun) { - topicsToDelete.add(listing); - } else { - System.out.println("Topic: " + listing); - } + if (isInferredInternalTopic(listing)) { + topicsToDelete.add(listing); } } + + System.out.println("Following inferred internal/auto-created topics will be deleted for application " + options.valueOf(applicationIdOption)); + for (final String topic : topicsToDelete) { Review comment: Could we print all topics in one line, instead of multiple? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org