mjsax commented on a change in pull request #8799:
URL: https://github.com/apache/kafka/pull/8799#discussion_r435597397



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -142,83 +142,89 @@ public void tearDown() throws IOException {
 
     @Test
     public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+        try {
+            final Serde<String> stringSerde = Serdes.String();
 
-        final Serde<String> stringSerde = Serdes.String();
-
-        final List<String> expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
-        // we compare lists of subscribed topics and hence requiring the order 
as well; this is guaranteed
-        // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add 
TEST-TOPIC-2 so the list is always
-        // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
-        final List<String> expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+            final List<String> expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
+            // we compare lists of subscribed topics and hence requiring the 
order as well; this is guaranteed
+            // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only 
add TEST-TOPIC-2 so the list is always
+            // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
+            final List<String> expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-        CLUSTER.createTopic("TEST-TOPIC-1");
+            CLUSTER.createTopic("TEST-TOPIC-1");
 
-        final StreamsBuilder builder = new StreamsBuilder();
+            final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, String> pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+            final KStream<String, String> pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-        pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
-        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
-        streams = new KafkaStreams(builder.build(), streamsConfiguration, new 
DefaultKafkaClientSupplier() {
-            @Override
-            public Consumer<byte[], byte[]> getConsumer(final Map<String, 
Object> config) {
-                return new KafkaConsumer<byte[], byte[]>(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
-                    @Override
-                    public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
-                        super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
-                    }
-                };
+            pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
+            final List<String> assignedTopics = new CopyOnWriteArrayList<>();
+            streams = new KafkaStreams(builder.build(), streamsConfiguration, 
new DefaultKafkaClientSupplier() {
+                @Override
+                public Consumer<byte[], byte[]> getConsumer(final Map<String, 
Object> config) {
+                    return new KafkaConsumer<byte[], byte[]>(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+                        @Override
+                        public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
+                            super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
+                        }
+                    };
 
-            }
-        });
+                }
+            });
 
-        streams.start();
-        TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+            streams.start();
+            TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
 
-        CLUSTER.createTopic("TEST-TOPIC-2");
+            CLUSTER.createTopic("TEST-TOPIC-2");
 
-        TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
+            TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
-        streams.close();
-        CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+            streams.close();

Review comment:
       Yes, that is my reasoning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to