[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r640217904 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception { () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")), "Streams never stopped" ); -} finally { streams.close(); Review comment: @mjsax Thanks for the detail description. I knew the things you describe. But in this case, after `streams.close()` , we still need to check streams state whether is `NOT_RUNNING`. If we remove `streams.close()`, the streams state will still be `RUNNING`, this state will lead to failed of the next checking of `NOT_RUNNING`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630705257 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception { () -> streams.state() == KafkaStreams.State.PENDING_ERROR, "Thread never stopped." ); -} finally { streams.close(); Review comment: @mjsax The line 532(original) wait for the streams state equal to **ERROR** after streams closed. Although this test case will pass if we remove `close()`, but it seems we might be better to retain `close()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630701286 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception { () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")), "Streams never stopped" ); -} finally { streams.close(); Review comment: @mjsax We should not remove this line, otherwise the line 498(original) will throw a timeout exception. The streams state will be always **RUNNING** if we skip `close()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630603482 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception { () -> streams.state() == KafkaStreams.State.PENDING_ERROR, "Thread never stopped." ); -} finally { streams.close(); Review comment: Ok, will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630603389 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception { () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")), "Streams never stopped" ); -} finally { streams.close(); Review comment: Thanks @mjsax for review. Will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630602092 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -553,141 +552,152 @@ public void testInitializesAndDestroysMetricsReporters() { @Test public void testCloseIsIdempotent() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.close(); -final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +streams.close(); +final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); -streams.close(); -Assert.assertEquals("subsequent close() calls should do nothing", -closeCount, MockMetricsReporter.CLOSE_COUNT.get()); +streams.close(); +Assert.assertEquals("subsequent close() calls should do nothing", +closeCount, MockMetricsReporter.CLOSE_COUNT.get()); +} } @Test public void shouldAddThreadWhenRunning() throws InterruptedException { props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.start(); -final int oldSize = streams.threads.size(); -TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); -assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); -assertThat(streams.threads.size(), equalTo(oldSize + 1)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +streams.start(); +final int oldSize = streams.threads.size(); +TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); +assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); +assertThat(streams.threads.size(), equalTo(oldSize + 1)); +} } @Test public void shouldNotAddThreadWhenCreated() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotAddThreadWhenClosed() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -streams.close(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +streams.close(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotAddThreadWhenError() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -streams.start(); -globalStreamThread.shutdown(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +streams.start(); +globalStreamThread.shutdown(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotReturnDeadThreads() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.start(); -streamThreadOne.shutdown(); -final Set threads = streams.localThreadsMetadata(); +final Set threads; +try (final KafkaStreams stre