[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512359#comment-16512359
 ] 

ASF GitHub Bot commented on KAFKA-6711:
---------------------------------------

rajinisivaram closed pull request #5219: KAFKA-6711: GlobalStateManagerImpl 
should not write offsets of in-memory stores in checkpoint file
URL: https://github.com/apache/kafka/pull/5219
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 78c4a363f29..a4ec23d4c49 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -42,6 +42,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -62,6 +63,7 @@
     private final int retries;
     private final long retryBackoffMs;
     private final Duration pollTime;
+    private final Set<String> globalNonPersistentStoresTopics = new 
HashSet<>();
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
@@ -71,6 +73,14 @@ public GlobalStateManagerImpl(final LogContext logContext,
                                   final StreamsConfig config) {
         super(stateDirectory.globalStateDir(), 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
 
+        // Find non persistent store's topics
+        final Map<String, String> storeToChangelogTopic = 
topology.storeToChangelogTopic();
+        for (final StateStore store : topology.globalStateStores()) {
+            if (!store.persistent()) {
+                
globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+            }
+        }
+
         this.log = logContext.logger(GlobalStateManagerImpl.class);
         this.topology = topology;
         this.globalConsumer = globalConsumer;
@@ -337,13 +347,22 @@ public void close(final Map<TopicPartition, Long> 
offsets) throws IOException {
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
         checkpointableOffsets.putAll(offsets);
-        if (!checkpointableOffsets.isEmpty()) {
-            try {
-                checkpoint.write(checkpointableOffsets);
-            } catch (final IOException e) {
-                log.warn("Failed to write offset checkpoint file to {} for 
global stores: {}", checkpoint, e);
+
+        final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
+
+        // Skip non persistent store
+        for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : 
checkpointableOffsets.entrySet()) {
+            final String topic = topicPartitionOffset.getKey().topic();
+            if (!globalNonPersistentStoresTopics.contains(topic)) {
+                filteredOffsets.put(topicPartitionOffset.getKey(), 
topicPartitionOffset.getValue());
             }
         }
+
+        try {
+            checkpoint.write(filteredOffsets);
+        } catch (final IOException e) {
+            log.warn("Failed to write offset checkpoint file to {} for global 
stores: {}", checkpoint, e);
+        }
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 900e65276ee..013e2b68110 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -22,13 +22,13 @@
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
@@ -38,6 +38,7 @@
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -53,6 +54,9 @@
 import java.util.Map;
 import java.util.Properties;
 
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
@@ -220,7 +224,27 @@ public boolean conditionMet() {
             }
         }, 30000L, "waiting for final values");
     }
-    
+
+    @Test
+    public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
+        builder = new StreamsBuilder();
+        globalTable = builder.globalTable(
+            globalTableTopic,
+            Consumed.with(Serdes.Long(), Serdes.String()),
+            Materialized.as(Stores.inMemoryKeyValueStore(globalStore)));
+
+        produceInitialGlobalTableValues();
+
+        startStreams();
+        ReadOnlyKeyValueStore<Long, String> store = 
kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
+        assertThat(store.approximateNumEntries(), equalTo(4L));
+        kafkaStreams.close();
+
+        startStreams();
+        store = kafkaStreams.store(globalStore, 
QueryableStoreTypes.keyValueStore());
+        assertThat(store.approximateNumEntries(), equalTo(4L));
+    }
+
     private void createTopics() throws InterruptedException {
         streamTopic = "stream-" + testNo;
         globalTableTopic = "globalTable-" + testNo;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 2ca9c211c1c..e37f6a63243 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -488,6 +488,16 @@ public void shouldCheckpointRestoredOffsetsToFile() throws 
IOException {
         assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
     }
 
+    @Test
+    public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws 
IOException {
+        stateManager.initialize();
+        initializeConsumer(10, 1, t3);
+        stateManager.register(store3, stateRestoreCallback);
+        stateManager.close(Collections.emptyMap());
+
+        assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap()));
+    }
+
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws 
IOException {
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new 
File(stateManager.baseDir(),
                                                                                
 ProcessorStateManager.CHECKPOINT_FILE_NAME));
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java 
b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index ae46b8dadaa..08945d5047a 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -95,7 +95,7 @@ public void close() {
 
     @Override
     public boolean persistent() {
-        return false;
+        return rocksdbStore;
     }
 
     @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6711
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6711
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.1
>            Reporter: Cemalettin Koç
>            Assignee: Cemalettin Koç
>            Priority: Major
>              Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to