This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 925c628173 KAFKA-10199: Commit the restoration progress within 
StateUpdater (#12279)
925c628173 is described below

commit 925c6281733662cd40fffaab54a6483b00f80ee6
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Thu Jun 23 10:46:14 2022 -0700

    KAFKA-10199: Commit the restoration progress within StateUpdater (#12279)
    
    During restoring, we should always commit a.k.a. write checkpoint file 
regardless of EOS or ALOS, since if there's a failure we would just 
over-restore them upon recovery so no EOS violations happened.
    
    Also when we complete restore or remove task, we should enforce a 
checkpoint as well; for failing cases though, we should not write a new one.
    
    Reviewers: Bruno Cadonna <cado...@apache.org>
---
 .../streams/processor/internals/AbstractTask.java  |   4 +-
 .../processor/internals/DefaultStateUpdater.java   |  36 +++++++-
 .../streams/processor/internals/StandbyTask.java   |   2 +-
 .../streams/processor/internals/StreamTask.java    |   8 +-
 .../kafka/streams/processor/internals/Task.java    |   6 ++
 .../internals/DefaultStateUpdaterTest.java         | 101 +++++++++++++++++++--
 .../processor/internals/StandbyTaskTest.java       |  41 +++++++++
 .../processor/internals/StreamTaskTest.java        |  16 ++++
 8 files changed, 197 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 4e652a6dfc..c64fadfe5c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -88,7 +88,8 @@ public abstract class AbstractTask implements Task {
      * @throws StreamsException fatal error when flushing the state store, for 
example sending changelog records failed
      *                          or flushing state store get IO errors; such 
error should cause the thread to die
      */
-    protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
+    @Override
+    public void maybeCheckpoint(final boolean enforceCheckpoint) {
         final Map<TopicPartition, Long> offsetSnapshot = 
stateMgr.changelogOffsets();
         if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, 
offsetSnapshotSinceLastFlush, offsetSnapshot)) {
             // the state's current offset would be used to checkpoint
@@ -98,7 +99,6 @@ public abstract class AbstractTask implements Task {
         }
     }
 
-
     @Override
     public TaskId id() {
         return id;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 54cb7bc427..cc580a3b38 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.processor.TaskId;
@@ -85,7 +86,7 @@ public class DefaultStateUpdater implements StateUpdater {
         }
 
         public boolean onlyStandbyTasksLeft() {
-            return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().allMatch(t -> !t.isActive());
+            return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().noneMatch(Task::isActive);
         }
 
         @Override
@@ -111,6 +112,7 @@ public class DefaultStateUpdater implements StateUpdater {
         private void runOnce() throws InterruptedException {
             performActionsOnTasks();
             restoreTasks();
+            maybeCheckpointUpdatingTasks(time.milliseconds());
             waitIfAllChangelogsCompletelyRead();
         }
 
@@ -252,6 +254,8 @@ public class DefaultStateUpdater implements StateUpdater {
         private void removeTask(final TaskId taskId) {
             final Task task = updatingTasks.remove(taskId);
             if (task != null) {
+                task.maybeCheckpoint(true);
+
                 final Collection<TopicPartition> changelogPartitions = 
task.changelogPartitions();
                 changelogReader.unregister(changelogPartitions);
                 removedTasks.add(task);
@@ -271,9 +275,10 @@ public class DefaultStateUpdater implements StateUpdater {
             final Collection<TopicPartition> taskChangelogPartitions = 
task.changelogPartitions();
             if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
                 task.completeRestoration(offsetResetter);
-                log.debug("Stateful active task " + task.id() + " completed 
restoration");
+                task.maybeCheckpoint(true);
                 addTaskToRestoredTasks(task);
                 updatingTasks.remove(task.id());
+                log.debug("Stateful active task " + task.id() + " completed 
restoration");
                 if (onlyStandbyTasksLeft()) {
                     changelogReader.transitToUpdateStandby();
                 }
@@ -290,6 +295,23 @@ public class DefaultStateUpdater implements StateUpdater {
                 restoredActiveTasksLock.unlock();
             }
         }
+
+        private void maybeCheckpointUpdatingTasks(final long now) {
+            final long elapsedMsSinceLastCommit = now - lastCommitMs;
+            if (elapsedMsSinceLastCommit > commitIntervalMs) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Committing all restoring tasks since {}ms has 
elapsed (commit interval is {}ms)",
+                        elapsedMsSinceLastCommit, commitIntervalMs);
+                }
+
+                for (final Task task : updatingTasks.values()) {
+                    // do not enforce checkpointing during restoration if its 
position has not advanced much
+                    task.maybeCheckpoint(false);
+                }
+
+                lastCommitMs = now;
+            }
+        }
     }
 
     private final Time time;
@@ -305,14 +327,22 @@ public class DefaultStateUpdater implements StateUpdater {
     private final BlockingQueue<Task> removedTasks = new 
LinkedBlockingQueue<>();
     private CountDownLatch shutdownGate;
 
+    private final long commitIntervalMs;
+    private long lastCommitMs;
+
     private StateUpdaterThread stateUpdaterThread = null;
 
-    public DefaultStateUpdater(final ChangelogReader changelogReader,
+    public DefaultStateUpdater(final StreamsConfig config,
+                               final ChangelogReader changelogReader,
                                final Consumer<Set<TopicPartition>> 
offsetResetter,
                                final Time time) {
         this.changelogReader = changelogReader;
         this.offsetResetter = offsetResetter;
         this.time = time;
+
+        this.commitIntervalMs = 
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
+        // initialize the last commit as of now to prevent first commit 
happens immediately
+        this.lastCommitMs = time.milliseconds();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index ea946b2341..670c0c4beb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -194,7 +194,7 @@ public class StandbyTask extends AbstractTask implements 
Task {
 
             case RUNNING:
             case SUSPENDED:
-                maybeWriteCheckpoint(enforceCheckpoint);
+                maybeCheckpoint(enforceCheckpoint);
 
                 log.debug("Finalized commit for {} task", state());
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ea593a2973..8514c6ae2e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -483,14 +483,14 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 
             case RESTORING:
             case SUSPENDED:
-                maybeWriteCheckpoint(enforceCheckpoint);
+                maybeCheckpoint(enforceCheckpoint);
                 log.debug("Finalized commit for {} task with enforce 
checkpoint {}", state(), enforceCheckpoint);
 
                 break;
 
             case RUNNING:
                 if (enforceCheckpoint || !eosEnabled) {
-                    maybeWriteCheckpoint(enforceCheckpoint);
+                    maybeCheckpoint(enforceCheckpoint);
                 }
                 log.debug("Finalized commit for {} task with eos {} enforce 
checkpoint {}", state(), eosEnabled, enforceCheckpoint);
 
@@ -582,14 +582,14 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
      *                          or flushing state store get IO errors; such 
error should cause the thread to die
      */
     @Override
-    protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
+    public void maybeCheckpoint(final boolean enforceCheckpoint) {
         // commitNeeded indicates we may have processed some records since 
last commit
         // and hence we need to refresh checkpointable offsets regardless 
whether we should checkpoint or not
         if (commitNeeded || enforceCheckpoint) {
             stateMgr.updateChangelogOffsets(checkpointableOffsets());
         }
 
-        super.maybeWriteCheckpoint(enforceCheckpoint);
+        super.maybeCheckpoint(enforceCheckpoint);
     }
 
     private void validateClean() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index fc3e6cb1a8..a17b19997b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -141,6 +141,12 @@ public interface Task {
      */
     void updateInputPartitions(final Set<TopicPartition> topicPartitions, 
final Map<String, List<String>> allTopologyNodesToSourceTopics);
 
+    /**
+     * @param enforceCheckpoint if true the task would always execute the 
checkpoint;
+     *                          otherwise it may skip if the state has not 
advanced much
+     */
+    void maybeCheckpoint(final boolean enforceCheckpoint);
+
     void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
 
     /**
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index fa50380f7f..8f0fc935a8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.processor.TaskId;
@@ -35,12 +37,17 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.common.utils.Utils.sleep;
+import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.easymock.EasyMock.anyBoolean;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -58,6 +65,7 @@ import static org.mockito.Mockito.when;
 
 class DefaultStateUpdaterTest {
 
+    private final static int COMMIT_INTERVAL = 100;
     private final static long CALL_TIMEOUT = 1000;
     private final static long VERIFICATION_TIMEOUT = 15000;
     private final static TopicPartition TOPIC_PARTITION_A_0 = new 
TopicPartition("topicA", 0);
@@ -69,15 +77,27 @@ class DefaultStateUpdaterTest {
     private final static TaskId TASK_1_0 = new TaskId(1, 0);
     private final static TaskId TASK_1_1 = new TaskId(1, 1);
 
+    private final StreamsConfig config = new 
StreamsConfig(configProps(COMMIT_INTERVAL));
     private final ChangelogReader changelogReader = 
mock(ChangelogReader.class);
     private final java.util.function.Consumer<Set<TopicPartition>> 
offsetResetter = topicPartitions -> { };
-    private final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(changelogReader, offsetResetter, Time.SYSTEM);
+
+    private DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, 
changelogReader, offsetResetter, Time.SYSTEM);
 
     @AfterEach
     public void tearDown() {
         stateUpdater.shutdown(Duration.ofMinutes(1));
     }
 
+    private Properties configProps(final int commitInterval) {
+        return mkObjectProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:2171"),
+                mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
commitInterval),
+                
mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), 
commitInterval)
+        ));
+    }
+
     @Test
     public void shouldShutdownStateUpdater() {
         final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
@@ -152,6 +172,7 @@ class DefaultStateUpdaterTest {
         }
 
         verifyRestoredActiveTasks(tasks);
+        verifyNeverCheckpointTasks(tasks);
         verifyUpdatingTasks();
         verifyExceptionsAndFailedTasks();
         verifyRemovedTasks();
@@ -173,6 +194,7 @@ class DefaultStateUpdaterTest {
         stateUpdater.add(task);
 
         verifyRestoredActiveTasks(task);
+        verifyCheckpointTasks(true, task);
         verifyUpdatingTasks();
         verifyExceptionsAndFailedTasks();
         verifyRemovedTasks();
@@ -203,6 +225,7 @@ class DefaultStateUpdaterTest {
         stateUpdater.add(task3);
 
         verifyRestoredActiveTasks(task3, task1, task2);
+        verifyCheckpointTasks(true, task3, task1, task2);
         verifyUpdatingTasks();
         verifyExceptionsAndFailedTasks();
         verifyRemovedTasks();
@@ -286,6 +309,7 @@ class DefaultStateUpdaterTest {
         stateUpdater.add(task4);
 
         verifyRestoredActiveTasks(task2, task1);
+        verifyCheckpointTasks(true, task2, task1);
         verifyUpdatingStandbyTasks(task4, task3);
         verifyExceptionsAndFailedTasks();
         verifyRemovedTasks();
@@ -313,6 +337,7 @@ class DefaultStateUpdaterTest {
         stateUpdater.add(task2);
 
         verifyRestoredActiveTasks(task1);
+        verifyCheckpointTasks(true, task1);
         verify(task1).completeRestoration(offsetResetter);
         verifyUpdatingStandbyTasks(task2);
         final InOrder orderVerifier = inOrder(changelogReader);
@@ -346,31 +371,32 @@ class DefaultStateUpdaterTest {
             .thenReturn(false);
         stateUpdater.add(task);
 
-        stateUpdater.remove(TASK_0_0);
+        stateUpdater.remove(task.id());
 
         verifyRemovedTasks(task);
+        verifyCheckpointTasks(true, task);
         verifyRestoredActiveTasks();
         verifyUpdatingTasks();
         verifyExceptionsAndFailedTasks();
-        
verify(changelogReader).unregister(Collections.singletonList(TOPIC_PARTITION_A_0));
+        verify(changelogReader).unregister(task.changelogPartitions());
     }
 
     @Test
     public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() 
throws Exception {
         final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
-        shouldNotRemoveTaskFromRestoredActiveTasks(task);
+        shouldNotRemoveTaskFromRestoredActiveTasks(task, 
Collections.singleton(TOPIC_PARTITION_A_0));
     }
 
     @Test
     public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws 
Exception {
         final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
-        shouldNotRemoveTaskFromRestoredActiveTasks(task);
+        shouldNotRemoveTaskFromRestoredActiveTasks(task, 
Collections.emptySet());
     }
 
-    private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask 
task) throws Exception {
+    private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask 
task, final Set<TopicPartition> completedChangelogs) throws Exception {
         final StreamTask controlTask = 
createActiveStatefulTaskInStateRestoring(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_B_0));
         when(changelogReader.completedChangelogs())
-            .thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+            .thenReturn(completedChangelogs);
         when(changelogReader.allChangelogsCompleted())
             .thenReturn(false);
         stateUpdater.add(task);
@@ -603,6 +629,67 @@ class DefaultStateUpdaterTest {
         verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, 
expectedExceptionAndTasks3, expectedExceptionAndTasks4);
     }
 
+    @Test
+    public void shouldAutoCheckpointTasksOnInterval() throws Exception {
+        final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+        final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+        when(changelogReader.completedChangelogs())
+                .thenReturn(Collections.emptySet());
+        when(changelogReader.allChangelogsCompleted())
+                .thenReturn(false);
+
+        stateUpdater.add(task1);
+        stateUpdater.add(task2);
+        stateUpdater.add(task3);
+        stateUpdater.add(task4);
+
+        sleep(COMMIT_INTERVAL);
+
+        verifyExceptionsAndFailedTasks();
+        verifyCheckpointTasks(false, task1, task2, task3, task4);
+    }
+
+    @Test
+    public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
+        stateUpdater.shutdown(Duration.ofMinutes(1));
+        final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
+        stateUpdater = new DefaultStateUpdater(config, changelogReader, 
offsetResetter, Time.SYSTEM);
+
+        try {
+            final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+            final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+            final StandbyTask task3 = 
createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+            final StandbyTask task4 = 
createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+            when(changelogReader.completedChangelogs())
+                    .thenReturn(Collections.emptySet());
+            when(changelogReader.allChangelogsCompleted())
+                    .thenReturn(false);
+
+            stateUpdater.add(task1);
+            stateUpdater.add(task2);
+            stateUpdater.add(task3);
+            stateUpdater.add(task4);
+
+            verifyNeverCheckpointTasks(task1, task2, task3, task4);
+        } finally {
+            stateUpdater.shutdown(Duration.ofMinutes(1));
+        }
+    }
+
+    private void verifyCheckpointTasks(final boolean enforceCheckpoint, final 
Task... tasks) {
+        for (final Task task : tasks) {
+            verify(task, 
timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint);
+        }
+    }
+
+    private void verifyNeverCheckpointTasks(final Task... tasks) {
+        for (final Task task : tasks) {
+            verify(task, never()).maybeCheckpoint(anyBoolean());
+        }
+    }
+
     private void verifyRestoredActiveTasks(final StreamTask... tasks) throws 
Exception {
         if (tasks.length == 0) {
             assertTrue(stateUpdater.getRestoredActiveTasks().isEmpty());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b6bb3b5e7a..43812020bc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -206,6 +206,47 @@ public class StandbyTaskTest {
         assertThrows(IllegalStateException.class, task::prepareCommit);
     }
 
+
+    @Test
+    public void shouldAlwaysCheckpointStateIfEnforced() {
+        stateManager.flush();
+        EasyMock.expectLastCall().once();
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once();
+        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
+        EasyMock.replay(stateManager);
+
+        task = createStandbyTask();
+
+        task.initializeIfNeeded();
+        task.maybeCheckpoint(true);
+
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
+        stateManager.flush();
+        EasyMock.expectLastCall();
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once();
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(Collections.singletonMap(partition, 50L))
+                .andReturn(Collections.singletonMap(partition, 11000L))
+                .andReturn(Collections.singletonMap(partition, 11000L));
+        EasyMock.replay(stateManager);
+
+        task = createStandbyTask();
+        task.initializeIfNeeded();
+
+        task.maybeCheckpoint(false);  // this should not checkpoint
+        task.maybeCheckpoint(false);  // this should checkpoint
+        task.maybeCheckpoint(false);  // this should not checkpoint
+
+        EasyMock.verify(stateManager);
+    }
+
     @Test
     public void shouldFlushAndCheckpointStateManagerOnCommit() {
         
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 9974fa92b6..2fb87a5ae9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1945,6 +1945,22 @@ public class StreamTaskTest {
         EasyMock.verify(stateManager);
     }
 
+    @Test
+    public void shouldCheckpointState() {
+        stateManager.flush();
+        EasyMock.expectLastCall().once();
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once();
+        EasyMock.replay(stateManager);
+
+        task = createOptimizedStatefulTask(createConfig("100"), consumer);
+
+        task.initializeIfNeeded();
+        task.maybeCheckpoint(true);
+
+        EasyMock.verify(stateManager);
+    }
+
     @Test
     public void shouldCheckpointOffsetsOnPostCommit() {
         final long offset = 543L;

Reply via email to