Thanks Sameer, yes this looks like a bug. Can you file a JIRA? On Mon, 4 Sep 2017 at 12:23 Sameer Kumar <sam.kum.w...@gmail.com> wrote:
> Hi, > > I am using InMemoryStore along with GlobalKTable. I came to realize that I > was losing on data once I restart my stream application while it was > consuming data from kafka topic since it would always start with last saved > checkpoint. This shall work fine with RocksDB it being a persistent store. > for in memory store it should be consume from beginning. > > Debugging it further, I looked at the code for GlobalStateManagerImpl(this > one works for GlobalKTable) and was comparing the same with > ProcessorStateManagerImpl(this one works for KTable). > > In ProcessorStateManagerImpl.checkpoint, we have added the check for when > state store being persistent before writing the checkpoints, the same check > is not there in GlobalStateManagerImpl.checkpoint method. Do you think the > same check needs to be added for GlobalStateManagerImpl. > > public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) { > log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets); > checkpointedOffsets.putAll(changelogReader.restoredOffsets()); > for (final Map.Entry<String, StateStore> entry : stores.entrySet()) > { > final String storeName = entry.getKey(); > // only checkpoint the offset to the offsets file if > // it is persistent AND changelog enabled > * if (entry.getValue().persistent() && > storeToChangelogTopic.containsKey(storeName)) {* > final String changelogTopic = storeToChangelogTopic.get( > storeName); > final TopicPartition topicPartition = new > TopicPartition(changelogTopic, getPartition(storeName)); > if (ackedOffsets.containsKey(topicPartition)) { > // store the last offset + 1 (the log position after > restoration) > checkpointedOffsets.put(topicPartition, > ackedOffsets.get(topicPartition) + 1); > } else if (restoredOffsets.containsKey(topicPartition)) { > checkpointedOffsets.put(topicPartition, > restoredOffsets.get(topicPartition)); > } > } > } > // write the checkpoint file before closing, to indicate clean > shutdown > try { > if (checkpoint == null) { > checkpoint = new OffsetCheckpoint(new File(baseDir, > CHECKPOINT_FILE_NAME)); > } > checkpoint.write(checkpointedOffsets); > } catch (final IOException e) { > log.warn("Failed to write checkpoint file to {}:", new > File(baseDir, CHECKPOINT_FILE_NAME), e); > } > } > > Regards, > -Sameer. >