[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513124#comment-16513124 ]
ASF GitHub Bot commented on KAFKA-6711: --------------------------------------- mjsax closed pull request #4782: KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-mem… URL: https://github.com/apache/kafka/pull/4782 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 56e6bed0850..17ae70ce0d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -41,6 +41,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -242,7 +243,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); final Long checkpoint = checkpointableOffsets.get(topicPartition); - if (checkpoint != null) { + if (checkpoint != null && checkpoint > StateRestorer.NO_CHECKPOINT) { globalConsumer.seek(topicPartition, checkpoint); } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); @@ -334,10 +335,33 @@ public void close(final Map<TopicPartition, Long> offsets) throws IOException { @Override public void checkpoint(final Map<TopicPartition, Long> offsets) { + + // Find non persistent store's topics + final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic(); + final Set<String> globalNonPersistentStoresTopics = new HashSet<>(); + for (final StateStore store : topology.globalStateStores()) { + if (!store.persistent() && storeToChangelogTopic.containsKey(store.name())) { + globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); + } + } + checkpointableOffsets.putAll(offsets); - if (!checkpointableOffsets.isEmpty()) { + + final Map<TopicPartition, Long> filteredOffsets = new HashMap<>(); + + // Skip non persistent store + for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) { + final String topic = topicPartitionOffset.getKey().topic(); + if (globalNonPersistentStoresTopics.contains(topic)) { + filteredOffsets.put(topicPartitionOffset.getKey(), (long) StateRestorer.NO_CHECKPOINT); + } else { + filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); + } + } + + if (!filteredOffsets.isEmpty()) { try { - checkpoint.write(checkpointableOffsets); + checkpoint.write(filteredOffsets); } catch (IOException e) { log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index df8d2010d24..39b7bb4747f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -488,6 +488,16 @@ public void shouldCheckpointRestoredOffsetsToFile() throws IOException { assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap)); } + @Test + public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { + stateManager.initialize(); + initializeConsumer(10, 1, t3); + stateManager.register(store3, stateRestoreCallback); + stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + + assertThat(readOffsetsCheckpoint(), equalTo(Collections.singletonMap(t3, (long) StateRestorer.NO_CHECKPOINT))); + } + private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME)); diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index ae46b8dadaa..08945d5047a 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -95,7 +95,7 @@ public void close() { @Override public boolean persistent() { - return false; + return rocksdbStore; } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -------------------------------------------------------------------------------------- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.1 > Reporter: Cemalettin Koç > Assignee: Cemalettin Koç > Priority: Major > Labels: newbie > Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)