Repository: kafka Updated Branches: refs/heads/trunk 667cd60dc -> 9cbb9f093
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java new file mode 100644 index 0000000..a0500b6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; + +import static org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS; + +public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier { + private final String name; + private final long retentionPeriod; + private final int segments; + private final long windowSize; + private final boolean retainDuplicates; + + public RocksDbWindowBytesStoreSupplier(final String name, + final long retentionPeriod, + final int segments, + final long windowSize, + final boolean retainDuplicates) { + if (segments < MIN_SEGMENTS) { + throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS); + } + this.name = name; + this.retentionPeriod = retentionPeriod; + this.segments = segments; + this.windowSize = windowSize; + this.retainDuplicates = retainDuplicates; + } + + @Override + public String name() { + return name; + } + + @Override + public WindowStore<Bytes, byte[]> get() { + final RocksDBSegmentedBytesStore segmentedBytesStore = new RocksDBSegmentedBytesStore( + name, + retentionPeriod, + segments, + new WindowKeySchema() + ); + return RocksDBWindowStore.bytesStore(segmentedBytesStore, + retainDuplicates, + windowSize); + + } + + @Override + public String metricsScope() { + return "rocksdb-window"; + } + + @Override + public int segments() { + return segments; + } + + @Override + public long windowSize() { + return windowSize; + } + + @Override + public boolean retainDuplicates() { + return retainDuplicates; + } + + @Override + public long retentionPeriod() { + return retentionPeriod; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java new file mode 100644 index 0000000..61919c3 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; + + +public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, SessionStore<K, V>> { + + private final SessionBytesStoreSupplier storeSupplier; + + public SessionStoreBuilder(final SessionBytesStoreSupplier storeSupplier, + final Serde<K> keySerde, + final Serde<V> valueSerde, + final Time time) { + super(storeSupplier.name(), keySerde, valueSerde, time); + this.storeSupplier = storeSupplier; + } + + @Override + public SessionStore<K, V> build() { + return new MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())), + storeSupplier.metricsScope(), + keySerde, + valueSerde, + time); + } + + private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, byte[]> inner) { + if (!enableCaching) { + return inner; + } + return new CachingSessionStore<>(inner, + keySerde, + valueSerde, + storeSupplier.segmentIntervalMs()); + } + + private SessionStore<Bytes, byte[]> maybeWrapLogging(final SessionStore<Bytes, byte[]> inner) { + if (!enableLogging) { + return inner; + } + return new ChangeLoggingSessionBytesStore(inner); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java new file mode 100644 index 0000000..97b4883 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; + + +public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> { + + private final WindowBytesStoreSupplier storeSupplier; + + public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier, + final Serde<K> keySerde, + final Serde<V> valueSerde, + final Time time) { + super(storeSupplier.name(), keySerde, valueSerde, time); + this.storeSupplier = storeSupplier; + } + + @Override + public WindowStore<K, V> build() { + return new MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())), + storeSupplier.metricsScope(), + time, + keySerde, + valueSerde); + } + + private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) { + if (!enableCaching) { + return inner; + } + return new CachingWindowStore<>(inner, + keySerde, + valueSerde, + storeSupplier.windowSize(), + storeSupplier.segments()); + } + + private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) { + if (!enableLogging) { + return inner; + } + return new ChangeLoggingWindowBytesStore(inner, storeSupplier.retainDuplicates()); + } + + public long retentionPeriod() { + return storeSupplier.retentionPeriod(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 12947d8..dbdc854 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -23,10 +23,12 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.ProcessorTopologyTestDriver; +import org.easymock.EasyMock; import org.junit.Test; import java.util.Arrays; @@ -42,6 +44,8 @@ import static org.junit.Assert.fail; public class TopologyTest { + private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class); + private final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class); private final Topology topology = new Topology(); private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription(); @@ -203,38 +207,52 @@ public class TopologyTest { @Test(expected = TopologyException.class) public void shouldNotAllowToAddStateStoreToNonExistingProcessor() { - topology.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor"); + mockStoreBuilder(); + EasyMock.replay(storeBuilder); + topology.addStateStore(storeBuilder, "no-such-processsor"); } @Test public void shouldNotAllowToAddStateStoreToSource() { + mockStoreBuilder(); + EasyMock.replay(storeBuilder); topology.addSource("source-1", "topic-1"); try { - topology.addStateStore(new MockStateStoreSupplier("store", false), "source-1"); + topology.addStateStore(storeBuilder, "source-1"); fail("Should have thrown TopologyException for adding store to source node"); } catch (final TopologyException expected) { } } @Test public void shouldNotAllowToAddStateStoreToSink() { + mockStoreBuilder(); + EasyMock.replay(storeBuilder); topology.addSink("sink-1", "topic-1"); try { - topology.addStateStore(new MockStateStoreSupplier("store", false), "sink-1"); + topology.addStateStore(storeBuilder, "sink-1"); fail("Should have thrown TopologyException for adding store to sink node"); } catch (final TopologyException expected) { } } + private void mockStoreBuilder() { + EasyMock.expect(storeBuilder.name()).andReturn("store").anyTimes(); + EasyMock.expect(storeBuilder.logConfig()).andReturn(Collections.emptyMap()); + EasyMock.expect(storeBuilder.loggingEnabled()).andReturn(false); + } + @Test public void shouldNotAllowToAddStoreWithSameName() { - topology.addStateStore(new MockStateStoreSupplier("store", false)); + mockStoreBuilder(); + EasyMock.replay(storeBuilder); + topology.addStateStore(storeBuilder); try { - topology.addStateStore(new MockStateStoreSupplier("store", false)); + topology.addStateStore(storeBuilder); fail("Should have thrown TopologyException for duplicate store name"); } catch (final TopologyException expected) { } } @Test(expected = TopologyBuilderException.class) - public void shouldThroughOnUnassignedStateStoreAccess() throws Exception { + public void shouldThrowOnUnassignedStateStoreAccess() throws Exception { final String sourceNodeName = "source"; final String goodNodeName = "goodGuy"; final String badNodeName = "badGuy"; @@ -243,12 +261,14 @@ public class TopologyTest { config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); final StreamsConfig streamsConfig = new StreamsConfig(config); - + mockStoreBuilder(); + EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStoreSupplier.MockStateStore("store", false)); + EasyMock.replay(storeBuilder); topology .addSource(sourceNodeName, "topic") .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName) .addStateStore( - Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), + storeBuilder, goodNodeName) .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); @@ -292,8 +312,10 @@ public class TopologyTest { @Test(expected = TopologyException.class) public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { + EasyMock.expect(globalStoreBuilder.name()).andReturn("anyName").anyTimes(); + EasyMock.replay(globalStoreBuilder); topology.addGlobalStore( - new MockStateStoreSupplier("anyName", false, false), + globalStoreBuilder, "sameName", null, null, @@ -611,7 +633,10 @@ public class TopologyTest { topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames); if (newStores) { for (final String store : storeNames) { - topology.addStateStore(new MockStateStoreSupplier(store, false), processorName); + final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class); + EasyMock.expect(storeBuilder.name()).andReturn(store).anyTimes(); + EasyMock.replay(storeBuilder); + topology.addStateStore(storeBuilder, processorName); } } else { topology.connectProcessorAndStateStores(processorName, storeNames); @@ -651,8 +676,11 @@ public class TopologyTest { final String sourceName, final String globalTopicName, final String processorName) { + final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class); + EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes(); + EasyMock.replay(globalStoreBuilder); topology.addGlobalStore( - new MockStateStoreSupplier(globalStoreName, false, false), + globalStoreBuilder, sourceName, null, null, http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 9bd8756..91edac5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -470,7 +470,7 @@ public class InternalTopologyBuilderTest { @Test(expected = NullPointerException.class) public void shouldNotAddNullStateStoreSupplier() throws Exception { - builder.addStateStore(null); + builder.addStateStore((StateStoreSupplier) null); } private Set<String> nodeNames(final Collection<ProcessorNode> nodes) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java index 66adbf5..700b243 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java @@ -18,11 +18,20 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache; +import org.apache.kafka.streams.state.internals.RocksDBSessionStore; +import org.apache.kafka.streams.state.internals.RocksDBStore; +import org.apache.kafka.streams.state.internals.RocksDBWindowStore; import org.junit.Test; import java.util.Collections; import java.util.Map; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -30,6 +39,7 @@ import static org.junit.Assert.fail; public class StoresTest { + @SuppressWarnings("deprecation") @Test public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() throws Exception { final StateStoreSupplier supplier = Stores.create("store") @@ -44,6 +54,7 @@ public class StoresTest { assertEquals("1000", config.get("retention.ms")); } + @SuppressWarnings("deprecation") @Test public void shouldCreateInMemoryStoreSupplierNotLogged() throws Exception { final StateStoreSupplier supplier = Stores.create("store") @@ -56,6 +67,7 @@ public class StoresTest { assertFalse(supplier.loggingEnabled()); } + @SuppressWarnings("deprecation") @Test public void shouldCreatePersistenStoreSupplierWithLoggedConfig() throws Exception { final StateStoreSupplier supplier = Stores.create("store") @@ -70,6 +82,7 @@ public class StoresTest { assertEquals("1000", config.get("retention.ms")); } + @SuppressWarnings("deprecation") @Test public void shouldCreatePersistenStoreSupplierNotLogged() throws Exception { final StateStoreSupplier supplier = Stores.create("store") @@ -95,4 +108,53 @@ public class StoresTest { // ok } } + + @Test + public void shouldCreateInMemoryKeyValueStore() { + assertThat(Stores.inMemoryKeyValueStore("memory").get(), instanceOf(InMemoryKeyValueStore.class)); + } + + @Test + public void shouldCreateMemoryNavigableCache() { + assertThat(Stores.lruMap("map", 10).get(), instanceOf(MemoryNavigableLRUCache.class)); + } + + @Test + public void shouldCreateRocksDbStore() { + assertThat(Stores.persistentKeyValueStore("store").get(), instanceOf(RocksDBStore.class)); + } + + @Test + public void shouldCreateRocksDbWindowStore() { + assertThat(Stores.persistentWindowStore("store", 1, 3, 1, false).get(), instanceOf(RocksDBWindowStore.class)); + } + + @Test + public void shouldCreateRocksDbSessionStore() { + assertThat(Stores.persistentSessionStore("store", 1).get(), instanceOf(RocksDBSessionStore.class)); + } + + @Test + public void shouldBuildWindowStore() { + final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3, 2, 3, true), + Serdes.String(), + Serdes.String()).build(); + assertThat(store, not(nullValue())); + } + + @Test + public void shouldBuildKeyValueStore() { + final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("name"), + Serdes.String(), + Serdes.String()).build(); + assertThat(store, not(nullValue())); + } + + @Test + public void shouldBuildSessionStore() { + final SessionStore<String, String> store = Stores.sessionStoreBuilder(Stores.persistentSessionStore("name", 10), + Serdes.String(), + Serdes.String()).build(); + assertThat(store, not(nullValue())); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java new file mode 100644 index 0000000..2d378d8 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; +import org.hamcrest.CoreMatchers; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Collections; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +@RunWith(EasyMockRunner.class) +public class KeyValueStoreBuilderTest { + + @Mock(type = MockType.NICE) + private KeyValueBytesStoreSupplier supplier; + @Mock(type = MockType.NICE) + private KeyValueStore<Bytes, byte[]> inner; + private KeyValueStoreBuilder<String, String> builder; + + @Before + public void setUp() throws Exception { + EasyMock.expect(supplier.get()).andReturn(inner); + EasyMock.expect(supplier.name()).andReturn("name"); + EasyMock.replay(supplier); + builder = new KeyValueStoreBuilder<>(supplier, + Serdes.String(), + Serdes.String(), + new MockTime() + ); + + } + + @Test + public void shouldHaveMeteredStoreAsOuterStore() { + final KeyValueStore<String, String> store = builder.build(); + assertThat(store, instanceOf(MeteredKeyValueBytesStore.class)); + } + + @Test + public void shouldHaveChangeLoggingStoreByDefault() { + final KeyValueStore<String, String> store = builder.build(); + assertThat(store, instanceOf(MeteredKeyValueBytesStore.class)); + final StateStore next = ((WrappedStateStore) store).wrappedStore(); + assertThat(next, instanceOf(ChangeLoggingKeyValueBytesStore.class)); + } + + @Test + public void shouldNotHaveChangeLoggingStoreWhenDisabled() { + final KeyValueStore<String, String> store = builder.withLoggingDisabled().build(); + final StateStore next = ((WrappedStateStore) store).wrappedStore(); + assertThat(next, CoreMatchers.<StateStore>equalTo(inner)); + } + + @Test + public void shouldHaveCachingStoreWhenEnabled() { + final KeyValueStore<String, String> store = builder.withCachingEnabled().build(); + final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + assertThat(store, instanceOf(MeteredKeyValueBytesStore.class)); + assertThat(wrapped, instanceOf(CachingKeyValueStore.class)); + } + + @Test + public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() { + final KeyValueStore<String, String> store = builder + .withLoggingEnabled(Collections.<String, String>emptyMap()) + .build(); + final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + assertThat(store, instanceOf(MeteredKeyValueBytesStore.class)); + assertThat(wrapped, instanceOf(ChangeLoggingKeyValueBytesStore.class)); + assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner)); + } + + @Test + public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { + final KeyValueStore<String, String> store = builder + .withLoggingEnabled(Collections.<String, String>emptyMap()) + .withCachingEnabled() + .build(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); + final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore(); + assertThat(store, instanceOf(MeteredKeyValueBytesStore.class)); + assertThat(caching, instanceOf(CachingKeyValueStore.class)); + assertThat(changeLogging, instanceOf(ChangeLoggingKeyValueBytesStore.class)); + assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner)); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfInnerIsNull() { + new KeyValueStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfKeySerdeIsNull() { + new KeyValueStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfValueSerdeIsNull() { + new KeyValueStoreBuilder<>(supplier, Serdes.String(), null, new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfTimeIsNull() { + new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfMetricsScopeIsNull() { + new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java new file mode 100644 index 0000000..621a1c2 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; +import org.hamcrest.CoreMatchers; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Collections; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +@RunWith(EasyMockRunner.class) +public class SessionStoreBuilderTest { + + @Mock(type = MockType.NICE) + private SessionBytesStoreSupplier supplier; + @Mock(type = MockType.NICE) + private SessionStore<Bytes, byte[]> inner; + private SessionStoreBuilder<String, String> builder; + + @Before + public void setUp() throws Exception { + + EasyMock.expect(supplier.get()).andReturn(inner); + EasyMock.expect(supplier.name()).andReturn("name"); + EasyMock.replay(supplier); + + builder = new SessionStoreBuilder<>(supplier, + Serdes.String(), + Serdes.String(), + new MockTime() + ); + } + + @Test + public void shouldHaveMeteredStoreAsOuterStore() { + final SessionStore<String, String> store = builder.build(); + assertThat(store, instanceOf(MeteredSessionStore.class)); + } + + @Test + public void shouldHaveChangeLoggingStoreByDefault() { + final SessionStore<String, String> store = builder.build(); + final StateStore next = ((WrappedStateStore) store).wrappedStore(); + assertThat(next, instanceOf(ChangeLoggingSessionBytesStore.class)); + } + + @Test + public void shouldNotHaveChangeLoggingStoreWhenDisabled() { + final SessionStore<String, String> store = builder.withLoggingDisabled().build(); + final StateStore next = ((WrappedStateStore) store).wrappedStore(); + assertThat(next, CoreMatchers.<StateStore>equalTo(inner)); + } + + @Test + public void shouldHaveCachingStoreWhenEnabled() { + final SessionStore<String, String> store = builder.withCachingEnabled().build(); + final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + assertThat(store, instanceOf(MeteredSessionStore.class)); + assertThat(wrapped, instanceOf(CachingSessionStore.class)); + } + + @Test + public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() { + final SessionStore<String, String> store = builder + .withLoggingEnabled(Collections.<String, String>emptyMap()) + .build(); + final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + assertThat(store, instanceOf(MeteredSessionStore.class)); + assertThat(wrapped, instanceOf(ChangeLoggingSessionBytesStore.class)); + assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner)); + } + + @Test + public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { + final SessionStore<String, String> store = builder + .withLoggingEnabled(Collections.<String, String>emptyMap()) + .withCachingEnabled() + .build(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); + final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore(); + assertThat(store, instanceOf(MeteredSessionStore.class)); + assertThat(caching, instanceOf(CachingSessionStore.class)); + assertThat(changeLogging, instanceOf(ChangeLoggingSessionBytesStore.class)); + assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner)); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfInnerIsNull() { + new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfKeySerdeIsNull() { + new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfValueSerdeIsNull() { + new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfTimeIsNull() { + new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfMetricsScopeIsNull() { + new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java new file mode 100644 index 0000000..25b8178 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; +import org.hamcrest.CoreMatchers; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Collections; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +@RunWith(EasyMockRunner.class) +public class WindowStoreBuilderTest { + + @Mock(type = MockType.NICE) + private WindowBytesStoreSupplier supplier; + @Mock(type = MockType.NICE) + private WindowStore<Bytes, byte[]> inner; + private WindowStoreBuilder<String, String> builder; + + @Before + public void setUp() throws Exception { + EasyMock.expect(supplier.get()).andReturn(inner); + EasyMock.expect(supplier.name()).andReturn("name"); + EasyMock.replay(supplier); + + builder = new WindowStoreBuilder<>(supplier, + Serdes.String(), + Serdes.String(), + new MockTime()); + + } + + @Test + public void shouldHaveMeteredStoreAsOuterStore() { + final WindowStore<String, String> store = builder.build(); + assertThat(store, instanceOf(MeteredWindowStore.class)); + } + + @Test + public void shouldHaveChangeLoggingStoreByDefault() { + final WindowStore<String, String> store = builder.build(); + final StateStore next = ((WrappedStateStore) store).wrappedStore(); + assertThat(next, instanceOf(ChangeLoggingWindowBytesStore.class)); + } + + @Test + public void shouldNotHaveChangeLoggingStoreWhenDisabled() { + final WindowStore<String, String> store = builder.withLoggingDisabled().build(); + final StateStore next = ((WrappedStateStore) store).wrappedStore(); + assertThat(next, CoreMatchers.<StateStore>equalTo(inner)); + } + + @Test + public void shouldHaveCachingStoreWhenEnabled() { + final WindowStore<String, String> store = builder.withCachingEnabled().build(); + final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + assertThat(store, instanceOf(MeteredWindowStore.class)); + assertThat(wrapped, instanceOf(CachingWindowStore.class)); + } + + @Test + public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() { + final WindowStore<String, String> store = builder + .withLoggingEnabled(Collections.<String, String>emptyMap()) + .build(); + final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + assertThat(store, instanceOf(MeteredWindowStore.class)); + assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class)); + assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner)); + } + + @Test + public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { + final WindowStore<String, String> store = builder + .withLoggingEnabled(Collections.<String, String>emptyMap()) + .withCachingEnabled() + .build(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); + final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore(); + assertThat(store, instanceOf(MeteredWindowStore.class)); + assertThat(caching, instanceOf(CachingWindowStore.class)); + assertThat(changeLogging, instanceOf(ChangeLoggingWindowBytesStore.class)); + assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner)); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfInnerIsNull() { + new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfKeySerdeIsNull() { + new WindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfValueSerdeIsNull() { + new WindowStoreBuilder<>(supplier, Serdes.String(), null, new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfTimeIsNull() { + new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null); + } + +} \ No newline at end of file