Repository: kafka
Updated Branches:
  refs/heads/trunk 667cd60dc -> 9cbb9f093


http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
new file mode 100644
index 0000000..a0500b6
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import static 
org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS;
+
+public class RocksDbWindowBytesStoreSupplier implements 
WindowBytesStoreSupplier {
+    private final String name;
+    private final long retentionPeriod;
+    private final int segments;
+    private final long windowSize;
+    private final boolean retainDuplicates;
+
+    public RocksDbWindowBytesStoreSupplier(final String name,
+                                           final long retentionPeriod,
+                                           final int segments,
+                                           final long windowSize,
+                                           final boolean retainDuplicates) {
+        if (segments < MIN_SEGMENTS) {
+            throw new IllegalArgumentException("numSegments must be >= " + 
MIN_SEGMENTS);
+        }
+        this.name = name;
+        this.retentionPeriod = retentionPeriod;
+        this.segments = segments;
+        this.windowSize = windowSize;
+        this.retainDuplicates = retainDuplicates;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public WindowStore<Bytes, byte[]> get() {
+        final RocksDBSegmentedBytesStore segmentedBytesStore = new 
RocksDBSegmentedBytesStore(
+                name,
+                retentionPeriod,
+                segments,
+                new WindowKeySchema()
+        );
+        return RocksDBWindowStore.bytesStore(segmentedBytesStore,
+                                             retainDuplicates,
+                                             windowSize);
+
+    }
+
+    @Override
+    public String metricsScope() {
+        return "rocksdb-window";
+    }
+
+    @Override
+    public int segments() {
+        return segments;
+    }
+
+    @Override
+    public long windowSize() {
+        return windowSize;
+    }
+
+    @Override
+    public boolean retainDuplicates() {
+        return retainDuplicates;
+    }
+
+    @Override
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
new file mode 100644
index 0000000..61919c3
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+
+
+public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, 
SessionStore<K, V>> {
+
+    private final SessionBytesStoreSupplier storeSupplier;
+
+    public SessionStoreBuilder(final SessionBytesStoreSupplier storeSupplier,
+                               final Serde<K> keySerde,
+                               final Serde<V> valueSerde,
+                               final Time time) {
+        super(storeSupplier.name(), keySerde, valueSerde, time);
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public SessionStore<K, V> build() {
+        return new 
MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+                                         storeSupplier.metricsScope(),
+                                         keySerde,
+                                         valueSerde,
+                                         time);
+    }
+
+    private SessionStore<Bytes, byte[]> maybeWrapCaching(final 
SessionStore<Bytes, byte[]> inner) {
+        if (!enableCaching) {
+            return inner;
+        }
+        return new CachingSessionStore<>(inner,
+                                         keySerde,
+                                         valueSerde,
+                                         storeSupplier.segmentIntervalMs());
+    }
+
+    private SessionStore<Bytes, byte[]> maybeWrapLogging(final 
SessionStore<Bytes, byte[]> inner) {
+        if (!enableLogging) {
+            return inner;
+        }
+        return new ChangeLoggingSessionBytesStore(inner);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
new file mode 100644
index 0000000..97b4883
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+
+public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, 
WindowStore<K, V>> {
+
+    private final WindowBytesStoreSupplier storeSupplier;
+
+    public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier,
+                              final Serde<K> keySerde,
+                              final Serde<V> valueSerde,
+                              final Time time) {
+        super(storeSupplier.name(), keySerde, valueSerde, time);
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public WindowStore<K, V> build() {
+        return new 
MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+                                        storeSupplier.metricsScope(),
+                                        time,
+                                        keySerde,
+                                        valueSerde);
+    }
+
+    private WindowStore<Bytes, byte[]> maybeWrapCaching(final 
WindowStore<Bytes, byte[]> inner) {
+        if (!enableCaching) {
+            return inner;
+        }
+        return new CachingWindowStore<>(inner,
+                                        keySerde,
+                                        valueSerde,
+                                        storeSupplier.windowSize(),
+                                        storeSupplier.segments());
+    }
+
+    private WindowStore<Bytes, byte[]> maybeWrapLogging(final 
WindowStore<Bytes, byte[]> inner) {
+        if (!enableLogging) {
+            return inner;
+        }
+        return new ChangeLoggingWindowBytesStore(inner, 
storeSupplier.retainDuplicates());
+    }
+
+    public long retentionPeriod() {
+        return storeSupplier.retentionPeriod();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 12947d8..dbdc854 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -23,10 +23,12 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.easymock.EasyMock;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -42,6 +44,8 @@ import static org.junit.Assert.fail;
 
 public class TopologyTest {
 
+    private final StoreBuilder storeBuilder = 
EasyMock.createNiceMock(StoreBuilder.class);
+    private final KeyValueStoreBuilder globalStoreBuilder = 
EasyMock.createNiceMock(KeyValueStoreBuilder.class);
     private final Topology topology = new Topology();
     private final InternalTopologyBuilder.TopologyDescription 
expectedDescription = new InternalTopologyBuilder.TopologyDescription();
 
@@ -203,38 +207,52 @@ public class TopologyTest {
 
     @Test(expected = TopologyException.class)
     public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
-        topology.addStateStore(new MockStateStoreSupplier("store", false), 
"no-such-processsor");
+        mockStoreBuilder();
+        EasyMock.replay(storeBuilder);
+        topology.addStateStore(storeBuilder, "no-such-processsor");
     }
 
     @Test
     public void shouldNotAllowToAddStateStoreToSource() {
+        mockStoreBuilder();
+        EasyMock.replay(storeBuilder);
         topology.addSource("source-1", "topic-1");
         try {
-            topology.addStateStore(new MockStateStoreSupplier("store", false), 
"source-1");
+            topology.addStateStore(storeBuilder, "source-1");
             fail("Should have thrown TopologyException for adding store to 
source node");
         } catch (final TopologyException expected) { }
     }
 
     @Test
     public void shouldNotAllowToAddStateStoreToSink() {
+        mockStoreBuilder();
+        EasyMock.replay(storeBuilder);
         topology.addSink("sink-1", "topic-1");
         try {
-            topology.addStateStore(new MockStateStoreSupplier("store", false), 
"sink-1");
+            topology.addStateStore(storeBuilder, "sink-1");
             fail("Should have thrown TopologyException for adding store to 
sink node");
         } catch (final TopologyException expected) { }
     }
 
+    private void mockStoreBuilder() {
+        EasyMock.expect(storeBuilder.name()).andReturn("store").anyTimes();
+        
EasyMock.expect(storeBuilder.logConfig()).andReturn(Collections.emptyMap());
+        EasyMock.expect(storeBuilder.loggingEnabled()).andReturn(false);
+    }
+
     @Test
     public void shouldNotAllowToAddStoreWithSameName() {
-        topology.addStateStore(new MockStateStoreSupplier("store", false));
+        mockStoreBuilder();
+        EasyMock.replay(storeBuilder);
+        topology.addStateStore(storeBuilder);
         try {
-            topology.addStateStore(new MockStateStoreSupplier("store", false));
+            topology.addStateStore(storeBuilder);
             fail("Should have thrown TopologyException for duplicate store 
name");
         } catch (final TopologyException expected) { }
     }
 
     @Test(expected = TopologyBuilderException.class)
-    public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
+    public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
         final String badNodeName = "badGuy";
@@ -243,12 +261,14 @@ public class TopologyTest {
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         final StreamsConfig streamsConfig = new StreamsConfig(config);
-
+        mockStoreBuilder();
+        EasyMock.expect(storeBuilder.build()).andReturn(new 
MockStateStoreSupplier.MockStateStore("store", false));
+        EasyMock.replay(storeBuilder);
         topology
             .addSource(sourceNodeName, "topic")
             .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName)
             .addStateStore(
-                
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+                storeBuilder,
                 goodNodeName)
             .addProcessor(badNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName);
 
@@ -292,8 +312,10 @@ public class TopologyTest {
 
     @Test(expected = TopologyException.class)
     public void 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
+        
EasyMock.expect(globalStoreBuilder.name()).andReturn("anyName").anyTimes();
+        EasyMock.replay(globalStoreBuilder);
         topology.addGlobalStore(
-            new MockStateStoreSupplier("anyName", false, false),
+            globalStoreBuilder,
             "sameName",
             null,
             null,
@@ -611,7 +633,10 @@ public class TopologyTest {
         topology.addProcessor(processorName, new MockProcessorSupplier(), 
parentNames);
         if (newStores) {
             for (final String store : storeNames) {
-                topology.addStateStore(new MockStateStoreSupplier(store, 
false), processorName);
+                final StoreBuilder storeBuilder = 
EasyMock.createNiceMock(StoreBuilder.class);
+                
EasyMock.expect(storeBuilder.name()).andReturn(store).anyTimes();
+                EasyMock.replay(storeBuilder);
+                topology.addStateStore(storeBuilder, processorName);
             }
         } else {
             topology.connectProcessorAndStateStores(processorName, storeNames);
@@ -651,8 +676,11 @@ public class TopologyTest {
                                                                 final String 
sourceName,
                                                                 final String 
globalTopicName,
                                                                 final String 
processorName) {
+        final KeyValueStoreBuilder globalStoreBuilder = 
EasyMock.createNiceMock(KeyValueStoreBuilder.class);
+        
EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
+        EasyMock.replay(globalStoreBuilder);
         topology.addGlobalStore(
-            new MockStateStoreSupplier(globalStoreName, false, false),
+            globalStoreBuilder,
             sourceName,
             null,
             null,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 9bd8756..91edac5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -470,7 +470,7 @@ public class InternalTopologyBuilderTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAddNullStateStoreSupplier() throws Exception {
-        builder.addStateStore(null);
+        builder.addStateStore((StateStoreSupplier) null);
     }
 
     private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java 
b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 66adbf5..700b243 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -18,11 +18,20 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
+import org.apache.kafka.streams.state.internals.RocksDBSessionStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -30,6 +39,7 @@ import static org.junit.Assert.fail;
 
 public class StoresTest {
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() throws 
Exception {
         final StateStoreSupplier supplier = Stores.create("store")
@@ -44,6 +54,7 @@ public class StoresTest {
         assertEquals("1000", config.get("retention.ms"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCreateInMemoryStoreSupplierNotLogged() throws Exception {
         final StateStoreSupplier supplier = Stores.create("store")
@@ -56,6 +67,7 @@ public class StoresTest {
         assertFalse(supplier.loggingEnabled());
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCreatePersistenStoreSupplierWithLoggedConfig() throws 
Exception {
         final StateStoreSupplier supplier = Stores.create("store")
@@ -70,6 +82,7 @@ public class StoresTest {
         assertEquals("1000", config.get("retention.ms"));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCreatePersistenStoreSupplierNotLogged() throws Exception 
{
         final StateStoreSupplier supplier = Stores.create("store")
@@ -95,4 +108,53 @@ public class StoresTest {
          // ok
         }
     }
+
+    @Test
+    public void shouldCreateInMemoryKeyValueStore() {
+        assertThat(Stores.inMemoryKeyValueStore("memory").get(), 
instanceOf(InMemoryKeyValueStore.class));
+    }
+
+    @Test
+    public void shouldCreateMemoryNavigableCache() {
+        assertThat(Stores.lruMap("map", 10).get(), 
instanceOf(MemoryNavigableLRUCache.class));
+    }
+
+    @Test
+    public void shouldCreateRocksDbStore() {
+        assertThat(Stores.persistentKeyValueStore("store").get(), 
instanceOf(RocksDBStore.class));
+    }
+
+    @Test
+    public void shouldCreateRocksDbWindowStore() {
+        assertThat(Stores.persistentWindowStore("store", 1, 3, 1, 
false).get(), instanceOf(RocksDBWindowStore.class));
+    }
+
+    @Test
+    public void shouldCreateRocksDbSessionStore() {
+        assertThat(Stores.persistentSessionStore("store", 1).get(), 
instanceOf(RocksDBSessionStore.class));
+    }
+
+    @Test
+    public void shouldBuildWindowStore() {
+        final WindowStore<String, String> store = 
Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3, 2, 3, true),
+                                                                      
Serdes.String(),
+                                                                      
Serdes.String()).build();
+        assertThat(store, not(nullValue()));
+    }
+
+    @Test
+    public void shouldBuildKeyValueStore() {
+        final KeyValueStore<String, String> store = 
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("name"),
+                                                                          
Serdes.String(),
+                                                                          
Serdes.String()).build();
+        assertThat(store, not(nullValue()));
+    }
+
+    @Test
+    public void shouldBuildSessionStore() {
+        final SessionStore<String, String> store = 
Stores.sessionStoreBuilder(Stores.persistentSessionStore("name", 10),
+                                                                       
Serdes.String(),
+                                                                       
Serdes.String()).build();
+        assertThat(store, not(nullValue()));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
new file mode 100644
index 0000000..2d378d8
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class KeyValueStoreBuilderTest {
+
+    @Mock(type = MockType.NICE)
+    private KeyValueBytesStoreSupplier supplier;
+    @Mock(type = MockType.NICE)
+    private KeyValueStore<Bytes, byte[]> inner;
+    private KeyValueStoreBuilder<String, String> builder;
+
+    @Before
+    public void setUp() throws Exception {
+        EasyMock.expect(supplier.get()).andReturn(inner);
+        EasyMock.expect(supplier.name()).andReturn("name");
+        EasyMock.replay(supplier);
+        builder = new KeyValueStoreBuilder<>(supplier,
+                                             Serdes.String(),
+                                             Serdes.String(),
+                                             new MockTime()
+        );
+
+    }
+
+    @Test
+    public void shouldHaveMeteredStoreAsOuterStore() {
+        final KeyValueStore<String, String> store = builder.build();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreByDefault() {
+        final KeyValueStore<String, String> store = builder.build();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+    }
+
+    @Test
+    public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+        final KeyValueStore<String, String> store = 
builder.withLoggingDisabled().build();
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingStoreWhenEnabled() {
+        final KeyValueStore<String, String> store = 
builder.withCachingEnabled().build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(wrapped, instanceOf(CachingKeyValueStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+        final KeyValueStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(wrapped, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+        assertThat(((WrappedStateStore) wrapped).wrappedStore(), 
CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+        final KeyValueStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .withCachingEnabled()
+                .build();
+        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrappedStore();
+        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrappedStore();
+        assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+        assertThat(caching, instanceOf(CachingKeyValueStore.class));
+        assertThat(changeLogging, 
instanceOf(ChangeLoggingKeyValueBytesStore.class));
+        assertThat(changeLogging.wrappedStore(), 
CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfInnerIsNull() {
+        new KeyValueStoreBuilder<>(null, Serdes.String(), Serdes.String(), new 
MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfKeySerdeIsNull() {
+        new KeyValueStoreBuilder<>(supplier, null, Serdes.String(), new 
MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfValueSerdeIsNull() {
+        new KeyValueStoreBuilder<>(supplier, Serdes.String(), null, new 
MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfTimeIsNull() {
+        new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), 
null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfMetricsScopeIsNull() {
+        new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), 
new MockTime());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
new file mode 100644
index 0000000..621a1c2
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class SessionStoreBuilderTest {
+
+    @Mock(type = MockType.NICE)
+    private SessionBytesStoreSupplier supplier;
+    @Mock(type = MockType.NICE)
+    private SessionStore<Bytes, byte[]> inner;
+    private SessionStoreBuilder<String, String> builder;
+
+    @Before
+    public void setUp() throws Exception {
+
+        EasyMock.expect(supplier.get()).andReturn(inner);
+        EasyMock.expect(supplier.name()).andReturn("name");
+        EasyMock.replay(supplier);
+
+        builder = new SessionStoreBuilder<>(supplier,
+                                            Serdes.String(),
+                                            Serdes.String(),
+                                            new MockTime()
+        );
+    }
+
+    @Test
+    public void shouldHaveMeteredStoreAsOuterStore() {
+        final SessionStore<String, String> store = builder.build();
+        assertThat(store, instanceOf(MeteredSessionStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreByDefault() {
+        final SessionStore<String, String> store = builder.build();
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, instanceOf(ChangeLoggingSessionBytesStore.class));
+    }
+
+    @Test
+    public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+        final SessionStore<String, String> store = 
builder.withLoggingDisabled().build();
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingStoreWhenEnabled() {
+        final SessionStore<String, String> store = 
builder.withCachingEnabled().build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredSessionStore.class));
+        assertThat(wrapped, instanceOf(CachingSessionStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+        final SessionStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredSessionStore.class));
+        assertThat(wrapped, instanceOf(ChangeLoggingSessionBytesStore.class));
+        assertThat(((WrappedStateStore) wrapped).wrappedStore(), 
CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+        final SessionStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .withCachingEnabled()
+                .build();
+        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrappedStore();
+        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrappedStore();
+        assertThat(store, instanceOf(MeteredSessionStore.class));
+        assertThat(caching, instanceOf(CachingSessionStore.class));
+        assertThat(changeLogging, 
instanceOf(ChangeLoggingSessionBytesStore.class));
+        assertThat(changeLogging.wrappedStore(), 
CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfInnerIsNull() {
+        new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new 
MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfKeySerdeIsNull() {
+        new SessionStoreBuilder<>(supplier, null, Serdes.String(), new 
MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfValueSerdeIsNull() {
+        new SessionStoreBuilder<>(supplier, Serdes.String(), null, new 
MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfTimeIsNull() {
+        new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), 
null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfMetricsScopeIsNull() {
+        new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), 
new MockTime());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
new file mode 100644
index 0000000..25b8178
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class WindowStoreBuilderTest {
+
+    @Mock(type = MockType.NICE)
+    private WindowBytesStoreSupplier supplier;
+    @Mock(type = MockType.NICE)
+    private WindowStore<Bytes, byte[]> inner;
+    private WindowStoreBuilder<String, String> builder;
+
+    @Before
+    public void setUp() throws Exception {
+        EasyMock.expect(supplier.get()).andReturn(inner);
+        EasyMock.expect(supplier.name()).andReturn("name");
+        EasyMock.replay(supplier);
+
+        builder = new WindowStoreBuilder<>(supplier,
+                                           Serdes.String(),
+                                           Serdes.String(),
+                                           new MockTime());
+
+    }
+
+    @Test
+    public void shouldHaveMeteredStoreAsOuterStore() {
+        final WindowStore<String, String> store = builder.build();
+        assertThat(store, instanceOf(MeteredWindowStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreByDefault() {
+        final WindowStore<String, String> store = builder.build();
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, instanceOf(ChangeLoggingWindowBytesStore.class));
+    }
+
+    @Test
+    public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+        final WindowStore<String, String> store = 
builder.withLoggingDisabled().build();
+        final StateStore next = ((WrappedStateStore) store).wrappedStore();
+        assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingStoreWhenEnabled() {
+        final WindowStore<String, String> store = 
builder.withCachingEnabled().build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredWindowStore.class));
+        assertThat(wrapped, instanceOf(CachingWindowStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+        final WindowStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+        assertThat(store, instanceOf(MeteredWindowStore.class));
+        assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class));
+        assertThat(((WrappedStateStore) wrapped).wrappedStore(), 
CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+        final WindowStore<String, String> store = builder
+                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .withCachingEnabled()
+                .build();
+        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrappedStore();
+        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrappedStore();
+        assertThat(store, instanceOf(MeteredWindowStore.class));
+        assertThat(caching, instanceOf(CachingWindowStore.class));
+        assertThat(changeLogging, 
instanceOf(ChangeLoggingWindowBytesStore.class));
+        assertThat(changeLogging.wrappedStore(), 
CoreMatchers.<StateStore>equalTo(inner));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfInnerIsNull() {
+        new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new 
MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfKeySerdeIsNull() {
+        new WindowStoreBuilder<>(supplier, null, Serdes.String(), new 
MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfValueSerdeIsNull() {
+        new WindowStoreBuilder<>(supplier, Serdes.String(), null, new 
MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfTimeIsNull() {
+        new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), 
null);
+    }
+
+}
\ No newline at end of file


Reply via email to