wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r562043444



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {
+            log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+            return false;

Review comment:
       Alright I adjusted the close response to align with this.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -978,7 +971,7 @@ private void waitForRunning(final 
List<KeyValue<KafkaStreams.State, KafkaStreams
         waitForCondition(
             () -> !observed.isEmpty() && observed.get(observed.size() - 
1).value.equals(State.RUNNING),
             MAX_WAIT_TIME_MS,
-            () -> "Client did not startup on time. Observers transitions: " + 
observed
+            () -> "Client did not have the expected state transition on time. 
Observers transitions: " + observed

Review comment:
       We do wait for running, I was thinking of bringing it to match with the 
other methods below but that doesn't make it anymore useful so I will revet it.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -993,6 +986,17 @@ private void waitForStateTransition(final 
List<KeyValue<KafkaStreams.State, Kafk
         );
     }
 
+    private void waitForStateTransitionContains(final 
List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed,
+                                        final 
List<KeyValue<KafkaStreams.State, KafkaStreams.State>> expected)
+            throws Exception {
+
+        waitForCondition(
+            () -> observed.containsAll(expected),
+            MAX_WAIT_TIME_MS,
+            () -> "Client did not have the expected state transition on time. 
Observers transitions: " + observed

Review comment:
       sure that is fine




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to