This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 925c628173 KAFKA-10199: Commit the restoration progress within StateUpdater (#12279) 925c628173 is described below commit 925c6281733662cd40fffaab54a6483b00f80ee6 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Thu Jun 23 10:46:14 2022 -0700 KAFKA-10199: Commit the restoration progress within StateUpdater (#12279) During restoring, we should always commit a.k.a. write checkpoint file regardless of EOS or ALOS, since if there's a failure we would just over-restore them upon recovery so no EOS violations happened. Also when we complete restore or remove task, we should enforce a checkpoint as well; for failing cases though, we should not write a new one. Reviewers: Bruno Cadonna <cado...@apache.org> --- .../streams/processor/internals/AbstractTask.java | 4 +- .../processor/internals/DefaultStateUpdater.java | 36 +++++++- .../streams/processor/internals/StandbyTask.java | 2 +- .../streams/processor/internals/StreamTask.java | 8 +- .../kafka/streams/processor/internals/Task.java | 6 ++ .../internals/DefaultStateUpdaterTest.java | 101 +++++++++++++++++++-- .../processor/internals/StandbyTaskTest.java | 41 +++++++++ .../processor/internals/StreamTaskTest.java | 16 ++++ 8 files changed, 197 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 4e652a6dfc..c64fadfe5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -88,7 +88,8 @@ public abstract class AbstractTask implements Task { * @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed * or flushing state store get IO errors; such error should cause the thread to die */ - protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { + @Override + public void maybeCheckpoint(final boolean enforceCheckpoint) { final Map<TopicPartition, Long> offsetSnapshot = stateMgr.changelogOffsets(); if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, offsetSnapshotSinceLastFlush, offsetSnapshot)) { // the state's current offset would be used to checkpoint @@ -98,7 +99,6 @@ public abstract class AbstractTask implements Task { } } - @Override public TaskId id() { return id; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 54cb7bc427..cc580a3b38 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.TaskId; @@ -85,7 +86,7 @@ public class DefaultStateUpdater implements StateUpdater { } public boolean onlyStandbyTasksLeft() { - return !updatingTasks.isEmpty() && updatingTasks.values().stream().allMatch(t -> !t.isActive()); + return !updatingTasks.isEmpty() && updatingTasks.values().stream().noneMatch(Task::isActive); } @Override @@ -111,6 +112,7 @@ public class DefaultStateUpdater implements StateUpdater { private void runOnce() throws InterruptedException { performActionsOnTasks(); restoreTasks(); + maybeCheckpointUpdatingTasks(time.milliseconds()); waitIfAllChangelogsCompletelyRead(); } @@ -252,6 +254,8 @@ public class DefaultStateUpdater implements StateUpdater { private void removeTask(final TaskId taskId) { final Task task = updatingTasks.remove(taskId); if (task != null) { + task.maybeCheckpoint(true); + final Collection<TopicPartition> changelogPartitions = task.changelogPartitions(); changelogReader.unregister(changelogPartitions); removedTasks.add(task); @@ -271,9 +275,10 @@ public class DefaultStateUpdater implements StateUpdater { final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions(); if (restoredChangelogs.containsAll(taskChangelogPartitions)) { task.completeRestoration(offsetResetter); - log.debug("Stateful active task " + task.id() + " completed restoration"); + task.maybeCheckpoint(true); addTaskToRestoredTasks(task); updatingTasks.remove(task.id()); + log.debug("Stateful active task " + task.id() + " completed restoration"); if (onlyStandbyTasksLeft()) { changelogReader.transitToUpdateStandby(); } @@ -290,6 +295,23 @@ public class DefaultStateUpdater implements StateUpdater { restoredActiveTasksLock.unlock(); } } + + private void maybeCheckpointUpdatingTasks(final long now) { + final long elapsedMsSinceLastCommit = now - lastCommitMs; + if (elapsedMsSinceLastCommit > commitIntervalMs) { + if (log.isDebugEnabled()) { + log.debug("Committing all restoring tasks since {}ms has elapsed (commit interval is {}ms)", + elapsedMsSinceLastCommit, commitIntervalMs); + } + + for (final Task task : updatingTasks.values()) { + // do not enforce checkpointing during restoration if its position has not advanced much + task.maybeCheckpoint(false); + } + + lastCommitMs = now; + } + } } private final Time time; @@ -305,14 +327,22 @@ public class DefaultStateUpdater implements StateUpdater { private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>(); private CountDownLatch shutdownGate; + private final long commitIntervalMs; + private long lastCommitMs; + private StateUpdaterThread stateUpdaterThread = null; - public DefaultStateUpdater(final ChangelogReader changelogReader, + public DefaultStateUpdater(final StreamsConfig config, + final ChangelogReader changelogReader, final Consumer<Set<TopicPartition>> offsetResetter, final Time time) { this.changelogReader = changelogReader; this.offsetResetter = offsetResetter; this.time = time; + + this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); + // initialize the last commit as of now to prevent first commit happens immediately + this.lastCommitMs = time.milliseconds(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index ea946b2341..670c0c4beb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -194,7 +194,7 @@ public class StandbyTask extends AbstractTask implements Task { case RUNNING: case SUSPENDED: - maybeWriteCheckpoint(enforceCheckpoint); + maybeCheckpoint(enforceCheckpoint); log.debug("Finalized commit for {} task", state()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index ea593a2973..8514c6ae2e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -483,14 +483,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, case RESTORING: case SUSPENDED: - maybeWriteCheckpoint(enforceCheckpoint); + maybeCheckpoint(enforceCheckpoint); log.debug("Finalized commit for {} task with enforce checkpoint {}", state(), enforceCheckpoint); break; case RUNNING: if (enforceCheckpoint || !eosEnabled) { - maybeWriteCheckpoint(enforceCheckpoint); + maybeCheckpoint(enforceCheckpoint); } log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", state(), eosEnabled, enforceCheckpoint); @@ -582,14 +582,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, * or flushing state store get IO errors; such error should cause the thread to die */ @Override - protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { + public void maybeCheckpoint(final boolean enforceCheckpoint) { // commitNeeded indicates we may have processed some records since last commit // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not if (commitNeeded || enforceCheckpoint) { stateMgr.updateChangelogOffsets(checkpointableOffsets()); } - super.maybeWriteCheckpoint(enforceCheckpoint); + super.maybeCheckpoint(enforceCheckpoint); } private void validateClean() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index fc3e6cb1a8..a17b19997b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -141,6 +141,12 @@ public interface Task { */ void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics); + /** + * @param enforceCheckpoint if true the task would always execute the checkpoint; + * otherwise it may skip if the state has not advanced much + */ + void maybeCheckpoint(final boolean enforceCheckpoint); + void markChangelogAsCorrupted(final Collection<TopicPartition> partitions); /** diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index fa50380f7f..8f0fc935a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.processor.TaskId; @@ -35,12 +37,17 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.common.utils.Utils.sleep; +import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.easymock.EasyMock.anyBoolean; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -58,6 +65,7 @@ import static org.mockito.Mockito.when; class DefaultStateUpdaterTest { + private final static int COMMIT_INTERVAL = 100; private final static long CALL_TIMEOUT = 1000; private final static long VERIFICATION_TIMEOUT = 15000; private final static TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0); @@ -69,15 +77,27 @@ class DefaultStateUpdaterTest { private final static TaskId TASK_1_0 = new TaskId(1, 0); private final static TaskId TASK_1_1 = new TaskId(1, 1); + private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { }; - private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(changelogReader, offsetResetter, Time.SYSTEM); + + private DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); @AfterEach public void tearDown() { stateUpdater.shutdown(Duration.ofMinutes(1)); } + private Properties configProps(final int commitInterval) { + return mkObjectProperties(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval), + mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), commitInterval) + )); + } + @Test public void shouldShutdownStateUpdater() { final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0); @@ -152,6 +172,7 @@ class DefaultStateUpdaterTest { } verifyRestoredActiveTasks(tasks); + verifyNeverCheckpointTasks(tasks); verifyUpdatingTasks(); verifyExceptionsAndFailedTasks(); verifyRemovedTasks(); @@ -173,6 +194,7 @@ class DefaultStateUpdaterTest { stateUpdater.add(task); verifyRestoredActiveTasks(task); + verifyCheckpointTasks(true, task); verifyUpdatingTasks(); verifyExceptionsAndFailedTasks(); verifyRemovedTasks(); @@ -203,6 +225,7 @@ class DefaultStateUpdaterTest { stateUpdater.add(task3); verifyRestoredActiveTasks(task3, task1, task2); + verifyCheckpointTasks(true, task3, task1, task2); verifyUpdatingTasks(); verifyExceptionsAndFailedTasks(); verifyRemovedTasks(); @@ -286,6 +309,7 @@ class DefaultStateUpdaterTest { stateUpdater.add(task4); verifyRestoredActiveTasks(task2, task1); + verifyCheckpointTasks(true, task2, task1); verifyUpdatingStandbyTasks(task4, task3); verifyExceptionsAndFailedTasks(); verifyRemovedTasks(); @@ -313,6 +337,7 @@ class DefaultStateUpdaterTest { stateUpdater.add(task2); verifyRestoredActiveTasks(task1); + verifyCheckpointTasks(true, task1); verify(task1).completeRestoration(offsetResetter); verifyUpdatingStandbyTasks(task2); final InOrder orderVerifier = inOrder(changelogReader); @@ -346,31 +371,32 @@ class DefaultStateUpdaterTest { .thenReturn(false); stateUpdater.add(task); - stateUpdater.remove(TASK_0_0); + stateUpdater.remove(task.id()); verifyRemovedTasks(task); + verifyCheckpointTasks(true, task); verifyRestoredActiveTasks(); verifyUpdatingTasks(); verifyExceptionsAndFailedTasks(); - verify(changelogReader).unregister(Collections.singletonList(TOPIC_PARTITION_A_0)); + verify(changelogReader).unregister(task.changelogPartitions()); } @Test public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception { final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - shouldNotRemoveTaskFromRestoredActiveTasks(task); + shouldNotRemoveTaskFromRestoredActiveTasks(task, Collections.singleton(TOPIC_PARTITION_A_0)); } @Test public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception { final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0); - shouldNotRemoveTaskFromRestoredActiveTasks(task); + shouldNotRemoveTaskFromRestoredActiveTasks(task, Collections.emptySet()); } - private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception { + private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task, final Set<TopicPartition> completedChangelogs) throws Exception { final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); when(changelogReader.completedChangelogs()) - .thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); + .thenReturn(completedChangelogs); when(changelogReader.allChangelogsCompleted()) .thenReturn(false); stateUpdater.add(task); @@ -603,6 +629,67 @@ class DefaultStateUpdaterTest { verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4); } + @Test + public void shouldAutoCheckpointTasksOnInterval() throws Exception { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + sleep(COMMIT_INTERVAL); + + verifyExceptionsAndFailedTasks(); + verifyCheckpointTasks(false, task1, task2, task3, task4); + } + + @Test + public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() { + stateUpdater.shutdown(Duration.ofMinutes(1)); + final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE)); + stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM); + + try { + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + when(changelogReader.completedChangelogs()) + .thenReturn(Collections.emptySet()); + when(changelogReader.allChangelogsCompleted()) + .thenReturn(false); + + stateUpdater.add(task1); + stateUpdater.add(task2); + stateUpdater.add(task3); + stateUpdater.add(task4); + + verifyNeverCheckpointTasks(task1, task2, task3, task4); + } finally { + stateUpdater.shutdown(Duration.ofMinutes(1)); + } + } + + private void verifyCheckpointTasks(final boolean enforceCheckpoint, final Task... tasks) { + for (final Task task : tasks) { + verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint); + } + } + + private void verifyNeverCheckpointTasks(final Task... tasks) { + for (final Task task : tasks) { + verify(task, never()).maybeCheckpoint(anyBoolean()); + } + } + private void verifyRestoredActiveTasks(final StreamTask... tasks) throws Exception { if (tasks.length == 0) { assertTrue(stateUpdater.getRestoredActiveTasks().isEmpty()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index b6bb3b5e7a..43812020bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -206,6 +206,47 @@ public class StandbyTaskTest { assertThrows(IllegalStateException.class, task::prepareCommit); } + + @Test + public void shouldAlwaysCheckpointStateIfEnforced() { + stateManager.flush(); + EasyMock.expectLastCall().once(); + stateManager.checkpoint(); + EasyMock.expectLastCall().once(); + EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes(); + EasyMock.replay(stateManager); + + task = createStandbyTask(); + + task.initializeIfNeeded(); + task.maybeCheckpoint(true); + + EasyMock.verify(stateManager); + } + + @Test + public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); + stateManager.flush(); + EasyMock.expectLastCall(); + stateManager.checkpoint(); + EasyMock.expectLastCall().once(); + EasyMock.expect(stateManager.changelogOffsets()) + .andReturn(Collections.singletonMap(partition, 50L)) + .andReturn(Collections.singletonMap(partition, 11000L)) + .andReturn(Collections.singletonMap(partition, 11000L)); + EasyMock.replay(stateManager); + + task = createStandbyTask(); + task.initializeIfNeeded(); + + task.maybeCheckpoint(false); // this should not checkpoint + task.maybeCheckpoint(false); // this should checkpoint + task.maybeCheckpoint(false); // this should not checkpoint + + EasyMock.verify(stateManager); + } + @Test public void shouldFlushAndCheckpointStateManagerOnCommit() { EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 9974fa92b6..2fb87a5ae9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1945,6 +1945,22 @@ public class StreamTaskTest { EasyMock.verify(stateManager); } + @Test + public void shouldCheckpointState() { + stateManager.flush(); + EasyMock.expectLastCall().once(); + stateManager.checkpoint(); + EasyMock.expectLastCall().once(); + EasyMock.replay(stateManager); + + task = createOptimizedStatefulTask(createConfig("100"), consumer); + + task.initializeIfNeeded(); + task.maybeCheckpoint(true); + + EasyMock.verify(stateManager); + } + @Test public void shouldCheckpointOffsetsOnPostCommit() { final long offset = 543L;