http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 1a5de5f..37aac0c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -26,9 +26,11 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -81,40 +83,56 @@ public class KStreamWindowAggregateTest { MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + driver = new KStreamTestDriver(builder, baseDir); - driver.setTime(0L); + setRecordContext(0, topic1); driver.process(topic1, "A", "1"); - driver.setTime(1L); + driver.flushState(); + setRecordContext(1, topic1); driver.process(topic1, "B", "2"); - driver.setTime(2L); + driver.flushState(); + setRecordContext(2, topic1); driver.process(topic1, "C", "3"); - driver.setTime(3L); + driver.flushState(); + setRecordContext(3, topic1); driver.process(topic1, "D", "4"); - driver.setTime(4L); + driver.flushState(); + setRecordContext(4, topic1); driver.process(topic1, "A", "1"); + driver.flushState(); - driver.setTime(5L); + setRecordContext(5, topic1); driver.process(topic1, "A", "1"); - driver.setTime(6L); + driver.flushState(); + setRecordContext(6, topic1); driver.process(topic1, "B", "2"); - driver.setTime(7L); + driver.flushState(); + setRecordContext(7, topic1); driver.process(topic1, "D", "4"); - driver.setTime(8L); + driver.flushState(); + setRecordContext(8, topic1); driver.process(topic1, "B", "2"); - driver.setTime(9L); + driver.flushState(); + setRecordContext(9, topic1); driver.process(topic1, "C", "3"); - - driver.setTime(10L); + driver.flushState(); + setRecordContext(10, topic1); driver.process(topic1, "A", "1"); - driver.setTime(11L); + driver.flushState(); + setRecordContext(11, topic1); driver.process(topic1, "B", "2"); - driver.setTime(12L); + driver.flushState(); + setRecordContext(12, topic1); + driver.flushState(); driver.process(topic1, "D", "4"); - driver.setTime(13L); + driver.flushState(); + setRecordContext(13, topic1); driver.process(topic1, "B", "2"); - driver.setTime(14L); + driver.flushState(); + setRecordContext(14, topic1); driver.process(topic1, "C", "3"); + driver.flushState(); + assertEquals(Utils.mkList( "[A@0]:0+1", @@ -140,6 +158,10 @@ public class KStreamWindowAggregateTest { } } + private void setRecordContext(final long time, final String topic) { + ((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic)); + } + @Test public void testJoin() throws Exception { final File baseDir = Files.createTempDirectory("test").toFile(); @@ -180,18 +202,23 @@ public class KStreamWindowAggregateTest { } }).toStream().process(proc3); - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + driver = new KStreamTestDriver(builder, baseDir); - driver.setTime(0L); + setRecordContext(0, topic1); driver.process(topic1, "A", "1"); - driver.setTime(1L); + driver.flushState(); + setRecordContext(1, topic1); driver.process(topic1, "B", "2"); - driver.setTime(2L); + driver.flushState(); + setRecordContext(2, topic1); driver.process(topic1, "C", "3"); - driver.setTime(3L); + driver.flushState(); + setRecordContext(3, topic1); driver.process(topic1, "D", "4"); - driver.setTime(4L); + driver.flushState(); + setRecordContext(4, topic1); driver.process(topic1, "A", "1"); + driver.flushState(); proc1.checkAndClearProcessResult( "[A@0]:0+1", @@ -209,16 +236,21 @@ public class KStreamWindowAggregateTest { "[A@0]:null" ); - driver.setTime(5L); + setRecordContext(5, topic1); driver.process(topic1, "A", "1"); - driver.setTime(6L); + driver.flushState(); + setRecordContext(6, topic1); driver.process(topic1, "B", "2"); - driver.setTime(7L); + driver.flushState(); + setRecordContext(7, topic1); driver.process(topic1, "D", "4"); - driver.setTime(8L); + driver.flushState(); + setRecordContext(8, topic1); driver.process(topic1, "B", "2"); - driver.setTime(9L); + driver.flushState(); + setRecordContext(9, topic1); driver.process(topic1, "C", "3"); + driver.flushState(); proc1.checkAndClearProcessResult( "[A@0]:0+1+1+1", "[A@5]:0+1", @@ -236,16 +268,21 @@ public class KStreamWindowAggregateTest { "[C@0]:null", "[C@5]:null" ); - driver.setTime(0L); + setRecordContext(0, topic1); driver.process(topic2, "A", "a"); - driver.setTime(1L); + driver.flushState(); + setRecordContext(1, topic1); driver.process(topic2, "B", "b"); - driver.setTime(2L); + driver.flushState(); + setRecordContext(2, topic1); driver.process(topic2, "C", "c"); - driver.setTime(3L); + driver.flushState(); + setRecordContext(3, topic1); driver.process(topic2, "D", "d"); - driver.setTime(4L); + driver.flushState(); + setRecordContext(4, topic1); driver.process(topic2, "A", "a"); + driver.flushState(); proc1.checkAndClearProcessResult(); proc2.checkAndClearProcessResult( @@ -262,17 +299,21 @@ public class KStreamWindowAggregateTest { "[D@0]:0+4+4%0+d", "[A@0]:0+1+1+1%0+a+a"); - driver.setTime(5L); + setRecordContext(5, topic1); driver.process(topic2, "A", "a"); - driver.setTime(6L); + driver.flushState(); + setRecordContext(6, topic1); driver.process(topic2, "B", "b"); - driver.setTime(7L); + driver.flushState(); + setRecordContext(7, topic1); driver.process(topic2, "D", "d"); - driver.setTime(8L); + driver.flushState(); + setRecordContext(8, topic1); driver.process(topic2, "B", "b"); - driver.setTime(9L); + driver.flushState(); + setRecordContext(9, topic1); driver.process(topic2, "C", "c"); - + driver.flushState(); proc1.checkAndClearProcessResult(); proc2.checkAndClearProcessResult( "[A@0]:0+a+a+a", "[A@5]:0+a",
http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index a405da4..ba33d5c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -83,25 +83,63 @@ public class KTableAggregateTest { driver = new KStreamTestDriver(builder, stateDir); driver.process(topic1, "A", "1"); + driver.flushState(); driver.process(topic1, "B", "2"); + driver.flushState(); driver.process(topic1, "A", "3"); + driver.flushState(); driver.process(topic1, "B", "4"); + driver.flushState(); driver.process(topic1, "C", "5"); + driver.flushState(); driver.process(topic1, "D", "6"); + driver.flushState(); driver.process(topic1, "B", "7"); + driver.flushState(); driver.process(topic1, "C", "8"); + driver.flushState(); assertEquals(Utils.mkList( "A:0+1", "B:0+2", - "A:0+1-1", "A:0+1-1+3", - "B:0+2-2", "B:0+2-2+4", + "A:0+1-1+3", + "B:0+2-2+4", "C:0+5", "D:0+6", - "B:0+2-2+4-4", "B:0+2-2+4-4+7", - "C:0+5-5", "C:0+5-5+8"), proc.processed); + "B:0+2-2+4-4+7", + "C:0+5-5+8"), proc.processed); } + + @Test + public void testAggCoalesced() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + final String topic1 = "topic1"; + final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>(); + + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName"); + KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), + stringSerde, + stringSerde + ).aggregate(MockInitializer.STRING_INIT, + MockAggregator.STRING_ADDER, + MockAggregator.STRING_REMOVER, + stringSerde, + "topic1-Canonized"); + + table2.toStream().process(proc); + + driver = new KStreamTestDriver(builder, stateDir); + + driver.process(topic1, "A", "1"); + driver.process(topic1, "A", "3"); + driver.process(topic1, "A", "4"); + driver.flushState(); + assertEquals(Utils.mkList( + "A:0+4"), proc.processed); + } + + @Test public void testAggRepartition() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); @@ -135,13 +173,21 @@ public class KTableAggregateTest { driver = new KStreamTestDriver(builder, stateDir); driver.process(topic1, "A", "1"); + driver.flushState(); driver.process(topic1, "A", null); + driver.flushState(); driver.process(topic1, "A", "1"); + driver.flushState(); driver.process(topic1, "B", "2"); + driver.flushState(); driver.process(topic1, "null", "3"); + driver.flushState(); driver.process(topic1, "B", "4"); + driver.flushState(); driver.process(topic1, "NULL", "5"); + driver.flushState(); driver.process(topic1, "B", "7"); + driver.flushState(); assertEquals(Utils.mkList( "1:0+1", @@ -170,10 +216,17 @@ public class KTableAggregateTest { final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir); driver.process(input, "A", "green"); + driver.flushState(); driver.process(input, "B", "green"); + driver.flushState(); driver.process(input, "A", "blue"); + driver.flushState(); driver.process(input, "C", "yellow"); + driver.flushState(); driver.process(input, "D", "green"); + driver.flushState(); + driver.flushState(); + assertEquals(Utils.mkList( "green:1", @@ -183,6 +236,35 @@ public class KTableAggregateTest { "green:2" ), proc.processed); } + + @Test + public void testCountCoalesced() throws IOException { + final KStreamBuilder builder = new KStreamBuilder(); + final String input = "count-test-input"; + final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); + + builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName") + .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde) + .count("count") + .toStream() + .process(proc); + + final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir); + + driver.process(input, "A", "green"); + driver.process(input, "B", "green"); + driver.process(input, "A", "blue"); + driver.process(input, "C", "yellow"); + driver.process(input, "D", "green"); + driver.flushState(); + + + assertEquals(Utils.mkList( + "blue:1", + "yellow:1", + "green:2" + ), proc.processed); + } @Test public void testRemoveOldBeforeAddNew() throws IOException { @@ -216,22 +298,26 @@ public class KTableAggregateTest { public String apply(String key, String value, String aggregate) { return aggregate.replaceAll(value, ""); } - }, "someStore") + }, Serdes.String(), "someStore") .toStream() .process(proc); final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir); driver.process(input, "11", "A"); + driver.flushState(); driver.process(input, "12", "B"); + driver.flushState(); driver.process(input, "11", null); + driver.flushState(); driver.process(input, "12", "C"); + driver.flushState(); assertEquals(Utils.mkList( "1:1", "1:12", "1:2", - "1:", "1:2" + "1:2" ), proc.processed); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index 6837c56..277d6d2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -90,8 +90,10 @@ public class KTableFilterTest { driver.process(topic1, "B", 2); driver.process(topic1, "C", 3); driver.process(topic1, "D", 4); + driver.flushState(); driver.process(topic1, "A", null); driver.process(topic1, "B", null); + driver.flushState(); proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"); proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null"); @@ -203,24 +205,25 @@ public class KTableFilterTest { driver.process(topic1, "A", 1); driver.process(topic1, "B", 1); driver.process(topic1, "C", 1); + driver.flushState(); proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); driver.process(topic1, "A", 2); driver.process(topic1, "B", 2); - + driver.flushState(); proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); driver.process(topic1, "A", 3); - + driver.flushState(); proc1.checkAndClearProcessResult("A:(3<-null)"); proc2.checkAndClearProcessResult("A:(null<-null)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); - + driver.flushState(); proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); } @@ -254,24 +257,25 @@ public class KTableFilterTest { driver.process(topic1, "A", 1); driver.process(topic1, "B", 1); driver.process(topic1, "C", 1); + driver.flushState(); proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); proc2.checkEmptyAndClearProcessResult(); driver.process(topic1, "A", 2); driver.process(topic1, "B", 2); - + driver.flushState(); proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); driver.process(topic1, "A", 3); - + driver.flushState(); proc1.checkAndClearProcessResult("A:(3<-2)"); proc2.checkAndClearProcessResult("A:(null<-2)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); - + driver.flushState(); proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)"); proc2.checkAndClearProcessResult("B:(null<-2)"); } @@ -305,7 +309,7 @@ public class KTableFilterTest { driver.process(topic1, "A", "reject"); driver.process(topic1, "B", "reject"); driver.process(topic1, "C", "reject"); - + driver.flushState(); proc1.checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)"); proc2.checkEmptyAndClearProcessResult(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java index 6fbce82..791fa28 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java @@ -96,6 +96,7 @@ public class KTableForeachTest { for (KeyValue<Integer, String> record: inputRecords) { driver.process(topicName, record.key, record.value); } + driver.flushState(); assertEquals(expectedRecords.size(), actualRecords.size()); for (int i = 0; i < expectedRecords.size(); i++) { http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 7edaac9..4b9ea06 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -113,9 +113,14 @@ public class KTableImplTest { driver = new KStreamTestDriver(builder, stateDir); driver.process(topic1, "A", "01"); + driver.flushState(); driver.process(topic1, "B", "02"); + driver.flushState(); driver.process(topic1, "C", "03"); + driver.flushState(); driver.process(topic1, "D", "04"); + driver.flushState(); + driver.flushState(); assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc1.processed); assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed); @@ -173,6 +178,7 @@ public class KTableImplTest { driver.process(topic1, "A", "01"); driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); + driver.flushState(); assertEquals("01", getter1.get("A")); assertEquals("01", getter1.get("B")); @@ -192,6 +198,7 @@ public class KTableImplTest { driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); + driver.flushState(); assertEquals("02", getter1.get("A")); assertEquals("02", getter1.get("B")); @@ -210,6 +217,7 @@ public class KTableImplTest { assertEquals("01", getter4.get("C")); driver.process(topic1, "A", "03"); + driver.flushState(); assertEquals("03", getter1.get("A")); assertEquals("02", getter1.get("B")); @@ -228,11 +236,13 @@ public class KTableImplTest { assertEquals("01", getter4.get("C")); driver.process(topic1, "A", null); + driver.flushState(); assertNull(getter1.get("A")); assertEquals("02", getter1.get("B")); assertEquals("01", getter1.get("C")); + assertNull(getter2.get("A")); assertEquals(new Integer(2), getter2.get("B")); assertEquals(new Integer(1), getter2.get("C")); http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java index 3615b46..ba10668 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java @@ -103,6 +103,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } + driver.flushState(); processor.checkAndClearProcessResult("0:null", "1:null"); checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null)); @@ -112,6 +113,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } + driver.flushState(); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); @@ -121,6 +123,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } + driver.flushState(); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); @@ -129,6 +132,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } + driver.flushState(); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); @@ -138,6 +142,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } + driver.flushState(); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); @@ -147,6 +152,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], null); } + driver.flushState(); processor.checkAndClearProcessResult("0:null", "1:null"); checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3")); @@ -156,6 +162,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } + driver.flushState(); processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3"); checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); @@ -191,6 +198,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } + driver.flushState(); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); @@ -199,6 +207,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); @@ -207,6 +216,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)"); @@ -214,7 +224,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -222,7 +232,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -230,7 +240,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], null); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); // push all four items to the primary stream. this should produce four items. @@ -238,7 +248,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); } @@ -274,7 +284,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); // push two items to the other stream. this should produce two items. @@ -282,7 +292,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); // push all four items to the primary stream. this should produce four items. @@ -290,14 +300,14 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -305,7 +315,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -313,7 +323,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], null); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)"); // push all four items to the primary stream. this should produce four items. @@ -321,7 +331,7 @@ public class KTableKTableJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index ec07116..5f84678 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -99,6 +99,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } + driver.flushState(); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null)); @@ -108,7 +109,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); @@ -117,7 +118,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null")); @@ -125,7 +126,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); @@ -134,7 +135,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); @@ -143,7 +144,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], null); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); @@ -152,7 +153,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); } @@ -187,7 +188,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push two items to the other stream. this should produce two items. @@ -195,7 +196,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); // push all four items to the primary stream. this should produce four items. @@ -203,14 +204,14 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -218,7 +219,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -226,7 +227,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], null); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push all four items to the primary stream. this should produce four items. @@ -234,7 +235,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); } @@ -270,7 +271,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push two items to the other stream. this should produce two items. @@ -278,7 +279,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); // push all four items to the primary stream. this should produce four items. @@ -286,14 +287,14 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); // push all four items to the primary stream. this should produce four items. @@ -301,7 +302,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -309,7 +310,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], null); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); // push all four items to the primary stream. this should produce four items. @@ -317,7 +318,7 @@ public class KTableKTableLeftJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index 33dfb04..a6249bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -93,7 +93,6 @@ public class KTableKTableOuterJoinTest { KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier(); driver = new KStreamTestDriver(builder, stateDir); - driver.setTime(0L); KTableValueGetter<Integer, String> getter = getterSupplier.get(); getter.init(driver.context()); @@ -103,7 +102,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null)); @@ -112,7 +111,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); @@ -121,7 +120,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null")); @@ -129,7 +128,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); @@ -138,7 +137,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); @@ -147,7 +146,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], null); } - + driver.flushState(); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); @@ -156,7 +155,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - + driver.flushState(); processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); @@ -165,7 +164,7 @@ public class KTableKTableOuterJoinTest { for (int i = 1; i < 3; i++) { driver.process(topic1, expectedKeys[i], null); } - + driver.flushState(); processor.checkAndClearProcessResult("1:null", "2:null+YY2"); checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3")); } @@ -189,7 +188,6 @@ public class KTableKTableOuterJoinTest { builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name); driver = new KStreamTestDriver(builder, stateDir); - driver.setTime(0L); assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); @@ -200,7 +198,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push two items to the other stream. this should produce two items. @@ -208,7 +206,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); // push all four items to the primary stream. this should produce four items. @@ -216,14 +214,14 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -231,7 +229,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -239,7 +237,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], null); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push all four items to the primary stream. this should produce four items. @@ -247,7 +245,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); // push middle two items to the primary stream with null. this should produce two items. @@ -255,7 +253,7 @@ public class KTableKTableOuterJoinTest { for (int i = 1; i < 3; i++) { driver.process(topic1, expectedKeys[i], null); } - + driver.flushState(); proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)"); } @@ -280,7 +278,6 @@ public class KTableKTableOuterJoinTest { builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name); driver = new KStreamTestDriver(builder, stateDir); - driver.setTime(0L); assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); @@ -291,7 +288,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push two items to the other stream. this should produce two items. @@ -299,7 +296,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); // push all four items to the primary stream. this should produce four items. @@ -307,14 +304,14 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); // push all four items to the primary stream. this should produce four items. @@ -322,7 +319,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -330,7 +327,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], null); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); // push all four items to the primary stream. this should produce four items. @@ -338,7 +335,7 @@ public class KTableKTableOuterJoinTest { for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - + driver.flushState(); proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); // push middle two items to the primary stream with null. this should produce two items. @@ -346,7 +343,7 @@ public class KTableKTableOuterJoinTest { for (int i = 1; i < 3; i++) { driver.process(topic1, expectedKeys[i], null); } - + driver.flushState(); proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java index 78cff18..864b274 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java @@ -93,10 +93,10 @@ public class KTableMapKeysTest { convertedStream.process(processor); driver = new KStreamTestDriver(builder, stateDir); - for (int i = 0; i < originalKeys.length; i++) { driver.process(topic1, originalKeys[i], values[i]); } + driver.flushState(); assertEquals(3, processor.processed.size()); http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index 5739397..1d5b8a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -82,7 +82,7 @@ public class KTableMapValuesTest { driver.process(topic1, "B", "02"); driver.process(topic1, "C", "03"); driver.process(topic1, "D", "04"); - + driver.flushState(); assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed); } @@ -120,7 +120,6 @@ public class KTableMapValuesTest { KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); driver = new KStreamTestDriver(builder, stateDir, null, null); - KTableValueGetter<String, String> getter1 = getterSupplier1.get(); getter1.init(driver.context()); KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); @@ -133,6 +132,7 @@ public class KTableMapValuesTest { driver.process(topic1, "A", "01"); driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); + driver.flushState(); assertEquals("01", getter1.get("A")); assertEquals("01", getter1.get("B")); @@ -152,6 +152,7 @@ public class KTableMapValuesTest { driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); + driver.flushState(); assertEquals("02", getter1.get("A")); assertEquals("02", getter1.get("B")); @@ -170,6 +171,7 @@ public class KTableMapValuesTest { assertEquals("01", getter4.get("C")); driver.process(topic1, "A", "03"); + driver.flushState(); assertEquals("03", getter1.get("A")); assertEquals("02", getter1.get("B")); @@ -188,6 +190,7 @@ public class KTableMapValuesTest { assertEquals("01", getter4.get("C")); driver.process(topic1, "A", null); + driver.flushState(); assertNull(getter1.get("A")); assertEquals("02", getter1.get("B")); @@ -227,26 +230,29 @@ public class KTableMapValuesTest { builder.addProcessor("proc", proc, table2.name); driver = new KStreamTestDriver(builder, stateDir, null, null); - assertFalse(table1.sendingOldValueEnabled()); assertFalse(table2.sendingOldValueEnabled()); driver.process(topic1, "A", "01"); driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); + driver.flushState(); proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); + driver.flushState(); proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); driver.process(topic1, "A", "03"); + driver.flushState(); proc.checkAndClearProcessResult("A:(3<-null)"); driver.process(topic1, "A", null); + driver.flushState(); proc.checkAndClearProcessResult("A:(null<-null)"); } @@ -274,26 +280,29 @@ public class KTableMapValuesTest { builder.addProcessor("proc", proc, table2.name); driver = new KStreamTestDriver(builder, stateDir, null, null); - assertTrue(table1.sendingOldValueEnabled()); assertTrue(table2.sendingOldValueEnabled()); driver.process(topic1, "A", "01"); driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); + driver.flushState(); proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); + driver.flushState(); proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); driver.process(topic1, "A", "03"); + driver.flushState(); proc.checkAndClearProcessResult("A:(3<-2)"); driver.process(topic1, "A", null); + driver.flushState(); proc.checkAndClearProcessResult("A:(null<-3)"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index cd1262b..5602555 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -69,13 +69,14 @@ public class KTableSourceTest { table1.toStream().process(proc1); driver = new KStreamTestDriver(builder, stateDir); - driver.process(topic1, "A", 1); driver.process(topic1, "B", 2); driver.process(topic1, "C", 3); driver.process(topic1, "D", 4); + driver.flushState(); driver.process(topic1, "A", null); driver.process(topic1, "B", null); + driver.flushState(); assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), proc1.processed); } @@ -91,7 +92,6 @@ public class KTableSourceTest { KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); driver = new KStreamTestDriver(builder, stateDir, null, null); - KTableValueGetter<String, String> getter1 = getterSupplier1.get(); getter1.init(driver.context()); @@ -138,24 +138,27 @@ public class KTableSourceTest { builder.addProcessor("proc1", proc1, table1.name); driver = new KStreamTestDriver(builder, stateDir, null, null); - driver.process(topic1, "A", "01"); driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); + driver.flushState(); proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); + driver.flushState(); proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)"); driver.process(topic1, "A", "03"); + driver.flushState(); proc1.checkAndClearProcessResult("A:(03<-null)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); + driver.flushState(); proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); } @@ -181,20 +184,24 @@ public class KTableSourceTest { driver.process(topic1, "A", "01"); driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); + driver.flushState(); proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); + driver.flushState(); proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)"); driver.process(topic1, "A", "03"); + driver.flushState(); proc1.checkAndClearProcessResult("A:(03<-02)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); + driver.flushState(); proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 5495416..630167a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.Processor; @@ -435,7 +436,7 @@ public class SimpleBenchmark { source.process(new ProcessorSupplier<Integer, byte[]>() { @Override public Processor<Integer, byte[]> get() { - return new Processor<Integer, byte[]>() { + return new AbstractProcessor<Integer, byte[]>() { @Override public void init(ProcessorContext context) { @@ -479,8 +480,7 @@ public class SimpleBenchmark { source.process(new ProcessorSupplier<Integer, byte[]>() { @Override public Processor<Integer, byte[]> get() { - return new Processor<Integer, byte[]>() { - + return new AbstractProcessor<Integer, byte[]>() { @Override public void init(ProcessorContext context) { } @@ -573,8 +573,7 @@ public class SimpleBenchmark { source.process(new ProcessorSupplier<Integer, byte[]>() { @Override public Processor<Integer, byte[]> get() { - return new Processor<Integer, byte[]>() { - + return new AbstractProcessor<Integer, byte[]>() { KeyValueStore<Integer, byte[]> store; @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index a4c008a..7fe5170 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -237,13 +237,13 @@ public class TopologyBuilderTest { builder.addStateStore(supplier); builder.setApplicationId("X"); - assertEquals(0, builder.build(null).stateStoreSuppliers().size()); + assertEquals(0, builder.build(null).stateStores().size()); builder.addSource("source-1", "topic-1"); builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); builder.connectProcessorAndStateStores("processor-1", "store-1"); - List<StateStoreSupplier> suppliers = builder.build(null).stateStoreSuppliers(); + List<StateStore> suppliers = builder.build(null).stateStores(); assertEquals(1, suppliers.size()); assertEquals(supplier.name(), suppliers.get(0).name()); } @@ -471,7 +471,7 @@ public class TopologyBuilderTest { builder.setApplicationId("appId"); builder.addSource("source", "topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, true, Collections.<String, String>emptyMap()), "processor"); + builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), false), "processor"); final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); final TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 32dce6d..5802b29 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockStateStoreSupplier; @@ -209,7 +210,7 @@ public class ProcessorStateManagerTest { public void testNoTopic() throws IOException { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap()); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); } finally { @@ -237,7 +238,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap()); try { restoreConsumer.reset(); @@ -286,7 +287,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, null); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap()); try { restoreConsumer.reset(); @@ -359,7 +360,7 @@ public class ProcessorStateManagerTest { // if there is an source partition, inherit the partition id Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, null); // standby + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap()); // standby try { restoreConsumer.reset(); @@ -393,7 +394,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, null); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap()); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); @@ -431,7 +432,7 @@ public class ProcessorStateManagerTest { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap()); try { // make sure the checkpoint file is deleted assertFalse(checkpointFile.exists()); http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 09434c3..54ee43c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -210,6 +210,7 @@ public class ProcessorTopologyTest { assertNoOutputRecord(OUTPUT_TOPIC_1); } + protected void assertNextOutputRecord(String topic, String key, String value) { ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER); assertEquals(topic, record.topic()); @@ -269,6 +270,7 @@ public class ProcessorTopologyTest { .addSink("counts", OUTPUT_TOPIC_1, "processor"); } + protected TopologyBuilder createSimpleMultiSourceTopology(int partition) { return new TopologyBuilder().addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1") http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java index 2c7aaeb..63b2b75 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; import org.junit.Test; @@ -60,7 +60,7 @@ public class PunctuationQueueTest { assertEquals(2, processor.punctuatedAt.size()); } - private static class TestProcessor implements Processor<String, String> { + private static class TestProcessor extends AbstractProcessor<String, String> { public final ArrayList<Long> punctuatedAt = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java new file mode 100644 index 0000000..3d5f3e3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +public class RecordContextStub implements RecordContext { + + private final long offset; + private final long timestamp; + private final int partition; + private final String topic; + + public RecordContextStub() { + this(-1, -1, -1, ""); + } + public RecordContextStub(final long offset, final long timestamp, final int partition, final String topic) { + this.offset = offset; + this.timestamp = timestamp; + this.partition = partition; + this.topic = topic; + } + + @Override + public long offset() { + return offset; + } + + @Override + public long timestamp() { + return timestamp; + } + + @Override + public String topic() { + return topic; + } + + @Override + public int partition() { + return partition; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 11058c2..268697c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockStateStoreSupplier; @@ -74,12 +74,12 @@ public class StandbyTaskTest { Collections.<ProcessorNode>emptyList(), Collections.<String, SourceNode>emptyMap(), Collections.<String, SinkNode>emptyMap(), - Utils.<StateStoreSupplier>mkList( - new MockStateStoreSupplier(storeName1, false), - new MockStateStoreSupplier(storeName2, true) + Utils.mkList( + new MockStateStoreSupplier(storeName1, false).get(), + new MockStateStoreSupplier(storeName2, true).get() ), - Collections.<String, String>emptyMap() - ); + Collections.<String, String>emptyMap(), + Collections.<StateStore, ProcessorNode>emptyMap()); private final TopicPartition ktable = new TopicPartition("ktable1", 0); private final Set<TopicPartition> ktablePartitions = Utils.mkSet(ktable); @@ -87,15 +87,15 @@ public class StandbyTaskTest { Collections.<ProcessorNode>emptyList(), Collections.<String, SourceNode>emptyMap(), Collections.<String, SinkNode>emptyMap(), - Utils.<StateStoreSupplier>mkList( - new MockStateStoreSupplier(ktable.topic(), true, false) + Utils.mkList( + new MockStateStoreSupplier(ktable.topic(), true, false).get() ), new HashMap<String, String>() { { put("ktable1", ktable.topic()); } - } - ); + }, + Collections.<StateStore, ProcessorNode>emptyMap()); private File baseDir; private StateDirectory stateDirectory; http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 32d6aa4..2b05e80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -30,7 +30,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; @@ -75,9 +75,9 @@ public class StreamTaskTest { } }, Collections.<String, SinkNode>emptyMap(), - Collections.<StateStoreSupplier>emptyList(), - Collections.<String, String>emptyMap() - ); + Collections.<StateStore>emptyList(), + Collections.<String, String>emptyMap(), + Collections.<StateStore, ProcessorNode>emptyMap()); private File baseDir; private StateDirectory stateDirectory; @@ -119,7 +119,7 @@ public class StreamTaskTest { @Test public void testProcessOrder() throws Exception { StreamsConfig config = createConfig(baseDir); - StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory); + StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, null); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -166,7 +166,7 @@ public class StreamTaskTest { @Test public void testPauseResume() throws Exception { StreamsConfig config = createConfig(baseDir); - StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory); + StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, null); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -225,7 +225,7 @@ public class StreamTaskTest { @Test public void testMaybePunctuate() throws Exception { StreamsConfig config = createConfig(baseDir); - StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory); + StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, null); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 1da7592..ccbf8d6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -133,7 +133,7 @@ public class StreamThreadTest { Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config, StateDirectory stateDirectory) { - super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, null, stateDirectory); + super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, null, stateDirectory, null); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java index f1c237e..7ff738f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -43,7 +44,7 @@ public class SmokeTestUtil { public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic, final boolean printOffset) { return new ProcessorSupplier<String, T>() { public Processor<String, T> get() { - return new Processor<String, T>() { + return new AbstractProcessor<String, T>() { private int numRecordsProcessed = 0; private ProcessorContext context; http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 140ea35..e84e9ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; @@ -178,6 +179,8 @@ public class KeyValueStoreTestDriver<K, V> { private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>(); private final MockProcessorContext context; private final Map<String, StateStore> storeMap = new HashMap<>(); + private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L; + private final ThreadCache cache = new ThreadCache(DEFAULT_CACHE_SIZE_BYTES); private final StreamsMetrics metrics = new StreamsMetrics() { @Override public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) { @@ -225,7 +228,7 @@ public class KeyValueStoreTestDriver<K, V> { - this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector) { + this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) { @Override public TaskId taskId() { return new TaskId(0, 1); @@ -266,6 +269,10 @@ public class KeyValueStoreTestDriver<K, V> { public Map<String, Object> appConfigsWithPrefix(String prefix) { return new StreamsConfig(props).originalsWithPrefix(prefix); } + @Override + public ThreadCache getCache() { + return cache; + } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/86aa0eb0/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 700655e..13f718c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -34,7 +34,6 @@ import org.junit.Test; public abstract class AbstractKeyValueStoreTest { - protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass, boolean useContextSerdes);