guozhangwang commented on code in PR #13248:
URL: https://github.com/apache/kafka/pull/13248#discussion_r1106408656


##########
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java:
##########
@@ -257,6 +258,23 @@ private Thread adjustCountHelperThread(final KafkaStreams 
kafkaStreams, final in
         });
     }
 
+    @Test
+    public void testRebalanceHappensBeforeStreamThreadGetDown() throws 
Exception {
+        final Properties prop = new Properties();
+        prop.putAll(properties);
+        // make rebalance happen quickly
+        prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);

Review Comment:
   Is there a better way to test this scenario than relying on the real time 
(and hence is time dependendent)? More specifically, I'm looking for a test 
case which would 100% fail without the fix, and would 100% pass with the fix. 
While this test seems would be pass some times even without the fix, is that 
right?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -658,6 +658,8 @@ public synchronized void onChange(final Thread thread,
                         setState(State.REBALANCING);
                     } else if (newState == StreamThread.State.RUNNING) {
                         maybeSetRunning();
+                    } else if (state != State.RUNNING && newState == 
StreamThread.State.DEAD) {

Review Comment:
   I wonder if it's cleaner to change the logic of `maybeSetRunning` directly, 
e.g. to exclude threads which are `PENDING_SHUTDOWN` as long as there are still 
other threads running?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to