This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9d54421  Revert "KAFKA-7657: Fixing thread state change to instance 
state change (#6018)" (#6090)
9d54421 is described below

commit 9d544212e69269f155bb3a51f94a9b13cf1fa565
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Fri Jan 4 14:41:15 2019 -0800

    Revert "KAFKA-7657: Fixing thread state change to instance state change 
(#6018)" (#6090)
    
    This reverts commit d6698308194625e7921b9c3ace27a918f42f26f1.
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 178 +++++++++++----------
 .../streams/processor/internals/StreamThread.java  |  71 ++++----
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  75 ++-------
 .../StreamTableJoinIntegrationTest.java            |   2 +-
 .../integration/utils/IntegrationTestUtils.java    |  10 +-
 .../processor/internals/StreamThreadTest.java      |  50 +++---
 6 files changed, 170 insertions(+), 216 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 4d0c1c7..bbd0105 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -159,24 +159,24 @@ public class KafkaStreams implements AutoCloseable {
      *         |       +-----+--------+
      *         |             |
      *         |             v
-     *         |       +----+--+------+
-     *         |       | Re-          |
-     *         +<----- | Balancing (1)| -------->+
-     *         |       +-----+-+------+          |
-     *         |             | ^                 |
-     *         |             v |                 |
-     *         |       +--------------+          v
-     *         |       | Running (2)  | -------->+
-     *         |       +------+-------+          |
-     *         |              |                  |
-     *         |              v                  |
-     *         |       +------+-------+     +----+-------+
-     *         +-----> | Pending      |<--- | Error (5)  |---+
-     *                 | Shutdown (3) |     +------------+   |
-     *                 +------+-------+          ^           |
-     *                        |                  |           |
-     *                        v                  +-----------+
-     *                 +------+-------+
+     *         |       +--------------+
+     *         +<----- | Running (2)  | -------->+
+     *         |       +----+--+------+          |
+     *         |            |  ^                 |
+     *         |            v  |                 |
+     *         |       +----+--+------+          |
+     *         |       | Re-          |          v
+     *         |       | Balancing (1)| -------->+
+     *         |       +-----+--------+          |
+     *         |             |                   |
+     *         |             v                   v
+     *         |       +-----+--------+     +----+-------+
+     *         +-----> | Pending      |<--- | Error (5)  |
+     *                 | Shutdown (3) |     +------------+
+     *                 +-----+--------+
+     *                       |
+     *                       v
+     *                 +-----+--------+
      *                 | Not          |
      *                 | Running (4)  |
      *                 +--------------+
@@ -191,7 +191,7 @@ public class KafkaStreams implements AutoCloseable {
      *   the instance will be in the ERROR state. The user will need to close 
it.
      */
     public enum State {
-        CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), 
PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3, 5);
+        CREATED(2, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), 
PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3);
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
@@ -238,11 +238,9 @@ public class KafkaStreams implements AutoCloseable {
      * @param newState New state
      */
     private boolean setState(final State newState) {
-        final State oldState;
+        final State oldState = state;
 
         synchronized (stateLock) {
-            oldState = state;
-
             if (state == State.PENDING_SHUTDOWN && newState != 
State.NOT_RUNNING) {
                 // when the state is already in PENDING_SHUTDOWN, all other 
transitions than NOT_RUNNING (due to thread dying) will be
                 // refused but we do not throw exception here, to allow 
appropriate error handling
@@ -251,13 +249,6 @@ public class KafkaStreams implements AutoCloseable {
                 // when the state is already in NOT_RUNNING, its transition to 
PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls)
                 // will be refused but we do not throw exception here, to 
allow idempotent close calls
                 return false;
-            } else if (state == State.REBALANCING && newState == 
State.REBALANCING) {
-                // when the state is already in REBALANCING, it should not 
transit to REBALANCING
-                return false;
-//            } else if (state == State.RUNNING && newState == State.RUNNING) {
-                // when the state is already in RUNNING, it should not transit 
to RUNNING
-                // this can happen during starting up
-//                return false;
             } else if (!state.isValidTransition(newState)) {
                 throw new IllegalStateException("Stream-client " + clientId + 
": Unexpected state transition from " + oldState + " to " + newState);
             } else {
@@ -269,7 +260,24 @@ public class KafkaStreams implements AutoCloseable {
 
         // we need to call the user customized state listener outside the 
state lock to avoid potential deadlocks
         if (stateListener != null) {
-            stateListener.onChange(newState, oldState);
+            stateListener.onChange(state, oldState);
+        }
+
+        return true;
+    }
+
+    private boolean setRunningFromCreated() {
+        synchronized (stateLock) {
+            if (state != State.CREATED) {
+                throw new IllegalStateException("Stream-client " + clientId + 
": Unexpected state transition from " + state + " to " + State.RUNNING);
+            }
+            state = State.RUNNING;
+            stateLock.notifyAll();
+        }
+
+        // we need to call the user customized state listener outside the 
state lock to avoid potential deadlocks
+        if (stateListener != null) {
+            stateListener.onChange(State.RUNNING, State.CREATED);
         }
 
         return true;
@@ -316,12 +324,11 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
      */
     public void setStateListener(final KafkaStreams.StateListener listener) {
-        synchronized (stateLock) {
-            if (state == State.CREATED) {
-                stateListener = listener;
-            } else {
-                throw new IllegalStateException("Can only set StateListener in 
CREATED state. Current state is: " + state);
-            }
+        if (state == State.CREATED) {
+            stateListener = listener;
+        } else {
+            throw new IllegalStateException("Can only set StateListener in 
CREATED state. " +
+                    "Current state is: " + state);
         }
     }
 
@@ -333,19 +340,17 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
      */
     public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh) {
-        synchronized (stateLock) {
-            if (state == State.CREATED) {
-                for (final StreamThread thread : threads) {
-                    thread.setUncaughtExceptionHandler(eh);
-                }
+        if (state == State.CREATED) {
+            for (final StreamThread thread : threads) {
+                thread.setUncaughtExceptionHandler(eh);
+            }
 
-                if (globalStreamThread != null) {
-                    globalStreamThread.setUncaughtExceptionHandler(eh);
-                }
-            } else {
-                throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
-                    "Current state is: " + state);
+            if (globalStreamThread != null) {
+                globalStreamThread.setUncaughtExceptionHandler(eh);
             }
+        } else {
+            throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+                    "Current state is: " + state);
         }
     }
 
@@ -357,13 +362,11 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
      */
     public void setGlobalStateRestoreListener(final StateRestoreListener 
globalStateRestoreListener) {
-        synchronized (stateLock) {
-            if (state == State.CREATED) {
-                this.globalStateRestoreListener = globalStateRestoreListener;
-            } else {
-                throw new IllegalStateException("Can only set 
GlobalStateRestoreListener in CREATED state. " +
+        if (state == State.CREATED) {
+            this.globalStateRestoreListener = globalStateRestoreListener;
+        } else {
+            throw new IllegalStateException("Can only set 
GlobalStateRestoreListener in CREATED state. " +
                     "Current state is: " + state);
-            }
         }
     }
 
@@ -393,21 +396,18 @@ public class KafkaStreams implements AutoCloseable {
     final class StreamStateListener implements StreamThread.StateListener {
         private final Map<Long, StreamThread.State> threadState;
         private GlobalStreamThread.State globalThreadState;
-        // this lock should always be held before the state lock
-        private final Object threadStatesLock;
 
         StreamStateListener(final Map<Long, StreamThread.State> threadState,
                             final GlobalStreamThread.State globalThreadState) {
             this.threadState = threadState;
             this.globalThreadState = globalThreadState;
-            this.threadStatesLock = new Object();
         }
 
         /**
          * If all threads are dead set to ERROR
          */
         private void maybeSetError() {
-            // check if we have at least one thread running
+            // check if we have enough threads running
             for (final StreamThread.State state : threadState.values()) {
                 if (state != StreamThread.State.DEAD) {
                     return;
@@ -415,7 +415,7 @@ public class KafkaStreams implements AutoCloseable {
             }
 
             if (setState(State.ERROR)) {
-                log.error("All stream threads have died. The instance will be 
in error state and should be closed.");
+                log.warn("All stream threads have died. The instance will be 
in error state and should be closed.");
             }
         }
 
@@ -423,13 +423,12 @@ public class KafkaStreams implements AutoCloseable {
          * If all threads are up, including the global thread, set to RUNNING
          */
         private void maybeSetRunning() {
-            // state can be transferred to RUNNING if all threads are either 
RUNNING or DEAD
+            // one thread is running, check others, including global thread
             for (final StreamThread.State state : threadState.values()) {
-                if (state != StreamThread.State.RUNNING && state != 
StreamThread.State.DEAD) {
+                if (state != StreamThread.State.RUNNING) {
                     return;
                 }
             }
-
             // the global state thread is relevant only if it is started. 
There are cases
             // when we don't have a global state thread at all, e.g., when we 
don't have global KTables
             if (globalThreadState != null && globalThreadState != 
GlobalStreamThread.State.RUNNING) {
@@ -444,29 +443,26 @@ public class KafkaStreams implements AutoCloseable {
         public synchronized void onChange(final Thread thread,
                                           final ThreadStateTransitionValidator 
abstractNewState,
                                           final ThreadStateTransitionValidator 
abstractOldState) {
-            synchronized (threadStatesLock) {
-                // StreamThreads first
-                if (thread instanceof StreamThread) {
-                    final StreamThread.State newState = (StreamThread.State) 
abstractNewState;
-                    threadState.put(thread.getId(), newState);
-
-                    if (newState == StreamThread.State.PARTITIONS_REVOKED) {
-                        setState(State.REBALANCING);
-                    } else if (newState == StreamThread.State.RUNNING) {
-                        maybeSetRunning();
-                    } else if (newState == StreamThread.State.DEAD) {
-                        maybeSetError();
-                    }
-                } else if (thread instanceof GlobalStreamThread) {
-                    // global stream thread has different invariants
-                    final GlobalStreamThread.State newState = 
(GlobalStreamThread.State) abstractNewState;
-                    globalThreadState = newState;
-
-                    // special case when global thread is dead
-                    if (newState == GlobalStreamThread.State.DEAD) {
-                        setState(State.ERROR);
-                        log.error("Global thread has died. The instance will 
be in error state and should be closed.");
-                    }
+            // StreamThreads first
+            if (thread instanceof StreamThread) {
+                final StreamThread.State newState = (StreamThread.State) 
abstractNewState;
+                threadState.put(thread.getId(), newState);
+
+                if (newState == StreamThread.State.PARTITIONS_REVOKED && state 
!= State.REBALANCING) {
+                    setState(State.REBALANCING);
+                } else if (newState == StreamThread.State.RUNNING && state != 
State.RUNNING) {
+                    maybeSetRunning();
+                } else if (newState == StreamThread.State.DEAD && state != 
State.ERROR) {
+                    maybeSetError();
+                }
+            } else if (thread instanceof GlobalStreamThread) {
+                // global stream thread has different invariants
+                final GlobalStreamThread.State newState = 
(GlobalStreamThread.State) abstractNewState;
+                globalThreadState = newState;
+
+                // special case when global thread is dead
+                if (newState == GlobalStreamThread.State.DEAD && state != 
State.ERROR && setState(State.ERROR)) {
+                    log.warn("Global thread has died. The instance will be in 
error state and should be closed.");
                 }
             }
         }
@@ -777,9 +773,11 @@ public class KafkaStreams implements AutoCloseable {
      *                          if {@link 
StreamsConfig#PROCESSING_GUARANTEE_CONFIG exactly-once} is enabled for pre 
0.11.0.x brokers
      */
     public synchronized void start() throws IllegalStateException, 
StreamsException {
-        if (setState(State.REBALANCING)) {
-            log.debug("Starting Streams client");
+        log.debug("Starting Streams client");
 
+        // first set state to RUNNING before kicking off the threads,
+        // making sure the state will always transit to RUNNING before 
REBALANCING
+        if (setRunningFromCreated()) {
             if (globalStreamThread != null) {
                 globalStreamThread.start();
             }
@@ -790,13 +788,17 @@ public class KafkaStreams implements AutoCloseable {
 
             final Long cleanupDelay = 
config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
             stateDirCleaner.scheduleAtFixedRate(() -> {
-                // we do not use lock here since we only read on the value and 
act on it
                 if (state == State.RUNNING) {
                     stateDirectory.cleanRemovedTasks(cleanupDelay);
                 }
             }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
+
+            log.info("Started Streams client");
         } else {
-            throw new IllegalStateException("The client is either already 
started or already stopped, cannot re-start");
+            // if transition failed but no exception is thrown; currently it 
is not possible
+            // since we do not allow calling start multiple times whether or 
not it is already shutdown.
+            // TODO: In the future if we lift this restriction this code path 
could then be triggered and be updated
+            log.error("Already stopped, cannot re-start");
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a4aab98..6c3e7cd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -83,36 +83,30 @@ public class StreamThread extends Thread {
      *          |           |
      *          |           v
      *          |     +-----+-------+
-     *          +<--- | Starting (1)|
-     *          |     +-----+-------+
-     *          |           |
-     *          |           |
-     *          |           v
-     *          |     +-----+-------+
-     *          +<--- | Partitions  |------+
-     *          |     | Revoked (2) | <----+
+     *          +<--- | Running (1) | <----+
      *          |     +-----+-------+      |
      *          |           |              |
      *          |           v              |
      *          |     +-----+-------+      |
-     *          |     | Partitions  |      |
-     *          +<--- | Assigned (3)| ---->+
+     *          +<--- | Partitions  |      |
+     *          |     | Revoked (2) | <----+
      *          |     +-----+-------+      |
      *          |           |              |
      *          |           v              |
      *          |     +-----+-------+      |
-     *          |     | Running (4) | ---->+
+     *          |     | Partitions  |      |
+     *          |     | Assigned (3)| ---->+
      *          |     +-----+-------+
      *          |           |
      *          |           v
      *          |     +-----+-------+
      *          +---> | Pending     |
-     *                | Shutdown (5)|
+     *                | Shutdown (4)|
      *                +-----+-------+
      *                      |
      *                      v
      *                +-----+-------+
-     *                | Dead (6)    |
+     *                | Dead (5)    |
      *                +-------------+
      * </pre>
      *
@@ -127,13 +121,12 @@ public class StreamThread extends Thread {
      *     <li>
      *         State PARTITIONS_REVOKED may want transit to itself 
indefinitely, in the corner case when
      *         the coordinator repeatedly fails in-between revoking partitions 
and assigning new partitions.
-     *         Also during streams instance start up PARTITIONS_REVOKED may 
want to transit to itself as well.
      *         In this case we will forbid the transition but will not treat 
as an error.
      *     </li>
      * </ul>
      */
     public enum State implements ThreadStateTransitionValidator {
-        CREATED(1, 5), STARTING(2, 5), PARTITIONS_REVOKED(2, 3, 5), 
PARTITIONS_ASSIGNED(2, 4, 5), RUNNING(2, 5), PENDING_SHUTDOWN(6), DEAD;
+        CREATED(1, 4), RUNNING(2, 4), PARTITIONS_REVOKED(3, 4), 
PARTITIONS_ASSIGNED(1, 2, 4), PENDING_SHUTDOWN(5), DEAD;
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
@@ -142,7 +135,7 @@ public class StreamThread extends Thread {
         }
 
         public boolean isRunning() {
-            return equals(RUNNING) || equals(STARTING) || 
equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED);
+            return equals(RUNNING) || equals(PARTITIONS_REVOKED) || 
equals(PARTITIONS_ASSIGNED);
         }
 
         @Override
@@ -203,10 +196,10 @@ public class StreamThread extends Thread {
                 // when the state is already in NOT_RUNNING, all its 
transitions
                 // will be refused but we do not throw exception here
                 return null;
-///            } else if (state == State.PARTITIONS_REVOKED && newState == 
State.PARTITIONS_REVOKED) {
+            } else if (state == State.PARTITIONS_REVOKED && newState == 
State.PARTITIONS_REVOKED) {
                 // when the state is already in PARTITIONS_REVOKED, its 
transition to itself will be
                 // refused but we do not throw exception here
-//                return null;
+                return null;
             } else if (!state.isValidTransition(newState)) {
                 log.error("Unexpected state transition from {} to {}", 
oldState, newState);
                 throw new StreamsException(logPrefix + "Unexpected state 
transition from " + oldState + " to " + newState);
@@ -745,7 +738,7 @@ public class StreamThread extends Thread {
     @Override
     public void run() {
         log.info("Starting");
-        if (setState(State.STARTING) == null) {
+        if (setState(State.RUNNING) == null) {
             log.info("StreamThread already shutdown. Not running");
             return;
         }
@@ -823,7 +816,7 @@ public class StreamThread extends Thread {
             // try to fetch some records with normal poll time
             // in order to wait long enough to get the join response
             records = pollRequests(pollTime);
-        } else if (state == State.RUNNING || state == State.STARTING) {
+        } else if (state == State.RUNNING) {
             // try to fetch some records with normal poll time
             // in order to get long polling
             records = pollRequests(pollTime);
@@ -1256,6 +1249,23 @@ public class StreamThread extends Thread {
         return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
     }
 
+    // the following are for testing only
+    void setNow(final long now) {
+        this.now = now;
+    }
+
+    TaskManager taskManager() {
+        return taskManager;
+    }
+
+    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() 
{
+        return standbyRecords;
+    }
+
+    int currentNumIterations() {
+        return numIterations;
+    }
+
     public Map<MetricName, Metric> producerMetrics() {
         final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
         if (producer != null) {
@@ -1290,25 +1300,4 @@ public class StreamThread extends Thread {
         result.putAll(adminClientMetrics);
         return result;
     }
-
-    // the following are for testing only
-    void setNow(final long now) {
-        this.now = now;
-    }
-
-    TaskManager taskManager() {
-        return taskManager;
-    }
-
-    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() 
{
-        return standbyRecords;
-    }
-
-    int currentNumIterations() {
-        return numIterations;
-    }
-
-    public StreamThread.StateListener stateListener() {
-        return stateListener;
-    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 89f3730..d2a4ace 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -139,77 +139,36 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testStateCloseAfterCreate() {
-        globalStreams.close();
-
-        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, 
globalStreams.state());
-    }
-
-    @Test
-    public void testStateOneThreadDeadButRebalanceFinish() throws 
InterruptedException {
+    public void testStateChanges() throws InterruptedException {
         final StateListenerStub stateListener = new StateListenerStub();
         globalStreams.setStateListener(stateListener);
 
-        Assert.assertEquals(0, stateListener.numChanges);
         Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state());
+        Assert.assertEquals(0, stateListener.numChanges);
 
         globalStreams.start();
-
         TestUtils.waitForCondition(
-            () -> stateListener.numChanges == 2,
+            () -> globalStreams.state() == KafkaStreams.State.RUNNING,
             10 * 1000,
             "Streams never started.");
-        Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state());
-
-        for (final StreamThread thread: globalStreams.threads) {
-            thread.stateListener().onChange(
-                thread,
-                StreamThread.State.PARTITIONS_REVOKED,
-                StreamThread.State.RUNNING);
-        }
-
-        Assert.assertEquals(3, stateListener.numChanges);
-        Assert.assertEquals(KafkaStreams.State.REBALANCING, 
globalStreams.state());
-
-        for (final StreamThread thread: globalStreams.threads) {
-            thread.stateListener().onChange(
-                thread,
-                StreamThread.State.PARTITIONS_ASSIGNED,
-                StreamThread.State.PARTITIONS_REVOKED);
-        }
 
-        Assert.assertEquals(3, stateListener.numChanges);
-        Assert.assertEquals(KafkaStreams.State.REBALANCING, 
globalStreams.state());
-
-        globalStreams.threads[NUM_THREADS - 1].stateListener().onChange(
-            globalStreams.threads[NUM_THREADS - 1],
-            StreamThread.State.PENDING_SHUTDOWN,
-            StreamThread.State.PARTITIONS_ASSIGNED);
+        globalStreams.close();
 
-        globalStreams.threads[NUM_THREADS - 1].stateListener().onChange(
-            globalStreams.threads[NUM_THREADS - 1],
-            StreamThread.State.DEAD,
-            StreamThread.State.PENDING_SHUTDOWN);
+        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, 
globalStreams.state());
+    }
 
-        Assert.assertEquals(3, stateListener.numChanges);
-        Assert.assertEquals(KafkaStreams.State.REBALANCING, 
globalStreams.state());
+    @Test
+    public void testStateCloseAfterCreate() {
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
-        for (final StreamThread thread: globalStreams.threads) {
-            if (thread != globalStreams.threads[NUM_THREADS - 1]) {
-                thread.stateListener().onChange(
-                    thread,
-                    StreamThread.State.RUNNING,
-                    StreamThread.State.PARTITIONS_ASSIGNED);
-            }
+        try {
+            final StateListenerStub stateListener = new StateListenerStub();
+            streams.setStateListener(stateListener);
+        } finally {
+            streams.close();
         }
 
-        Assert.assertEquals(4, stateListener.numChanges);
-        Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state());
-
-        globalStreams.close();
-
-        Assert.assertEquals(6, stateListener.numChanges);
-        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, 
globalStreams.state());
+        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
     }
 
     @Test
@@ -540,8 +499,8 @@ public class KafkaStreamsTest {
         assertNotNull(threadMetadata);
         assertEquals(2, threadMetadata.size());
         for (final ThreadMetadata metadata : threadMetadata) {
-            assertTrue("#threadState() was: " + metadata.threadState() + "; 
expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or 
CREATED",
-                asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", 
"PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState()));
+            assertTrue("#threadState() was: " + metadata.threadState() + "; 
expected either RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED",
+                asList("RUNNING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", 
"CREATED").contains(metadata.threadState()));
             assertEquals(0, metadata.standbyTasks().size());
             assertEquals(0, metadata.activeTasks().size());
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index ca78d02..7503dd6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -82,7 +82,7 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, 
"Did not seen thread state transited to PENDING_SHUTDOWN");
 
         streams.close();
-        assertEquals(listener.createdToRevokedSeen(), true);
+        assertEquals(listener.runningToRevokedSeen(), true);
         assertEquals(listener.revokedToPendingShutdownSeen(), true);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 1097285..8a2122d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -75,14 +75,14 @@ public class IntegrationTestUtils {
      * Records state transition for StreamThread
      */
     public static class StateListenerStub implements 
StreamThread.StateListener {
-        boolean startingToRevokedSeen = false;
+        boolean runningToRevokedSeen = false;
         boolean revokedToPendingShutdownSeen = false;
         @Override
         public void onChange(final Thread thread,
                              final ThreadStateTransitionValidator newState,
                              final ThreadStateTransitionValidator oldState) {
-            if (oldState == StreamThread.State.STARTING && newState == 
StreamThread.State.PARTITIONS_REVOKED) {
-                startingToRevokedSeen = true;
+            if (oldState == StreamThread.State.RUNNING && newState == 
StreamThread.State.PARTITIONS_REVOKED) {
+                runningToRevokedSeen = true;
             } else if (oldState == StreamThread.State.PARTITIONS_REVOKED && 
newState == StreamThread.State.PENDING_SHUTDOWN) {
                 revokedToPendingShutdownSeen = true;
             }
@@ -92,8 +92,8 @@ public class IntegrationTestUtils {
             return revokedToPendingShutdownSeen;
         }
 
-        public boolean createdToRevokedSeen() {
-            return startingToRevokedSeen;
+        public boolean runningToRevokedSeen() {
+            return runningToRevokedSeen;
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index dd311fb..5df8fbc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -163,12 +163,12 @@ public class StreamThreadTest {
         assertEquals(thread.state(), StreamThread.State.CREATED);
 
         final ConsumerRebalanceListener rebalanceListener = 
thread.rebalanceListener;
+        thread.setState(StreamThread.State.RUNNING);
 
         final List<TopicPartition> revokedPartitions;
         final List<TopicPartition> assignedPartitions;
 
         // revoke nothing
-        thread.setState(StreamThread.State.STARTING);
         revokedPartitions = Collections.emptyList();
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
 
@@ -202,7 +202,7 @@ public class StreamThreadTest {
         TestUtils.waitForCondition(new TestCondition() {
             @Override
             public boolean conditionMet() {
-                return thread.state() == StreamThread.State.STARTING;
+                return thread.state() == StreamThread.State.RUNNING;
             }
         }, 10 * 1000, "Thread never started.");
 
@@ -342,7 +342,7 @@ public class StreamThreadTest {
             properties));
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);
@@ -505,8 +505,8 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
+        thread.setState(StreamThread.State.RUNNING);
+        
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -542,7 +542,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
         
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -577,7 +577,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
         
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -612,6 +612,8 @@ public class StreamThreadTest {
     public void shouldShutdownTaskManagerOnClose() {
         final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
+        
EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.<TaskId, 
StreamTask>emptyMap());
+        
EasyMock.expect(taskManager.standbyTasks()).andReturn(Collections.<TaskId, 
StandbyTask>emptyMap());
         taskManager.shutdown(true);
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
@@ -635,7 +637,7 @@ public class StreamThreadTest {
             new StreamThread.StateListener() {
                 @Override
                 public void onChange(final Thread t, final 
ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator 
oldState) {
-                    if (oldState == StreamThread.State.CREATED && newState == 
StreamThread.State.STARTING) {
+                    if (oldState == StreamThread.State.CREATED && newState == 
StreamThread.State.RUNNING) {
                         thread.shutdown();
                     }
                 }
@@ -709,8 +711,8 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
+        thread.setState(StreamThread.State.RUNNING);
+        
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
 
@@ -734,7 +736,7 @@ public class StreamThreadTest {
 
         consumer.updatePartitions(topic1, singletonList(new 
PartitionInfo(topic1, 1, null, null, null)));
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -802,7 +804,7 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "name", null, null, null, 
topic1);
         internalTopologyBuilder.addSink("out", "output", null, null, null, 
"name");
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -838,7 +840,7 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "name", null, null, null, 
topic1);
         internalTopologyBuilder.addSink("out", "output", null, null, null, 
"name");
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -894,7 +896,8 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
+
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -943,7 +946,8 @@ public class StreamThreadTest {
         restoreConsumer.updateEndOffsets(offsets);
         restoreConsumer.updateBeginningOffsets(offsets);
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
+
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@@ -1006,7 +1010,8 @@ public class StreamThreadTest {
             restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1, 
i, ("K" + i).getBytes(), ("V" + i).getBytes()));
         }
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
+
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@@ -1069,7 +1074,8 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
+
         thread.rebalanceListener.onPartitionsRevoked(null);
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
@@ -1115,9 +1121,6 @@ public class StreamThreadTest {
         ThreadMetadata metadata = thread.threadMetadata();
         assertEquals(StreamThread.State.CREATED.name(), 
metadata.threadState());
 
-        thread.setState(StreamThread.State.STARTING);
-        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
-        thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
         thread.setState(StreamThread.State.RUNNING);
         metadata = thread.threadMetadata();
         assertEquals(StreamThread.State.RUNNING.name(), 
metadata.threadState());
@@ -1152,9 +1155,10 @@ public class StreamThreadTest {
 
         
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
 
+        thread.setState(StreamThread.State.RUNNING);
+
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
-        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(assignedPartitions);
         assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), 
StreamThread.State.PARTITIONS_REVOKED);
 
@@ -1283,7 +1287,7 @@ public class StreamThreadTest {
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass().getName());
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(config), false);
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);
@@ -1327,7 +1331,7 @@ public class StreamThreadTest {
         
config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
LogAndSkipOnInvalidTimestamp.class.getName());
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(config), false);
 
-        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.RUNNING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);

Reply via email to