showuon commented on a change in pull request #9888:
URL: https://github.com/apache/kafka/pull/9888#discussion_r557960104



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -90,24 +91,64 @@ public void setup() {
         );
     }
 
+    private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
+        kafkaStreams.start();
+        waitForStateTransition(KafkaStreams.State.RUNNING);
+    }
+
     @After
     public void teardown() throws IOException {
+        stateTransitionHistory.clear();
         purgeLocalStreamsState(properties);
     }
 
+    private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) 
{
+        // we store each new state in state transition so that we won't miss 
any state change
+        kafkaStreams.setStateListener(
+            (newState, oldState) -> stateTransitionHistory.add(newState)
+        );
+    }
+
+    private void waitForStateTransition(final KafkaStreams.State expected) 
throws InterruptedException {
+        waitForCondition(
+            () -> !stateTransitionHistory.isEmpty() && 
stateTransitionHistory.contains(expected),
+            DEFAULT_DURATION.toMillis(),
+            () -> String.format("Client did not change to the %s state in 
time. Observed new state transitions: %s",
+                expected, stateTransitionHistory)
+        );
+    }

Review comment:
       We can't just check the current state to become `RUNNING` because after 
we add/remove threads, the state won't change immediately. That is, if we check 
if the state is `RUNNING` after adding/removing threads, the check will pass, 
but the rebalance is not happening, yet, which will cause the test fail. So I 
still use `stateTransitionHistory` to check the state, and also, I checked the 
last state of the history to see if it is RUNNING. That should be better. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to