This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d669830  KAFKA-7657: Fixing thread state change to instance state 
change (#6018)
d669830 is described below

commit d6698308194625e7921b9c3ace27a918f42f26f1
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Fri Jan 4 14:39:47 2019 -0800

    KAFKA-7657: Fixing thread state change to instance state change (#6018)
    
    While looking into KAFKA-7657, I found there are a few loopholes in this 
logic:
    
    1. We kept a map of thread-name to thread-state and a global-thread state 
at the KafkaStreams instance-level, in addition to the instance state itself. 
stateLock is used when accessing the instance state, however when we are in the 
thread state change callback, we are accessing both the thread-states as well 
as the instance state at the same time in the callers of setState without a 
lock, which is vulnerable to concurrent multi-stream threads. The fix is a) 
introduce a threadStatesLoc [...]
    
    2. When transiting to state.RUNNING, we check if all threads are either in 
RUNNING or DEAD state, this is because some threads maybe dead at the rebalance 
period but we should still proceed to RUNNING if the rest of threads are still 
transiting to RUNNING.
    
    Added unit test for 2) above. Also simplified another test as a nit change.
    
    
    Reviewers: John Roesler <vvcep...@users.noreply.github.com>, Matthias J. 
Sax <mj...@apache.org>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 178 ++++++++++-----------
 .../streams/processor/internals/StreamThread.java  |  71 ++++----
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  75 +++++++--
 .../StreamTableJoinIntegrationTest.java            |   2 +-
 .../integration/utils/IntegrationTestUtils.java    |  10 +-
 .../processor/internals/StreamThreadTest.java      |  50 +++---
 6 files changed, 216 insertions(+), 170 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bbd0105..4d0c1c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -159,24 +159,24 @@ public class KafkaStreams implements AutoCloseable {
      *         |       +-----+--------+
      *         |             |
      *         |             v
-     *         |       +--------------+
-     *         +<----- | Running (2)  | -------->+
-     *         |       +----+--+------+          |
-     *         |            |  ^                 |
-     *         |            v  |                 |
-     *         |       +----+--+------+          |
-     *         |       | Re-          |          v
-     *         |       | Balancing (1)| -------->+
-     *         |       +-----+--------+          |
-     *         |             |                   |
-     *         |             v                   v
-     *         |       +-----+--------+     +----+-------+
-     *         +-----> | Pending      |<--- | Error (5)  |
-     *                 | Shutdown (3) |     +------------+
-     *                 +-----+--------+
-     *                       |
-     *                       v
-     *                 +-----+--------+
+     *         |       +----+--+------+
+     *         |       | Re-          |
+     *         +<----- | Balancing (1)| -------->+
+     *         |       +-----+-+------+          |
+     *         |             | ^                 |
+     *         |             v |                 |
+     *         |       +--------------+          v
+     *         |       | Running (2)  | -------->+
+     *         |       +------+-------+          |
+     *         |              |                  |
+     *         |              v                  |
+     *         |       +------+-------+     +----+-------+
+     *         +-----> | Pending      |<--- | Error (5)  |---+
+     *                 | Shutdown (3) |     +------------+   |
+     *                 +------+-------+          ^           |
+     *                        |                  |           |
+     *                        v                  +-----------+
+     *                 +------+-------+
      *                 | Not          |
      *                 | Running (4)  |
      *                 +--------------+
@@ -191,7 +191,7 @@ public class KafkaStreams implements AutoCloseable {
      *   the instance will be in the ERROR state. The user will need to close 
it.
      */
     public enum State {
-        CREATED(2, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), 
PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3);
+        CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), 
PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3, 5);
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
@@ -238,9 +238,11 @@ public class KafkaStreams implements AutoCloseable {
      * @param newState New state
      */
     private boolean setState(final State newState) {
-        final State oldState = state;
+        final State oldState;
 
         synchronized (stateLock) {
+            oldState = state;
+
             if (state == State.PENDING_SHUTDOWN && newState != 
State.NOT_RUNNING) {
                 // when the state is already in PENDING_SHUTDOWN, all other 
transitions than NOT_RUNNING (due to thread dying) will be
                 // refused but we do not throw exception here, to allow 
appropriate error handling
@@ -249,6 +251,13 @@ public class KafkaStreams implements AutoCloseable {
                 // when the state is already in NOT_RUNNING, its transition to 
PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls)
                 // will be refused but we do not throw exception here, to 
allow idempotent close calls
                 return false;
+            } else if (state == State.REBALANCING && newState == 
State.REBALANCING) {
+                // when the state is already in REBALANCING, it should not 
transit to REBALANCING
+                return false;
+//            } else if (state == State.RUNNING && newState == State.RUNNING) {
+                // when the state is already in RUNNING, it should not transit 
to RUNNING
+                // this can happen during starting up
+//                return false;
             } else if (!state.isValidTransition(newState)) {
                 throw new IllegalStateException("Stream-client " + clientId + 
": Unexpected state transition from " + oldState + " to " + newState);
             } else {
@@ -260,24 +269,7 @@ public class KafkaStreams implements AutoCloseable {
 
         // we need to call the user customized state listener outside the 
state lock to avoid potential deadlocks
         if (stateListener != null) {
-            stateListener.onChange(state, oldState);
-        }
-
-        return true;
-    }
-
-    private boolean setRunningFromCreated() {
-        synchronized (stateLock) {
-            if (state != State.CREATED) {
-                throw new IllegalStateException("Stream-client " + clientId + 
": Unexpected state transition from " + state + " to " + State.RUNNING);
-            }
-            state = State.RUNNING;
-            stateLock.notifyAll();
-        }
-
-        // we need to call the user customized state listener outside the 
state lock to avoid potential deadlocks
-        if (stateListener != null) {
-            stateListener.onChange(State.RUNNING, State.CREATED);
+            stateListener.onChange(newState, oldState);
         }
 
         return true;
@@ -324,11 +316,12 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
      */
     public void setStateListener(final KafkaStreams.StateListener listener) {
-        if (state == State.CREATED) {
-            stateListener = listener;
-        } else {
-            throw new IllegalStateException("Can only set StateListener in 
CREATED state. " +
-                    "Current state is: " + state);
+        synchronized (stateLock) {
+            if (state == State.CREATED) {
+                stateListener = listener;
+            } else {
+                throw new IllegalStateException("Can only set StateListener in 
CREATED state. Current state is: " + state);
+            }
         }
     }
 
@@ -340,17 +333,19 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
      */
     public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh) {
-        if (state == State.CREATED) {
-            for (final StreamThread thread : threads) {
-                thread.setUncaughtExceptionHandler(eh);
-            }
+        synchronized (stateLock) {
+            if (state == State.CREATED) {
+                for (final StreamThread thread : threads) {
+                    thread.setUncaughtExceptionHandler(eh);
+                }
 
-            if (globalStreamThread != null) {
-                globalStreamThread.setUncaughtExceptionHandler(eh);
-            }
-        } else {
-            throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+                if (globalStreamThread != null) {
+                    globalStreamThread.setUncaughtExceptionHandler(eh);
+                }
+            } else {
+                throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
                     "Current state is: " + state);
+            }
         }
     }
 
@@ -362,11 +357,13 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
      */
     public void setGlobalStateRestoreListener(final StateRestoreListener 
globalStateRestoreListener) {
-        if (state == State.CREATED) {
-            this.globalStateRestoreListener = globalStateRestoreListener;
-        } else {
-            throw new IllegalStateException("Can only set 
GlobalStateRestoreListener in CREATED state. " +
+        synchronized (stateLock) {
+            if (state == State.CREATED) {
+                this.globalStateRestoreListener = globalStateRestoreListener;
+            } else {
+                throw new IllegalStateException("Can only set 
GlobalStateRestoreListener in CREATED state. " +
                     "Current state is: " + state);
+            }
         }
     }
 
@@ -396,18 +393,21 @@ public class KafkaStreams implements AutoCloseable {
     final class StreamStateListener implements StreamThread.StateListener {
         private final Map<Long, StreamThread.State> threadState;
         private GlobalStreamThread.State globalThreadState;
+        // this lock should always be held before the state lock
+        private final Object threadStatesLock;
 
         StreamStateListener(final Map<Long, StreamThread.State> threadState,
                             final GlobalStreamThread.State globalThreadState) {
             this.threadState = threadState;
             this.globalThreadState = globalThreadState;
+            this.threadStatesLock = new Object();
         }
 
         /**
          * If all threads are dead set to ERROR
          */
         private void maybeSetError() {
-            // check if we have enough threads running
+            // check if we have at least one thread running
             for (final StreamThread.State state : threadState.values()) {
                 if (state != StreamThread.State.DEAD) {
                     return;
@@ -415,7 +415,7 @@ public class KafkaStreams implements AutoCloseable {
             }
 
             if (setState(State.ERROR)) {
-                log.warn("All stream threads have died. The instance will be 
in error state and should be closed.");
+                log.error("All stream threads have died. The instance will be 
in error state and should be closed.");
             }
         }
 
@@ -423,12 +423,13 @@ public class KafkaStreams implements AutoCloseable {
          * If all threads are up, including the global thread, set to RUNNING
          */
         private void maybeSetRunning() {
-            // one thread is running, check others, including global thread
+            // state can be transferred to RUNNING if all threads are either 
RUNNING or DEAD
             for (final StreamThread.State state : threadState.values()) {
-                if (state != StreamThread.State.RUNNING) {
+                if (state != StreamThread.State.RUNNING && state != 
StreamThread.State.DEAD) {
                     return;
                 }
             }
+
             // the global state thread is relevant only if it is started. 
There are cases
             // when we don't have a global state thread at all, e.g., when we 
don't have global KTables
             if (globalThreadState != null && globalThreadState != 
GlobalStreamThread.State.RUNNING) {
@@ -443,26 +444,29 @@ public class KafkaStreams implements AutoCloseable {
         public synchronized void onChange(final Thread thread,
                                           final ThreadStateTransitionValidator 
abstractNewState,
                                           final ThreadStateTransitionValidator 
abstractOldState) {
-            // StreamThreads first
-            if (thread instanceof StreamThread) {
-                final StreamThread.State newState = (StreamThread.State) 
abstractNewState;
-                threadState.put(thread.getId(), newState);
-
-                if (newState == StreamThread.State.PARTITIONS_REVOKED && state 
!= State.REBALANCING) {
-                    setState(State.REBALANCING);
-                } else if (newState == StreamThread.State.RUNNING && state != 
State.RUNNING) {
-                    maybeSetRunning();
-                } else if (newState == StreamThread.State.DEAD && state != 
State.ERROR) {
-                    maybeSetError();
-                }
-            } else if (thread instanceof GlobalStreamThread) {
-                // global stream thread has different invariants
-                final GlobalStreamThread.State newState = 
(GlobalStreamThread.State) abstractNewState;
-                globalThreadState = newState;
-
-                // special case when global thread is dead
-                if (newState == GlobalStreamThread.State.DEAD && state != 
State.ERROR && setState(State.ERROR)) {
-                    log.warn("Global thread has died. The instance will be in 
error state and should be closed.");
+            synchronized (threadStatesLock) {
+                // StreamThreads first
+                if (thread instanceof StreamThread) {
+                    final StreamThread.State newState = (StreamThread.State) 
abstractNewState;
+                    threadState.put(thread.getId(), newState);
+
+                    if (newState == StreamThread.State.PARTITIONS_REVOKED) {
+                        setState(State.REBALANCING);
+                    } else if (newState == StreamThread.State.RUNNING) {
+                        maybeSetRunning();
+                    } else if (newState == StreamThread.State.DEAD) {
+                        maybeSetError();
+                    }
+                } else if (thread instanceof GlobalStreamThread) {
+                    // global stream thread has different invariants
+                    final GlobalStreamThread.State newState = 
(GlobalStreamThread.State) abstractNewState;
+                    globalThreadState = newState;
+
+                    // special case when global thread is dead
+                    if (newState == GlobalStreamThread.State.DEAD) {
+                        setState(State.ERROR);
+                        log.error("Global thread has died. The instance will 
be in error state and should be closed.");
+                    }
                 }
             }
         }
@@ -773,11 +777,9 @@ public class KafkaStreams implements AutoCloseable {
      *                          if {@link 
StreamsConfig#PROCESSING_GUARANTEE_CONFIG exactly-once} is enabled for pre 
0.11.0.x brokers
      */
     public synchronized void start() throws IllegalStateException, 
StreamsException {
-        log.debug("Starting Streams client");
+        if (setState(State.REBALANCING)) {
+            log.debug("Starting Streams client");
 
-        // first set state to RUNNING before kicking off the threads,
-        // making sure the state will always transit to RUNNING before 
REBALANCING
-        if (setRunningFromCreated()) {
             if (globalStreamThread != null) {
                 globalStreamThread.start();
             }
@@ -788,17 +790,13 @@ public class KafkaStreams implements AutoCloseable {
 
             final Long cleanupDelay = 
config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
             stateDirCleaner.scheduleAtFixedRate(() -> {
+                // we do not use lock here since we only read on the value and 
act on it
                 if (state == State.RUNNING) {
                     stateDirectory.cleanRemovedTasks(cleanupDelay);
                 }
             }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
-
-            log.info("Started Streams client");
         } else {
-            // if transition failed but no exception is thrown; currently it 
is not possible
-            // since we do not allow calling start multiple times whether or 
not it is already shutdown.
-            // TODO: In the future if we lift this restriction this code path 
could then be triggered and be updated
-            log.error("Already stopped, cannot re-start");
+            throw new IllegalStateException("The client is either already 
started or already stopped, cannot re-start");
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 6c3e7cd..a4aab98 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -83,30 +83,36 @@ public class StreamThread extends Thread {
      *          |           |
      *          |           v
      *          |     +-----+-------+
-     *          +<--- | Running (1) | <----+
+     *          +<--- | Starting (1)|
+     *          |     +-----+-------+
+     *          |           |
+     *          |           |
+     *          |           v
+     *          |     +-----+-------+
+     *          +<--- | Partitions  |------+
+     *          |     | Revoked (2) | <----+
      *          |     +-----+-------+      |
      *          |           |              |
      *          |           v              |
      *          |     +-----+-------+      |
-     *          +<--- | Partitions  |      |
-     *          |     | Revoked (2) | <----+
+     *          |     | Partitions  |      |
+     *          +<--- | Assigned (3)| ---->+
      *          |     +-----+-------+      |
      *          |           |              |
      *          |           v              |
      *          |     +-----+-------+      |
-     *          |     | Partitions  |      |
-     *          |     | Assigned (3)| ---->+
+     *          |     | Running (4) | ---->+
      *          |     +-----+-------+
      *          |           |
      *          |           v
      *          |     +-----+-------+
      *          +---> | Pending     |
-     *                | Shutdown (4)|
+     *                | Shutdown (5)|
      *                +-----+-------+
      *                      |
      *                      v
      *                +-----+-------+
-     *                | Dead (5)    |
+     *                | Dead (6)    |
      *                +-------------+
      * </pre>
      *
@@ -121,12 +127,13 @@ public class StreamThread extends Thread {
      *     <li>
      *         State PARTITIONS_REVOKED may want transit to itself 
indefinitely, in the corner case when
      *         the coordinator repeatedly fails in-between revoking partitions 
and assigning new partitions.
+     *         Also during streams instance start up PARTITIONS_REVOKED may 
want to transit to itself as well.
      *         In this case we will forbid the transition but will not treat 
as an error.
      *     </li>
      * </ul>
      */
     public enum State implements ThreadStateTransitionValidator {
-        CREATED(1, 4), RUNNING(2, 4), PARTITIONS_REVOKED(3, 4), 
PARTITIONS_ASSIGNED(1, 2, 4), PENDING_SHUTDOWN(5), DEAD;
+        CREATED(1, 5), STARTING(2, 5), PARTITIONS_REVOKED(2, 3, 5), 
PARTITIONS_ASSIGNED(2, 4, 5), RUNNING(2, 5), PENDING_SHUTDOWN(6), DEAD;
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
@@ -135,7 +142,7 @@ public class StreamThread extends Thread {
         }
 
         public boolean isRunning() {
-            return equals(RUNNING) || equals(PARTITIONS_REVOKED) || 
equals(PARTITIONS_ASSIGNED);
+            return equals(RUNNING) || equals(STARTING) || 
equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED);
         }
 
         @Override
@@ -196,10 +203,10 @@ public class StreamThread extends Thread {
                 // when the state is already in NOT_RUNNING, all its 
transitions
                 // will be refused but we do not throw exception here
                 return null;
-            } else if (state == State.PARTITIONS_REVOKED && newState == 
State.PARTITIONS_REVOKED) {
+///            } else if (state == State.PARTITIONS_REVOKED && newState == 
State.PARTITIONS_REVOKED) {
                 // when the state is already in PARTITIONS_REVOKED, its 
transition to itself will be
                 // refused but we do not throw exception here
-                return null;
+//                return null;
             } else if (!state.isValidTransition(newState)) {
                 log.error("Unexpected state transition from {} to {}", 
oldState, newState);
                 throw new StreamsException(logPrefix + "Unexpected state 
transition from " + oldState + " to " + newState);
@@ -738,7 +745,7 @@ public class StreamThread extends Thread {
     @Override
     public void run() {
         log.info("Starting");
-        if (setState(State.RUNNING) == null) {
+        if (setState(State.STARTING) == null) {
             log.info("StreamThread already shutdown. Not running");
             return;
         }
@@ -816,7 +823,7 @@ public class StreamThread extends Thread {
             // try to fetch some records with normal poll time
             // in order to wait long enough to get the join response
             records = pollRequests(pollTime);
-        } else if (state == State.RUNNING) {
+        } else if (state == State.RUNNING || state == State.STARTING) {
             // try to fetch some records with normal poll time
             // in order to get long polling
             records = pollRequests(pollTime);
@@ -1249,23 +1256,6 @@ public class StreamThread extends Thread {
         return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
     }
 
-    // the following are for testing only
-    void setNow(final long now) {
-        this.now = now;
-    }
-
-    TaskManager taskManager() {
-        return taskManager;
-    }
-
-    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() 
{
-        return standbyRecords;
-    }
-
-    int currentNumIterations() {
-        return numIterations;
-    }
-
     public Map<MetricName, Metric> producerMetrics() {
         final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
         if (producer != null) {
@@ -1300,4 +1290,25 @@ public class StreamThread extends Thread {
         result.putAll(adminClientMetrics);
         return result;
     }
+
+    // the following are for testing only
+    void setNow(final long now) {
+        this.now = now;
+    }
+
+    TaskManager taskManager() {
+        return taskManager;
+    }
+
+    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() 
{
+        return standbyRecords;
+    }
+
+    int currentNumIterations() {
+        return numIterations;
+    }
+
+    public StreamThread.StateListener stateListener() {
+        return stateListener;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index d2a4ace..89f3730 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -139,36 +139,77 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testStateChanges() throws InterruptedException {
+    public void testStateCloseAfterCreate() {
+        globalStreams.close();
+
+        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, 
globalStreams.state());
+    }
+
+    @Test
+    public void testStateOneThreadDeadButRebalanceFinish() throws 
InterruptedException {
         final StateListenerStub stateListener = new StateListenerStub();
         globalStreams.setStateListener(stateListener);
 
-        Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state());
         Assert.assertEquals(0, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state());
 
         globalStreams.start();
+
         TestUtils.waitForCondition(
-            () -> globalStreams.state() == KafkaStreams.State.RUNNING,
+            () -> stateListener.numChanges == 2,
             10 * 1000,
             "Streams never started.");
+        Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state());
 
-        globalStreams.close();
+        for (final StreamThread thread: globalStreams.threads) {
+            thread.stateListener().onChange(
+                thread,
+                StreamThread.State.PARTITIONS_REVOKED,
+                StreamThread.State.RUNNING);
+        }
 
-        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, 
globalStreams.state());
-    }
+        Assert.assertEquals(3, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.REBALANCING, 
globalStreams.state());
 
-    @Test
-    public void testStateCloseAfterCreate() {
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        for (final StreamThread thread: globalStreams.threads) {
+            thread.stateListener().onChange(
+                thread,
+                StreamThread.State.PARTITIONS_ASSIGNED,
+                StreamThread.State.PARTITIONS_REVOKED);
+        }
 
-        try {
-            final StateListenerStub stateListener = new StateListenerStub();
-            streams.setStateListener(stateListener);
-        } finally {
-            streams.close();
+        Assert.assertEquals(3, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.REBALANCING, 
globalStreams.state());
+
+        globalStreams.threads[NUM_THREADS - 1].stateListener().onChange(
+            globalStreams.threads[NUM_THREADS - 1],
+            StreamThread.State.PENDING_SHUTDOWN,
+            StreamThread.State.PARTITIONS_ASSIGNED);
+
+        globalStreams.threads[NUM_THREADS - 1].stateListener().onChange(
+            globalStreams.threads[NUM_THREADS - 1],
+            StreamThread.State.DEAD,
+            StreamThread.State.PENDING_SHUTDOWN);
+
+        Assert.assertEquals(3, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.REBALANCING, 
globalStreams.state());
+
+        for (final StreamThread thread: globalStreams.threads) {
+            if (thread != globalStreams.threads[NUM_THREADS - 1]) {
+                thread.stateListener().onChange(
+                    thread,
+                    StreamThread.State.RUNNING,
+                    StreamThread.State.PARTITIONS_ASSIGNED);
+            }
         }
 
-        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
+        Assert.assertEquals(4, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state());
+
+        globalStreams.close();
+
+        Assert.assertEquals(6, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, 
globalStreams.state());
     }
 
     @Test
@@ -499,8 +540,8 @@ public class KafkaStreamsTest {
         assertNotNull(threadMetadata);
         assertEquals(2, threadMetadata.size());
         for (final ThreadMetadata metadata : threadMetadata) {
-            assertTrue("#threadState() was: " + metadata.threadState() + "; 
expected either RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED",
-                asList("RUNNING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", 
"CREATED").contains(metadata.threadState()));
+            assertTrue("#threadState() was: " + metadata.threadState() + "; 
expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or 
CREATED",
+                asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", 
"PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState()));
             assertEquals(0, metadata.standbyTasks().size());
             assertEquals(0, metadata.activeTasks().size());
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 7503dd6..ca78d02 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -82,7 +82,7 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, 
"Did not seen thread state transited to PENDING_SHUTDOWN");
 
         streams.close();
-        assertEquals(listener.runningToRevokedSeen(), true);
+        assertEquals(listener.createdToRevokedSeen(), true);
         assertEquals(listener.revokedToPendingShutdownSeen(), true);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 8a2122d..1097285 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -75,14 +75,14 @@ public class IntegrationTestUtils {
      * Records state transition for StreamThread
      */
     public static class StateListenerStub implements 
StreamThread.StateListener {
-        boolean runningToRevokedSeen = false;
+        boolean startingToRevokedSeen = false;
         boolean revokedToPendingShutdownSeen = false;
         @Override
         public void onChange(final Thread thread,
                              final ThreadStateTransitionValidator newState,
                              final ThreadStateTransitionValidator oldState) {
-            if (oldState == StreamThread.State.RUNNING && newState == 
StreamThread.State.PARTITIONS_REVOKED) {
-                runningToRevokedSeen = true;
+            if (oldState == StreamThread.State.STARTING && newState == 
StreamThread.State.PARTITIONS_REVOKED) {
+                startingToRevokedSeen = true;
             } else if (oldState == StreamThread.State.PARTITIONS_REVOKED && 
newState == StreamThread.State.PENDING_SHUTDOWN) {
                 revokedToPendingShutdownSeen = true;
             }
@@ -92,8 +92,8 @@ public class IntegrationTestUtils {
             return revokedToPendingShutdownSeen;
         }
 
-        public boolean runningToRevokedSeen() {
-            return runningToRevokedSeen;
+        public boolean createdToRevokedSeen() {
+            return startingToRevokedSeen;
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5df8fbc..dd311fb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -163,12 +163,12 @@ public class StreamThreadTest {
         assertEquals(thread.state(), StreamThread.State.CREATED);
 
         final ConsumerRebalanceListener rebalanceListener = 
thread.rebalanceListener;
-        thread.setState(StreamThread.State.RUNNING);
 
         final List<TopicPartition> revokedPartitions;
         final List<TopicPartition> assignedPartitions;
 
         // revoke nothing
+        thread.setState(StreamThread.State.STARTING);
         revokedPartitions = Collections.emptyList();
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
 
@@ -202,7 +202,7 @@ public class StreamThreadTest {
         TestUtils.waitForCondition(new TestCondition() {
             @Override
             public boolean conditionMet() {
-                return thread.state() == StreamThread.State.RUNNING;
+                return thread.state() == StreamThread.State.STARTING;
             }
         }, 10 * 1000, "Thread never started.");
 
@@ -342,7 +342,7 @@ public class StreamThreadTest {
             properties));
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);
@@ -505,8 +505,8 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setState(StreamThread.State.RUNNING);
-        
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.setState(StreamThread.State.STARTING);
+        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -542,7 +542,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -577,7 +577,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -612,8 +612,6 @@ public class StreamThreadTest {
     public void shouldShutdownTaskManagerOnClose() {
         final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        
EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.<TaskId, 
StreamTask>emptyMap());
-        
EasyMock.expect(taskManager.standbyTasks()).andReturn(Collections.<TaskId, 
StandbyTask>emptyMap());
         taskManager.shutdown(true);
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
@@ -637,7 +635,7 @@ public class StreamThreadTest {
             new StreamThread.StateListener() {
                 @Override
                 public void onChange(final Thread t, final 
ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator 
oldState) {
-                    if (oldState == StreamThread.State.CREATED && newState == 
StreamThread.State.RUNNING) {
+                    if (oldState == StreamThread.State.CREATED && newState == 
StreamThread.State.STARTING) {
                         thread.shutdown();
                     }
                 }
@@ -711,8 +709,8 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setState(StreamThread.State.RUNNING);
-        
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.setState(StreamThread.State.STARTING);
+        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
 
@@ -736,7 +734,7 @@ public class StreamThreadTest {
 
         consumer.updatePartitions(topic1, singletonList(new 
PartitionInfo(topic1, 1, null, null, null)));
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -804,7 +802,7 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "name", null, null, null, 
topic1);
         internalTopologyBuilder.addSink("out", "output", null, null, null, 
"name");
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -840,7 +838,7 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "name", null, null, null, 
topic1);
         internalTopologyBuilder.addSink("out", "output", null, null, null, 
"name");
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -896,8 +894,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setState(StreamThread.State.RUNNING);
-
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -946,8 +943,7 @@ public class StreamThreadTest {
         restoreConsumer.updateEndOffsets(offsets);
         restoreConsumer.updateBeginningOffsets(offsets);
 
-        thread.setState(StreamThread.State.RUNNING);
-
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@@ -1010,8 +1006,7 @@ public class StreamThreadTest {
             restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1, 
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
         }
 
-        thread.setState(StreamThread.State.RUNNING);
-
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@@ -1074,8 +1069,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setState(StreamThread.State.RUNNING);
-
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
@@ -1121,6 +1115,9 @@ public class StreamThreadTest {
         ThreadMetadata metadata = thread.threadMetadata();
         assertEquals(StreamThread.State.CREATED.name(), 
metadata.threadState());
 
+        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+        thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
         thread.setState(StreamThread.State.RUNNING);
         metadata = thread.threadMetadata();
         assertEquals(StreamThread.State.RUNNING.name(), 
metadata.threadState());
@@ -1155,10 +1152,9 @@ public class StreamThreadTest {
 
         
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
 
-        thread.setState(StreamThread.State.RUNNING);
-
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(assignedPartitions);
         assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), 
StreamThread.State.PARTITIONS_REVOKED);
 
@@ -1287,7 +1283,7 @@ public class StreamThreadTest {
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass().getName());
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(config), false);
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);
@@ -1331,7 +1327,7 @@ public class StreamThreadTest {
         
config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
LogAndSkipOnInvalidTimestamp.class.getName());
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(config), false);
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);

Reply via email to