chia7712 commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630238771



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -553,141 +552,152 @@ public void 
testInitializesAndDestroysMetricsReporters() {
 
     @Test
     public void testCloseIsIdempotent() {
-        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.close();
-        final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.close();
+            final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-        streams.close();
-        Assert.assertEquals("subsequent close() calls should do nothing",
-            closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+            streams.close();
+            Assert.assertEquals("subsequent close() calls should do nothing",
+                closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+        }
     }
 
     @Test
     public void shouldAddThreadWhenRunning() throws InterruptedException {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        final int oldSize = streams.threads.size();
-        TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
-        assertThat(streams.addStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 2)));
-        assertThat(streams.threads.size(), equalTo(oldSize + 1));
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            final int oldSize = streams.threads.size();
+            TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
+            assertThat(streams.addStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 2)));
+            assertThat(streams.threads.size(), equalTo(oldSize + 1));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenCreated() {
-        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenClosed() {
-        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        streams.close();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            streams.close();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenError() {
-        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        streams.start();
-        globalStreamThread.shutdown();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            streams.start();
+            globalStreamThread.shutdown();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotReturnDeadThreads() {
-        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        streamThreadOne.shutdown();
-        final Set<ThreadMetadata> threads = streams.localThreadsMetadata();
+        final Set<ThreadMetadata> threads;
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            streamThreadOne.shutdown();
+            threads = streams.localThreadsMetadata();
+        }
         assertThat(threads.size(), equalTo(1));
         assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
     }
 
     @Test
     public void shouldRemoveThread() throws InterruptedException {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
-        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        final int oldSize = streams.threads.size();
-        TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L,
-            "Kafka Streams client did not reach state RUNNING");
-        assertThat(streams.removeStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 1)));
-        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            final int oldSize = streams.threads.size();
+            TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L,
+                "Kafka Streams client did not reach state RUNNING");
+            assertThat(streams.removeStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 1)));
+            assertThat(streams.threads.size(), equalTo(oldSize - 1));
+        }
     }
 
     @Test
     public void shouldNotRemoveThreadWhenNotRunning() {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(1));
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            assertThat(streams.removeStreamThread(), 
equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(1));
+        }
     }
 
     @Test
     public void testCannotStartOnceClosed() {
-        final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        streams.close();
-        try {
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
             streams.start();
-            fail("Should have throw IllegalStateException");
-        } catch (final IllegalStateException expected) {
-            // this is ok
-        } finally {
             streams.close();
+            try {
+                streams.start();
+                fail("Should have throw IllegalStateException");

Review comment:
       How about replacing it by ```assertThrows```?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to