This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new db98c6f  KAFKA-5998: fix checkpointableOffsets handling (#7030)
db98c6f is described below

commit db98c6f8cdeb27e7dd858cd37499c66458064e5a
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Fri Jul 12 08:42:11 2019 -0500

    KAFKA-5998: fix checkpointableOffsets handling (#7030)
    
    fix checkpoint file warning by filtering checkpointable offsets per task
    clean up state manager hierarchy to prevent similar bugs
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, Bill Bejeck 
<bbej...@gmail.com>
---
 .../internals/GlobalStateManagerImpl.java          |  68 ++++---
 .../processor/internals/ProcessorStateManager.java | 203 ++++++++++++++-------
 ...ractStateManager.java => StateManagerUtil.java} |  41 ++---
 .../streams/processor/internals/StreamTask.java    |  11 +-
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../streams/state/internals/OffsetCheckpoint.java  |   5 +
 .../internals/GlobalStateManagerImplTest.java      |   8 +-
 .../processor/internals/MockChangelogReader.java   |   7 +-
 .../internals/ProcessorStateManagerTest.java       | 134 +++++++++++++-
 .../processor/internals/StandbyTaskTest.java       |   4 +-
 .../processor/internals/StreamTaskTest.java        |   6 +-
 .../processor/internals/StreamThreadTest.java      |   2 +-
 .../processor/internals/TaskManagerTest.java       |   6 +-
 13 files changed, 356 insertions(+), 141 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f224e1e..6a3959f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.FixedOrderMap;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -32,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.RecordConverter;
 import org.slf4j.Logger;
 
@@ -48,22 +50,30 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
+
 /**
  * This class is responsible for the initialization, restoration, closing, 
flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per 
Application Instance.
  */
-public class GlobalStateManagerImpl extends AbstractStateManager implements 
GlobalStateManager {
+public class GlobalStateManagerImpl implements GlobalStateManager {
     private final Logger log;
+    private final boolean eosEnabled;
     private final ProcessorTopology topology;
     private final Consumer<byte[], byte[]> globalConsumer;
+    private final File baseDir;
     private final StateDirectory stateDirectory;
     private final Set<String> globalStoreNames = new HashSet<>();
+    private final FixedOrderMap<String, Optional<StateStore>> globalStores = 
new FixedOrderMap<>();
     private final StateRestoreListener stateRestoreListener;
-    private InternalProcessorContext processorContext;
+    private InternalProcessorContext globalProcessorContext;
     private final int retries;
     private final long retryBackoffMs;
     private final Duration pollTime;
     private final Set<String> globalNonPersistentStoresTopics = new 
HashSet<>();
+    private final OffsetCheckpoint checkpointFile;
+    private final Map<TopicPartition, Long> checkpointFileCache;
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
@@ -71,7 +81,10 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
                                   final StateDirectory stateDirectory,
                                   final StateRestoreListener 
stateRestoreListener,
                                   final StreamsConfig config) {
-        super(stateDirectory.globalStateDir(), 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
+        eosEnabled = 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
+        baseDir = stateDirectory.globalStateDir();
+        checkpointFile = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
+        checkpointFileCache = new HashMap<>();
 
         // Find non persistent store's topics
         final Map<String, String> storeToChangelogTopic = 
topology.storeToChangelogTopic();
@@ -81,19 +94,19 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
             }
         }
 
-        this.log = logContext.logger(GlobalStateManagerImpl.class);
+        log = logContext.logger(GlobalStateManagerImpl.class);
         this.topology = topology;
         this.globalConsumer = globalConsumer;
         this.stateDirectory = stateDirectory;
         this.stateRestoreListener = stateRestoreListener;
-        this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
-        this.retryBackoffMs = 
config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
-        this.pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
+        retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
+        retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
+        pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
     }
 
     @Override
-    public void setGlobalProcessorContext(final InternalProcessorContext 
processorContext) {
-        this.processorContext = processorContext;
+    public void setGlobalProcessorContext(final InternalProcessorContext 
globalProcessorContext) {
+        this.globalProcessorContext = globalProcessorContext;
     }
 
     @Override
@@ -103,11 +116,11 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
                 throw new LockException(String.format("Failed to lock the 
global state directory: %s", baseDir));
             }
         } catch (final IOException e) {
-            throw new LockException(String.format("Failed to lock the global 
state directory: %s", baseDir));
+            throw new LockException(String.format("Failed to lock the global 
state directory: %s", baseDir), e);
         }
 
         try {
-            this.checkpointableOffsets.putAll(checkpoint.read());
+            checkpointFileCache.putAll(checkpointFile.read());
         } catch (final IOException e) {
             try {
                 stateDirectory.unlockGlobalState();
@@ -120,7 +133,7 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
         final List<StateStore> stateStores = topology.globalStateStores();
         for (final StateStore stateStore : stateStores) {
             globalStoreNames.add(stateStore.name());
-            stateStore.init(processorContext, stateStore);
+            stateStore.init(globalProcessorContext, stateStore);
         }
         return Collections.unmodifiableSet(globalStoreNames);
     }
@@ -128,12 +141,17 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
     @Override
     public void reinitializeStateStoresForPartitions(final 
Collection<TopicPartition> partitions,
                                                      final 
InternalProcessorContext processorContext) {
-        super.reinitializeStateStoresForPartitions(
+        StateManagerUtil.reinitializeStateStoresForPartitions(
             log,
+            eosEnabled,
+            baseDir,
             globalStores,
             topology.storeToChangelogTopic(),
             partitions,
-            processorContext);
+            processorContext,
+            checkpointFile,
+            checkpointFileCache
+        );
 
         globalConsumer.assign(partitions);
         globalConsumer.seekToBeginning(partitions);
@@ -261,7 +279,7 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
                               final RecordConverter recordConverter) {
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
-            final Long checkpoint = checkpointableOffsets.get(topicPartition);
+            final Long checkpoint = checkpointFileCache.get(topicPartition);
             if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
             } else {
@@ -293,14 +311,14 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
                     log.warn("Restoring GlobalStore {} failed due to: {}. 
Deleting global store to recreate from scratch.",
                         storeName,
                         recoverableException.toString());
-                    
reinitializeStateStoresForPartitions(recoverableException.partitions(), 
processorContext);
+                    
reinitializeStateStoresForPartitions(recoverableException.partitions(), 
globalProcessorContext);
 
                     stateRestoreListener.onRestoreStart(topicPartition, 
storeName, offset, highWatermark);
                     restoreCount = 0L;
                 }
             }
             stateRestoreListener.onRestoreEnd(topicPartition, storeName, 
restoreCount);
-            checkpointableOffsets.put(topicPartition, offset);
+            checkpointFileCache.put(topicPartition, offset);
         }
     }
 
@@ -313,7 +331,7 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
                 try {
                     log.trace("Flushing global store={}", store.name());
                     store.flush();
-                } catch (final Exception e) {
+                } catch (final RuntimeException e) {
                     throw new ProcessorStateException(
                         String.format("Failed to flush global state store %s", 
store.name()),
                         e
@@ -338,12 +356,12 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
                     log.debug("Closing global storage engine {}", 
entry.getKey());
                     try {
                         entry.getValue().get().close();
-                    } catch (final Exception e) {
+                    } catch (final RuntimeException e) {
                         log.error("Failed to close global state store {}", 
entry.getKey(), e);
                         closeFailed.append("Failed to close global state 
store:")
                                    .append(entry.getKey())
                                    .append(". Reason: ")
-                                   .append(e.toString())
+                                   .append(e)
                                    .append("\n");
                     }
                     globalStores.put(entry.getKey(), Optional.empty());
@@ -361,12 +379,12 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
 
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
-        checkpointableOffsets.putAll(offsets);
+        checkpointFileCache.putAll(offsets);
 
         final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
 
         // Skip non persistent store
-        for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : 
checkpointableOffsets.entrySet()) {
+        for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : 
checkpointFileCache.entrySet()) {
             final String topic = topicPartitionOffset.getKey().topic();
             if (!globalNonPersistentStoresTopics.contains(topic)) {
                 filteredOffsets.put(topicPartitionOffset.getKey(), 
topicPartitionOffset.getValue());
@@ -374,15 +392,15 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
         }
 
         try {
-            checkpoint.write(filteredOffsets);
+            checkpointFile.write(filteredOffsets);
         } catch (final IOException e) {
-            log.warn("Failed to write offset checkpoint file to {} for global 
stores: {}", checkpoint, e);
+            log.warn("Failed to write offset checkpoint file to {} for global 
stores: {}", checkpointFile, e);
         }
     }
 
     @Override
     public Map<TopicPartition, Long> checkpointed() {
-        return Collections.unmodifiableMap(checkpointableOffsets);
+        return Collections.unmodifiableMap(checkpointFileCache);
     }
 
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index f060d29..7e16abc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -33,15 +33,20 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.unmodifiableList;
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
 import static 
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
 
 
-public class ProcessorStateManager extends AbstractStateManager {
+public class ProcessorStateManager implements StateManager {
     private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
 
     private final Logger log;
@@ -57,16 +62,23 @@ public class ProcessorStateManager extends 
AbstractStateManager {
 
     // must be maintained in topological order
     private final FixedOrderMap<String, Optional<StateStore>> registeredStores 
= new FixedOrderMap<>();
+    private final FixedOrderMap<String, Optional<StateStore>> globalStores = 
new FixedOrderMap<>();
 
     private final List<TopicPartition> changelogPartitions = new ArrayList<>();
 
     // TODO: this map does not work with customized grouper where multiple 
partitions
-    // of the same topic can be assigned to the same topic.
+    // of the same topic can be assigned to the same task.
     private final Map<String, TopicPartition> partitionForTopic;
 
+    private final boolean eosEnabled;
+    private final File baseDir;
+    private OffsetCheckpoint checkpointFile;
+    private final Map<TopicPartition, Long> checkpointFileCache = new 
HashMap<>();
+    private final Map<TopicPartition, Long> initialLoadedCheckpoints;
+
     /**
      * @throws ProcessorStateException if the task directory does not exist 
and could not be created
-     * @throws IOException if any severe error happens while creating or 
locking the state directory
+     * @throws IOException             if any severe error happens while 
creating or locking the state directory
      */
     public ProcessorStateManager(final TaskId taskId,
                                  final Collection<TopicPartition> sources,
@@ -76,9 +88,9 @@ public class ProcessorStateManager extends 
AbstractStateManager {
                                  final ChangelogReader changelogReader,
                                  final boolean eosEnabled,
                                  final LogContext logContext) throws 
IOException {
-        super(stateDirectory.directoryForTask(taskId), eosEnabled);
+        this.eosEnabled = eosEnabled;
 
-        this.log = logContext.logger(ProcessorStateManager.class);
+        log = logContext.logger(ProcessorStateManager.class);
         this.taskId = taskId;
         this.changelogReader = changelogReader;
         logPrefix = String.format("task [%s] ", taskId);
@@ -94,15 +106,17 @@ public class ProcessorStateManager extends 
AbstractStateManager {
         recordConverters = isStandby ? new HashMap<>() : null;
         this.storeToChangelogTopic = new HashMap<>(storeToChangelogTopic);
 
-        // load the checkpoint information
-        checkpointableOffsets.putAll(checkpoint.read());
+        baseDir = stateDirectory.directoryForTask(taskId);
+        checkpointFile = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
+        initialLoadedCheckpoints = checkpointFile.read();
 
-        log.trace("Checkpointable offsets read from checkpoint: {}", 
checkpointableOffsets);
+        log.trace("Checkpointable offsets read from checkpoint: {}", 
initialLoadedCheckpoints);
 
         if (eosEnabled) {
-            // delete the checkpoint file after finish loading its stored 
offsets
-            checkpoint.delete();
-            checkpoint = null;
+            // with EOS enabled, there should never be a checkpoint file 
_during_ processing.
+            // delete the checkpoint file after loading its stored offsets.
+            checkpointFile.delete();
+            checkpointFile = null;
         }
 
         log.debug("Created state store manager for task {} with the acquired 
state dir lock", taskId);
@@ -126,7 +140,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
         log.debug("Registering state store {} to its state manager", 
storeName);
 
         if (CHECKPOINT_FILE_NAME.equals(storeName)) {
-            throw new IllegalArgumentException(String.format("%sIllegal store 
name: %s", logPrefix, CHECKPOINT_FILE_NAME));
+            throw new IllegalArgumentException(String.format("%sIllegal store 
name: %s", logPrefix, storeName));
         }
 
         if (registeredStores.containsKey(storeName) && 
registeredStores.get(storeName).isPresent()) {
@@ -135,10 +149,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
 
         // check that the underlying change log topic exist or not
         final String topic = storeToChangelogTopic.get(storeName);
-        if (topic == null) {
-            registeredStores.put(storeName, Optional.of(store));
-        } else {
-
+        if (topic != null) {
             final TopicPartition storePartition = new TopicPartition(topic, 
getPartition(topic));
 
             final RecordConverter recordConverter = converterForStore(store);
@@ -149,12 +160,16 @@ public class ProcessorStateManager extends 
AbstractStateManager {
                 restoreCallbacks.put(topic, stateRestoreCallback);
                 recordConverters.put(topic, recordConverter);
             } else {
-                log.trace("Restoring state store {} from changelog topic {} at 
checkpoint {}", storeName, topic, checkpointableOffsets.get(storePartition));
+                final Long restoreCheckpoint = store.persistent() ? 
initialLoadedCheckpoints.get(storePartition) : null;
+                if (restoreCheckpoint != null) {
+                    checkpointFileCache.put(storePartition, restoreCheckpoint);
+                }
+                log.trace("Restoring state store {} from changelog topic {} at 
checkpoint {}", storeName, topic, restoreCheckpoint);
 
                 final StateRestorer restorer = new StateRestorer(
                     storePartition,
                     new CompositeRestoreListener(stateRestoreCallback),
-                    checkpointableOffsets.get(storePartition),
+                    restoreCheckpoint,
                     offsetLimit(storePartition),
                     store.persistent(),
                     storeName,
@@ -164,24 +179,38 @@ public class ProcessorStateManager extends 
AbstractStateManager {
                 changelogReader.register(restorer);
             }
             changelogPartitions.add(storePartition);
-
-            registeredStores.put(storeName, Optional.of(store));
         }
+
+        registeredStores.put(storeName, Optional.of(store));
     }
 
     @Override
     public void reinitializeStateStoresForPartitions(final 
Collection<TopicPartition> partitions,
                                                      final 
InternalProcessorContext processorContext) {
-        reinitializeStateStoresForPartitions(
-            log,
-            registeredStores,
-            storeToChangelogTopic,
-            partitions,
-            processorContext);
+        StateManagerUtil.reinitializeStateStoresForPartitions(log,
+                                                              eosEnabled,
+                                                              baseDir,
+                                                              registeredStores,
+                                                              
storeToChangelogTopic,
+                                                              partitions,
+                                                              processorContext,
+                                                              checkpointFile,
+                                                              
checkpointFileCache
+        );
+    }
+
+    void clearCheckpoints() throws IOException {
+        if (checkpointFile != null) {
+            checkpointFile.delete();
+            checkpointFile = null;
+
+            checkpointFileCache.clear();
+        }
     }
 
     @Override
     public Map<TopicPartition, Long> checkpointed() {
+        updateCheckpointFileCache(emptyMap());
         final Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
 
         for (final Map.Entry<String, StateRestoreCallback> entry : 
restoreCallbacks.entrySet()) {
@@ -189,7 +218,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
             final int partition = getPartition(topicName);
             final TopicPartition storePartition = new 
TopicPartition(topicName, partition);
 
-            partitionsAndOffsets.put(storePartition, 
checkpointableOffsets.getOrDefault(storePartition, -1L));
+            partitionsAndOffsets.put(storePartition, 
checkpointFileCache.getOrDefault(storePartition, -1L));
         }
         return partitionsAndOffsets;
     }
@@ -209,7 +238,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
 
             try {
                 restoreCallback.restoreBatch(convertedRecords);
-            } catch (final Exception e) {
+            } catch (final RuntimeException e) {
                 throw new ProcessorStateException(String.format("%sException 
caught while trying to restore state from %s", logPrefix, storePartition), e);
             }
         }
@@ -246,7 +275,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
                     log.trace("Flushing store {}", store.name());
                     try {
                         store.flush();
-                    } catch (final Exception e) {
+                    } catch (final RuntimeException e) {
                         if (firstException == null) {
                             firstException = new 
ProcessorStateException(String.format("%sFailed to flush state store %s", 
logPrefix, store.name()), e);
                         }
@@ -266,6 +295,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
     /**
      * {@link StateStore#close() Close} all stores (even in case of failure).
      * Log all exception and re-throw the first exception that did occur at 
the end.
+     *
      * @throws ProcessorStateException if any error happens when closing the 
state stores
      */
     @Override
@@ -282,7 +312,7 @@ public class ProcessorStateManager extends 
AbstractStateManager {
                     try {
                         store.close();
                         registeredStores.put(store.name(), Optional.empty());
-                    } catch (final Exception e) {
+                    } catch (final RuntimeException e) {
                         if (firstException == null) {
                             firstException = new 
ProcessorStateException(String.format("%sFailed to close state store %s", 
logPrefix, store.name()), e);
                         }
@@ -294,11 +324,10 @@ public class ProcessorStateManager extends 
AbstractStateManager {
             }
         }
 
-        if (!clean && eosEnabled && checkpoint != null) {
+        if (!clean && eosEnabled) {
             // delete the checkpoint file if this is an unclean close
             try {
-                checkpoint.delete();
-                checkpoint = null;
+                clearCheckpoints();
             } catch (final IOException e) {
                 throw new ProcessorStateException(String.format("%sError while 
deleting the checkpoint file", logPrefix), e);
             }
@@ -309,44 +338,56 @@ public class ProcessorStateManager extends 
AbstractStateManager {
         }
     }
 
-    // write the checkpoint
     @Override
-    public void checkpoint(final Map<TopicPartition, Long> 
checkpointableOffsets) {
-        this.checkpointableOffsets.putAll(changelogReader.restoredOffsets());
-        log.trace("Checkpointable offsets updated with restored offsets: {}", 
this.checkpointableOffsets);
-        for (final Map.Entry<String, Optional<StateStore>> entry : 
registeredStores.entrySet()) {
-            if (entry.getValue().isPresent()) {
-                final StateStore store = entry.getValue().get();
-                final String storeName = store.name();
-                // only checkpoint the offset to the offsets file if
-                // it is persistent AND changelog enabled
-                if (store.persistent() && 
storeToChangelogTopic.containsKey(storeName)) {
-                    final String changelogTopic = 
storeToChangelogTopic.get(storeName);
-                    final TopicPartition topicPartition = new 
TopicPartition(changelogTopic, getPartition(storeName));
-                    if (checkpointableOffsets.containsKey(topicPartition)) {
-                        // store the last offset + 1 (the log position after 
restoration)
-                        this.checkpointableOffsets.put(topicPartition, 
checkpointableOffsets.get(topicPartition) + 1);
-                    } else if 
(standbyRestoredOffsets.containsKey(topicPartition)) {
-                        this.checkpointableOffsets.put(topicPartition, 
standbyRestoredOffsets.get(topicPartition));
-                    }
-                }
-            } else {
-                throw new IllegalStateException("Expected " + entry.getKey() + 
" to have been initialized");
-            }
-        }
-
-        log.trace("Checkpointable offsets updated with active acked offsets: 
{}", this.checkpointableOffsets);
+    public void checkpoint(final Map<TopicPartition, Long> 
checkpointableOffsetsFromProcessing) {
+        ensureStoresRegistered();
 
         // write the checkpoint file before closing
-        if (checkpoint == null) {
-            checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
+        if (checkpointFile == null) {
+            checkpointFile = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
         }
 
-        log.trace("Writing checkpoint: {}", this.checkpointableOffsets);
+        updateCheckpointFileCache(checkpointableOffsetsFromProcessing);
+
+        log.trace("Checkpointable offsets updated with active acked offsets: 
{}", checkpointFileCache);
+
+        log.trace("Writing checkpoint: {}", checkpointFileCache);
         try {
-            checkpoint.write(this.checkpointableOffsets);
+            checkpointFile.write(checkpointFileCache);
         } catch (final IOException e) {
-            log.warn("Failed to write offset checkpoint file to [{}]", 
checkpoint, e);
+            log.warn("Failed to write offset checkpoint file to [{}]", 
checkpointFile, e);
+        }
+    }
+
+    private void updateCheckpointFileCache(final Map<TopicPartition, Long> 
checkpointableOffsetsFromProcessing) {
+        final Set<TopicPartition> validCheckpointableTopics = 
validCheckpointableTopics();
+        final Map<TopicPartition, Long> restoredOffsets = 
validCheckpointableOffsets(
+            changelogReader.restoredOffsets(),
+            validCheckpointableTopics
+        );
+        log.trace("Checkpointable offsets updated with restored offsets: {}", 
checkpointFileCache);
+        for (final TopicPartition topicPartition : validCheckpointableTopics) {
+            if 
(checkpointableOffsetsFromProcessing.containsKey(topicPartition)) {
+                // if we have just recently processed some offsets,
+                // store the last offset + 1 (the log position after 
restoration)
+                checkpointFileCache.put(topicPartition, 
checkpointableOffsetsFromProcessing.get(topicPartition) + 1);
+            } else if (standbyRestoredOffsets.containsKey(topicPartition)) {
+                // or if we restored some offset as a standby task, use it
+                checkpointFileCache.put(topicPartition, 
standbyRestoredOffsets.get(topicPartition));
+            } else if (restoredOffsets.containsKey(topicPartition)) {
+                // or if we restored some offset as an active task, use it
+                checkpointFileCache.put(topicPartition, 
restoredOffsets.get(topicPartition));
+            } else if (checkpointFileCache.containsKey(topicPartition)) {
+                // or if we have a prior value we've cached (and written to 
the checkpoint file), then keep it
+            } else {
+                // As a last resort, fall back to the offset we loaded from 
the checkpoint file at startup, but
+                // only if the offset is actually valid for our current state 
stores.
+                final Long loadedOffset =
+                    validCheckpointableOffsets(initialLoadedCheckpoints, 
validCheckpointableTopics).get(topicPartition);
+                if (loadedOffset != null) {
+                    checkpointFileCache.put(topicPartition, loadedOffset);
+                }
+            }
         }
     }
 
@@ -380,4 +421,38 @@ public class ProcessorStateManager extends 
AbstractStateManager {
             }
         }
     }
+
+    private Set<TopicPartition> validCheckpointableTopics() {
+        // it's only valid to record checkpoints for registered stores that 
are both persistent and change-logged
+
+        final Set<TopicPartition> result = new 
HashSet<>(storeToChangelogTopic.size());
+        for (final Map.Entry<String, String> storeToChangelog : 
storeToChangelogTopic.entrySet()) {
+            final String storeName = storeToChangelog.getKey();
+            if (registeredStores.containsKey(storeName)
+                && registeredStores.get(storeName).isPresent()
+                && registeredStores.get(storeName).get().persistent()) {
+
+                final String changelogTopic = storeToChangelog.getValue();
+                result.add(new TopicPartition(changelogTopic, 
getPartition(changelogTopic)));
+            }
+        }
+        return result;
+    }
+
+    private static Map<TopicPartition, Long> validCheckpointableOffsets(
+        final Map<TopicPartition, Long> checkpointableOffsets,
+        final Set<TopicPartition> validCheckpointableTopics) {
+
+        final Map<TopicPartition, Long> result = new 
HashMap<>(checkpointableOffsets.size());
+
+        for (final Map.Entry<TopicPartition, Long> topicToCheckpointableOffset 
: checkpointableOffsets.entrySet()) {
+            final TopicPartition topic = topicToCheckpointableOffset.getKey();
+            if (validCheckpointableTopics.contains(topic)) {
+                final Long checkpointableOffset = 
topicToCheckpointableOffset.getValue();
+                result.put(topic, checkpointableOffset);
+            }
+        }
+
+        return result;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
similarity index 78%
rename from 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
rename to 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index 2190c43..4568865 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -38,45 +38,40 @@ import static 
org.apache.kafka.streams.state.internals.RecordConverters.identity
 import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
 import static 
org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped;
 
-abstract class AbstractStateManager implements StateManager {
+final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
-    final File baseDir;
-    final boolean eosEnabled;
-    OffsetCheckpoint checkpoint;
-
-    final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
-    final FixedOrderMap<String, Optional<StateStore>> globalStores = new 
FixedOrderMap<>();
-
-    AbstractStateManager(final File baseDir,
-                         final boolean eosEnabled) {
-        this.baseDir = baseDir;
-        this.eosEnabled = eosEnabled;
-        this.checkpoint = new OffsetCheckpoint(new File(baseDir, 
CHECKPOINT_FILE_NAME));
-    }
+    private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
-    public void reinitializeStateStoresForPartitions(final Logger log,
-                                                     final 
FixedOrderMap<String, Optional<StateStore>> stateStores,
-                                                     final Map<String, String> 
storeToChangelogTopic,
-                                                     final 
Collection<TopicPartition> partitions,
-                                                     final 
InternalProcessorContext processorContext) {
+    public static void reinitializeStateStoresForPartitions(final Logger log,
+                                                            final boolean 
eosEnabled,
+                                                            final File baseDir,
+                                                            final 
FixedOrderMap<String, Optional<StateStore>> stateStores,
+                                                            final Map<String, 
String> storeToChangelogTopic,
+                                                            final 
Collection<TopicPartition> partitions,
+                                                            final 
InternalProcessorContext processorContext,
+                                                            final 
OffsetCheckpoint checkpointFile,
+                                                            final 
Map<TopicPartition, Long> checkpointFileCache) {
         final Map<String, String> changelogTopicToStore = 
inverseOneToOneMap(storeToChangelogTopic);
         final Set<String> storesToBeReinitialized = new HashSet<>();
 
         for (final TopicPartition topicPartition : partitions) {
-            checkpointableOffsets.remove(topicPartition);
+            checkpointFileCache.remove(topicPartition);
             
storesToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
         }
 
         if (!eosEnabled) {
             try {
-                checkpoint.write(checkpointableOffsets);
+                checkpointFile.write(checkpointFileCache);
             } catch (final IOException fatalException) {
-                log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}", checkpoint, stateStores, fatalException);
+                log.error("Failed to write offset checkpoint file to {} while 
re-initializing {}: {}",
+                          checkpointFile,
+                          stateStores,
+                          fatalException);
                 throw new StreamsException("Failed to reinitialize global 
store.", fatalException);
             }
         }
@@ -122,7 +117,7 @@ abstract class AbstractStateManager implements StateManager 
{
         }
     }
 
-    private Map<String, String> inverseOneToOneMap(final Map<String, String> 
origin) {
+    private static Map<String, String> inverseOneToOneMap(final Map<String, 
String> origin) {
         final Map<String, String> reversedMap = new HashMap<>();
         for (final Map.Entry<String, String> entry : origin.entrySet()) {
             reversedMap.put(entry.getValue(), entry.getKey());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 80653c5..539d116 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -304,13 +304,10 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
             initializeTransactions();
             recordCollector.init(producer);
 
-            if (stateMgr.checkpoint != null) {
-                try {
-                    stateMgr.checkpoint.delete();
-                    stateMgr.checkpoint = null;
-                } catch (final IOException e) {
-                    throw new ProcessorStateException(String.format("%sError 
while deleting the checkpoint file", logPrefix), e);
-                }
+            try {
+                stateMgr.clearCheckpoints();
+            } catch (final IOException e) {
+                throw new ProcessorStateException(format("%sError while 
deleting the checkpoint file", logPrefix), e);
             }
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 86a8e49..c136fdb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -208,7 +208,7 @@ public class TaskManager {
                 try {
                     final TaskId id = TaskId.parse(dir.getName());
                     // if the checkpoint file exists, the state is valid.
-                    if (new File(dir, 
ProcessorStateManager.CHECKPOINT_FILE_NAME).exists()) {
+                    if (new File(dir, 
StateManagerUtil.CHECKPOINT_FILE_NAME).exists()) {
                         tasks.add(id);
                     }
                 } catch (final TaskIdFormatException e) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 405831a..93a0561 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -50,6 +52,7 @@ import java.util.regex.Pattern;
  *   separated by spaces.
  */
 public class OffsetCheckpoint {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OffsetCheckpoint.class);
 
     private static final Pattern WHITESPACE_MINIMUM_ONCE = 
Pattern.compile("\\s+");
 
@@ -75,6 +78,7 @@ public class OffsetCheckpoint {
         synchronized (lock) {
             // write to temp file and then swap with the existing file
             final File temp = new File(file.getAbsolutePath() + ".tmp");
+            LOG.trace("Writing tmp checkpoint file {}", 
temp.getAbsolutePath());
 
             final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
             try (final BufferedWriter writer = new BufferedWriter(
@@ -90,6 +94,7 @@ public class OffsetCheckpoint {
                 fileOutputStream.getFD().sync();
             }
 
+            LOG.trace("Swapping tmp checkpoint file {} {}", temp.toPath(), 
file.toPath());
             Utils.atomicMoveWithFallback(temp.toPath(), file.toPath());
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 90cff85..6e4f0d5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -137,7 +137,7 @@ public class GlobalStateManagerImplTest {
             streamsConfig);
         processorContext = new 
InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
         stateManager.setGlobalProcessorContext(processorContext);
-        checkpointFile = new File(stateManager.baseDir(), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        checkpointFile = new File(stateManager.baseDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME);
     }
 
     @After
@@ -336,7 +336,7 @@ public class GlobalStateManagerImplTest {
         initializeConsumer(5, 5, t1);
 
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new 
File(stateManager.baseDir(),
-                                                                               
 ProcessorStateManager.CHECKPOINT_FILE_NAME));
+                                                                               
 StateManagerUtil.CHECKPOINT_FILE_NAME));
         offsetCheckpoint.write(Collections.singletonMap(t1, 5L));
 
         stateManager.initialize();
@@ -560,7 +560,7 @@ public class GlobalStateManagerImplTest {
 
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws 
IOException {
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new 
File(stateManager.baseDir(),
-                                                                               
 ProcessorStateManager.CHECKPOINT_FILE_NAME));
+                                                                               
 StateManagerUtil.CHECKPOINT_FILE_NAME));
         return offsetCheckpoint.read();
     }
 
@@ -717,7 +717,7 @@ public class GlobalStateManagerImplTest {
     }
 
     private void writeCorruptCheckpoint() throws IOException {
-        final File checkpointFile = new File(stateManager.baseDir(), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        final File checkpointFile = new File(stateManager.baseDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME);
         try (final OutputStream stream = 
Files.newOutputStream(checkpointFile.toPath())) {
             stream.write("0\n1\nfoo".getBytes());
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
index 6c3be61..1330967 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 public class MockChangelogReader implements ChangelogReader {
     private final Set<TopicPartition> registered = new HashSet<>();
+    private Map<TopicPartition, Long> restoredOffsets = Collections.emptyMap();
 
     @Override
     public void register(final StateRestorer restorer) {
@@ -39,7 +40,11 @@ public class MockChangelogReader implements ChangelogReader {
 
     @Override
     public Map<TopicPartition, Long> restoredOffsets() {
-        return Collections.emptyMap();
+        return restoredOffsets;
+    }
+
+    void setRestoredOffsets(final Map<TopicPartition, Long> restoredOffsets) {
+        this.restoredOffsets = restoredOffsets;
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 0a6ada3..c7e8e8e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -107,7 +107,7 @@ public class ProcessorStateManagerTest {
                 put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getPath());
             }
         }), new MockTime(), true);
-        checkpointFile = new File(stateDirectory.directoryForTask(taskId), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        checkpointFile = new File(stateDirectory.directoryForTask(taskId), 
StateManagerUtil.CHECKPOINT_FILE_NAME);
         checkpoint = new OffsetCheckpoint(checkpointFile);
     }
 
@@ -240,7 +240,7 @@ public class ProcessorStateManagerTest {
     @Test
     public void testChangeLogOffsets() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
-        final long lastCheckpointedOffset = 10L;
+        final long storeTopic1LoadedCheckpoint = 10L;
         final String storeName1 = "store1";
         final String storeName2 = "store2";
         final String storeName3 = "store3";
@@ -254,8 +254,10 @@ public class ProcessorStateManagerTest {
         storeToChangelogTopic.put(storeName2, storeTopicName2);
         storeToChangelogTopic.put(storeName3, storeTopicName3);
 
-        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(stateDirectory.directoryForTask(taskId), 
ProcessorStateManager.CHECKPOINT_FILE_NAME));
-        checkpoint.write(singletonMap(new TopicPartition(storeTopicName1, 0), 
lastCheckpointedOffset));
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(
+            new File(stateDirectory.directoryForTask(taskId), 
StateManagerUtil.CHECKPOINT_FILE_NAME)
+        );
+        checkpoint.write(singletonMap(new TopicPartition(storeTopicName1, 0), 
storeTopic1LoadedCheckpoint));
 
         final TopicPartition partition1 = new TopicPartition(storeTopicName1, 
0);
         final TopicPartition partition2 = new TopicPartition(storeTopicName2, 
0);
@@ -289,7 +291,7 @@ public class ProcessorStateManagerTest {
             assertTrue(changeLogOffsets.containsKey(partition1));
             assertTrue(changeLogOffsets.containsKey(partition2));
             assertTrue(changeLogOffsets.containsKey(partition3));
-            assertEquals(lastCheckpointedOffset, (long) 
changeLogOffsets.get(partition1));
+            assertEquals(storeTopic1LoadedCheckpoint, (long) 
changeLogOffsets.get(partition1));
             assertEquals(-1L, (long) changeLogOffsets.get(partition2));
             assertEquals(-1L, (long) changeLogOffsets.get(partition3));
 
@@ -365,8 +367,7 @@ public class ProcessorStateManagerTest {
 
         // the checkpoint file should contain an offset from the persistent 
store only.
         final Map<TopicPartition, Long> checkpointedOffsets = 
checkpoint.read();
-        assertEquals(1, checkpointedOffsets.size());
-        assertEquals(new Long(124), checkpointedOffsets.get(new 
TopicPartition(persistentStoreTopicName, 1)));
+        assertThat(checkpointedOffsets, is(singletonMap(new 
TopicPartition(persistentStoreTopicName, 1), 124L)));
     }
 
     @Test
@@ -442,6 +443,123 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
+    public void shouldIgnoreIrrelevantLoadedCheckpoints() throws IOException {
+        final Map<TopicPartition, Long> offsets = mkMap(
+            mkEntry(persistentStorePartition, 99L),
+            mkEntry(new TopicPartition("ignoreme", 1234), 12L)
+        );
+        checkpoint.write(offsets);
+
+        final MockKeyValueStore persistentStore = new 
MockKeyValueStore(persistentStoreName, true);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            false,
+            stateDirectory,
+            singletonMap(persistentStoreName, 
persistentStorePartition.topic()),
+            changelogReader,
+            false,
+            logContext);
+        stateMgr.register(persistentStore, 
persistentStore.stateRestoreCallback);
+
+        
changelogReader.setRestoredOffsets(singletonMap(persistentStorePartition, 
110L));
+
+        stateMgr.checkpoint(emptyMap());
+        stateMgr.close(true);
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(singletonMap(persistentStorePartition, 
110L)));
+    }
+
+    @Test
+    public void shouldOverrideLoadedCheckpointsWithRestoredCheckpoints() 
throws IOException {
+        final Map<TopicPartition, Long> offsets = 
singletonMap(persistentStorePartition, 99L);
+        checkpoint.write(offsets);
+
+        final MockKeyValueStore persistentStore = new 
MockKeyValueStore(persistentStoreName, true);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            false,
+            stateDirectory,
+            singletonMap(persistentStoreName, 
persistentStorePartition.topic()),
+            changelogReader,
+            false,
+            logContext);
+        stateMgr.register(persistentStore, 
persistentStore.stateRestoreCallback);
+
+        
changelogReader.setRestoredOffsets(singletonMap(persistentStorePartition, 
110L));
+
+        stateMgr.checkpoint(emptyMap());
+        stateMgr.close(true);
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(singletonMap(persistentStorePartition, 
110L)));
+    }
+
+    @Test
+    public void shouldIgnoreIrrelevantRestoredCheckpoints() throws IOException 
{
+        final Map<TopicPartition, Long> offsets = 
singletonMap(persistentStorePartition, 99L);
+        checkpoint.write(offsets);
+
+        final MockKeyValueStore persistentStore = new 
MockKeyValueStore(persistentStoreName, true);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            false,
+            stateDirectory,
+            singletonMap(persistentStoreName, 
persistentStorePartition.topic()),
+            changelogReader,
+            false,
+            logContext);
+        stateMgr.register(persistentStore, 
persistentStore.stateRestoreCallback);
+
+        // should ignore irrelevant topic partitions
+        changelogReader.setRestoredOffsets(mkMap(
+            mkEntry(persistentStorePartition, 110L),
+            mkEntry(new TopicPartition("sillytopic", 5000), 1234L)
+        ));
+
+        stateMgr.checkpoint(emptyMap());
+        stateMgr.close(true);
+        final Map<TopicPartition, Long> read = checkpoint.read();
+        assertThat(read, equalTo(singletonMap(persistentStorePartition, 
110L)));
+    }
+
+    @Test
+    public void shouldOverrideRestoredOffsetsWithProcessedOffsets() throws 
IOException {
+        final Map<TopicPartition, Long> offsets = 
singletonMap(persistentStorePartition, 99L);
+        checkpoint.write(offsets);
+
+        final MockKeyValueStore persistentStore = new 
MockKeyValueStore(persistentStoreName, true);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            false,
+            stateDirectory,
+            singletonMap(persistentStoreName, 
persistentStorePartition.topic()),
+            changelogReader,
+            false,
+            logContext);
+        stateMgr.register(persistentStore, 
persistentStore.stateRestoreCallback);
+
+        // should ignore irrelevant topic partitions
+        changelogReader.setRestoredOffsets(mkMap(
+            mkEntry(persistentStorePartition, 110L),
+            mkEntry(new TopicPartition("sillytopic", 5000), 1234L)
+        ));
+
+        // should ignore irrelevant topic partitions
+        stateMgr.checkpoint(mkMap(
+            mkEntry(persistentStorePartition, 220L),
+            mkEntry(new TopicPartition("ignoreme", 42), 9000L)
+        ));
+        stateMgr.close(true);
+        final Map<TopicPartition, Long> read = checkpoint.read();
+
+        // the checkpoint gets incremented to be the log position _after_ the 
committed offset
+        assertThat(read, equalTo(singletonMap(persistentStorePartition, 
221L)));
+    }
+
+    @Test
     public void shouldWriteCheckpointForPersistentLogEnabledStore() throws 
IOException {
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             taskId,
@@ -540,7 +658,7 @@ public class ProcessorStateManagerTest {
             logContext);
 
         try {
-            stateManager.register(new 
MockKeyValueStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), null);
+            stateManager.register(new 
MockKeyValueStore(StateManagerUtil.CHECKPOINT_FILE_NAME, true), null);
             fail("should have thrown illegal argument exception when store 
name same as checkpoint file");
         } catch (final IllegalArgumentException e) {
             //pass
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 6e7655a..9e6cdf4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -372,7 +372,7 @@ public class StandbyTaskTest {
         task.close(true, false);
 
         final File taskDir = stateDirectory.directoryForTask(taskId);
-        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(taskDir, StateManagerUtil.CHECKPOINT_FILE_NAME));
         final Map<TopicPartition, Long> offsets = checkpoint.read();
 
         assertEquals(1, offsets.size());
@@ -559,7 +559,7 @@ public class StandbyTaskTest {
         task.commit();
 
         final Map<TopicPartition, Long> checkpoint = new OffsetCheckpoint(
-            new File(stateDirectory.directoryForTask(taskId), 
ProcessorStateManager.CHECKPOINT_FILE_NAME)
+            new File(stateDirectory.directoryForTask(taskId), 
StateManagerUtil.CHECKPOINT_FILE_NAME)
         ).read();
         assertThat(checkpoint, 
equalTo(Collections.singletonMap(globalTopicPartition, 51L)));
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index ccd94de..e4b6bd1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -864,7 +864,7 @@ public class StreamTaskTest {
         task.initializeTopology();
         task.commit();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(
-            new File(stateDirectory.directoryForTask(taskId00), 
ProcessorStateManager.CHECKPOINT_FILE_NAME)
+            new File(stateDirectory.directoryForTask(taskId00), 
StateManagerUtil.CHECKPOINT_FILE_NAME)
         );
 
         assertThat(checkpoint.read(), 
equalTo(Collections.singletonMap(changelogPartition, offset)));
@@ -878,7 +878,7 @@ public class StreamTaskTest {
         task.commit();
         final File checkpointFile = new File(
             stateDirectory.directoryForTask(taskId00),
-            ProcessorStateManager.CHECKPOINT_FILE_NAME
+            StateManagerUtil.CHECKPOINT_FILE_NAME
         );
 
         assertFalse(checkpointFile.exists());
@@ -1481,6 +1481,8 @@ public class StreamTaskTest {
     }
 
     private StreamTask createStatefulTask(final StreamsConfig config, final 
boolean logged) {
+        final StateStore stateStore = new MockKeyValueStore(storeName, logged);
+
         final ProcessorTopology topology = ProcessorTopologyFactories.with(
             asList(source1, source2),
             mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index f3d151c..f0a8fa2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -88,7 +88,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static 
org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME;
+import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static 
org.apache.kafka.streams.processor.internals.StreamThread.getSharedAdminClientId;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index fcf275b..7d7d4e6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -218,9 +218,9 @@ public class TaskManagerTest {
                                                 testFolder.newFolder("1_1"),
                                                 
testFolder.newFolder("dummy")).toArray(new File[0]);
 
-        assertTrue((new File(taskFolders[0], 
ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile());
-        assertTrue((new File(taskFolders[1], 
ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile());
-        assertTrue((new File(taskFolders[3], 
ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile());
+        assertTrue((new File(taskFolders[0], 
StateManagerUtil.CHECKPOINT_FILE_NAME)).createNewFile());
+        assertTrue((new File(taskFolders[1], 
StateManagerUtil.CHECKPOINT_FILE_NAME)).createNewFile());
+        assertTrue((new File(taskFolders[3], 
StateManagerUtil.CHECKPOINT_FILE_NAME)).createNewFile());
 
         
expect(activeTaskCreator.stateDirectory()).andReturn(stateDirectory).once();
         
expect(stateDirectory.listTaskDirectories()).andReturn(taskFolders).once();

Reply via email to