This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d669830 KAFKA-7657: Fixing thread state change to instance state change (#6018) d669830 is described below commit d6698308194625e7921b9c3ace27a918f42f26f1 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Fri Jan 4 14:39:47 2019 -0800 KAFKA-7657: Fixing thread state change to instance state change (#6018) While looking into KAFKA-7657, I found there are a few loopholes in this logic: 1. We kept a map of thread-name to thread-state and a global-thread state at the KafkaStreams instance-level, in addition to the instance state itself. stateLock is used when accessing the instance state, however when we are in the thread state change callback, we are accessing both the thread-states as well as the instance state at the same time in the callers of setState without a lock, which is vulnerable to concurrent multi-stream threads. The fix is a) introduce a threadStatesLoc [...] 2. When transiting to state.RUNNING, we check if all threads are either in RUNNING or DEAD state, this is because some threads maybe dead at the rebalance period but we should still proceed to RUNNING if the rest of threads are still transiting to RUNNING. Added unit test for 2) above. Also simplified another test as a nit change. Reviewers: John Roesler <vvcep...@users.noreply.github.com>, Matthias J. Sax <mj...@apache.org> --- .../org/apache/kafka/streams/KafkaStreams.java | 178 ++++++++++----------- .../streams/processor/internals/StreamThread.java | 71 ++++---- .../org/apache/kafka/streams/KafkaStreamsTest.java | 75 +++++++-- .../StreamTableJoinIntegrationTest.java | 2 +- .../integration/utils/IntegrationTestUtils.java | 10 +- .../processor/internals/StreamThreadTest.java | 50 +++--- 6 files changed, 216 insertions(+), 170 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index bbd0105..4d0c1c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -159,24 +159,24 @@ public class KafkaStreams implements AutoCloseable { * | +-----+--------+ * | | * | v - * | +--------------+ - * +<----- | Running (2) | -------->+ - * | +----+--+------+ | - * | | ^ | - * | v | | - * | +----+--+------+ | - * | | Re- | v - * | | Balancing (1)| -------->+ - * | +-----+--------+ | - * | | | - * | v v - * | +-----+--------+ +----+-------+ - * +-----> | Pending |<--- | Error (5) | - * | Shutdown (3) | +------------+ - * +-----+--------+ - * | - * v - * +-----+--------+ + * | +----+--+------+ + * | | Re- | + * +<----- | Balancing (1)| -------->+ + * | +-----+-+------+ | + * | | ^ | + * | v | | + * | +--------------+ v + * | | Running (2) | -------->+ + * | +------+-------+ | + * | | | + * | v | + * | +------+-------+ +----+-------+ + * +-----> | Pending |<--- | Error (5) |---+ + * | Shutdown (3) | +------------+ | + * +------+-------+ ^ | + * | | | + * v +-----------+ + * +------+-------+ * | Not | * | Running (4) | * +--------------+ @@ -191,7 +191,7 @@ public class KafkaStreams implements AutoCloseable { * the instance will be in the ERROR state. The user will need to close it. */ public enum State { - CREATED(2, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3); + CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3, 5); private final Set<Integer> validTransitions = new HashSet<>(); @@ -238,9 +238,11 @@ public class KafkaStreams implements AutoCloseable { * @param newState New state */ private boolean setState(final State newState) { - final State oldState = state; + final State oldState; synchronized (stateLock) { + oldState = state; + if (state == State.PENDING_SHUTDOWN && newState != State.NOT_RUNNING) { // when the state is already in PENDING_SHUTDOWN, all other transitions than NOT_RUNNING (due to thread dying) will be // refused but we do not throw exception here, to allow appropriate error handling @@ -249,6 +251,13 @@ public class KafkaStreams implements AutoCloseable { // when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls) // will be refused but we do not throw exception here, to allow idempotent close calls return false; + } else if (state == State.REBALANCING && newState == State.REBALANCING) { + // when the state is already in REBALANCING, it should not transit to REBALANCING + return false; +// } else if (state == State.RUNNING && newState == State.RUNNING) { + // when the state is already in RUNNING, it should not transit to RUNNING + // this can happen during starting up +// return false; } else if (!state.isValidTransition(newState)) { throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + oldState + " to " + newState); } else { @@ -260,24 +269,7 @@ public class KafkaStreams implements AutoCloseable { // we need to call the user customized state listener outside the state lock to avoid potential deadlocks if (stateListener != null) { - stateListener.onChange(state, oldState); - } - - return true; - } - - private boolean setRunningFromCreated() { - synchronized (stateLock) { - if (state != State.CREATED) { - throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + state + " to " + State.RUNNING); - } - state = State.RUNNING; - stateLock.notifyAll(); - } - - // we need to call the user customized state listener outside the state lock to avoid potential deadlocks - if (stateListener != null) { - stateListener.onChange(State.RUNNING, State.CREATED); + stateListener.onChange(newState, oldState); } return true; @@ -324,11 +316,12 @@ public class KafkaStreams implements AutoCloseable { * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setStateListener(final KafkaStreams.StateListener listener) { - if (state == State.CREATED) { - stateListener = listener; - } else { - throw new IllegalStateException("Can only set StateListener in CREATED state. " + - "Current state is: " + state); + synchronized (stateLock) { + if (state == State.CREATED) { + stateListener = listener; + } else { + throw new IllegalStateException("Can only set StateListener in CREATED state. Current state is: " + state); + } } } @@ -340,17 +333,19 @@ public class KafkaStreams implements AutoCloseable { * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) { - if (state == State.CREATED) { - for (final StreamThread thread : threads) { - thread.setUncaughtExceptionHandler(eh); - } + synchronized (stateLock) { + if (state == State.CREATED) { + for (final StreamThread thread : threads) { + thread.setUncaughtExceptionHandler(eh); + } - if (globalStreamThread != null) { - globalStreamThread.setUncaughtExceptionHandler(eh); - } - } else { - throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + + if (globalStreamThread != null) { + globalStreamThread.setUncaughtExceptionHandler(eh); + } + } else { + throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + "Current state is: " + state); + } } } @@ -362,11 +357,13 @@ public class KafkaStreams implements AutoCloseable { * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) { - if (state == State.CREATED) { - this.globalStateRestoreListener = globalStateRestoreListener; - } else { - throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED state. " + + synchronized (stateLock) { + if (state == State.CREATED) { + this.globalStateRestoreListener = globalStateRestoreListener; + } else { + throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED state. " + "Current state is: " + state); + } } } @@ -396,18 +393,21 @@ public class KafkaStreams implements AutoCloseable { final class StreamStateListener implements StreamThread.StateListener { private final Map<Long, StreamThread.State> threadState; private GlobalStreamThread.State globalThreadState; + // this lock should always be held before the state lock + private final Object threadStatesLock; StreamStateListener(final Map<Long, StreamThread.State> threadState, final GlobalStreamThread.State globalThreadState) { this.threadState = threadState; this.globalThreadState = globalThreadState; + this.threadStatesLock = new Object(); } /** * If all threads are dead set to ERROR */ private void maybeSetError() { - // check if we have enough threads running + // check if we have at least one thread running for (final StreamThread.State state : threadState.values()) { if (state != StreamThread.State.DEAD) { return; @@ -415,7 +415,7 @@ public class KafkaStreams implements AutoCloseable { } if (setState(State.ERROR)) { - log.warn("All stream threads have died. The instance will be in error state and should be closed."); + log.error("All stream threads have died. The instance will be in error state and should be closed."); } } @@ -423,12 +423,13 @@ public class KafkaStreams implements AutoCloseable { * If all threads are up, including the global thread, set to RUNNING */ private void maybeSetRunning() { - // one thread is running, check others, including global thread + // state can be transferred to RUNNING if all threads are either RUNNING or DEAD for (final StreamThread.State state : threadState.values()) { - if (state != StreamThread.State.RUNNING) { + if (state != StreamThread.State.RUNNING && state != StreamThread.State.DEAD) { return; } } + // the global state thread is relevant only if it is started. There are cases // when we don't have a global state thread at all, e.g., when we don't have global KTables if (globalThreadState != null && globalThreadState != GlobalStreamThread.State.RUNNING) { @@ -443,26 +444,29 @@ public class KafkaStreams implements AutoCloseable { public synchronized void onChange(final Thread thread, final ThreadStateTransitionValidator abstractNewState, final ThreadStateTransitionValidator abstractOldState) { - // StreamThreads first - if (thread instanceof StreamThread) { - final StreamThread.State newState = (StreamThread.State) abstractNewState; - threadState.put(thread.getId(), newState); - - if (newState == StreamThread.State.PARTITIONS_REVOKED && state != State.REBALANCING) { - setState(State.REBALANCING); - } else if (newState == StreamThread.State.RUNNING && state != State.RUNNING) { - maybeSetRunning(); - } else if (newState == StreamThread.State.DEAD && state != State.ERROR) { - maybeSetError(); - } - } else if (thread instanceof GlobalStreamThread) { - // global stream thread has different invariants - final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState; - globalThreadState = newState; - - // special case when global thread is dead - if (newState == GlobalStreamThread.State.DEAD && state != State.ERROR && setState(State.ERROR)) { - log.warn("Global thread has died. The instance will be in error state and should be closed."); + synchronized (threadStatesLock) { + // StreamThreads first + if (thread instanceof StreamThread) { + final StreamThread.State newState = (StreamThread.State) abstractNewState; + threadState.put(thread.getId(), newState); + + if (newState == StreamThread.State.PARTITIONS_REVOKED) { + setState(State.REBALANCING); + } else if (newState == StreamThread.State.RUNNING) { + maybeSetRunning(); + } else if (newState == StreamThread.State.DEAD) { + maybeSetError(); + } + } else if (thread instanceof GlobalStreamThread) { + // global stream thread has different invariants + final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState; + globalThreadState = newState; + + // special case when global thread is dead + if (newState == GlobalStreamThread.State.DEAD) { + setState(State.ERROR); + log.error("Global thread has died. The instance will be in error state and should be closed."); + } } } } @@ -773,11 +777,9 @@ public class KafkaStreams implements AutoCloseable { * if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG exactly-once} is enabled for pre 0.11.0.x brokers */ public synchronized void start() throws IllegalStateException, StreamsException { - log.debug("Starting Streams client"); + if (setState(State.REBALANCING)) { + log.debug("Starting Streams client"); - // first set state to RUNNING before kicking off the threads, - // making sure the state will always transit to RUNNING before REBALANCING - if (setRunningFromCreated()) { if (globalStreamThread != null) { globalStreamThread.start(); } @@ -788,17 +790,13 @@ public class KafkaStreams implements AutoCloseable { final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); stateDirCleaner.scheduleAtFixedRate(() -> { + // we do not use lock here since we only read on the value and act on it if (state == State.RUNNING) { stateDirectory.cleanRemovedTasks(cleanupDelay); } }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS); - - log.info("Started Streams client"); } else { - // if transition failed but no exception is thrown; currently it is not possible - // since we do not allow calling start multiple times whether or not it is already shutdown. - // TODO: In the future if we lift this restriction this code path could then be triggered and be updated - log.error("Already stopped, cannot re-start"); + throw new IllegalStateException("The client is either already started or already stopped, cannot re-start"); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 6c3e7cd..a4aab98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -83,30 +83,36 @@ public class StreamThread extends Thread { * | | * | v * | +-----+-------+ - * +<--- | Running (1) | <----+ + * +<--- | Starting (1)| + * | +-----+-------+ + * | | + * | | + * | v + * | +-----+-------+ + * +<--- | Partitions |------+ + * | | Revoked (2) | <----+ * | +-----+-------+ | * | | | * | v | * | +-----+-------+ | - * +<--- | Partitions | | - * | | Revoked (2) | <----+ + * | | Partitions | | + * +<--- | Assigned (3)| ---->+ * | +-----+-------+ | * | | | * | v | * | +-----+-------+ | - * | | Partitions | | - * | | Assigned (3)| ---->+ + * | | Running (4) | ---->+ * | +-----+-------+ * | | * | v * | +-----+-------+ * +---> | Pending | - * | Shutdown (4)| + * | Shutdown (5)| * +-----+-------+ * | * v * +-----+-------+ - * | Dead (5) | + * | Dead (6) | * +-------------+ * </pre> * @@ -121,12 +127,13 @@ public class StreamThread extends Thread { * <li> * State PARTITIONS_REVOKED may want transit to itself indefinitely, in the corner case when * the coordinator repeatedly fails in-between revoking partitions and assigning new partitions. + * Also during streams instance start up PARTITIONS_REVOKED may want to transit to itself as well. * In this case we will forbid the transition but will not treat as an error. * </li> * </ul> */ public enum State implements ThreadStateTransitionValidator { - CREATED(1, 4), RUNNING(2, 4), PARTITIONS_REVOKED(3, 4), PARTITIONS_ASSIGNED(1, 2, 4), PENDING_SHUTDOWN(5), DEAD; + CREATED(1, 5), STARTING(2, 5), PARTITIONS_REVOKED(2, 3, 5), PARTITIONS_ASSIGNED(2, 4, 5), RUNNING(2, 5), PENDING_SHUTDOWN(6), DEAD; private final Set<Integer> validTransitions = new HashSet<>(); @@ -135,7 +142,7 @@ public class StreamThread extends Thread { } public boolean isRunning() { - return equals(RUNNING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED); + return equals(RUNNING) || equals(STARTING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED); } @Override @@ -196,10 +203,10 @@ public class StreamThread extends Thread { // when the state is already in NOT_RUNNING, all its transitions // will be refused but we do not throw exception here return null; - } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) { +/// } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) { // when the state is already in PARTITIONS_REVOKED, its transition to itself will be // refused but we do not throw exception here - return null; +// return null; } else if (!state.isValidTransition(newState)) { log.error("Unexpected state transition from {} to {}", oldState, newState); throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState); @@ -738,7 +745,7 @@ public class StreamThread extends Thread { @Override public void run() { log.info("Starting"); - if (setState(State.RUNNING) == null) { + if (setState(State.STARTING) == null) { log.info("StreamThread already shutdown. Not running"); return; } @@ -816,7 +823,7 @@ public class StreamThread extends Thread { // try to fetch some records with normal poll time // in order to wait long enough to get the join response records = pollRequests(pollTime); - } else if (state == State.RUNNING) { + } else if (state == State.RUNNING || state == State.STARTING) { // try to fetch some records with normal poll time // in order to get long polling records = pollRequests(pollTime); @@ -1249,23 +1256,6 @@ public class StreamThread extends Thread { return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent); } - // the following are for testing only - void setNow(final long now) { - this.now = now; - } - - TaskManager taskManager() { - return taskManager; - } - - Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() { - return standbyRecords; - } - - int currentNumIterations() { - return numIterations; - } - public Map<MetricName, Metric> producerMetrics() { final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>(); if (producer != null) { @@ -1300,4 +1290,25 @@ public class StreamThread extends Thread { result.putAll(adminClientMetrics); return result; } + + // the following are for testing only + void setNow(final long now) { + this.now = now; + } + + TaskManager taskManager() { + return taskManager; + } + + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() { + return standbyRecords; + } + + int currentNumIterations() { + return numIterations; + } + + public StreamThread.StateListener stateListener() { + return stateListener; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index d2a4ace..89f3730 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -139,36 +139,77 @@ public class KafkaStreamsTest { } @Test - public void testStateChanges() throws InterruptedException { + public void testStateCloseAfterCreate() { + globalStreams.close(); + + Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); + } + + @Test + public void testStateOneThreadDeadButRebalanceFinish() throws InterruptedException { final StateListenerStub stateListener = new StateListenerStub(); globalStreams.setStateListener(stateListener); - Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state()); Assert.assertEquals(0, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state()); globalStreams.start(); + TestUtils.waitForCondition( - () -> globalStreams.state() == KafkaStreams.State.RUNNING, + () -> stateListener.numChanges == 2, 10 * 1000, "Streams never started."); + Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state()); - globalStreams.close(); + for (final StreamThread thread: globalStreams.threads) { + thread.stateListener().onChange( + thread, + StreamThread.State.PARTITIONS_REVOKED, + StreamThread.State.RUNNING); + } - Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); - } + Assert.assertEquals(3, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); - @Test - public void testStateCloseAfterCreate() { - final KafkaStreams streams = new KafkaStreams(builder.build(), props); + for (final StreamThread thread: globalStreams.threads) { + thread.stateListener().onChange( + thread, + StreamThread.State.PARTITIONS_ASSIGNED, + StreamThread.State.PARTITIONS_REVOKED); + } - try { - final StateListenerStub stateListener = new StateListenerStub(); - streams.setStateListener(stateListener); - } finally { - streams.close(); + Assert.assertEquals(3, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + + globalStreams.threads[NUM_THREADS - 1].stateListener().onChange( + globalStreams.threads[NUM_THREADS - 1], + StreamThread.State.PENDING_SHUTDOWN, + StreamThread.State.PARTITIONS_ASSIGNED); + + globalStreams.threads[NUM_THREADS - 1].stateListener().onChange( + globalStreams.threads[NUM_THREADS - 1], + StreamThread.State.DEAD, + StreamThread.State.PENDING_SHUTDOWN); + + Assert.assertEquals(3, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + + for (final StreamThread thread: globalStreams.threads) { + if (thread != globalStreams.threads[NUM_THREADS - 1]) { + thread.stateListener().onChange( + thread, + StreamThread.State.RUNNING, + StreamThread.State.PARTITIONS_ASSIGNED); + } } - Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); + Assert.assertEquals(4, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state()); + + globalStreams.close(); + + Assert.assertEquals(6, stateListener.numChanges); + Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); } @Test @@ -499,8 +540,8 @@ public class KafkaStreamsTest { assertNotNull(threadMetadata); assertEquals(2, threadMetadata.size()); for (final ThreadMetadata metadata : threadMetadata) { - assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", - asList("RUNNING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState())); + assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", + asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState())); assertEquals(0, metadata.standbyTasks().size()); assertEquals(0, metadata.activeTasks().size()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java index 7503dd6..ca78d02 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java @@ -82,7 +82,7 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN"); streams.close(); - assertEquals(listener.runningToRevokedSeen(), true); + assertEquals(listener.createdToRevokedSeen(), true); assertEquals(listener.revokedToPendingShutdownSeen(), true); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 8a2122d..1097285 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -75,14 +75,14 @@ public class IntegrationTestUtils { * Records state transition for StreamThread */ public static class StateListenerStub implements StreamThread.StateListener { - boolean runningToRevokedSeen = false; + boolean startingToRevokedSeen = false; boolean revokedToPendingShutdownSeen = false; @Override public void onChange(final Thread thread, final ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator oldState) { - if (oldState == StreamThread.State.RUNNING && newState == StreamThread.State.PARTITIONS_REVOKED) { - runningToRevokedSeen = true; + if (oldState == StreamThread.State.STARTING && newState == StreamThread.State.PARTITIONS_REVOKED) { + startingToRevokedSeen = true; } else if (oldState == StreamThread.State.PARTITIONS_REVOKED && newState == StreamThread.State.PENDING_SHUTDOWN) { revokedToPendingShutdownSeen = true; } @@ -92,8 +92,8 @@ public class IntegrationTestUtils { return revokedToPendingShutdownSeen; } - public boolean runningToRevokedSeen() { - return runningToRevokedSeen; + public boolean createdToRevokedSeen() { + return startingToRevokedSeen; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 5df8fbc..dd311fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -163,12 +163,12 @@ public class StreamThreadTest { assertEquals(thread.state(), StreamThread.State.CREATED); final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; - thread.setState(StreamThread.State.RUNNING); final List<TopicPartition> revokedPartitions; final List<TopicPartition> assignedPartitions; // revoke nothing + thread.setState(StreamThread.State.STARTING); revokedPartitions = Collections.emptyList(); rebalanceListener.onPartitionsRevoked(revokedPartitions); @@ -202,7 +202,7 @@ public class StreamThreadTest { TestUtils.waitForCondition(new TestCondition() { @Override public boolean conditionMet() { - return thread.state() == StreamThread.State.RUNNING; + return thread.state() == StreamThread.State.STARTING; } }, 10 * 1000, "Thread never started."); @@ -342,7 +342,7 @@ public class StreamThreadTest { properties)); final StreamThread thread = createStreamThread(clientId, config, false); - thread.setState(StreamThread.State.RUNNING); + thread.setState(StreamThread.State.STARTING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1); @@ -505,8 +505,8 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); - thread.setState(StreamThread.State.RUNNING); - thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); + thread.setState(StreamThread.State.STARTING); + thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); final List<TopicPartition> assignedPartitions = new ArrayList<>(); @@ -542,7 +542,7 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); - thread.setState(StreamThread.State.RUNNING); + thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -577,7 +577,7 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); - thread.setState(StreamThread.State.RUNNING); + thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -612,8 +612,6 @@ public class StreamThreadTest { public void shouldShutdownTaskManagerOnClose() { final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.<TaskId, StreamTask>emptyMap()); - EasyMock.expect(taskManager.standbyTasks()).andReturn(Collections.<TaskId, StandbyTask>emptyMap()); taskManager.shutdown(true); EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); @@ -637,7 +635,7 @@ public class StreamThreadTest { new StreamThread.StateListener() { @Override public void onChange(final Thread t, final ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator oldState) { - if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.RUNNING) { + if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) { thread.shutdown(); } } @@ -711,8 +709,8 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); - thread.setState(StreamThread.State.RUNNING); - thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); + thread.setState(StreamThread.State.STARTING); + thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); @@ -736,7 +734,7 @@ public class StreamThreadTest { consumer.updatePartitions(topic1, singletonList(new PartitionInfo(topic1, 1, null, null, null))); - thread.setState(StreamThread.State.RUNNING); + thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -804,7 +802,7 @@ public class StreamThreadTest { internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); - thread.setState(StreamThread.State.RUNNING); + thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -840,7 +838,7 @@ public class StreamThreadTest { internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); - thread.setState(StreamThread.State.RUNNING); + thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -896,8 +894,7 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); - thread.setState(StreamThread.State.RUNNING); - + thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -946,8 +943,7 @@ public class StreamThreadTest { restoreConsumer.updateEndOffsets(offsets); restoreConsumer.updateBeginningOffsets(offsets); - thread.setState(StreamThread.State.RUNNING); - + thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); @@ -1010,8 +1006,7 @@ public class StreamThreadTest { restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes())); } - thread.setState(StreamThread.State.RUNNING); - + thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); @@ -1074,8 +1069,7 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); - thread.setState(StreamThread.State.RUNNING); - + thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(null); final List<TopicPartition> assignedPartitions = new ArrayList<>(); @@ -1121,6 +1115,9 @@ public class StreamThreadTest { ThreadMetadata metadata = thread.threadMetadata(); assertEquals(StreamThread.State.CREATED.name(), metadata.threadState()); + thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.PARTITIONS_REVOKED); + thread.setState(StreamThread.State.PARTITIONS_ASSIGNED); thread.setState(StreamThread.State.RUNNING); metadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState()); @@ -1155,10 +1152,9 @@ public class StreamThreadTest { clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.setState(StreamThread.State.RUNNING); - final List<TopicPartition> assignedPartitions = new ArrayList<>(); + thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED); @@ -1287,7 +1283,7 @@ public class StreamThreadTest { config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false); - thread.setState(StreamThread.State.RUNNING); + thread.setState(StreamThread.State.STARTING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1); @@ -1331,7 +1327,7 @@ public class StreamThreadTest { config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName()); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false); - thread.setState(StreamThread.State.RUNNING); + thread.setState(StreamThread.State.STARTING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);