Re: Potential Bug | GlobalStateManager checkpoint

2017-09-04 Thread Damian Guy
Thanks Sameer, yes this looks like a bug. Can you file a JIRA?

On Mon, 4 Sep 2017 at 12:23 Sameer Kumar  wrote:

> Hi,
>
> I am using InMemoryStore along with GlobalKTable. I came to realize that I
> was losing on data once I restart my stream application while it was
> consuming data from kafka topic since it would always start with last saved
> checkpoint. This shall work fine with RocksDB it being a persistent store.
> for in memory store it should be consume from beginning.
>
> Debugging it further, I looked at the code for GlobalStateManagerImpl(this
> one works for GlobalKTable) and was comparing the same with
> ProcessorStateManagerImpl(this one works for KTable).
>
> In ProcessorStateManagerImpl.checkpoint, we have added the check for when
> state store being persistent before writing the checkpoints, the same check
> is not there in GlobalStateManagerImpl.checkpoint method. Do you think the
> same check needs to be added for GlobalStateManagerImpl.
>
>   public void checkpoint(final Map ackedOffsets) {
> log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets);
> checkpointedOffsets.putAll(changelogReader.restoredOffsets());
> for (final Map.Entry entry : stores.entrySet())
> {
> final String storeName = entry.getKey();
> // only checkpoint the offset to the offsets file if
> // it is persistent AND changelog enabled
> *if (entry.getValue().persistent() &&
> storeToChangelogTopic.containsKey(storeName)) {*
> final String changelogTopic = storeToChangelogTopic.get(
> storeName);
> final TopicPartition topicPartition = new
> TopicPartition(changelogTopic, getPartition(storeName));
> if (ackedOffsets.containsKey(topicPartition)) {
> // store the last offset + 1 (the log position after
> restoration)
> checkpointedOffsets.put(topicPartition,
> ackedOffsets.get(topicPartition) + 1);
> } else if (restoredOffsets.containsKey(topicPartition)) {
> checkpointedOffsets.put(topicPartition,
> restoredOffsets.get(topicPartition));
> }
> }
> }
> // write the checkpoint file before closing, to indicate clean
> shutdown
> try {
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir,
> CHECKPOINT_FILE_NAME));
> }
> checkpoint.write(checkpointedOffsets);
> } catch (final IOException e) {
> log.warn("Failed to write checkpoint file to {}:", new
> File(baseDir, CHECKPOINT_FILE_NAME), e);
> }
> }
>
> Regards,
> -Sameer.
>


Potential Bug | GlobalStateManager checkpoint

2017-09-04 Thread Sameer Kumar
Hi,

I am using InMemoryStore along with GlobalKTable. I came to realize that I
was losing on data once I restart my stream application while it was
consuming data from kafka topic since it would always start with last saved
checkpoint. This shall work fine with RocksDB it being a persistent store.
for in memory store it should be consume from beginning.

Debugging it further, I looked at the code for GlobalStateManagerImpl(this
one works for GlobalKTable) and was comparing the same with
ProcessorStateManagerImpl(this one works for KTable).

In ProcessorStateManagerImpl.checkpoint, we have added the check for when
state store being persistent before writing the checkpoints, the same check
is not there in GlobalStateManagerImpl.checkpoint method. Do you think the
same check needs to be added for GlobalStateManagerImpl.

  public void checkpoint(final Map ackedOffsets) {
log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets);
checkpointedOffsets.putAll(changelogReader.restoredOffsets());
for (final Map.Entry entry : stores.entrySet())
{
final String storeName = entry.getKey();
// only checkpoint the offset to the offsets file if
// it is persistent AND changelog enabled
*if (entry.getValue().persistent() &&
storeToChangelogTopic.containsKey(storeName)) {*
final String changelogTopic = storeToChangelogTopic.get(
storeName);
final TopicPartition topicPartition = new
TopicPartition(changelogTopic, getPartition(storeName));
if (ackedOffsets.containsKey(topicPartition)) {
// store the last offset + 1 (the log position after
restoration)
checkpointedOffsets.put(topicPartition,
ackedOffsets.get(topicPartition) + 1);
} else if (restoredOffsets.containsKey(topicPartition)) {
checkpointedOffsets.put(topicPartition,
restoredOffsets.get(topicPartition));
}
}
}
// write the checkpoint file before closing, to indicate clean
shutdown
try {
if (checkpoint == null) {
checkpoint = new OffsetCheckpoint(new File(baseDir,
CHECKPOINT_FILE_NAME));
}
checkpoint.write(checkpointedOffsets);
} catch (final IOException e) {
log.warn("Failed to write checkpoint file to {}:", new
File(baseDir, CHECKPOINT_FILE_NAME), e);
}
}

Regards,
-Sameer.