This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9d54421 Revert "KAFKA-7657: Fixing thread state change to instance state change (#6018)" (#6090) 9d54421 is described below commit 9d544212e69269f155bb3a51f94a9b13cf1fa565 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Fri Jan 4 14:41:15 2019 -0800 Revert "KAFKA-7657: Fixing thread state change to instance state change (#6018)" (#6090) This reverts commit d6698308194625e7921b9c3ace27a918f42f26f1. --- .../org/apache/kafka/streams/KafkaStreams.java | 178 +++++++++++---------- .../streams/processor/internals/StreamThread.java | 71 ++++---- .../org/apache/kafka/streams/KafkaStreamsTest.java | 75 ++------- .../StreamTableJoinIntegrationTest.java | 2 +- .../integration/utils/IntegrationTestUtils.java | 10 +- .../processor/internals/StreamThreadTest.java | 50 +++--- 6 files changed, 170 insertions(+), 216 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 4d0c1c7..bbd0105 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -159,24 +159,24 @@ public class KafkaStreams implements AutoCloseable { * | +-----+--------+ * | | * | v - * | +----+--+------+ - * | | Re- | - * +<----- | Balancing (1)| -------->+ - * | +-----+-+------+ | - * | | ^ | - * | v | | - * | +--------------+ v - * | | Running (2) | -------->+ - * | +------+-------+ | - * | | | - * | v | - * | +------+-------+ +----+-------+ - * +-----> | Pending |<--- | Error (5) |---+ - * | Shutdown (3) | +------------+ | - * +------+-------+ ^ | - * | | | - * v +-----------+ - * +------+-------+ + * | +--------------+ + * +<----- | Running (2) | -------->+ + * | +----+--+------+ | + * | | ^ | + * | v | | + * | +----+--+------+ | + * | | Re- | v + * | | Balancing (1)| -------->+ + * | +-----+--------+ | + * | | | + * | v v + * | +-----+--------+ +----+-------+ + * +-----> | Pending |<--- | Error (5) | + * | Shutdown (3) | +------------+ + * +-----+--------+ + * | + * v + * +-----+--------+ * | Not | * | Running (4) | * +--------------+ @@ -191,7 +191,7 @@ public class KafkaStreams implements AutoCloseable { * the instance will be in the ERROR state. The user will need to close it. */ public enum State { - CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3, 5); + CREATED(2, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3); private final Set<Integer> validTransitions = new HashSet<>(); @@ -238,11 +238,9 @@ public class KafkaStreams implements AutoCloseable { * @param newState New state */ private boolean setState(final State newState) { - final State oldState; + final State oldState = state; synchronized (stateLock) { - oldState = state; - if (state == State.PENDING_SHUTDOWN && newState != State.NOT_RUNNING) { // when the state is already in PENDING_SHUTDOWN, all other transitions than NOT_RUNNING (due to thread dying) will be // refused but we do not throw exception here, to allow appropriate error handling @@ -251,13 +249,6 @@ public class KafkaStreams implements AutoCloseable { // when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls) // will be refused but we do not throw exception here, to allow idempotent close calls return false; - } else if (state == State.REBALANCING && newState == State.REBALANCING) { - // when the state is already in REBALANCING, it should not transit to REBALANCING - return false; -// } else if (state == State.RUNNING && newState == State.RUNNING) { - // when the state is already in RUNNING, it should not transit to RUNNING - // this can happen during starting up -// return false; } else if (!state.isValidTransition(newState)) { throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + oldState + " to " + newState); } else { @@ -269,7 +260,24 @@ public class KafkaStreams implements AutoCloseable { // we need to call the user customized state listener outside the state lock to avoid potential deadlocks if (stateListener != null) { - stateListener.onChange(newState, oldState); + stateListener.onChange(state, oldState); + } + + return true; + } + + private boolean setRunningFromCreated() { + synchronized (stateLock) { + if (state != State.CREATED) { + throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + state + " to " + State.RUNNING); + } + state = State.RUNNING; + stateLock.notifyAll(); + } + + // we need to call the user customized state listener outside the state lock to avoid potential deadlocks + if (stateListener != null) { + stateListener.onChange(State.RUNNING, State.CREATED); } return true; @@ -316,12 +324,11 @@ public class KafkaStreams implements AutoCloseable { * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setStateListener(final KafkaStreams.StateListener listener) { - synchronized (stateLock) { - if (state == State.CREATED) { - stateListener = listener; - } else { - throw new IllegalStateException("Can only set StateListener in CREATED state. Current state is: " + state); - } + if (state == State.CREATED) { + stateListener = listener; + } else { + throw new IllegalStateException("Can only set StateListener in CREATED state. " + + "Current state is: " + state); } } @@ -333,19 +340,17 @@ public class KafkaStreams implements AutoCloseable { * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) { - synchronized (stateLock) { - if (state == State.CREATED) { - for (final StreamThread thread : threads) { - thread.setUncaughtExceptionHandler(eh); - } + if (state == State.CREATED) { + for (final StreamThread thread : threads) { + thread.setUncaughtExceptionHandler(eh); + } - if (globalStreamThread != null) { - globalStreamThread.setUncaughtExceptionHandler(eh); - } - } else { - throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + - "Current state is: " + state); + if (globalStreamThread != null) { + globalStreamThread.setUncaughtExceptionHandler(eh); } + } else { + throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + + "Current state is: " + state); } } @@ -357,13 +362,11 @@ public class KafkaStreams implements AutoCloseable { * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */ public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) { - synchronized (stateLock) { - if (state == State.CREATED) { - this.globalStateRestoreListener = globalStateRestoreListener; - } else { - throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED state. " + + if (state == State.CREATED) { + this.globalStateRestoreListener = globalStateRestoreListener; + } else { + throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED state. " + "Current state is: " + state); - } } } @@ -393,21 +396,18 @@ public class KafkaStreams implements AutoCloseable { final class StreamStateListener implements StreamThread.StateListener { private final Map<Long, StreamThread.State> threadState; private GlobalStreamThread.State globalThreadState; - // this lock should always be held before the state lock - private final Object threadStatesLock; StreamStateListener(final Map<Long, StreamThread.State> threadState, final GlobalStreamThread.State globalThreadState) { this.threadState = threadState; this.globalThreadState = globalThreadState; - this.threadStatesLock = new Object(); } /** * If all threads are dead set to ERROR */ private void maybeSetError() { - // check if we have at least one thread running + // check if we have enough threads running for (final StreamThread.State state : threadState.values()) { if (state != StreamThread.State.DEAD) { return; @@ -415,7 +415,7 @@ public class KafkaStreams implements AutoCloseable { } if (setState(State.ERROR)) { - log.error("All stream threads have died. The instance will be in error state and should be closed."); + log.warn("All stream threads have died. The instance will be in error state and should be closed."); } } @@ -423,13 +423,12 @@ public class KafkaStreams implements AutoCloseable { * If all threads are up, including the global thread, set to RUNNING */ private void maybeSetRunning() { - // state can be transferred to RUNNING if all threads are either RUNNING or DEAD + // one thread is running, check others, including global thread for (final StreamThread.State state : threadState.values()) { - if (state != StreamThread.State.RUNNING && state != StreamThread.State.DEAD) { + if (state != StreamThread.State.RUNNING) { return; } } - // the global state thread is relevant only if it is started. There are cases // when we don't have a global state thread at all, e.g., when we don't have global KTables if (globalThreadState != null && globalThreadState != GlobalStreamThread.State.RUNNING) { @@ -444,29 +443,26 @@ public class KafkaStreams implements AutoCloseable { public synchronized void onChange(final Thread thread, final ThreadStateTransitionValidator abstractNewState, final ThreadStateTransitionValidator abstractOldState) { - synchronized (threadStatesLock) { - // StreamThreads first - if (thread instanceof StreamThread) { - final StreamThread.State newState = (StreamThread.State) abstractNewState; - threadState.put(thread.getId(), newState); - - if (newState == StreamThread.State.PARTITIONS_REVOKED) { - setState(State.REBALANCING); - } else if (newState == StreamThread.State.RUNNING) { - maybeSetRunning(); - } else if (newState == StreamThread.State.DEAD) { - maybeSetError(); - } - } else if (thread instanceof GlobalStreamThread) { - // global stream thread has different invariants - final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState; - globalThreadState = newState; - - // special case when global thread is dead - if (newState == GlobalStreamThread.State.DEAD) { - setState(State.ERROR); - log.error("Global thread has died. The instance will be in error state and should be closed."); - } + // StreamThreads first + if (thread instanceof StreamThread) { + final StreamThread.State newState = (StreamThread.State) abstractNewState; + threadState.put(thread.getId(), newState); + + if (newState == StreamThread.State.PARTITIONS_REVOKED && state != State.REBALANCING) { + setState(State.REBALANCING); + } else if (newState == StreamThread.State.RUNNING && state != State.RUNNING) { + maybeSetRunning(); + } else if (newState == StreamThread.State.DEAD && state != State.ERROR) { + maybeSetError(); + } + } else if (thread instanceof GlobalStreamThread) { + // global stream thread has different invariants + final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState; + globalThreadState = newState; + + // special case when global thread is dead + if (newState == GlobalStreamThread.State.DEAD && state != State.ERROR && setState(State.ERROR)) { + log.warn("Global thread has died. The instance will be in error state and should be closed."); } } } @@ -777,9 +773,11 @@ public class KafkaStreams implements AutoCloseable { * if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG exactly-once} is enabled for pre 0.11.0.x brokers */ public synchronized void start() throws IllegalStateException, StreamsException { - if (setState(State.REBALANCING)) { - log.debug("Starting Streams client"); + log.debug("Starting Streams client"); + // first set state to RUNNING before kicking off the threads, + // making sure the state will always transit to RUNNING before REBALANCING + if (setRunningFromCreated()) { if (globalStreamThread != null) { globalStreamThread.start(); } @@ -790,13 +788,17 @@ public class KafkaStreams implements AutoCloseable { final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); stateDirCleaner.scheduleAtFixedRate(() -> { - // we do not use lock here since we only read on the value and act on it if (state == State.RUNNING) { stateDirectory.cleanRemovedTasks(cleanupDelay); } }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS); + + log.info("Started Streams client"); } else { - throw new IllegalStateException("The client is either already started or already stopped, cannot re-start"); + // if transition failed but no exception is thrown; currently it is not possible + // since we do not allow calling start multiple times whether or not it is already shutdown. + // TODO: In the future if we lift this restriction this code path could then be triggered and be updated + log.error("Already stopped, cannot re-start"); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a4aab98..6c3e7cd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -83,36 +83,30 @@ public class StreamThread extends Thread { * | | * | v * | +-----+-------+ - * +<--- | Starting (1)| - * | +-----+-------+ - * | | - * | | - * | v - * | +-----+-------+ - * +<--- | Partitions |------+ - * | | Revoked (2) | <----+ + * +<--- | Running (1) | <----+ * | +-----+-------+ | * | | | * | v | * | +-----+-------+ | - * | | Partitions | | - * +<--- | Assigned (3)| ---->+ + * +<--- | Partitions | | + * | | Revoked (2) | <----+ * | +-----+-------+ | * | | | * | v | * | +-----+-------+ | - * | | Running (4) | ---->+ + * | | Partitions | | + * | | Assigned (3)| ---->+ * | +-----+-------+ * | | * | v * | +-----+-------+ * +---> | Pending | - * | Shutdown (5)| + * | Shutdown (4)| * +-----+-------+ * | * v * +-----+-------+ - * | Dead (6) | + * | Dead (5) | * +-------------+ * </pre> * @@ -127,13 +121,12 @@ public class StreamThread extends Thread { * <li> * State PARTITIONS_REVOKED may want transit to itself indefinitely, in the corner case when * the coordinator repeatedly fails in-between revoking partitions and assigning new partitions. - * Also during streams instance start up PARTITIONS_REVOKED may want to transit to itself as well. * In this case we will forbid the transition but will not treat as an error. * </li> * </ul> */ public enum State implements ThreadStateTransitionValidator { - CREATED(1, 5), STARTING(2, 5), PARTITIONS_REVOKED(2, 3, 5), PARTITIONS_ASSIGNED(2, 4, 5), RUNNING(2, 5), PENDING_SHUTDOWN(6), DEAD; + CREATED(1, 4), RUNNING(2, 4), PARTITIONS_REVOKED(3, 4), PARTITIONS_ASSIGNED(1, 2, 4), PENDING_SHUTDOWN(5), DEAD; private final Set<Integer> validTransitions = new HashSet<>(); @@ -142,7 +135,7 @@ public class StreamThread extends Thread { } public boolean isRunning() { - return equals(RUNNING) || equals(STARTING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED); + return equals(RUNNING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED); } @Override @@ -203,10 +196,10 @@ public class StreamThread extends Thread { // when the state is already in NOT_RUNNING, all its transitions // will be refused but we do not throw exception here return null; -/// } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) { + } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) { // when the state is already in PARTITIONS_REVOKED, its transition to itself will be // refused but we do not throw exception here -// return null; + return null; } else if (!state.isValidTransition(newState)) { log.error("Unexpected state transition from {} to {}", oldState, newState); throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState); @@ -745,7 +738,7 @@ public class StreamThread extends Thread { @Override public void run() { log.info("Starting"); - if (setState(State.STARTING) == null) { + if (setState(State.RUNNING) == null) { log.info("StreamThread already shutdown. Not running"); return; } @@ -823,7 +816,7 @@ public class StreamThread extends Thread { // try to fetch some records with normal poll time // in order to wait long enough to get the join response records = pollRequests(pollTime); - } else if (state == State.RUNNING || state == State.STARTING) { + } else if (state == State.RUNNING) { // try to fetch some records with normal poll time // in order to get long polling records = pollRequests(pollTime); @@ -1256,6 +1249,23 @@ public class StreamThread extends Thread { return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent); } + // the following are for testing only + void setNow(final long now) { + this.now = now; + } + + TaskManager taskManager() { + return taskManager; + } + + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() { + return standbyRecords; + } + + int currentNumIterations() { + return numIterations; + } + public Map<MetricName, Metric> producerMetrics() { final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>(); if (producer != null) { @@ -1290,25 +1300,4 @@ public class StreamThread extends Thread { result.putAll(adminClientMetrics); return result; } - - // the following are for testing only - void setNow(final long now) { - this.now = now; - } - - TaskManager taskManager() { - return taskManager; - } - - Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() { - return standbyRecords; - } - - int currentNumIterations() { - return numIterations; - } - - public StreamThread.StateListener stateListener() { - return stateListener; - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 89f3730..d2a4ace 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -139,77 +139,36 @@ public class KafkaStreamsTest { } @Test - public void testStateCloseAfterCreate() { - globalStreams.close(); - - Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); - } - - @Test - public void testStateOneThreadDeadButRebalanceFinish() throws InterruptedException { + public void testStateChanges() throws InterruptedException { final StateListenerStub stateListener = new StateListenerStub(); globalStreams.setStateListener(stateListener); - Assert.assertEquals(0, stateListener.numChanges); Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state()); + Assert.assertEquals(0, stateListener.numChanges); globalStreams.start(); - TestUtils.waitForCondition( - () -> stateListener.numChanges == 2, + () -> globalStreams.state() == KafkaStreams.State.RUNNING, 10 * 1000, "Streams never started."); - Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state()); - - for (final StreamThread thread: globalStreams.threads) { - thread.stateListener().onChange( - thread, - StreamThread.State.PARTITIONS_REVOKED, - StreamThread.State.RUNNING); - } - - Assert.assertEquals(3, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); - - for (final StreamThread thread: globalStreams.threads) { - thread.stateListener().onChange( - thread, - StreamThread.State.PARTITIONS_ASSIGNED, - StreamThread.State.PARTITIONS_REVOKED); - } - Assert.assertEquals(3, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); - - globalStreams.threads[NUM_THREADS - 1].stateListener().onChange( - globalStreams.threads[NUM_THREADS - 1], - StreamThread.State.PENDING_SHUTDOWN, - StreamThread.State.PARTITIONS_ASSIGNED); + globalStreams.close(); - globalStreams.threads[NUM_THREADS - 1].stateListener().onChange( - globalStreams.threads[NUM_THREADS - 1], - StreamThread.State.DEAD, - StreamThread.State.PENDING_SHUTDOWN); + Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); + } - Assert.assertEquals(3, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + @Test + public void testStateCloseAfterCreate() { + final KafkaStreams streams = new KafkaStreams(builder.build(), props); - for (final StreamThread thread: globalStreams.threads) { - if (thread != globalStreams.threads[NUM_THREADS - 1]) { - thread.stateListener().onChange( - thread, - StreamThread.State.RUNNING, - StreamThread.State.PARTITIONS_ASSIGNED); - } + try { + final StateListenerStub stateListener = new StateListenerStub(); + streams.setStateListener(stateListener); + } finally { + streams.close(); } - Assert.assertEquals(4, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state()); - - globalStreams.close(); - - Assert.assertEquals(6, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); + Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); } @Test @@ -540,8 +499,8 @@ public class KafkaStreamsTest { assertNotNull(threadMetadata); assertEquals(2, threadMetadata.size()); for (final ThreadMetadata metadata : threadMetadata) { - assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", - asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState())); + assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", + asList("RUNNING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState())); assertEquals(0, metadata.standbyTasks().size()); assertEquals(0, metadata.activeTasks().size()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java index ca78d02..7503dd6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java @@ -82,7 +82,7 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN"); streams.close(); - assertEquals(listener.createdToRevokedSeen(), true); + assertEquals(listener.runningToRevokedSeen(), true); assertEquals(listener.revokedToPendingShutdownSeen(), true); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 1097285..8a2122d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -75,14 +75,14 @@ public class IntegrationTestUtils { * Records state transition for StreamThread */ public static class StateListenerStub implements StreamThread.StateListener { - boolean startingToRevokedSeen = false; + boolean runningToRevokedSeen = false; boolean revokedToPendingShutdownSeen = false; @Override public void onChange(final Thread thread, final ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator oldState) { - if (oldState == StreamThread.State.STARTING && newState == StreamThread.State.PARTITIONS_REVOKED) { - startingToRevokedSeen = true; + if (oldState == StreamThread.State.RUNNING && newState == StreamThread.State.PARTITIONS_REVOKED) { + runningToRevokedSeen = true; } else if (oldState == StreamThread.State.PARTITIONS_REVOKED && newState == StreamThread.State.PENDING_SHUTDOWN) { revokedToPendingShutdownSeen = true; } @@ -92,8 +92,8 @@ public class IntegrationTestUtils { return revokedToPendingShutdownSeen; } - public boolean createdToRevokedSeen() { - return startingToRevokedSeen; + public boolean runningToRevokedSeen() { + return runningToRevokedSeen; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index dd311fb..5df8fbc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -163,12 +163,12 @@ public class StreamThreadTest { assertEquals(thread.state(), StreamThread.State.CREATED); final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; + thread.setState(StreamThread.State.RUNNING); final List<TopicPartition> revokedPartitions; final List<TopicPartition> assignedPartitions; // revoke nothing - thread.setState(StreamThread.State.STARTING); revokedPartitions = Collections.emptyList(); rebalanceListener.onPartitionsRevoked(revokedPartitions); @@ -202,7 +202,7 @@ public class StreamThreadTest { TestUtils.waitForCondition(new TestCondition() { @Override public boolean conditionMet() { - return thread.state() == StreamThread.State.STARTING; + return thread.state() == StreamThread.State.RUNNING; } }, 10 * 1000, "Thread never started."); @@ -342,7 +342,7 @@ public class StreamThreadTest { properties)); final StreamThread thread = createStreamThread(clientId, config, false); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1); @@ -505,8 +505,8 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); - thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.setState(StreamThread.State.RUNNING); + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); final List<TopicPartition> assignedPartitions = new ArrayList<>(); @@ -542,7 +542,7 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -577,7 +577,7 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -612,6 +612,8 @@ public class StreamThreadTest { public void shouldShutdownTaskManagerOnClose() { final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.<TaskId, StreamTask>emptyMap()); + EasyMock.expect(taskManager.standbyTasks()).andReturn(Collections.<TaskId, StandbyTask>emptyMap()); taskManager.shutdown(true); EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); @@ -635,7 +637,7 @@ public class StreamThreadTest { new StreamThread.StateListener() { @Override public void onChange(final Thread t, final ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator oldState) { - if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) { + if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.RUNNING) { thread.shutdown(); } } @@ -709,8 +711,8 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); - thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList()); + thread.setState(StreamThread.State.RUNNING); + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); @@ -734,7 +736,7 @@ public class StreamThreadTest { consumer.updatePartitions(topic1, singletonList(new PartitionInfo(topic1, 1, null, null, null))); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -802,7 +804,7 @@ public class StreamThreadTest { internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -838,7 +840,7 @@ public class StreamThreadTest { internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -894,7 +896,8 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); + thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -943,7 +946,8 @@ public class StreamThreadTest { restoreConsumer.updateEndOffsets(offsets); restoreConsumer.updateBeginningOffsets(offsets); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); + thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); @@ -1006,7 +1010,8 @@ public class StreamThreadTest { restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes())); } - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); + thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); @@ -1069,7 +1074,8 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); + thread.rebalanceListener.onPartitionsRevoked(null); final List<TopicPartition> assignedPartitions = new ArrayList<>(); @@ -1115,9 +1121,6 @@ public class StreamThreadTest { ThreadMetadata metadata = thread.threadMetadata(); assertEquals(StreamThread.State.CREATED.name(), metadata.threadState()); - thread.setState(StreamThread.State.STARTING); - thread.setState(StreamThread.State.PARTITIONS_REVOKED); - thread.setState(StreamThread.State.PARTITIONS_ASSIGNED); thread.setState(StreamThread.State.RUNNING); metadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState()); @@ -1152,9 +1155,10 @@ public class StreamThreadTest { clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + thread.setState(StreamThread.State.RUNNING); + final List<TopicPartition> assignedPartitions = new ArrayList<>(); - thread.setState(StreamThread.State.STARTING); thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED); @@ -1283,7 +1287,7 @@ public class StreamThreadTest { config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1); @@ -1327,7 +1331,7 @@ public class StreamThreadTest { config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName()); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false); - thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.RUNNING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);