This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 2c06320 KAFKA-10198: guard against recycling dirty state (#8924) 2c06320 is described below commit 2c06320288a992c19f90fc4af14fa1ea3dde7718 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Wed Jun 24 18:57:38 2020 -0700 KAFKA-10198: guard against recycling dirty state (#8924) We just needed to add the check in StreamTask#closeClean to closeAndRecycleState as well. I also renamed closeAndRecycleState to closeCleanAndRecycleState to drive this point home: it needs to be clean. This should be cherry-picked back to the 2.6 branch Reviewers: Matthias J. Sax <matth...@confluent.io>, John Roesler <j...@confluent.io>, Guozhang Wang <wangg...@gmail.com>, --- .../processor/internals/ActiveTaskCreator.java | 2 +- .../streams/processor/internals/StandbyTask.java | 2 +- .../processor/internals/StandbyTaskCreator.java | 2 +- .../streams/processor/internals/StreamTask.java | 21 +++++++----- .../kafka/streams/processor/internals/Task.java | 2 +- .../processor/internals/StandbyTaskTest.java | 6 ++-- .../processor/internals/StreamTaskTest.java | 39 ++++++++++++++++++---- .../processor/internals/TaskManagerTest.java | 2 +- 8 files changed, 54 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 0a1f47e..012ff20 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -184,7 +184,7 @@ class ActiveTaskCreator { final ProcessorStateManager stateManager = standbyTask.stateMgr; final LogContext logContext = getLogContext(standbyTask.id); - standbyTask.closeAndRecycleState(); + standbyTask.closeCleanAndRecycleState(); stateManager.transitionTaskType(TaskType.ACTIVE, logContext); return createActiveTask( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index ffd09f1..1aa68bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -189,7 +189,7 @@ public class StandbyTask extends AbstractTask implements Task { } @Override - public void closeAndRecycleState() { + public void closeCleanAndRecycleState() { streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); if (state() == State.SUSPENDED) { stateMgr.recycle(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index 443db8e..b5f2b74 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -112,7 +112,7 @@ class StandbyTaskCreator { final InternalProcessorContext context = streamTask.processorContext(); final ProcessorStateManager stateManager = streamTask.stateMgr; - streamTask.closeAndRecycleState(); + streamTask.closeCleanAndRecycleState(); stateManager.transitionTaskType(TaskType.STANDBY, getLogContext(streamTask.id())); return createStandbyTask( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6e8bf40..4b27436 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -463,6 +463,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @Override public void closeClean() { + validateClean(); streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); close(true); log.info("Closed clean"); @@ -482,7 +483,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } @Override - public void closeAndRecycleState() { + public void closeCleanAndRecycleState() { + validateClean(); streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); switch (state()) { case SUSPENDED: @@ -515,17 +517,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, stateMgr.checkpoint(checkpointableOffsets()); } - /** - * You must commit a task and checkpoint the state manager before closing as this will release the state dir lock - */ - private void close(final boolean clean) { - if (clean && commitNeeded) { - // It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to - // closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty + private void validateClean() { + // It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to + // closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty + if (commitNeeded) { log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to" + " commit and should close as dirty instead"); throw new TaskMigratedException("Tried to close dirty task as clean"); } + } + + /** + * You must commit a task and checkpoint the state manager before closing as this will release the state dir lock + */ + private void close(final boolean clean) { switch (state()) { case SUSPENDED: // first close state manager (which is idempotent) then close the record collector diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 0200870..103c231 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -153,7 +153,7 @@ public interface Task { /** * Attempt a clean close but do not close the underlying state */ - void closeAndRecycleState(); + void closeCleanAndRecycleState(); /** * Revive a closed task to a created one; should never throw an exception diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 82f33c4..f98d630 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -502,13 +502,13 @@ public class StandbyTaskTest { EasyMock.replay(stateManager); task = createStandbyTask(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED task.initializeIfNeeded(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING task.suspend(); - task.closeAndRecycleState(); // SUSPENDED + task.closeCleanAndRecycleState(); // SUSPENDED // Currently, there are no metrics registered for standby tasks. // This is a regression test so that, if we add some, we will be sure to deregister them. diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index c6ffe74..9607470 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; @@ -1752,7 +1753,7 @@ public class StreamTaskTest { } @Test - public void shouldUnregisterMetricsInCloseAndRecycle() { + public void shouldUnregisterMetricsInCloseCleanAndRecycleState() { EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes(); EasyMock.replay(stateManager, recordCollector); @@ -1761,7 +1762,7 @@ public class StreamTaskTest { task.suspend(); assertThat(getTaskMetrics(), not(empty())); - task.closeAndRecycleState(); + task.closeCleanAndRecycleState(); assertThat(getTaskMetrics(), empty()); } @@ -1799,22 +1800,48 @@ public class StreamTaskTest { } @Test + public void shouldThrowIfCleanClosingDirtyTask() { + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0))); + task.process(0L); + assertTrue(task.commitNeeded()); + + assertThrows(TaskMigratedException.class, () -> task.closeClean()); + } + + @Test + public void shouldThrowIfRecyclingDirtyTask() { + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0))); + task.process(0L); + assertTrue(task.commitNeeded()); + + assertThrows(TaskMigratedException.class, () -> task.closeCleanAndRecycleState()); + } + + @Test public void shouldOnlyRecycleSuspendedTasks() { stateManager.recycle(); recordCollector.closeClean(); EasyMock.replay(stateManager, recordCollector); task = createStatefulTask(createConfig(false, "100"), true); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED task.initializeIfNeeded(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RESTORING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RESTORING task.completeRestoration(); - assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING + assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING task.suspend(); - task.closeAndRecycleState(); // SUSPENDED + task.closeCleanAndRecycleState(); // SUSPENDED EasyMock.verify(stateManager, recordCollector); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index fcfbb1f..23166d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2723,7 +2723,7 @@ public class TaskManagerTest { } @Override - public void closeAndRecycleState() { + public void closeCleanAndRecycleState() { transitionTo(State.CLOSED); }