http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 1a5de5f..37aac0c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -26,9 +26,11 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -81,40 +83,56 @@ public class KStreamWindowAggregateTest {
             MockProcessorSupplier<Windowed<String>, String> proc2 = new 
MockProcessorSupplier<>();
             table2.toStream().process(proc2);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver = new KStreamTestDriver(builder, baseDir);
 
-            driver.setTime(0L);
+            setRecordContext(0, topic1);
             driver.process(topic1, "A", "1");
-            driver.setTime(1L);
+            driver.flushState();
+            setRecordContext(1, topic1);
             driver.process(topic1, "B", "2");
-            driver.setTime(2L);
+            driver.flushState();
+            setRecordContext(2, topic1);
             driver.process(topic1, "C", "3");
-            driver.setTime(3L);
+            driver.flushState();
+            setRecordContext(3, topic1);
             driver.process(topic1, "D", "4");
-            driver.setTime(4L);
+            driver.flushState();
+            setRecordContext(4, topic1);
             driver.process(topic1, "A", "1");
+            driver.flushState();
 
-            driver.setTime(5L);
+            setRecordContext(5, topic1);
             driver.process(topic1, "A", "1");
-            driver.setTime(6L);
+            driver.flushState();
+            setRecordContext(6, topic1);
             driver.process(topic1, "B", "2");
-            driver.setTime(7L);
+            driver.flushState();
+            setRecordContext(7, topic1);
             driver.process(topic1, "D", "4");
-            driver.setTime(8L);
+            driver.flushState();
+            setRecordContext(8, topic1);
             driver.process(topic1, "B", "2");
-            driver.setTime(9L);
+            driver.flushState();
+            setRecordContext(9, topic1);
             driver.process(topic1, "C", "3");
-
-            driver.setTime(10L);
+            driver.flushState();
+            setRecordContext(10, topic1);
             driver.process(topic1, "A", "1");
-            driver.setTime(11L);
+            driver.flushState();
+            setRecordContext(11, topic1);
             driver.process(topic1, "B", "2");
-            driver.setTime(12L);
+            driver.flushState();
+            setRecordContext(12, topic1);
+            driver.flushState();
             driver.process(topic1, "D", "4");
-            driver.setTime(13L);
+            driver.flushState();
+            setRecordContext(13, topic1);
             driver.process(topic1, "B", "2");
-            driver.setTime(14L);
+            driver.flushState();
+            setRecordContext(14, topic1);
             driver.process(topic1, "C", "3");
+            driver.flushState();
+
 
             assertEquals(Utils.mkList(
                     "[A@0]:0+1",
@@ -140,6 +158,10 @@ public class KStreamWindowAggregateTest {
         }
     }
 
+    private void setRecordContext(final long time, final String topic) {
+        ((MockProcessorContext) driver.context()).setRecordContext(new 
ProcessorRecordContext(time, 0, 0, topic));
+    }
+
     @Test
     public void testJoin() throws Exception {
         final File baseDir = Files.createTempDirectory("test").toFile();
@@ -180,18 +202,23 @@ public class KStreamWindowAggregateTest {
                 }
             }).toStream().process(proc3);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver = new KStreamTestDriver(builder, baseDir);
 
-            driver.setTime(0L);
+            setRecordContext(0, topic1);
             driver.process(topic1, "A", "1");
-            driver.setTime(1L);
+            driver.flushState();
+            setRecordContext(1, topic1);
             driver.process(topic1, "B", "2");
-            driver.setTime(2L);
+            driver.flushState();
+            setRecordContext(2, topic1);
             driver.process(topic1, "C", "3");
-            driver.setTime(3L);
+            driver.flushState();
+            setRecordContext(3, topic1);
             driver.process(topic1, "D", "4");
-            driver.setTime(4L);
+            driver.flushState();
+            setRecordContext(4, topic1);
             driver.process(topic1, "A", "1");
+            driver.flushState();
 
             proc1.checkAndClearProcessResult(
                     "[A@0]:0+1",
@@ -209,16 +236,21 @@ public class KStreamWindowAggregateTest {
                     "[A@0]:null"
             );
 
-            driver.setTime(5L);
+            setRecordContext(5, topic1);
             driver.process(topic1, "A", "1");
-            driver.setTime(6L);
+            driver.flushState();
+            setRecordContext(6, topic1);
             driver.process(topic1, "B", "2");
-            driver.setTime(7L);
+            driver.flushState();
+            setRecordContext(7, topic1);
             driver.process(topic1, "D", "4");
-            driver.setTime(8L);
+            driver.flushState();
+            setRecordContext(8, topic1);
             driver.process(topic1, "B", "2");
-            driver.setTime(9L);
+            driver.flushState();
+            setRecordContext(9, topic1);
             driver.process(topic1, "C", "3");
+            driver.flushState();
 
             proc1.checkAndClearProcessResult(
                     "[A@0]:0+1+1+1", "[A@5]:0+1",
@@ -236,16 +268,21 @@ public class KStreamWindowAggregateTest {
                     "[C@0]:null", "[C@5]:null"
             );
 
-            driver.setTime(0L);
+            setRecordContext(0, topic1);
             driver.process(topic2, "A", "a");
-            driver.setTime(1L);
+            driver.flushState();
+            setRecordContext(1, topic1);
             driver.process(topic2, "B", "b");
-            driver.setTime(2L);
+            driver.flushState();
+            setRecordContext(2, topic1);
             driver.process(topic2, "C", "c");
-            driver.setTime(3L);
+            driver.flushState();
+            setRecordContext(3, topic1);
             driver.process(topic2, "D", "d");
-            driver.setTime(4L);
+            driver.flushState();
+            setRecordContext(4, topic1);
             driver.process(topic2, "A", "a");
+            driver.flushState();
 
             proc1.checkAndClearProcessResult();
             proc2.checkAndClearProcessResult(
@@ -262,17 +299,21 @@ public class KStreamWindowAggregateTest {
                     "[D@0]:0+4+4%0+d",
                     "[A@0]:0+1+1+1%0+a+a");
 
-            driver.setTime(5L);
+            setRecordContext(5, topic1);
             driver.process(topic2, "A", "a");
-            driver.setTime(6L);
+            driver.flushState();
+            setRecordContext(6, topic1);
             driver.process(topic2, "B", "b");
-            driver.setTime(7L);
+            driver.flushState();
+            setRecordContext(7, topic1);
             driver.process(topic2, "D", "d");
-            driver.setTime(8L);
+            driver.flushState();
+            setRecordContext(8, topic1);
             driver.process(topic2, "B", "b");
-            driver.setTime(9L);
+            driver.flushState();
+            setRecordContext(9, topic1);
             driver.process(topic2, "C", "c");
-
+            driver.flushState();
             proc1.checkAndClearProcessResult();
             proc2.checkAndClearProcessResult(
                     "[A@0]:0+a+a+a", "[A@5]:0+a",

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index a405da4..ba33d5c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -83,25 +83,63 @@ public class KTableAggregateTest {
         driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(topic1, "A", "1");
+        driver.flushState();
         driver.process(topic1, "B", "2");
+        driver.flushState();
         driver.process(topic1, "A", "3");
+        driver.flushState();
         driver.process(topic1, "B", "4");
+        driver.flushState();
         driver.process(topic1, "C", "5");
+        driver.flushState();
         driver.process(topic1, "D", "6");
+        driver.flushState();
         driver.process(topic1, "B", "7");
+        driver.flushState();
         driver.process(topic1, "C", "8");
+        driver.flushState();
 
         assertEquals(Utils.mkList(
                 "A:0+1",
                 "B:0+2",
-                "A:0+1-1", "A:0+1-1+3",
-                "B:0+2-2", "B:0+2-2+4",
+                "A:0+1-1+3",
+                "B:0+2-2+4",
                 "C:0+5",
                 "D:0+6",
-                "B:0+2-2+4-4", "B:0+2-2+4-4+7",
-                "C:0+5-5", "C:0+5-5+8"), proc.processed);
+                "B:0+2-2+4-4+7",
+                "C:0+5-5+8"), proc.processed);
     }
 
+
+    @Test
+    public void testAggCoalesced() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String topic1 = "topic1";
+        final MockProcessorSupplier<String, String> proc = new 
MockProcessorSupplier<>();
+
+        KTable<String, String> table1 = builder.table(stringSerde, 
stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table2 = 
table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
+            stringSerde,
+            stringSerde
+        ).aggregate(MockInitializer.STRING_INIT,
+            MockAggregator.STRING_ADDER,
+            MockAggregator.STRING_REMOVER,
+            stringSerde,
+            "topic1-Canonized");
+
+        table2.toStream().process(proc);
+
+        driver = new KStreamTestDriver(builder, stateDir);
+
+        driver.process(topic1, "A", "1");
+        driver.process(topic1, "A", "3");
+        driver.process(topic1, "A", "4");
+        driver.flushState();
+        assertEquals(Utils.mkList(
+            "A:0+4"), proc.processed);
+    }
+
+
     @Test
     public void testAggRepartition() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
@@ -135,13 +173,21 @@ public class KTableAggregateTest {
         driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(topic1, "A", "1");
+        driver.flushState();
         driver.process(topic1, "A", null);
+        driver.flushState();
         driver.process(topic1, "A", "1");
+        driver.flushState();
         driver.process(topic1, "B", "2");
+        driver.flushState();
         driver.process(topic1, "null", "3");
+        driver.flushState();
         driver.process(topic1, "B", "4");
+        driver.flushState();
         driver.process(topic1, "NULL", "5");
+        driver.flushState();
         driver.process(topic1, "B", "7");
+        driver.flushState();
 
         assertEquals(Utils.mkList(
                 "1:0+1",
@@ -170,10 +216,17 @@ public class KTableAggregateTest {
         final KStreamTestDriver driver = new KStreamTestDriver(builder, 
stateDir);
 
         driver.process(input, "A", "green");
+        driver.flushState();
         driver.process(input, "B", "green");
+        driver.flushState();
         driver.process(input, "A", "blue");
+        driver.flushState();
         driver.process(input, "C", "yellow");
+        driver.flushState();
         driver.process(input, "D", "green");
+        driver.flushState();
+        driver.flushState();
+
 
         assertEquals(Utils.mkList(
                  "green:1",
@@ -183,6 +236,35 @@ public class KTableAggregateTest {
                  "green:2"
                  ), proc.processed);
     }
+
+    @Test
+    public void testCountCoalesced() throws IOException {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String input = "count-test-input";
+        final MockProcessorSupplier<String, Long> proc = new 
MockProcessorSupplier<>();
+
+        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+            .groupBy(MockKeyValueMapper.<String, 
String>SelectValueKeyValueMapper(), stringSerde, stringSerde)
+            .count("count")
+            .toStream()
+            .process(proc);
+
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, 
stateDir);
+
+        driver.process(input, "A", "green");
+        driver.process(input, "B", "green");
+        driver.process(input, "A", "blue");
+        driver.process(input, "C", "yellow");
+        driver.process(input, "D", "green");
+        driver.flushState();
+
+
+        assertEquals(Utils.mkList(
+            "blue:1",
+            "yellow:1",
+            "green:2"
+            ), proc.processed);
+    }
     
     @Test
     public void testRemoveOldBeforeAddNew() throws IOException {
@@ -216,22 +298,26 @@ public class KTableAggregateTest {
                     public String apply(String key, String value, String 
aggregate) {
                         return aggregate.replaceAll(value, "");
                     }
-                }, "someStore")
+                }, Serdes.String(), "someStore")
                 .toStream()
                 .process(proc);
 
         final KStreamTestDriver driver = new KStreamTestDriver(builder, 
stateDir);
 
         driver.process(input, "11", "A");
+        driver.flushState();
         driver.process(input, "12", "B");
+        driver.flushState();
         driver.process(input, "11", null);
+        driver.flushState();
         driver.process(input, "12", "C");
+        driver.flushState();
 
         assertEquals(Utils.mkList(
                  "1:1",
                  "1:12",
                  "1:2",
-                 "1:", "1:2"
+                 "1:2"
                  ), proc.processed);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 6837c56..277d6d2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -90,8 +90,10 @@ public class KTableFilterTest {
         driver.process(topic1, "B", 2);
         driver.process(topic1, "C", 3);
         driver.process(topic1, "D", 4);
+        driver.flushState();
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
+        driver.flushState();
 
         proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", 
"A:null", "B:null");
         proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", 
"A:null", "B:null");
@@ -203,24 +205,25 @@ public class KTableFilterTest {
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 1);
         driver.process(topic1, "C", 1);
+        driver.flushState();
 
         proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", 
"C:(1<-null)");
         proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", 
"C:(null<-null)");
 
         driver.process(topic1, "A", 2);
         driver.process(topic1, "B", 2);
-
+        driver.flushState();
         proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
         proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
         driver.process(topic1, "A", 3);
-
+        driver.flushState();
         proc1.checkAndClearProcessResult("A:(3<-null)");
         proc2.checkAndClearProcessResult("A:(null<-null)");
 
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
-
+        driver.flushState();
         proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
         proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
     }
@@ -254,24 +257,25 @@ public class KTableFilterTest {
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 1);
         driver.process(topic1, "C", 1);
+        driver.flushState();
 
         proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", 
"C:(1<-null)");
         proc2.checkEmptyAndClearProcessResult();
 
         driver.process(topic1, "A", 2);
         driver.process(topic1, "B", 2);
-
+        driver.flushState();
         proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
         proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
         driver.process(topic1, "A", 3);
-
+        driver.flushState();
         proc1.checkAndClearProcessResult("A:(3<-2)");
         proc2.checkAndClearProcessResult("A:(null<-2)");
 
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
-
+        driver.flushState();
         proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
         proc2.checkAndClearProcessResult("B:(null<-2)");
     }
@@ -305,7 +309,7 @@ public class KTableFilterTest {
         driver.process(topic1, "A", "reject");
         driver.process(topic1, "B", "reject");
         driver.process(topic1, "C", "reject");
-
+        driver.flushState();
         proc1.checkAndClearProcessResult("A:(reject<-null)", 
"B:(reject<-null)", "C:(reject<-null)");
         proc2.checkEmptyAndClearProcessResult();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 6fbce82..791fa28 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -96,6 +96,7 @@ public class KTableForeachTest {
         for (KeyValue<Integer, String> record: inputRecords) {
             driver.process(topicName, record.key, record.value);
         }
+        driver.flushState();
 
         assertEquals(expectedRecords.size(), actualRecords.size());
         for (int i = 0; i < expectedRecords.size(); i++) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 7edaac9..4b9ea06 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -113,9 +113,14 @@ public class KTableImplTest {
         driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(topic1, "A", "01");
+        driver.flushState();
         driver.process(topic1, "B", "02");
+        driver.flushState();
         driver.process(topic1, "C", "03");
+        driver.flushState();
         driver.process(topic1, "D", "04");
+        driver.flushState();
+        driver.flushState();
 
         assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), 
proc1.processed);
         assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), 
proc2.processed);
@@ -173,6 +178,7 @@ public class KTableImplTest {
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "01");
         driver.process(topic1, "C", "01");
+        driver.flushState();
 
         assertEquals("01", getter1.get("A"));
         assertEquals("01", getter1.get("B"));
@@ -192,6 +198,7 @@ public class KTableImplTest {
 
         driver.process(topic1, "A", "02");
         driver.process(topic1, "B", "02");
+        driver.flushState();
 
         assertEquals("02", getter1.get("A"));
         assertEquals("02", getter1.get("B"));
@@ -210,6 +217,7 @@ public class KTableImplTest {
         assertEquals("01", getter4.get("C"));
 
         driver.process(topic1, "A", "03");
+        driver.flushState();
 
         assertEquals("03", getter1.get("A"));
         assertEquals("02", getter1.get("B"));
@@ -228,11 +236,13 @@ public class KTableImplTest {
         assertEquals("01", getter4.get("C"));
 
         driver.process(topic1, "A", null);
+        driver.flushState();
 
         assertNull(getter1.get("A"));
         assertEquals("02", getter1.get("B"));
         assertEquals("01", getter1.get("C"));
 
+
         assertNull(getter2.get("A"));
         assertEquals(new Integer(2), getter2.get("B"));
         assertEquals(new Integer(1), getter2.get("C"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 3615b46..ba10668 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -103,6 +103,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
+        driver.flushState();
 
         processor.checkAndClearProcessResult("0:null", "1:null");
         checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, 
null));
@@ -112,6 +113,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
+        driver.flushState();
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
         checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), 
kv(3, null));
@@ -121,6 +123,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
+        driver.flushState();
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", 
"3:null");
         checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), 
kv(3, null));
@@ -129,6 +132,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
+        driver.flushState();
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
         checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
@@ -138,6 +142,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
+        driver.flushState();
 
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
         checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
@@ -147,6 +152,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], null);
         }
+        driver.flushState();
 
         processor.checkAndClearProcessResult("0:null", "1:null");
         checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), 
kv(3, "X3+YY3"));
@@ -156,6 +162,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
+        driver.flushState();
 
         processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", 
"3:XX3+YY3");
         checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), 
kv(3, "XX3+YY3"));
@@ -191,6 +198,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
+        driver.flushState();
 
         proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
@@ -199,6 +207,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
+        driver.flushState();
 
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
@@ -207,6 +216,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
+        driver.flushState();
 
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", 
"2:(null<-null)", "3:(null<-null)");
 
@@ -214,7 +224,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -222,7 +232,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
         // push two items with null to the other stream as deletes. this 
should produce two item.
@@ -230,7 +240,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], null);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -238,7 +248,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", 
"2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
     }
 
@@ -274,7 +284,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
         // push two items to the other stream. this should produce two items.
@@ -282,7 +292,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -290,14 +300,14 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", 
"1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
 
         // push all items to the other stream. this should produce four items.
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", 
"1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -305,7 +315,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", 
"1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
         // push two items with null to the other stream as deletes. this 
should produce two item.
@@ -313,7 +323,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], null);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(null<-X0+YY0)", 
"1:(null<-X1+YY1)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -321,7 +331,7 @@ public class KTableKTableJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", 
"2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index ec07116..5f84678 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -99,6 +99,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
+        driver.flushState();
 
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
         checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, 
null), kv(3, null));
@@ -108,7 +109,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
         checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), 
kv(3, null));
 
@@ -117,7 +118,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", 
"2:X2+null", "3:X3+null");
         checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, 
"X2+null"), kv(3, "X3+null"));
 
@@ -125,7 +126,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
         checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
 
@@ -134,7 +135,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
         checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
 
@@ -143,7 +144,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], null);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
         checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
 
@@ -152,7 +153,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", 
"2:XX2+YY2", "3:XX3+YY3");
         checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, 
"XX2+YY2"), kv(3, "XX3+YY3"));
     }
@@ -187,7 +188,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
         // push two items to the other stream. this should produce two items.
@@ -195,7 +196,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -203,14 +204,14 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", 
"2:(X2+null<-null)", "3:(X3+null<-null)");
 
         // push all items to the other stream. this should produce four items.
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -218,7 +219,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
         // push two items with null to the other stream as deletes. this 
should produce two item.
@@ -226,7 +227,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], null);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -234,7 +235,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+null<-null)", 
"1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
     }
 
@@ -270,7 +271,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
         // push two items to the other stream. this should produce two items.
@@ -278,7 +279,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", 
"1:(X1+Y1<-X1+null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -286,14 +287,14 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", 
"1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
         // push all items to the other stream. this should produce four items.
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", 
"1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -301,7 +302,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", 
"1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
         // push two items with null to the other stream as deletes. this 
should produce two item.
@@ -309,7 +310,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], null);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", 
"1:(X1+null<-X1+YY1)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -317,7 +318,7 @@ public class KTableKTableLeftJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", 
"1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 33dfb04..a6249bc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -93,7 +93,6 @@ public class KTableKTableOuterJoinTest {
         KTableValueGetterSupplier<Integer, String> getterSupplier = 
((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
 
         driver = new KStreamTestDriver(builder, stateDir);
-        driver.setTime(0L);
 
         KTableValueGetter<Integer, String> getter = getterSupplier.get();
         getter.init(driver.context());
@@ -103,7 +102,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
         checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, 
null), kv(3, null));
 
@@ -112,7 +111,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
         checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), 
kv(3, null));
 
@@ -121,7 +120,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", 
"2:X2+null", "3:X3+null");
         checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, 
"X2+null"), kv(3, "X3+null"));
 
@@ -129,7 +128,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
         checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
 
@@ -138,7 +137,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
         checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
 
@@ -147,7 +146,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], null);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
         checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
 
@@ -156,7 +155,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", 
"2:XX2+YY2", "3:XX3+YY3");
         checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, 
"XX2+YY2"), kv(3, "XX3+YY3"));
 
@@ -165,7 +164,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 1; i < 3; i++) {
             driver.process(topic1, expectedKeys[i], null);
         }
-
+        driver.flushState();
         processor.checkAndClearProcessResult("1:null", "2:null+YY2");
         checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, 
"null+YY2"), kv(3, "XX3+YY3"));
     }
@@ -189,7 +188,6 @@ public class KTableKTableOuterJoinTest {
         builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) 
joined).name);
 
         driver = new KStreamTestDriver(builder, stateDir);
-        driver.setTime(0L);
 
         assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
         assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
@@ -200,7 +198,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
         // push two items to the other stream. this should produce two items.
@@ -208,7 +206,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -216,14 +214,14 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", 
"2:(X2+null<-null)", "3:(X3+null<-null)");
 
         // push all items to the other stream. this should produce four items.
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -231,7 +229,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
         // push two items with null to the other stream as deletes. this 
should produce two item.
@@ -239,7 +237,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], null);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -247,7 +245,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+null<-null)", 
"1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
         // push middle two items to the primary stream with null. this should 
produce two items.
@@ -255,7 +253,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 1; i < 3; i++) {
             driver.process(topic1, expectedKeys[i], null);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("1:(null<-null)", 
"2:(null+YY2<-null)");
     }
 
@@ -280,7 +278,6 @@ public class KTableKTableOuterJoinTest {
         builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) 
joined).name);
 
         driver = new KStreamTestDriver(builder, stateDir);
-        driver.setTime(0L);
 
         assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
         assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
@@ -291,7 +288,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
         // push two items to the other stream. this should produce two items.
@@ -299,7 +296,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", 
"1:(X1+Y1<-X1+null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -307,14 +304,14 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", 
"1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
         // push all items to the other stream. this should produce four items.
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", 
"1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -322,7 +319,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", 
"1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
         // push two items with null to the other stream as deletes. this 
should produce two item.
@@ -330,7 +327,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], null);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", 
"1:(X1+null<-X1+YY1)");
 
         // push all four items to the primary stream. this should produce four 
items.
@@ -338,7 +335,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", 
"1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
 
         // push middle two items to the primary stream with null. this should 
produce two items.
@@ -346,7 +343,7 @@ public class KTableKTableOuterJoinTest {
         for (int i = 1; i < 3; i++) {
             driver.process(topic1, expectedKeys[i], null);
         }
-
+        driver.flushState();
         proc.checkAndClearProcessResult("1:(null<-XX1+null)", 
"2:(null+YY2<-XX2+YY2)");
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 78cff18..864b274 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -93,10 +93,10 @@ public class KTableMapKeysTest {
         convertedStream.process(processor);
 
         driver = new KStreamTestDriver(builder, stateDir);
-
         for (int i = 0;  i < originalKeys.length; i++) {
             driver.process(topic1, originalKeys[i], values[i]);
         }
+        driver.flushState();
 
         assertEquals(3, processor.processed.size());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 5739397..1d5b8a9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -82,7 +82,7 @@ public class KTableMapValuesTest {
         driver.process(topic1, "B", "02");
         driver.process(topic1, "C", "03");
         driver.process(topic1, "D", "04");
-
+        driver.flushState();
         assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), 
proc2.processed);
     }
 
@@ -120,7 +120,6 @@ public class KTableMapValuesTest {
         KTableValueGetterSupplier<String, String> getterSupplier4 = 
table4.valueGetterSupplier();
 
         driver = new KStreamTestDriver(builder, stateDir, null, null);
-
         KTableValueGetter<String, String> getter1 = getterSupplier1.get();
         getter1.init(driver.context());
         KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
@@ -133,6 +132,7 @@ public class KTableMapValuesTest {
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "01");
         driver.process(topic1, "C", "01");
+        driver.flushState();
 
         assertEquals("01", getter1.get("A"));
         assertEquals("01", getter1.get("B"));
@@ -152,6 +152,7 @@ public class KTableMapValuesTest {
 
         driver.process(topic1, "A", "02");
         driver.process(topic1, "B", "02");
+        driver.flushState();
 
         assertEquals("02", getter1.get("A"));
         assertEquals("02", getter1.get("B"));
@@ -170,6 +171,7 @@ public class KTableMapValuesTest {
         assertEquals("01", getter4.get("C"));
 
         driver.process(topic1, "A", "03");
+        driver.flushState();
 
         assertEquals("03", getter1.get("A"));
         assertEquals("02", getter1.get("B"));
@@ -188,6 +190,7 @@ public class KTableMapValuesTest {
         assertEquals("01", getter4.get("C"));
 
         driver.process(topic1, "A", null);
+        driver.flushState();
 
         assertNull(getter1.get("A"));
         assertEquals("02", getter1.get("B"));
@@ -227,26 +230,29 @@ public class KTableMapValuesTest {
         builder.addProcessor("proc", proc, table2.name);
 
         driver = new KStreamTestDriver(builder, stateDir, null, null);
-
         assertFalse(table1.sendingOldValueEnabled());
         assertFalse(table2.sendingOldValueEnabled());
 
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "01");
         driver.process(topic1, "C", "01");
+        driver.flushState();
 
         proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", 
"C:(1<-null)");
 
         driver.process(topic1, "A", "02");
         driver.process(topic1, "B", "02");
+        driver.flushState();
 
         proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
         driver.process(topic1, "A", "03");
+        driver.flushState();
 
         proc.checkAndClearProcessResult("A:(3<-null)");
 
         driver.process(topic1, "A", null);
+        driver.flushState();
 
         proc.checkAndClearProcessResult("A:(null<-null)");
     }
@@ -274,26 +280,29 @@ public class KTableMapValuesTest {
         builder.addProcessor("proc", proc, table2.name);
 
         driver = new KStreamTestDriver(builder, stateDir, null, null);
-
         assertTrue(table1.sendingOldValueEnabled());
         assertTrue(table2.sendingOldValueEnabled());
 
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "01");
         driver.process(topic1, "C", "01");
+        driver.flushState();
 
         proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", 
"C:(1<-null)");
 
         driver.process(topic1, "A", "02");
         driver.process(topic1, "B", "02");
+        driver.flushState();
 
         proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
 
         driver.process(topic1, "A", "03");
+        driver.flushState();
 
         proc.checkAndClearProcessResult("A:(3<-2)");
 
         driver.process(topic1, "A", null);
+        driver.flushState();
 
         proc.checkAndClearProcessResult("A:(null<-3)");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index cd1262b..5602555 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -69,13 +69,14 @@ public class KTableSourceTest {
         table1.toStream().process(proc1);
 
         driver = new KStreamTestDriver(builder, stateDir);
-
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 2);
         driver.process(topic1, "C", 3);
         driver.process(topic1, "D", 4);
+        driver.flushState();
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
+        driver.flushState();
 
         assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", 
"B:null"), proc1.processed);
     }
@@ -91,7 +92,6 @@ public class KTableSourceTest {
         KTableValueGetterSupplier<String, String> getterSupplier1 = 
table1.valueGetterSupplier();
 
         driver = new KStreamTestDriver(builder, stateDir, null, null);
-
         KTableValueGetter<String, String> getter1 = getterSupplier1.get();
         getter1.init(driver.context());
 
@@ -138,24 +138,27 @@ public class KTableSourceTest {
         builder.addProcessor("proc1", proc1, table1.name);
 
         driver = new KStreamTestDriver(builder, stateDir, null, null);
-
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "01");
         driver.process(topic1, "C", "01");
+        driver.flushState();
 
         proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", 
"C:(01<-null)");
 
         driver.process(topic1, "A", "02");
         driver.process(topic1, "B", "02");
+        driver.flushState();
 
         proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
 
         driver.process(topic1, "A", "03");
+        driver.flushState();
 
         proc1.checkAndClearProcessResult("A:(03<-null)");
 
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
+        driver.flushState();
 
         proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
     }
@@ -181,20 +184,24 @@ public class KTableSourceTest {
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "01");
         driver.process(topic1, "C", "01");
+        driver.flushState();
 
         proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", 
"C:(01<-null)");
 
         driver.process(topic1, "A", "02");
         driver.process(topic1, "B", "02");
+        driver.flushState();
 
         proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
 
         driver.process(topic1, "A", "03");
+        driver.flushState();
 
         proc1.checkAndClearProcessResult("A:(03<-02)");
 
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
+        driver.flushState();
 
         proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 5495416..630167a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.Processor;
@@ -435,7 +436,7 @@ public class SimpleBenchmark {
         source.process(new ProcessorSupplier<Integer, byte[]>() {
             @Override
             public Processor<Integer, byte[]> get() {
-                return new Processor<Integer, byte[]>() {
+                return new AbstractProcessor<Integer, byte[]>() {
 
                     @Override
                     public void init(ProcessorContext context) {
@@ -479,8 +480,7 @@ public class SimpleBenchmark {
         source.process(new ProcessorSupplier<Integer, byte[]>() {
             @Override
             public Processor<Integer, byte[]> get() {
-                return new Processor<Integer, byte[]>() {
-
+                return new AbstractProcessor<Integer, byte[]>() {
                     @Override
                     public void init(ProcessorContext context) {
                     }
@@ -573,8 +573,7 @@ public class SimpleBenchmark {
         source.process(new ProcessorSupplier<Integer, byte[]>() {
             @Override
             public Processor<Integer, byte[]> get() {
-                return new Processor<Integer, byte[]>() {
-
+                return new AbstractProcessor<Integer, byte[]>() {
                     KeyValueStore<Integer, byte[]> store;
 
                     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index a4c008a..7fe5170 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -237,13 +237,13 @@ public class TopologyBuilderTest {
         builder.addStateStore(supplier);
         builder.setApplicationId("X");
 
-        assertEquals(0, builder.build(null).stateStoreSuppliers().size());
+        assertEquals(0, builder.build(null).stateStores().size());
 
         builder.addSource("source-1", "topic-1");
         builder.addProcessor("processor-1", new MockProcessorSupplier(), 
"source-1");
         builder.connectProcessorAndStateStores("processor-1", "store-1");
 
-        List<StateStoreSupplier> suppliers = 
builder.build(null).stateStoreSuppliers();
+        List<StateStore> suppliers = builder.build(null).stateStores();
         assertEquals(1, suppliers.size());
         assertEquals(supplier.name(), suppliers.get(0).name());
     }
@@ -471,7 +471,7 @@ public class TopologyBuilderTest {
         builder.setApplicationId("appId");
         builder.addSource("source", "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
-        builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 
3, false, null, null, true, Collections.<String, String>emptyMap()), 
"processor");
+        builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 
3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), 
false), "processor");
         final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
         final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = 
topicsInfo.stateChangelogTopics.get("appId-store-changelog");

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 32dce6d..5802b29 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -29,6 +29,7 @@ import 
org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockStateStoreSupplier;
@@ -209,7 +210,7 @@ public class ProcessorStateManagerTest {
     public void testNoTopic() throws IOException {
         MockStateStoreSupplier.MockStateStore mockStateStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new 
MockRestoreConsumer(), false, stateDirectory, null);
+        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new 
MockRestoreConsumer(), false, stateDirectory, null, Collections.<StateStore, 
ProcessorNode>emptyMap());
         try {
             stateMgr.register(mockStateStore, true, 
mockStateStore.stateRestoreCallback);
         } finally {
@@ -237,7 +238,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore persistentStore = new 
MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent 
store
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, 
false, stateDirectory, null);
+        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, 
false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
         try {
             restoreConsumer.reset();
 
@@ -286,7 +287,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore nonPersistentStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non 
persistent store
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, 
restoreConsumer, false, stateDirectory, null);
+        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, 
restoreConsumer, false, stateDirectory, null, Collections.<StateStore, 
ProcessorNode>emptyMap());
         try {
             restoreConsumer.reset();
 
@@ -359,7 +360,7 @@ public class ProcessorStateManagerTest {
         // if there is an source partition, inherit the partition id
         Set<TopicPartition> sourcePartitions = Utils.mkSet(new 
TopicPartition(storeTopicName3, 1));
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, 
true, stateDirectory, null); // standby
+        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, 
true, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap()); 
// standby
         try {
             restoreConsumer.reset();
 
@@ -393,7 +394,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore mockStateStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, 
restoreConsumer, false, stateDirectory, null);
+        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, 
restoreConsumer, false, stateDirectory, null, Collections.<StateStore, 
ProcessorNode>emptyMap());
         try {
             stateMgr.register(mockStateStore, true, 
mockStateStore.stateRestoreCallback);
 
@@ -431,7 +432,7 @@ public class ProcessorStateManagerTest {
         MockStateStoreSupplier.MockStateStore persistentStore = new 
MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
         MockStateStoreSupplier.MockStateStore nonPersistentStore = new 
MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, 
false, stateDirectory, null);
+        ProcessorStateManager stateMgr = new 
ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, 
false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
         try {
             // make sure the checkpoint file is deleted
             assertFalse(checkpointFile.exists());

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 09434c3..54ee43c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -210,6 +210,7 @@ public class ProcessorTopologyTest {
         assertNoOutputRecord(OUTPUT_TOPIC_1);
     }
 
+
     protected void assertNextOutputRecord(String topic, String key, String 
value) {
         ProducerRecord<String, String> record = driver.readOutput(topic, 
STRING_DESERIALIZER, STRING_DESERIALIZER);
         assertEquals(topic, record.topic());
@@ -269,6 +270,7 @@ public class ProcessorTopologyTest {
                                     .addSink("counts", OUTPUT_TOPIC_1, 
"processor");
     }
 
+
     protected TopologyBuilder createSimpleMultiSourceTopology(int partition) {
         return new TopologyBuilder().addSource("source-1", 
STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
                 .addProcessor("processor-1", define(new 
ForwardingProcessor()), "source-1")

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 2c7aaeb..63b2b75 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.junit.Test;
 
@@ -60,7 +60,7 @@ public class PunctuationQueueTest {
         assertEquals(2, processor.punctuatedAt.size());
     }
 
-    private static class TestProcessor implements Processor<String, String> {
+    private static class TestProcessor extends AbstractProcessor<String, 
String> {
 
         public final ArrayList<Long> punctuatedAt = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
new file mode 100644
index 0000000..3d5f3e3
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+public class RecordContextStub implements RecordContext {
+
+    private final long offset;
+    private final long timestamp;
+    private final int partition;
+    private final String topic;
+
+    public RecordContextStub() {
+        this(-1, -1, -1, "");
+    }
+    public RecordContextStub(final long offset, final long timestamp, final 
int partition, final String topic) {
+        this.offset = offset;
+        this.timestamp = timestamp;
+        this.partition = partition;
+        this.topic = topic;
+    }
+
+    @Override
+    public long offset() {
+        return offset;
+    }
+
+    @Override
+    public long timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public String topic() {
+        return topic;
+    }
+
+    @Override
+    public int partition() {
+        return partition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 11058c2..268697c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -29,7 +29,7 @@ import 
org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockStateStoreSupplier;
@@ -74,12 +74,12 @@ public class StandbyTaskTest {
             Collections.<ProcessorNode>emptyList(),
             Collections.<String, SourceNode>emptyMap(),
             Collections.<String, SinkNode>emptyMap(),
-            Utils.<StateStoreSupplier>mkList(
-                    new MockStateStoreSupplier(storeName1, false),
-                    new MockStateStoreSupplier(storeName2, true)
+            Utils.mkList(
+                    new MockStateStoreSupplier(storeName1, false).get(),
+                    new MockStateStoreSupplier(storeName2, true).get()
             ),
-            Collections.<String, String>emptyMap()
-    );
+            Collections.<String, String>emptyMap(),
+            Collections.<StateStore, ProcessorNode>emptyMap());
 
     private final TopicPartition ktable = new TopicPartition("ktable1", 0);
     private final Set<TopicPartition> ktablePartitions = Utils.mkSet(ktable);
@@ -87,15 +87,15 @@ public class StandbyTaskTest {
             Collections.<ProcessorNode>emptyList(),
             Collections.<String, SourceNode>emptyMap(),
             Collections.<String, SinkNode>emptyMap(),
-            Utils.<StateStoreSupplier>mkList(
-                    new MockStateStoreSupplier(ktable.topic(), true, false)
+            Utils.mkList(
+                    new MockStateStoreSupplier(ktable.topic(), true, 
false).get()
             ),
             new HashMap<String, String>() {
             {
                 put("ktable1", ktable.topic());
             }
-        }
-    );
+        },
+            Collections.<StateStore, ProcessorNode>emptyMap());
     private File baseDir;
     private StateDirectory stateDirectory;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 32d6aa4..2b05e80 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -30,7 +30,7 @@ import 
org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
@@ -75,9 +75,9 @@ public class StreamTaskTest {
                 }
             },
             Collections.<String, SinkNode>emptyMap(),
-            Collections.<StateStoreSupplier>emptyList(),
-            Collections.<String, String>emptyMap()
-    );
+            Collections.<StateStore>emptyList(),
+            Collections.<String, String>emptyMap(),
+            Collections.<StateStore, ProcessorNode>emptyMap());
     private File baseDir;
     private StateDirectory stateDirectory;
 
@@ -119,7 +119,7 @@ public class StreamTaskTest {
     @Test
     public void testProcessOrder() throws Exception {
         StreamsConfig config = createConfig(baseDir);
-        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", 
partitions, topology, consumer, producer, restoreStateConsumer, config, null, 
stateDirectory);
+        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", 
partitions, topology, consumer, producer, restoreStateConsumer, config, null, 
stateDirectory, null);
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
@@ -166,7 +166,7 @@ public class StreamTaskTest {
     @Test
     public void testPauseResume() throws Exception {
         StreamsConfig config = createConfig(baseDir);
-        StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", 
partitions, topology, consumer, producer, restoreStateConsumer, config, null, 
stateDirectory);
+        StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", 
partitions, topology, consumer, producer, restoreStateConsumer, config, null, 
stateDirectory, null);
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
@@ -225,7 +225,7 @@ public class StreamTaskTest {
     @Test
     public void testMaybePunctuate() throws Exception {
         StreamsConfig config = createConfig(baseDir);
-        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", 
partitions, topology, consumer, producer, restoreStateConsumer, config, null, 
stateDirectory);
+        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", 
partitions, topology, consumer, producer, restoreStateConsumer, config, null, 
stateDirectory, null);
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1da7592..ccbf8d6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -133,7 +133,7 @@ public class StreamThreadTest {
                               Consumer<byte[], byte[]> restoreConsumer,
                               StreamsConfig config,
                               StateDirectory stateDirectory) {
-            super(id, applicationId, partitions, topology, consumer, producer, 
restoreConsumer, config, null, stateDirectory);
+            super(id, applicationId, partitions, topology, consumer, producer, 
restoreConsumer, config, null, stateDirectory, null);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java 
b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index f1c237e..7ff738f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -43,7 +44,7 @@ public class SmokeTestUtil {
     public static <T> ProcessorSupplier<String, T> 
printProcessorSupplier(final String topic, final boolean printOffset) {
         return new ProcessorSupplier<String, T>() {
             public Processor<String, T> get() {
-                return new Processor<String, T>() {
+                return new AbstractProcessor<String, T>() {
                     private int numRecordsProcessed = 0;
                     private ProcessorContext context;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 140ea35..e84e9ba 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
@@ -178,6 +179,8 @@ public class KeyValueStoreTestDriver<K, V> {
     private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>();
     private final MockProcessorContext context;
     private final Map<String, StateStore> storeMap = new HashMap<>();
+    private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
+    private final ThreadCache cache = new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES);
     private final StreamsMetrics metrics = new StreamsMetrics() {
         @Override
         public Sensor addLatencySensor(String scopeName, String entityName, 
String operationName, String... tags) {
@@ -225,7 +228,7 @@ public class KeyValueStoreTestDriver<K, V> {
 
 
 
-        this.context = new MockProcessorContext(null, this.stateDir, 
serdes.keySerde(), serdes.valueSerde(), recordCollector) {
+        this.context = new MockProcessorContext(null, this.stateDir, 
serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
             @Override
             public TaskId taskId() {
                 return new TaskId(0, 1);
@@ -266,6 +269,10 @@ public class KeyValueStoreTestDriver<K, V> {
             public Map<String, Object> appConfigsWithPrefix(String prefix) {
                 return new StreamsConfig(props).originalsWithPrefix(prefix);
             }
+            @Override
+            public ThreadCache getCache() {
+                return cache;
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 700655e..13f718c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -34,7 +34,6 @@ import org.junit.Test;
 public abstract class AbstractKeyValueStoreTest {
 
 
-
     protected abstract <K, V> KeyValueStore<K, V> 
createKeyValueStore(ProcessorContext context,
                                                                       Class<K> 
keyClass, Class<V> valueClass,
                                                                       boolean 
useContextSerdes);

Reply via email to