[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-11 Thread via GitHub


mjsax commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1260139834


##
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##
@@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws 
Exception {
 }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
 
 // Kill instance, delete state to force restoration.
-assertThat("Streams instance did not close within timeout", 
streams.close(Duration.ofSeconds(60)));
+assertThat("Streams instance did not close within timeout", 
streams.get().close(Duration.ofSeconds(60)));
 IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
 Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
 .map(Path::toFile)
 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " 
could not be deleted"));
+} finally {
+streams.get().close();

Review Comment:
   Not sure why we need `AtomicReference`? Can you elaborate?
   
   Why does this not work?
   ```
   KafkaStream streams;
   try {
 stream = new KafkaStreams(...);
 ...
   ```
   
   Btw: `streams.get()` could return `null` in case we fail before calling 
`set(...)`. Need a `null`-check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-05 Thread via GitHub


mjsax commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1253710146


##
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##
@@ -87,7 +88,7 @@ public static void closeCluster() {
 }
 
 
-private final Time time = CLUSTER.time;
+private final Time time = new MockTime(1);

Review Comment:
   Are we not worried that broker time and our time might diverge? Can we make 
the broker mock time advance? Seems to be an undesired pattern to have two 
mock-times?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-05 Thread via GitHub


mjsax commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1253708580


##
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##
@@ -277,6 +277,7 @@ public void shouldFetchLagsDuringRestoration() throws 
Exception {
 t1.toStream().to(outputTopicName);
 final KafkaStreams streams = new KafkaStreams(builder.build(), props);

Review Comment:
   We need to `close()` a `KafkaStreams` client even if we never `.start()` it, 
and thus should use `try-with-resource` or split this up and do:
   ```
   final KafkaStreams streams;
   try {
 streams = new KafkaStreams(...);
   } finally {
 ...
   }
   ```



##
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##
@@ -313,9 +313,14 @@ public void shouldFetchLagsDuringRestoration() throws 
Exception {
 Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
 .map(Path::toFile)
 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " 
could not be deleted"));
+} finally {
+streams.close();
+streams.cleanUp();
+}
 
-// wait till the lag goes down to 0
-final KafkaStreams restartedStreams = new 
KafkaStreams(builder.build(), props);
+// wait till the lag goes down to 0
+final KafkaStreams restartedStreams = new 
KafkaStreams(builder.build(), props);

Review Comment:
   as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org