Thanks Sameer, yes this looks like a bug. Can you file a JIRA?
On Mon, 4 Sep 2017 at 12:23 Sameer Kumar wrote:
> Hi,
>
> I am using InMemoryStore along with GlobalKTable. I came to realize that I
> was losing on data once I restart my stream application while it was
> consuming data from kafka topic since it would always start with last saved
> checkpoint. This shall work fine with RocksDB it being a persistent store.
> for in memory store it should be consume from beginning.
>
> Debugging it further, I looked at the code for GlobalStateManagerImpl(this
> one works for GlobalKTable) and was comparing the same with
> ProcessorStateManagerImpl(this one works for KTable).
>
> In ProcessorStateManagerImpl.checkpoint, we have added the check for when
> state store being persistent before writing the checkpoints, the same check
> is not there in GlobalStateManagerImpl.checkpoint method. Do you think the
> same check needs to be added for GlobalStateManagerImpl.
>
> public void checkpoint(final Map ackedOffsets) {
> log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets);
> checkpointedOffsets.putAll(changelogReader.restoredOffsets());
> for (final Map.Entry entry : stores.entrySet())
> {
> final String storeName = entry.getKey();
> // only checkpoint the offset to the offsets file if
> // it is persistent AND changelog enabled
> *if (entry.getValue().persistent() &&
> storeToChangelogTopic.containsKey(storeName)) {*
> final String changelogTopic = storeToChangelogTopic.get(
> storeName);
> final TopicPartition topicPartition = new
> TopicPartition(changelogTopic, getPartition(storeName));
> if (ackedOffsets.containsKey(topicPartition)) {
> // store the last offset + 1 (the log position after
> restoration)
> checkpointedOffsets.put(topicPartition,
> ackedOffsets.get(topicPartition) + 1);
> } else if (restoredOffsets.containsKey(topicPartition)) {
> checkpointedOffsets.put(topicPartition,
> restoredOffsets.get(topicPartition));
> }
> }
> }
> // write the checkpoint file before closing, to indicate clean
> shutdown
> try {
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir,
> CHECKPOINT_FILE_NAME));
> }
> checkpoint.write(checkpointedOffsets);
> } catch (final IOException e) {
> log.warn("Failed to write checkpoint file to {}:", new
> File(baseDir, CHECKPOINT_FILE_NAME), e);
> }
> }
>
> Regards,
> -Sameer.
>