[ https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511455#comment-16511455 ]
ASF GitHub Bot commented on KAFKA-6474: --------------------------------------- guozhangwang closed pull request #4986: KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2] URL: https://github.com/apache/kafka/pull/4986 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java new file mode 100644 index 00000000000..bec4b5f79ff --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorNode; + +import java.util.Properties; + +/** + * This class provides access to {@link TopologyTestDriver} protected methods. + * It should only be used for internal testing, in the rare occasions where the + * necessary functionality is not supported by {@link TopologyTestDriver}. + */ +public class TopologyTestDriverWrapper extends TopologyTestDriver { + + + public TopologyTestDriverWrapper(final Topology topology, + final Properties config) { + super(topology, config); + } + + /** + * Get the processor context, setting the processor whose name is given as current node + * + * @param processorName processor name to set as current node + * @return the processor context + */ + public ProcessorContext setCurrentNodeForProcessorContext(final String processorName) { + final ProcessorContext context = task.context(); + ((ProcessorContextImpl) context).setCurrentNode(getProcessor(processorName)); + return context; + } + + /** + * Get a processor by name + * + * @param name the name to search for + * @return the processor matching the search name + */ + public ProcessorNode getProcessor(final String name) { + for (final ProcessorNode node : processorTopology.processors()) { + if (node.name().equals(name)) { + return node; + } + } + for (final ProcessorNode node : globalTopology.processors()) { + if (node.name().equals(name)) { + return node; + } + } + throw new StreamsException("Could not find a processor named '" + name + "'"); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index c37078df99c..2cf192b9f45 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -16,45 +16,40 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyTestDriverWrapper; +import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; -import org.apache.kafka.test.MockMapper; -import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.Rule; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; -import java.io.File; import java.util.List; +import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; public class KTableFilterTest { - final private Serde<Integer> intSerde = Serdes.Integer(); - final private Serde<String> stringSerde = Serdes.String(); - private final Consumed<String, Integer> consumed = Consumed.with(stringSerde, intSerde); - @Rule - public final KStreamTestDriver driver = new KStreamTestDriver(); - private File stateDir = null; - - @Before - public void setUp() { - stateDir = TestUtils.tempDirectory("kafka-test"); - } + private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer()); + private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer()); + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer()); private void doTestKTable(final StreamsBuilder builder, final KTable<String, Integer> table2, @@ -64,16 +59,14 @@ private void doTestKTable(final StreamsBuilder builder, table2.toStream().process(supplier); table3.toStream().process(supplier); - driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer()); - - driver.process(topic, "A", 1); - driver.process(topic, "B", 2); - driver.process(topic, "C", 3); - driver.process(topic, "D", 4); - driver.flushState(); - driver.process(topic, "A", null); - driver.process(topic, "B", null); - driver.flushState(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + driver.pipeInput(recordFactory.create(topic, "A", 1)); + driver.pipeInput(recordFactory.create(topic, "B", 2)); + driver.pipeInput(recordFactory.create(topic, "C", 3)); + driver.pipeInput(recordFactory.create(topic, "D", 4)); + driver.pipeInput(recordFactory.create(topic, "A", null)); + driver.pipeInput(recordFactory.create(topic, "B", null)); + } final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2); @@ -136,63 +129,68 @@ private void doTestValueGetter(final StreamsBuilder builder, final KTableImpl<String, Integer, Integer> table2, final KTableImpl<String, Integer, Integer> table3, final String topic1) { + + final Topology topology = builder.build(); + KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); - driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer()); + final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); + topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames()); + topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames()); - KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); - KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); + try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) { - getter2.init(driver.context()); - getter3.init(driver.context()); + KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); + KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); - driver.process(topic1, "A", 1); - driver.process(topic1, "B", 1); - driver.process(topic1, "C", 1); + getter2.init(driver.setCurrentNodeForProcessorContext(table2.name)); + getter3.init(driver.setCurrentNodeForProcessorContext(table3.name)); - assertNull(getter2.get("A")); - assertNull(getter2.get("B")); - assertNull(getter2.get("C")); + driver.pipeInput(recordFactory.create(topic1, "A", 1)); + driver.pipeInput(recordFactory.create(topic1, "B", 1)); + driver.pipeInput(recordFactory.create(topic1, "C", 1)); - assertEquals(1, (int) getter3.get("A")); - assertEquals(1, (int) getter3.get("B")); - assertEquals(1, (int) getter3.get("C")); + assertNull(getter2.get("A")); + assertNull(getter2.get("B")); + assertNull(getter2.get("C")); - driver.process(topic1, "A", 2); - driver.process(topic1, "B", 2); - driver.flushState(); + assertEquals(1, (int) getter3.get("A")); + assertEquals(1, (int) getter3.get("B")); + assertEquals(1, (int) getter3.get("C")); - assertEquals(2, (int) getter2.get("A")); - assertEquals(2, (int) getter2.get("B")); - assertNull(getter2.get("C")); + driver.pipeInput(recordFactory.create(topic1, "A", 2)); + driver.pipeInput(recordFactory.create(topic1, "B", 2)); - assertNull(getter3.get("A")); - assertNull(getter3.get("B")); - assertEquals(1, (int) getter3.get("C")); + assertEquals(2, (int) getter2.get("A")); + assertEquals(2, (int) getter2.get("B")); + assertNull(getter2.get("C")); - driver.process(topic1, "A", 3); - driver.flushState(); + assertNull(getter3.get("A")); + assertNull(getter3.get("B")); + assertEquals(1, (int) getter3.get("C")); - assertNull(getter2.get("A")); - assertEquals(2, (int) getter2.get("B")); - assertNull(getter2.get("C")); + driver.pipeInput(recordFactory.create(topic1, "A", 3)); - assertEquals(3, (int) getter3.get("A")); - assertNull(getter3.get("B")); - assertEquals(1, (int) getter3.get("C")); + assertNull(getter2.get("A")); + assertEquals(2, (int) getter2.get("B")); + assertNull(getter2.get("C")); - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); - driver.flushState(); + assertEquals(3, (int) getter3.get("A")); + assertNull(getter3.get("B")); + assertEquals(1, (int) getter3.get("C")); - assertNull(getter2.get("A")); - assertNull(getter2.get("B")); - assertNull(getter2.get("C")); + driver.pipeInput(recordFactory.create(topic1, "A", null)); + driver.pipeInput(recordFactory.create(topic1, "B", null)); - assertNull(getter3.get("A")); - assertNull(getter3.get("B")); - assertEquals(1, (int) getter3.get("C")); + assertNull(getter2.get("A")); + assertNull(getter2.get("B")); + assertNull(getter2.get("C")); + + assertNull(getter3.get("A")); + assertNull(getter3.get("B")); + assertEquals(1, (int) getter3.get("C")); + } } @Test @@ -259,34 +257,34 @@ private void doTestNotSendingOldValue(final StreamsBuilder builder, builder.build().addProcessor("proc1", supplier, table1.name); builder.build().addProcessor("proc2", supplier, table2.name); - driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer()); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { - driver.process(topic1, "A", 1); - driver.process(topic1, "B", 1); - driver.process(topic1, "C", 1); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", 1)); + driver.pipeInput(recordFactory.create(topic1, "B", 1)); + driver.pipeInput(recordFactory.create(topic1, "C", 1)); - final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2); + final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2); + + processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); - processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); - - driver.process(topic1, "A", 2); - driver.process(topic1, "B", 2); - driver.flushState(); - processors.get(0).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); - processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); - - driver.process(topic1, "A", 3); - driver.flushState(); - processors.get(0).checkAndClearProcessResult("A:(3<-null)"); - processors.get(1).checkAndClearProcessResult("A:(null<-null)"); - - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); - driver.flushState(); - processors.get(0).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); - processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + driver.pipeInput(recordFactory.create(topic1, "A", 2)); + driver.pipeInput(recordFactory.create(topic1, "B", 2)); + + processors.get(0).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + + driver.pipeInput(recordFactory.create(topic1, "A", 3)); + + processors.get(0).checkAndClearProcessResult("A:(3<-null)"); + processors.get(1).checkAndClearProcessResult("A:(null<-null)"); + + driver.pipeInput(recordFactory.create(topic1, "A", null)); + driver.pipeInput(recordFactory.create(topic1, "B", null)); + + processors.get(0).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + } } @@ -340,34 +338,34 @@ private void doTestSendingOldValue(final StreamsBuilder builder, topology.addProcessor("proc1", supplier, table1.name); topology.addProcessor("proc2", supplier, table2.name); - driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer()); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { - driver.process(topic1, "A", 1); - driver.process(topic1, "B", 1); - driver.process(topic1, "C", 1); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", 1)); + driver.pipeInput(recordFactory.create(topic1, "B", 1)); + driver.pipeInput(recordFactory.create(topic1, "C", 1)); - final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2); + final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2); - processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - processors.get(1).checkEmptyAndClearProcessResult(); + processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + processors.get(1).checkEmptyAndClearProcessResult(); + + driver.pipeInput(recordFactory.create(topic1, "A", 2)); + driver.pipeInput(recordFactory.create(topic1, "B", 2)); + + processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); + processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + + driver.pipeInput(recordFactory.create(topic1, "A", 3)); + + processors.get(0).checkAndClearProcessResult("A:(3<-2)"); + processors.get(1).checkAndClearProcessResult("A:(null<-2)"); + + driver.pipeInput(recordFactory.create(topic1, "A", null)); + driver.pipeInput(recordFactory.create(topic1, "B", null)); - driver.process(topic1, "A", 2); - driver.process(topic1, "B", 2); - driver.flushState(); - processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); - processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); - - driver.process(topic1, "A", 3); - driver.flushState(); - processors.get(0).checkAndClearProcessResult("A:(3<-2)"); - processors.get(1).checkAndClearProcessResult("A:(null<-2)"); - - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); - driver.flushState(); - processors.get(0).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)"); - processors.get(1).checkAndClearProcessResult("B:(null<-2)"); + processors.get(0).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)"); + processors.get(1).checkAndClearProcessResult("B:(null<-2)"); + } } @Test @@ -418,12 +416,13 @@ private void doTestSkipNullOnMaterialization(final StreamsBuilder builder, topology.addProcessor("proc1", supplier, table1.name); topology.addProcessor("proc2", supplier, table2.name); - driver.setUp(builder, stateDir, stringSerde, stringSerde); + final ConsumerRecordFactory<String, String> stringRecordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { - driver.process(topic1, "A", "reject"); - driver.process(topic1, "B", "reject"); - driver.process(topic1, "C", "reject"); - driver.flushState(); + driver.pipeInput(stringRecordFactory.create(topic1, "A", "reject")); + driver.pipeInput(stringRecordFactory.create(topic1, "B", "reject")); + driver.pipeInput(stringRecordFactory.create(topic1, "C", "reject")); + } final List<MockProcessor<String, String>> processors = supplier.capturedProcessors(2); processors.get(0).checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)"); @@ -437,7 +436,7 @@ public void testSkipNullOnMaterialization() { String topic1 = "topic1"; - final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); + final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String()); KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, consumed); KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter( @@ -459,7 +458,7 @@ public void testQueryableSkipNullOnMaterialization() { String topic1 = "topic1"; - final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); + final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String()); KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, consumed); KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter( diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 399e519bb4c..0d194e44816 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -16,12 +16,17 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyTestDriverWrapper; +import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; @@ -30,10 +35,11 @@ import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockMapper; @@ -41,36 +47,31 @@ import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.MockValueJoiner; -import org.apache.kafka.test.TestUtils; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import java.io.File; import java.lang.reflect.Field; import java.util.List; +import java.util.Properties; import static org.easymock.EasyMock.mock; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; public class KTableImplTest { - private final Serde<String> stringSerde = Serdes.String(); - private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); - private final Produced<String, String> produced = Produced.with(stringSerde, stringSerde); + private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String()); + private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String()); + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); - @Rule - public final KStreamTestDriver driver = new KStreamTestDriver(); - private File stateDir = null; private StreamsBuilder builder; private KTable<String, String> table; @Before public void setUp() { - stateDir = TestUtils.tempDirectory("kafka-test"); builder = new StreamsBuilder(); table = builder.table("test"); } @@ -110,17 +111,12 @@ public boolean test(String key, Integer value) { table4.toStream().process(supplier); - driver.setUp(builder, stateDir); - - driver.process(topic1, "A", "01"); - driver.flushState(); - driver.process(topic1, "B", "02"); - driver.flushState(); - driver.process(topic1, "C", "03"); - driver.flushState(); - driver.process(topic1, "D", "04"); - driver.flushState(); - driver.flushState(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + driver.pipeInput(recordFactory.create(topic1, "A", "01")); + driver.pipeInput(recordFactory.create(topic1, "B", "02")); + driver.pipeInput(recordFactory.create(topic1, "C", "03")); + driver.pipeInput(recordFactory.create(topic1, "D", "04")); + } final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4); assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), processors.get(0).processed); @@ -156,104 +152,109 @@ public boolean test(String key, Integer value) { table1.toStream().to(topic2, produced); final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed); + final Topology topology = builder.build(); + final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); - driver.setUp(builder, stateDir, null, null); + final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); + topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames()); + topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames()); + topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames()); + topologyBuilder.connectProcessorAndStateStores(table4.name, getterSupplier4.storeNames()); + + try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) { - // two state store should be created - assertEquals(2, driver.allStateStores().size()); + assertEquals(2, driver.getAllStateStores().size()); - final KTableValueGetter<String, String> getter1 = getterSupplier1.get(); - getter1.init(driver.context()); - final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); - getter2.init(driver.context()); - final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); - getter3.init(driver.context()); - final KTableValueGetter<String, String> getter4 = getterSupplier4.get(); - getter4.init(driver.context()); + final KTableValueGetter<String, String> getter1 = getterSupplier1.get(); + final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); + final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); + final KTableValueGetter<String, String> getter4 = getterSupplier4.get(); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - driver.flushState(); + getter1.init(driver.setCurrentNodeForProcessorContext(table1.name)); + getter2.init(driver.setCurrentNodeForProcessorContext(table2.name)); + getter3.init(driver.setCurrentNodeForProcessorContext(table3.name)); + getter4.init(driver.setCurrentNodeForProcessorContext(table4.name)); - assertEquals("01", getter1.get("A")); - assertEquals("01", getter1.get("B")); - assertEquals("01", getter1.get("C")); + driver.pipeInput(recordFactory.create(topic1, "A", "01")); + driver.pipeInput(recordFactory.create(topic1, "B", "01")); + driver.pipeInput(recordFactory.create(topic1, "C", "01")); - assertEquals(new Integer(1), getter2.get("A")); - assertEquals(new Integer(1), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); + assertEquals("01", getter1.get("A")); + assertEquals("01", getter1.get("B")); + assertEquals("01", getter1.get("C")); - assertNull(getter3.get("A")); - assertNull(getter3.get("B")); - assertNull(getter3.get("C")); + assertEquals(new Integer(1), getter2.get("A")); + assertEquals(new Integer(1), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); - assertEquals("01", getter4.get("A")); - assertEquals("01", getter4.get("B")); - assertEquals("01", getter4.get("C")); + assertNull(getter3.get("A")); + assertNull(getter3.get("B")); + assertNull(getter3.get("C")); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - driver.flushState(); + assertEquals("01", getter4.get("A")); + assertEquals("01", getter4.get("B")); + assertEquals("01", getter4.get("C")); - assertEquals("02", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); + driver.pipeInput(recordFactory.create(topic1, "A", "02")); + driver.pipeInput(recordFactory.create(topic1, "B", "02")); - assertEquals(new Integer(2), getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); + assertEquals("02", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); - assertEquals(new Integer(2), getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); + assertEquals(new Integer(2), getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); - assertEquals("02", getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); + assertEquals(new Integer(2), getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); - driver.process(topic1, "A", "03"); - driver.flushState(); + assertEquals("02", getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); - assertEquals("03", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); + driver.pipeInput(recordFactory.create(topic1, "A", "03")); - assertEquals(new Integer(3), getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); + assertEquals("03", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); - assertNull(getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); + assertEquals(new Integer(3), getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); - assertEquals("03", getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); + assertNull(getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); - driver.process(topic1, "A", null); - driver.flushState(); + assertEquals("03", getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); - assertNull(getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); + driver.pipeInput(recordFactory.create(topic1, "A", (String) null)); + assertNull(getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); - assertNull(getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - assertNull(getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); + assertNull(getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); - assertNull(getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); + assertNull(getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertNull(getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + } } @Test @@ -282,11 +283,9 @@ public boolean test(String key, Integer value) { } }); - driver.setUp(builder, stateDir, null, null); - driver.setTime(0L); - - // two state stores should be created - assertEquals(2, driver.allStateStores().size()); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + assertEquals(2, driver.getAllStateStores().size()); + } } @Test @@ -323,15 +322,25 @@ public String apply(String v1, Integer v2) { } }); - driver.setUp(builder, stateDir, null, null); - driver.setTime(0L); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + assertEquals(2, driver.getAllStateStores().size()); + } + } - // two state store should be created - assertEquals(2, driver.allStateStores().size()); + private void assertTopologyContainsProcessor(final Topology topology, final String processorName) { + for (final TopologyDescription.Subtopology subtopology: topology.describe().subtopologies()) { + for (final TopologyDescription.Node node: subtopology.nodes()) { + if (node.name().equals(processorName)) { + return; + } + } + } + throw new AssertionError("No processor named '" + processorName + "'" + + "found in the provided Topology:\n" + topology.describe()); } @Test - public void testRepartition() throws NoSuchFieldException, IllegalAccessException { + public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws NoSuchFieldException, IllegalAccessException { final String topic1 = "topic1"; final String storeName1 = "storeName1"; @@ -341,8 +350,8 @@ public void testRepartition() throws NoSuchFieldException, IllegalAccessExceptio (KTableImpl<String, String, String>) builder.table(topic1, consumed, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1) - .withKeySerde(stringSerde) - .withValueSerde(stringSerde) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) ); table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper()) @@ -352,27 +361,26 @@ public void testRepartition() throws NoSuchFieldException, IllegalAccessExceptio table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper()) .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("mock-result2")); - driver.setUp(builder, stateDir, stringSerde, stringSerde); - driver.setTime(0L); + final Topology topology = builder.build(); + try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) { - // three state store should be created, one for source, one for aggregate and one for reduce - assertEquals(3, driver.allStateStores().size()); + assertEquals(3, driver.getAllStateStores().size()); - // contains the corresponding repartition source / sink nodes - assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003")); - assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004")); - assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007")); - assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008")); + assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000003"); + assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000004"); + assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000007"); + assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000008"); - Field valSerializerField = ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer"); - Field valDeserializerField = ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer"); - valSerializerField.setAccessible(true); - valDeserializerField.setAccessible(true); + Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer"); + Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer"); + valSerializerField.setAccessible(true); + valDeserializerField.setAccessible(true); - assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000003"))).inner()); - assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000004"))).inner()); - assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000007"))).inner()); - assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000008"))).inner()); + assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner()); + assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner()); + assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner()); + assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner()); + } } @Test(expected = NullPointerException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index a54e43e92c3..a01d5cb215a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -16,27 +16,30 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyTestDriverWrapper; +import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.Rule; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; -import java.io.File; +import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -45,27 +48,19 @@ public class KTableMapValuesTest { - private final Serde<String> stringSerde = Serdes.String(); - private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde); - private final Produced<String, String> produced = Produced.with(stringSerde, stringSerde); - @Rule - public final KStreamTestDriver driver = new KStreamTestDriver(); - private File stateDir = null; - - @Before - public void setUp() { - stateDir = TestUtils.tempDirectory("kafka-test"); - } + private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String()); + private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String()); + private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> supplier) { - driver.setUp(builder, stateDir, Serdes.String(), Serdes.String()); - - driver.process(topic1, "A", "1"); - driver.process(topic1, "B", "2"); - driver.process(topic1, "C", "3"); - driver.process(topic1, "D", "4"); - driver.flushState(); - assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), supplier.theCapturedProcessor().processed); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + driver.pipeInput(recordFactory.create(topic1, "A", "1")); + driver.pipeInput(recordFactory.create(topic1, "B", "2")); + driver.pipeInput(recordFactory.create(topic1, "C", "3")); + driver.pipeInput(recordFactory.create(topic1, "D", "4")); + assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), supplier.theCapturedProcessor().processed); + } } @Test @@ -114,99 +109,106 @@ private void doTestValueGetter(final StreamsBuilder builder, final KTableImpl<String, String, Integer> table2, final KTableImpl<String, Integer, Integer> table3, final KTableImpl<String, String, String> table4) { + + final Topology topology = builder.build(); + final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); - driver.setUp(builder, stateDir, Serdes.String(), Serdes.String()); - final KTableValueGetter<String, String> getter1 = getterSupplier1.get(); - getter1.init(driver.context()); - final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); - getter2.init(driver.context()); - final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); - getter3.init(driver.context()); - final KTableValueGetter<String, String> getter4 = getterSupplier4.get(); - getter4.init(driver.context()); - - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - driver.flushState(); - - assertEquals("01", getter1.get("A")); - assertEquals("01", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(1), getter2.get("A")); - assertEquals(new Integer(1), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertNull(getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("01", getter4.get("A")); - assertEquals("01", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - driver.flushState(); - - assertEquals("02", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(2), getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertEquals(new Integer(2), getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("02", getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", "03"); - driver.flushState(); - - assertEquals("03", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(3), getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("03", getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", null); - driver.flushState(); - - assertNull(getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertNull(getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertNull(getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); + final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); + topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames()); + topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames()); + topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames()); + topologyBuilder.connectProcessorAndStateStores(table4.name, getterSupplier4.storeNames()); + + try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), props)) { + KTableValueGetter<String, String> getter1 = getterSupplier1.get(); + KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); + KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); + KTableValueGetter<String, String> getter4 = getterSupplier4.get(); + + getter1.init(driver.setCurrentNodeForProcessorContext(table1.name)); + getter2.init(driver.setCurrentNodeForProcessorContext(table2.name)); + getter3.init(driver.setCurrentNodeForProcessorContext(table3.name)); + getter4.init(driver.setCurrentNodeForProcessorContext(table4.name)); + + driver.pipeInput(recordFactory.create(topic1, "A", "01")); + driver.pipeInput(recordFactory.create(topic1, "B", "01")); + driver.pipeInput(recordFactory.create(topic1, "C", "01")); + + assertEquals("01", getter1.get("A")); + assertEquals("01", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(1), getter2.get("A")); + assertEquals(new Integer(1), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertNull(getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("01", getter4.get("A")); + assertEquals("01", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.pipeInput(recordFactory.create(topic1, "A", "02")); + driver.pipeInput(recordFactory.create(topic1, "B", "02")); + + assertEquals("02", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(2), getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertEquals(new Integer(2), getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("02", getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.pipeInput(recordFactory.create(topic1, "A", "03")); + + assertEquals("03", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(3), getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("03", getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.pipeInput(recordFactory.create(topic1, "A", (String) null)); + + assertNull(getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertNull(getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertNull(getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + } } @Test @@ -244,6 +246,8 @@ public void testQueryableValueGetter() { final String topic1 = "topic1"; final String topic2 = "topic2"; + final String storeName2 = "anyMapName"; + final String storeName3 = "anyFilterName"; final KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, consumed); @@ -253,14 +257,14 @@ public void testQueryableValueGetter() { public Integer apply(String value) { return new Integer(value); } - }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyMapName").withValueSerde(Serdes.Integer())); + }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName2).withValueSerde(Serdes.Integer())); final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter( new Predicate<String, Integer>() { @Override public boolean test(String key, Integer value) { return (value % 2) == 0; } - }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyFilterName").withValueSerde(Serdes.Integer())); + }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName3).withValueSerde(Serdes.Integer())); table1.toStream().to(topic2, produced); final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed); @@ -285,37 +289,34 @@ public Integer apply(String value) { final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", supplier, table2.name); + final Topology topology = builder.build().addProcessor("proc", supplier, table2.name); - driver.setUp(builder, stateDir); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { - final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor(); + final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor(); - assertFalse(table1.sendingOldValueEnabled()); - assertFalse(table2.sendingOldValueEnabled()); + assertFalse(table1.sendingOldValueEnabled()); + assertFalse(table2.sendingOldValueEnabled()); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "01")); + driver.pipeInput(recordFactory.create(topic1, "B", "01")); + driver.pipeInput(recordFactory.create(topic1, "C", "01")); - proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "02")); + driver.pipeInput(recordFactory.create(topic1, "B", "02")); - proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); - driver.process(topic1, "A", "03"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "03")); - proc.checkAndClearProcessResult("A:(3<-null)"); + proc.checkAndClearProcessResult("A:(3<-null)"); - driver.process(topic1, "A", null); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", (String) null)); - proc.checkAndClearProcessResult("A:(null<-null)"); + proc.checkAndClearProcessResult("A:(null<-null)"); + } } @Test @@ -340,34 +341,31 @@ public Integer apply(String value) { builder.build().addProcessor("proc", supplier, table2.name); - driver.setUp(builder, stateDir); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { - final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor(); + final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor(); - assertTrue(table1.sendingOldValueEnabled()); - assertTrue(table2.sendingOldValueEnabled()); + assertTrue(table1.sendingOldValueEnabled()); + assertTrue(table2.sendingOldValueEnabled()); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "01")); + driver.pipeInput(recordFactory.create(topic1, "B", "01")); + driver.pipeInput(recordFactory.create(topic1, "C", "01")); - proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "02")); + driver.pipeInput(recordFactory.create(topic1, "B", "02")); - proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); + proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); - driver.process(topic1, "A", "03"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "03")); - proc.checkAndClearProcessResult("A:(3<-2)"); + proc.checkAndClearProcessResult("A:(3<-2)"); - driver.process(topic1, "A", null); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", (String) null)); - proc.checkAndClearProcessResult("A:(null<-3)"); + proc.checkAndClearProcessResult("A:(null<-3)"); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 504d8414aae..80a60ab2f20 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -16,22 +16,26 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyTestDriverWrapper; +import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.Rule; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; -import java.io.File; +import java.util.Properties; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; import static org.hamcrest.CoreMatchers.hasItem; @@ -42,17 +46,9 @@ public class KTableSourceTest { - final private Serde<String> stringSerde = Serdes.String(); - private final Consumed<String, String> stringConsumed = Consumed.with(stringSerde, stringSerde); - final private Serde<Integer> intSerde = Serdes.Integer(); - @Rule - public final KStreamTestDriver driver = new KStreamTestDriver(); - private File stateDir = null; - - @Before - public void setUp() { - stateDir = TestUtils.tempDirectory("kafka-test"); - } + private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String()); + private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); @Test public void testKTable() { @@ -60,38 +56,38 @@ public void testKTable() { final String topic1 = "topic1"; - final KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(stringSerde, intSerde)); + final KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(Serdes.String(), Serdes.Integer())); final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>(); table1.toStream().process(supplier); - driver.setUp(builder, stateDir); - driver.process(topic1, "A", 1); - driver.process(topic1, "B", 2); - driver.process(topic1, "C", 3); - driver.process(topic1, "D", 4); - driver.flushState(); - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); - driver.flushState(); + final ConsumerRecordFactory<String, Integer> integerFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer()); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + driver.pipeInput(integerFactory.create(topic1, "A", 1)); + driver.pipeInput(integerFactory.create(topic1, "B", 2)); + driver.pipeInput(integerFactory.create(topic1, "C", 3)); + driver.pipeInput(integerFactory.create(topic1, "D", 4)); + driver.pipeInput(integerFactory.create(topic1, "A", null)); + driver.pipeInput(integerFactory.create(topic1, "B", null)); + } assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), supplier.theCapturedProcessor().processed); } @Test public void kTableShouldLogAndMeterOnSkippedRecords() { - final StreamsBuilder streamsBuilder = new StreamsBuilder(); + final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; - streamsBuilder.table(topic, Consumed.with(stringSerde, intSerde)); + builder.table(topic, stringConsumed); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - driver.setUp(streamsBuilder, stateDir); - driver.process(topic, null, "value"); - driver.flushState(); - LogCaptureAppender.unregister(appender); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + driver.pipeInput(recordFactory.create(topic, null, "value")); + LogCaptureAppender.unregister(appender); - assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue()); - assertThat(appender.getMessages(), hasItem("Skipping record due to null key. topic=[topic] partition=[-1] offset=[-1]")); + assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue()); + assertThat(appender.getMessages(), hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]")); + } } @Test @@ -102,39 +98,45 @@ public void testValueGetter() { final KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed); + final Topology topology = builder.build(); + final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); - driver.setUp(builder, stateDir); - final KTableValueGetter<String, String> getter1 = getterSupplier1.get(); - getter1.init(driver.context()); + final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); + topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames()); + + try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), props)) { + final KTableValueGetter<String, String> getter1 = getterSupplier1.get(); + getter1.init(driver.setCurrentNodeForProcessorContext(table1.name)); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); + driver.pipeInput(recordFactory.create(topic1, "A", "01")); + driver.pipeInput(recordFactory.create(topic1, "B", "01")); + driver.pipeInput(recordFactory.create(topic1, "C", "01")); - assertEquals("01", getter1.get("A")); - assertEquals("01", getter1.get("B")); - assertEquals("01", getter1.get("C")); + assertEquals("01", getter1.get("A")); + assertEquals("01", getter1.get("B")); + assertEquals("01", getter1.get("C")); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); + driver.pipeInput(recordFactory.create(topic1, "A", "02")); + driver.pipeInput(recordFactory.create(topic1, "B", "02")); - assertEquals("02", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); + assertEquals("02", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); - driver.process(topic1, "A", "03"); + driver.pipeInput(recordFactory.create(topic1, "A", "03")); - assertEquals("03", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); + assertEquals("03", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); + driver.pipeInput(recordFactory.create(topic1, "A", (String) null)); + driver.pipeInput(recordFactory.create(topic1, "B", (String) null)); - assertNull(getter1.get("A")); - assertNull(getter1.get("B")); - assertEquals("01", getter1.get("C")); + assertNull(getter1.get("A")); + assertNull(getter1.get("B")); + assertEquals("01", getter1.get("C")); + } } @@ -148,35 +150,32 @@ public void testNotSendingOldValue() { final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc1", supplier, table1.name); + final Topology topology = builder.build().addProcessor("proc1", supplier, table1.name); - driver.setUp(builder, stateDir); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { - final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor(); + final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor(); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "01")); + driver.pipeInput(recordFactory.create(topic1, "B", "01")); + driver.pipeInput(recordFactory.create(topic1, "C", "01")); - proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); + proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "02")); + driver.pipeInput(recordFactory.create(topic1, "B", "02")); - proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)"); + proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)"); - driver.process(topic1, "A", "03"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "03")); - proc1.checkAndClearProcessResult("A:(03<-null)"); + proc1.checkAndClearProcessResult("A:(03<-null)"); - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", (String) null)); + driver.pipeInput(recordFactory.create(topic1, "B", (String) null)); - proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + } } @Test @@ -193,34 +192,31 @@ public void testSendingOldValue() { final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc1", supplier, table1.name); + final Topology topology = builder.build().addProcessor("proc1", supplier, table1.name); - driver.setUp(builder, stateDir); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { - final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor(); + final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor(); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "01")); + driver.pipeInput(recordFactory.create(topic1, "B", "01")); + driver.pipeInput(recordFactory.create(topic1, "C", "01")); - proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); + proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "02")); + driver.pipeInput(recordFactory.create(topic1, "B", "02")); - proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)"); + proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)"); - driver.process(topic1, "A", "03"); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", "03")); - proc1.checkAndClearProcessResult("A:(03<-02)"); + proc1.checkAndClearProcessResult("A:(03<-02)"); - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); - driver.flushState(); + driver.pipeInput(recordFactory.create(topic1, "A", (String) null)); + driver.pipeInput(recordFactory.create(topic1, "B", (String) null)); - proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)"); + proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)"); + } } } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index e46ec6a35d0..83471ce61c0 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -175,13 +175,14 @@ private final static int PARTITION_ID = 0; private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID); - private final StreamTask task; + final StreamTask task; private final GlobalStateUpdateTask globalStateTask; private final GlobalStateManager globalStateManager; private final StateDirectory stateDirectory; private final Metrics metrics; - private final ProcessorTopology processorTopology; + final ProcessorTopology processorTopology; + final ProcessorTopology globalTopology; private final MockProducer<byte[], byte[]> producer; @@ -220,18 +221,6 @@ public TopologyTestDriver(final Topology topology, this(topology.internalTopologyBuilder, config, initialWallClockTimeMs); } - /** - * Create a new test diver instance. - * - * @param builder builder for the topology to be tested - * @param config the configuration for the topology - */ - TopologyTestDriver(final InternalTopologyBuilder builder, - final Properties config) { - this(builder, config, System.currentTimeMillis()); - - } - /** * Create a new test diver instance. * @@ -249,7 +238,7 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)); processorTopology = internalTopologyBuilder.build(null); - final ProcessorTopology globalTopology = internalTopologyBuilder.buildGlobalStateTopology(); + globalTopology = internalTopologyBuilder.buildGlobalStateTopology(); final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Rewrite test to use new public TopologyTestDriver > ------------------------------------------------- > > Key: KAFKA-6474 > URL: https://issues.apache.org/jira/browse/KAFKA-6474 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests > Affects Versions: 1.1.0 > Reporter: Matthias J. Sax > Assignee: Filipe Agapito > Priority: Major > Labels: beginner, newbie > > With KIP-247 we added public TopologyTestDriver. We should rewrite out own > test to use this new test driver and remove the two classes > ProcessorTopoogyTestDriver and KStreamTestDriver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)