[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-26 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r640217904



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
 () -> streams.localThreadsMetadata().stream().allMatch(t -> 
t.threadState().equals("DEAD")),
 "Streams never stopped"
 );
-} finally {
 streams.close();

Review comment:
   @mjsax Thanks for the detail description. I knew the things you describe.
   But in this case, after `streams.close()` , we still need to check streams 
state whether is `NOT_RUNNING`.
   If we remove `streams.close()`, the streams state will still be `RUNNING`, 
this state will lead to failed of the next checking of `NOT_RUNNING`.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630705257



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception 
{
 () -> streams.state() == KafkaStreams.State.PENDING_ERROR,
 "Thread never stopped."
 );
-} finally {
 streams.close();

Review comment:
   @mjsax The line 532(original) wait for the streams state equal to 
**ERROR** after streams closed. 
   Although this test case will pass if we remove `close()`, but it seems we 
might be better to retain `close()`?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630701286



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
 () -> streams.localThreadsMetadata().stream().allMatch(t -> 
t.threadState().equals("DEAD")),
 "Streams never stopped"
 );
-} finally {
 streams.close();

Review comment:
   @mjsax We should not remove this line, otherwise the line 498(original) 
will throw a timeout exception.
   The streams state will be always **RUNNING** if we skip `close()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630603482



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception 
{
 () -> streams.state() == KafkaStreams.State.PENDING_ERROR,
 "Thread never stopped."
 );
-} finally {
 streams.close();

Review comment:
   Ok, will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630603389



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
 () -> streams.localThreadsMetadata().stream().allMatch(t -> 
t.threadState().equals("DEAD")),
 "Streams never stopped"
 );
-} finally {
 streams.close();

Review comment:
   Thanks @mjsax for review. Will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630602092



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -553,141 +552,152 @@ public void 
testInitializesAndDestroysMetricsReporters() {
 
 @Test
 public void testCloseIsIdempotent() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.close();
-final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+streams.close();
+final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-streams.close();
-Assert.assertEquals("subsequent close() calls should do nothing",
-closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+streams.close();
+Assert.assertEquals("subsequent close() calls should do nothing",
+closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+}
 }
 
 @Test
 public void shouldAddThreadWhenRunning() throws InterruptedException {
 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.start();
-final int oldSize = streams.threads.size();
-TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
-assertThat(streams.addStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 2)));
-assertThat(streams.threads.size(), equalTo(oldSize + 1));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+streams.start();
+final int oldSize = streams.threads.size();
+TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
+assertThat(streams.addStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 2)));
+assertThat(streams.threads.size(), equalTo(oldSize + 1));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenCreated() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenClosed() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-streams.close();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+streams.close();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenError() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-streams.start();
-globalStreamThread.shutdown();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+streams.start();
+globalStreamThread.shutdown();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotReturnDeadThreads() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.start();
-streamThreadOne.shutdown();
-final Set threads = streams.localThreadsMetadata();
+final Set threads;
+try (final KafkaStreams stre