guozhangwang commented on code in PR #13248: URL: https://github.com/apache/kafka/pull/13248#discussion_r1106408656
########## streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java: ########## @@ -257,6 +258,23 @@ private Thread adjustCountHelperThread(final KafkaStreams kafkaStreams, final in }); } + @Test + public void testRebalanceHappensBeforeStreamThreadGetDown() throws Exception { + final Properties prop = new Properties(); + prop.putAll(properties); + // make rebalance happen quickly + prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); Review Comment: Is there a better way to test this scenario than relying on the real time (and hence is time dependendent)? More specifically, I'm looking for a test case which would 100% fail without the fix, and would 100% pass with the fix. While this test seems would be pass some times even without the fix, is that right? ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -658,6 +658,8 @@ public synchronized void onChange(final Thread thread, setState(State.REBALANCING); } else if (newState == StreamThread.State.RUNNING) { maybeSetRunning(); + } else if (state != State.RUNNING && newState == StreamThread.State.DEAD) { Review Comment: I wonder if it's cleaner to change the logic of `maybeSetRunning` directly, e.g. to exclude threads which are `PENDING_SHUTDOWN` as long as there are still other threads running? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org