[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
mjsax commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1260139834 ## streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java: ## @@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws Exception { }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag."); // Kill instance, delete state to force restoration. -assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60))); +assertThat("Streams instance did not close within timeout", streams.get().close(Duration.ofSeconds(60))); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder()) .map(Path::toFile) .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted")); +} finally { +streams.get().close(); Review Comment: Not sure why we need `AtomicReference`? Can you elaborate? Why does this not work? ``` KafkaStream streams; try { stream = new KafkaStreams(...); ... ``` Btw: `streams.get()` could return `null` in case we fail before calling `set(...)`. Need a `null`-check here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
mjsax commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1253710146 ## streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java: ## @@ -87,7 +88,7 @@ public static void closeCluster() { } -private final Time time = CLUSTER.time; +private final Time time = new MockTime(1); Review Comment: Are we not worried that broker time and our time might diverge? Can we make the broker mock time advance? Seems to be an undesired pattern to have two mock-times? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
mjsax commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1253708580 ## streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java: ## @@ -277,6 +277,7 @@ public void shouldFetchLagsDuringRestoration() throws Exception { t1.toStream().to(outputTopicName); final KafkaStreams streams = new KafkaStreams(builder.build(), props); Review Comment: We need to `close()` a `KafkaStreams` client even if we never `.start()` it, and thus should use `try-with-resource` or split this up and do: ``` final KafkaStreams streams; try { streams = new KafkaStreams(...); } finally { ... } ``` ## streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java: ## @@ -313,9 +313,14 @@ public void shouldFetchLagsDuringRestoration() throws Exception { Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder()) .map(Path::toFile) .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted")); +} finally { +streams.close(); +streams.cleanUp(); +} -// wait till the lag goes down to 0 -final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props); +// wait till the lag goes down to 0 +final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props); Review Comment: as above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org