ableegoldman commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r620645865



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##########
@@ -151,6 +151,22 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() {
         Assert.assertEquals(1, exitCode);
     }
 
+    @Test
+    public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() {

Review comment:
       Just wondering, why put this test here instead of in 
`AbstractResetIntegrationTest`?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##########
@@ -151,6 +151,22 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() {
         Assert.assertEquals(1, exitCode);
     }
 
+    @Test
+    public void shouldNotAllowToResetWhenSpecifiedInternalTopicAbsent() {

Review comment:
       Also, can we add a test like this but for the case where the topic does 
exist but just isn't a subset of inferred internal topics?

##########
File path: docs/streams/developer-guide/app-reset-tool.html
##########
@@ -78,6 +78,9 @@
             <h2>Step 1: Run the application reset tool<a class="headerlink" 
href="#step-1-run-the-application-reset-tool" title="Permalink to this 
headline"></a></h2>
             <p>Invoke the application reset tool from the command line</p>
             <div class="highlight-bash"><div 
class="highlight"><pre><span></span><code>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset</code></pre></div>
+            <p>Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that you run this once with <code 
class="docutils literal"><span class="pre">--dry-run</span></code> to preview 
your changes before making them.</p>
+            <div class="highlight-bash"><div 
class="highlight"><pre><span></span>&lt;path-to-kafka&gt;/bin/kafka-streams-application-reset

Review comment:
       You don't need to repeat this line, it's just printing an example of 
running the app reset tool for the line above (`Invoke the application reset 
tool from the command line`)

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -205,6 +206,34 @@ private void add10InputElements() {
         }
     }
 
+    @Test
+    public void testResetWhenInternalTopicsAreSpecified() throws Exception {
+        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, 
OUTPUT_TOPIC_2), streamsConfig);
+        streams.start();
+        
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+
+        streams.close();
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT);
+
+        // RESET
+        streams.cleanUp();
+
+        final List<String> internalTopics = 
cluster.getAllTopicsInCluster().stream()
+                .filter(topic -> topic.startsWith(appID + "-"))

Review comment:
       nit: we should also filter for internal topics specifically, like what's 
done in `StreamsResetter#matchesInternalTopicFormat`. Actually you can probably 
just invoke that method directly for the filter here (it can be made static if 
necessary)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to