cadonna commented on a change in pull request #9888:
URL: https://github.com/apache/kafka/pull/9888#discussion_r558149927



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -90,24 +91,61 @@ public void setup() {
         );
     }
 
+    private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
+        kafkaStreams.start();
+        waitForRunning();
+    }
+
     @After
     public void teardown() throws IOException {
+        stateTransitionHistory.clear();
         purgeLocalStreamsState(properties);
     }
 
+    private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) 
{
+        // we store each new state in state transition so that we won't miss 
any state change

Review comment:
       Could you please remove this comment? I do not think it is needed. The 
code is clear enough.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -90,24 +91,61 @@ public void setup() {
         );
     }
 
+    private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
+        kafkaStreams.start();
+        waitForRunning();
+    }
+
     @After
     public void teardown() throws IOException {
+        stateTransitionHistory.clear();
         purgeLocalStreamsState(properties);
     }
 
+    private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) 
{
+        // we store each new state in state transition so that we won't miss 
any state change
+        kafkaStreams.setStateListener(
+            (newState, oldState) -> stateTransitionHistory.add(newState)
+        );
+    }
+
+    private void waitForRunning() throws InterruptedException {
+        waitForCondition(
+            () -> !stateTransitionHistory.isEmpty() &&
+                stateTransitionHistory.get(stateTransitionHistory.size() - 
1).equals(KafkaStreams.State.RUNNING),
+            DEFAULT_DURATION.toMillis(),
+            () -> String.format("Client did not transit to state %s in %d 
seconds",
+                KafkaStreams.State.RUNNING, DEFAULT_DURATION.toMillis() / 1000)
+        );
+    }
+
+    // verify if state change from "before" state into "after" state
+    private boolean hasStateTransition(final KafkaStreams.State before, final 
KafkaStreams.State after) {
+        final int historySize = stateTransitionHistory.size();
+        // should have at least 2 states in history
+        if (historySize >= 2 && stateTransitionHistory.get(historySize - 
2).equals(before) &&
+            stateTransitionHistory.get(historySize - 1).equals(after)) {
+            return true;
+        }

Review comment:
       nit: just to better visually separate condition from `if`-block
    ```suggestion
           if (historySize >= 2 && stateTransitionHistory.get(historySize - 
2).equals(before) &&
               stateTransitionHistory.get(historySize - 1).equals(after)) {
               
               return true;
           }
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -90,24 +91,61 @@ public void setup() {
         );
     }
 
+    private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
+        kafkaStreams.start();
+        waitForRunning();
+    }
+
     @After
     public void teardown() throws IOException {
+        stateTransitionHistory.clear();
         purgeLocalStreamsState(properties);
     }
 
+    private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) 
{
+        // we store each new state in state transition so that we won't miss 
any state change
+        kafkaStreams.setStateListener(
+            (newState, oldState) -> stateTransitionHistory.add(newState)
+        );
+    }
+
+    private void waitForRunning() throws InterruptedException {
+        waitForCondition(
+            () -> !stateTransitionHistory.isEmpty() &&
+                stateTransitionHistory.get(stateTransitionHistory.size() - 
1).equals(KafkaStreams.State.RUNNING),
+            DEFAULT_DURATION.toMillis(),
+            () -> String.format("Client did not transit to state %s in %d 
seconds",
+                KafkaStreams.State.RUNNING, DEFAULT_DURATION.toMillis() / 1000)
+        );
+    }
+
+    // verify if state change from "before" state into "after" state

Review comment:
       This comment seems incomplete. But I would also remove it. Sorry that I 
am a bit picky about inline comments, but inline comment tend to lie after a 
while when the code they should describe changes but the comments do not. I 
would rather focus on giving meaning names to variables and methods. For 
example, I would rename this method to 
`lastStateTransitionFromRebalancingToRunning()`, remove the argumetns, and hard 
code the states.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -119,37 +157,47 @@ public void shouldAddStreamThread() throws Exception {
                     .sorted().toArray(),
                 equalTo(new String[] {"1", "2", "3"})
             );
-            waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.REBALANCING, DEFAULT_DURATION);
-            waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.RUNNING, DEFAULT_DURATION);
+
+            waitForRunning();
+            assertThat(hasStateTransition(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING), is(true));

Review comment:
       What do you think of combining these two checks to one and call it 
`waitForTransitionFromRebalancingToRunning()`. They are always used together.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -90,24 +91,61 @@ public void setup() {
         );
     }
 
+    private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
+        kafkaStreams.start();
+        waitForRunning();
+    }
+
     @After
     public void teardown() throws IOException {
+        stateTransitionHistory.clear();
         purgeLocalStreamsState(properties);
     }
 
+    private void addStreamStateChangeListener(final KafkaStreams kafkaStreams) 
{
+        // we store each new state in state transition so that we won't miss 
any state change
+        kafkaStreams.setStateListener(
+            (newState, oldState) -> stateTransitionHistory.add(newState)
+        );
+    }
+
+    private void waitForRunning() throws InterruptedException {
+        waitForCondition(
+            () -> !stateTransitionHistory.isEmpty() &&
+                stateTransitionHistory.get(stateTransitionHistory.size() - 
1).equals(KafkaStreams.State.RUNNING),
+            DEFAULT_DURATION.toMillis(),
+            () -> String.format("Client did not transit to state %s in %d 
seconds",
+                KafkaStreams.State.RUNNING, DEFAULT_DURATION.toMillis() / 1000)
+        );
+    }
+
+    // verify if state change from "before" state into "after" state
+    private boolean hasStateTransition(final KafkaStreams.State before, final 
KafkaStreams.State after) {
+        final int historySize = stateTransitionHistory.size();
+        // should have at least 2 states in history

Review comment:
       I think this comment is also not needed. Could you remove it? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to