This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 0.11.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push: new a8e48b3 KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (#5219) a8e48b3 is described below commit a8e48b3f95e032ecce4036e00165c34f331b5f31 Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Thu Jun 14 04:44:09 2018 -0700 KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (#5219) --- .../internals/GlobalStateManagerImpl.java | 24 +++++++++++++++++-- .../integration/GlobalKTableIntegrationTest.java | 27 +++++++++++++++++++++- .../internals/GlobalStateManagerImplTest.java | 19 +++++++++++++-- .../org/apache/kafka/test/NoOpReadOnlyStore.java | 11 +++++++-- 4 files changed, 74 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 1e2e5ff..73999c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -59,6 +59,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { private final OffsetCheckpoint checkpoint; private final Set<String> globalStoreNames = new HashSet<>(); private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>(); + private final Set<String> globalNonPersistentStoresTopics = new HashSet<>(); public GlobalStateManagerImpl(final ProcessorTopology topology, final Consumer<byte[], byte[]> consumer, @@ -68,6 +69,14 @@ public class GlobalStateManagerImpl implements GlobalStateManager { this.stateDirectory = stateDirectory; this.baseDir = stateDirectory.globalStateDir(); this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); + + // Find non persistent store's topics + final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic(); + for (final StateStore store : topology.globalStateStores()) { + if (!store.persistent()) { + globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); + } + } } @Override @@ -230,9 +239,20 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @Override public void checkpoint(final Map<TopicPartition, Long> offsets) { checkpointableOffsets.putAll(offsets); - if (!checkpointableOffsets.isEmpty()) { + + final Map<TopicPartition, Long> filteredOffsets = new HashMap<>(); + + // Skip non persistent store + for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) { + final String topic = topicPartitionOffset.getKey().topic(); + if (!globalNonPersistentStoresTopics.contains(topic)) { + filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); + } + } + + if (!filteredOffsets.isEmpty()) { try { - checkpoint.write(checkpointableOffsets); + checkpoint.write(filteredOffsets); } catch (IOException e) { log.warn("failed to write offsets checkpoint for global stores", e); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 9a849f5..b3df990 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; @@ -51,6 +52,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThat; + @Category({IntegrationTest.class}) public class GlobalKTableIntegrationTest { private static final int NUM_BROKERS = 1; @@ -217,7 +221,28 @@ public class GlobalKTableIntegrationTest { } }, 30000L, "waiting for final values"); } - + + @Test + public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception { + builder = new KStreamBuilder(); + globalTable = builder.globalTable( + Serdes.Long(), + Serdes.String(), + globalOne, + new InMemoryKeyValueStoreSupplier<>(globalStore, Serdes.Long(), Serdes.String(), false, null)); + + produceInitialGlobalTableValues(); + + startStreams(); + ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore()); + assertThat(store.approximateNumEntries(), equalTo(4L)); + kafkaStreams.close(); + + startStreams(); + store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore()); + assertThat(store.approximateNumEntries(), equalTo(4L)); + } + private void createTopics() throws InterruptedException { inputStream = "input-stream-" + testNo; inputTable = "input-table-" + testNo; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 98ef8f6a..b4b2cc5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -63,12 +63,14 @@ public class GlobalStateManagerImplTest { private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); private final TopicPartition t1 = new TopicPartition("t1", 1); private final TopicPartition t2 = new TopicPartition("t2", 1); + private final TopicPartition t3 = new TopicPartition("t3", 1); private GlobalStateManagerImpl stateManager; private NoOpProcessorContext context; private StateDirectory stateDirectory; private String stateDirPath; private NoOpReadOnlyStore<Object, Object> store1; private NoOpReadOnlyStore store2; + private NoOpReadOnlyStore store3; private MockConsumer<byte[], byte[]> consumer; private File checkpointFile; private ProcessorTopology topology; @@ -78,18 +80,21 @@ public class GlobalStateManagerImplTest { final Map<String, String> storeToTopic = new HashMap<>(); storeToTopic.put("t1-store", "t1"); storeToTopic.put("t2-store", "t2"); + storeToTopic.put("t3-store", "t3"); final Map<StateStore, ProcessorNode> storeToProcessorNode = new HashMap<>(); store1 = new NoOpReadOnlyStore<>("t1-store"); storeToProcessorNode.put(store1, new MockProcessorNode(-1)); store2 = new NoOpReadOnlyStore("t2-store"); storeToProcessorNode.put(store2, new MockProcessorNode(-1)); + store3 = new NoOpReadOnlyStore("t3-store", false); + storeToProcessorNode.put(store2, new MockProcessorNode(-1)); topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(), Collections.<String, SourceNode>emptyMap(), Collections.<String, SinkNode>emptyMap(), Collections.<StateStore>emptyList(), storeToTopic, - Arrays.<StateStore>asList(store1, store2)); + Arrays.<StateStore>asList(store1, store2, store3)); context = new NoOpProcessorContext(); stateDirPath = TestUtils.tempDirectory().getPath(); @@ -153,7 +158,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldReturnInitializedStoreNames() throws Exception { final Set<String> storeNames = stateManager.initialize(context); - assertEquals(Utils.mkSet(store1.name(), store2.name()), storeNames); + assertEquals(Utils.mkSet(store1.name(), store2.name(), store3.name()), storeNames); } @Test @@ -439,6 +444,16 @@ public class GlobalStateManagerImplTest { assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap)); } + @Test + public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { + stateManager.initialize(context); + initializeConsumer(10, 1, t3); + stateManager.register(store3, false, stateRestoreCallback); + stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + + assertThat(readOffsetsCheckpoint(), equalTo(Collections.<TopicPartition, Long>emptyMap())); + } + private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME)); diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index 0ada2e4..892a4dc 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -25,17 +25,24 @@ public class NoOpReadOnlyStore<K, V> implements ReadOnlyKeyValueStore<K, V>, StateStore { private final String name; + private final boolean persistent; private boolean open = true; public boolean initialized; public boolean flushed; public NoOpReadOnlyStore() { - this(""); + this("", true); } public NoOpReadOnlyStore(final String name) { + this(name, true); + } + + public NoOpReadOnlyStore(final String name, + final boolean persistent) { this.name = name; + this.persistent = persistent; } @Override @@ -80,7 +87,7 @@ public class NoOpReadOnlyStore<K, V> @Override public boolean persistent() { - return false; + return persistent; } @Override -- To stop receiving notification emails like this one, please contact mj...@apache.org.