[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511455#comment-16511455
 ] 

ASF GitHub Bot commented on KAFKA-6474:
---------------------------------------

guozhangwang closed pull request #4986: KAFKA-6474: Rewrite tests to use new 
public TopologyTestDriver [part 2]
URL: https://github.com/apache/kafka/pull/4986
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
new file mode 100644
index 00000000000..bec4b5f79ff
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+
+import java.util.Properties;
+
+/**
+ * This class provides access to {@link TopologyTestDriver} protected methods.
+ * It should only be used for internal testing, in the rare occasions where the
+ * necessary functionality is not supported by {@link TopologyTestDriver}.
+ */
+public class TopologyTestDriverWrapper extends TopologyTestDriver {
+
+
+    public TopologyTestDriverWrapper(final Topology topology,
+                                     final Properties config) {
+        super(topology, config);
+    }
+
+    /**
+     * Get the processor context, setting the processor whose name is given as 
current node
+     *
+     * @param processorName processor name to set as current node
+     * @return the processor context
+     */
+    public ProcessorContext setCurrentNodeForProcessorContext(final String 
processorName) {
+        final ProcessorContext context = task.context();
+        ((ProcessorContextImpl) 
context).setCurrentNode(getProcessor(processorName));
+        return context;
+    }
+
+    /**
+     * Get a processor by name
+     *
+     * @param name the name to search for
+     * @return the processor matching the search name
+     */
+    public ProcessorNode getProcessor(final String name) {
+        for (final ProcessorNode node : processorTopology.processors()) {
+            if (node.name().equals(name)) {
+                return node;
+            }
+        }
+        for (final ProcessorNode node : globalTopology.processors()) {
+            if (node.name().equals(name)) {
+                return node;
+            }
+        }
+        throw new StreamsException("Could not find a processor named '" + name 
+ "'");
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index c37078df99c..2cf192b9f45 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -16,45 +16,40 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.List;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class KTableFilterTest {
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    private final Consumed<String, Integer> consumed = 
Consumed.with(stringSerde, intSerde);
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final Consumed<String, Integer> consumed = 
Consumed.with(Serdes.String(), Serdes.Integer());
+    private final ConsumerRecordFactory<String, Integer> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
+    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
 
     private void doTestKTable(final StreamsBuilder builder,
                               final KTable<String, Integer> table2,
@@ -64,16 +59,14 @@ private void doTestKTable(final StreamsBuilder builder,
         table2.toStream().process(supplier);
         table3.toStream().process(supplier);
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
-
-        driver.process(topic, "A", 1);
-        driver.process(topic, "B", 2);
-        driver.process(topic, "C", 3);
-        driver.process(topic, "D", 4);
-        driver.flushState();
-        driver.process(topic, "A", null);
-        driver.process(topic, "B", null);
-        driver.flushState();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic, "A", 1));
+            driver.pipeInput(recordFactory.create(topic, "B", 2));
+            driver.pipeInput(recordFactory.create(topic, "C", 3));
+            driver.pipeInput(recordFactory.create(topic, "D", 4));
+            driver.pipeInput(recordFactory.create(topic, "A", null));
+            driver.pipeInput(recordFactory.create(topic, "B", null));
+        }
 
         final List<MockProcessor<String, Integer>> processors = 
supplier.capturedProcessors(2);
 
@@ -136,63 +129,68 @@ private void doTestValueGetter(final StreamsBuilder 
builder,
                                    final KTableImpl<String, Integer, Integer> 
table2,
                                    final KTableImpl<String, Integer, Integer> 
table3,
                                    final String topic1) {
+
+        final Topology topology = builder.build();
+
         KTableValueGetterSupplier<String, Integer> getterSupplier2 = 
table2.valueGetterSupplier();
         KTableValueGetterSupplier<String, Integer> getterSupplier3 = 
table3.valueGetterSupplier();
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
+        final InternalTopologyBuilder topologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+        topologyBuilder.connectProcessorAndStateStores(table2.name, 
getterSupplier2.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table3.name, 
getterSupplier3.storeNames());
 
-        KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
-        KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+        try (final TopologyTestDriverWrapper driver = new 
TopologyTestDriverWrapper(topology, props)) {
 
-        getter2.init(driver.context());
-        getter3.init(driver.context());
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
 
-        driver.process(topic1, "A", 1);
-        driver.process(topic1, "B", 1);
-        driver.process(topic1, "C", 1);
+            
getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+            
getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
 
-        assertNull(getter2.get("A"));
-        assertNull(getter2.get("B"));
-        assertNull(getter2.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", 1));
+            driver.pipeInput(recordFactory.create(topic1, "B", 1));
+            driver.pipeInput(recordFactory.create(topic1, "C", 1));
 
-        assertEquals(1, (int) getter3.get("A"));
-        assertEquals(1, (int) getter3.get("B"));
-        assertEquals(1, (int) getter3.get("C"));
+            assertNull(getter2.get("A"));
+            assertNull(getter2.get("B"));
+            assertNull(getter2.get("C"));
 
-        driver.process(topic1, "A", 2);
-        driver.process(topic1, "B", 2);
-        driver.flushState();
+            assertEquals(1, (int) getter3.get("A"));
+            assertEquals(1, (int) getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
 
-        assertEquals(2, (int) getter2.get("A"));
-        assertEquals(2, (int) getter2.get("B"));
-        assertNull(getter2.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", 2));
+            driver.pipeInput(recordFactory.create(topic1, "B", 2));
 
-        assertNull(getter3.get("A"));
-        assertNull(getter3.get("B"));
-        assertEquals(1, (int) getter3.get("C"));
+            assertEquals(2, (int) getter2.get("A"));
+            assertEquals(2, (int) getter2.get("B"));
+            assertNull(getter2.get("C"));
 
-        driver.process(topic1, "A", 3);
-        driver.flushState();
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
 
-        assertNull(getter2.get("A"));
-        assertEquals(2, (int) getter2.get("B"));
-        assertNull(getter2.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", 3));
 
-        assertEquals(3, (int) getter3.get("A"));
-        assertNull(getter3.get("B"));
-        assertEquals(1, (int) getter3.get("C"));
+            assertNull(getter2.get("A"));
+            assertEquals(2, (int) getter2.get("B"));
+            assertNull(getter2.get("C"));
 
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
+            assertEquals(3, (int) getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
 
-        assertNull(getter2.get("A"));
-        assertNull(getter2.get("B"));
-        assertNull(getter2.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", null));
+            driver.pipeInput(recordFactory.create(topic1, "B", null));
 
-        assertNull(getter3.get("A"));
-        assertNull(getter3.get("B"));
-        assertEquals(1, (int) getter3.get("C"));
+            assertNull(getter2.get("A"));
+            assertNull(getter2.get("B"));
+            assertNull(getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertEquals(1, (int) getter3.get("C"));
+        }
     }
 
     @Test
@@ -259,34 +257,34 @@ private void doTestNotSendingOldValue(final 
StreamsBuilder builder,
         builder.build().addProcessor("proc1", supplier, table1.name);
         builder.build().addProcessor("proc2", supplier, table2.name);
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
 
-        driver.process(topic1, "A", 1);
-        driver.process(topic1, "B", 1);
-        driver.process(topic1, "C", 1);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", 1));
+            driver.pipeInput(recordFactory.create(topic1, "B", 1));
+            driver.pipeInput(recordFactory.create(topic1, "C", 1));
 
-        final List<MockProcessor<String, Integer>> processors = 
supplier.capturedProcessors(2);
+            final List<MockProcessor<String, Integer>> processors = 
supplier.capturedProcessors(2);
+
+            processors.get(0).checkAndClearProcessResult("A:(1<-null)", 
"B:(1<-null)", "C:(1<-null)");
+            processors.get(1).checkAndClearProcessResult("A:(null<-null)", 
"B:(null<-null)", "C:(null<-null)");
 
-        processors.get(0).checkAndClearProcessResult("A:(1<-null)", 
"B:(1<-null)", "C:(1<-null)");
-        processors.get(1).checkAndClearProcessResult("A:(null<-null)", 
"B:(null<-null)", "C:(null<-null)");
-
-        driver.process(topic1, "A", 2);
-        driver.process(topic1, "B", 2);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(2<-null)", 
"B:(2<-null)");
-        processors.get(1).checkAndClearProcessResult("A:(2<-null)", 
"B:(2<-null)");
-
-        driver.process(topic1, "A", 3);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(3<-null)");
-        processors.get(1).checkAndClearProcessResult("A:(null<-null)");
-
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(null<-null)", 
"B:(null<-null)");
-        processors.get(1).checkAndClearProcessResult("A:(null<-null)", 
"B:(null<-null)");
+            driver.pipeInput(recordFactory.create(topic1, "A", 2));
+            driver.pipeInput(recordFactory.create(topic1, "B", 2));
+
+            processors.get(0).checkAndClearProcessResult("A:(2<-null)", 
"B:(2<-null)");
+            processors.get(1).checkAndClearProcessResult("A:(2<-null)", 
"B:(2<-null)");
+
+            driver.pipeInput(recordFactory.create(topic1, "A", 3));
+
+            processors.get(0).checkAndClearProcessResult("A:(3<-null)");
+            processors.get(1).checkAndClearProcessResult("A:(null<-null)");
+
+            driver.pipeInput(recordFactory.create(topic1, "A", null));
+            driver.pipeInput(recordFactory.create(topic1, "B", null));
+
+            processors.get(0).checkAndClearProcessResult("A:(null<-null)", 
"B:(null<-null)");
+            processors.get(1).checkAndClearProcessResult("A:(null<-null)", 
"B:(null<-null)");
+        }
     }
 
 
@@ -340,34 +338,34 @@ private void doTestSendingOldValue(final StreamsBuilder 
builder,
         topology.addProcessor("proc1", supplier, table1.name);
         topology.addProcessor("proc2", supplier, table2.name);
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, props)) {
 
-        driver.process(topic1, "A", 1);
-        driver.process(topic1, "B", 1);
-        driver.process(topic1, "C", 1);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", 1));
+            driver.pipeInput(recordFactory.create(topic1, "B", 1));
+            driver.pipeInput(recordFactory.create(topic1, "C", 1));
 
-        final List<MockProcessor<String, Integer>> processors = 
supplier.capturedProcessors(2);
+            final List<MockProcessor<String, Integer>> processors = 
supplier.capturedProcessors(2);
 
-        processors.get(0).checkAndClearProcessResult("A:(1<-null)", 
"B:(1<-null)", "C:(1<-null)");
-        processors.get(1).checkEmptyAndClearProcessResult();
+            processors.get(0).checkAndClearProcessResult("A:(1<-null)", 
"B:(1<-null)", "C:(1<-null)");
+            processors.get(1).checkEmptyAndClearProcessResult();
+
+            driver.pipeInput(recordFactory.create(topic1, "A", 2));
+            driver.pipeInput(recordFactory.create(topic1, "B", 2));
+
+            processors.get(0).checkAndClearProcessResult("A:(2<-1)", 
"B:(2<-1)");
+            processors.get(1).checkAndClearProcessResult("A:(2<-null)", 
"B:(2<-null)");
+
+            driver.pipeInput(recordFactory.create(topic1, "A", 3));
+
+            processors.get(0).checkAndClearProcessResult("A:(3<-2)");
+            processors.get(1).checkAndClearProcessResult("A:(null<-2)");
+
+            driver.pipeInput(recordFactory.create(topic1, "A", null));
+            driver.pipeInput(recordFactory.create(topic1, "B", null));
 
-        driver.process(topic1, "A", 2);
-        driver.process(topic1, "B", 2);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
-        processors.get(1).checkAndClearProcessResult("A:(2<-null)", 
"B:(2<-null)");
-
-        driver.process(topic1, "A", 3);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(3<-2)");
-        processors.get(1).checkAndClearProcessResult("A:(null<-2)");
-
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
-        processors.get(0).checkAndClearProcessResult("A:(null<-3)", 
"B:(null<-2)");
-        processors.get(1).checkAndClearProcessResult("B:(null<-2)");
+            processors.get(0).checkAndClearProcessResult("A:(null<-3)", 
"B:(null<-2)");
+            processors.get(1).checkAndClearProcessResult("B:(null<-2)");
+        }
     }
 
     @Test
@@ -418,12 +416,13 @@ private void doTestSkipNullOnMaterialization(final 
StreamsBuilder builder,
         topology.addProcessor("proc1", supplier, table1.name);
         topology.addProcessor("proc2", supplier, table2.name);
 
-        driver.setUp(builder, stateDir, stringSerde, stringSerde);
+        final ConsumerRecordFactory<String, String> stringRecordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, props)) {
 
-        driver.process(topic1, "A", "reject");
-        driver.process(topic1, "B", "reject");
-        driver.process(topic1, "C", "reject");
-        driver.flushState();
+            driver.pipeInput(stringRecordFactory.create(topic1, "A", 
"reject"));
+            driver.pipeInput(stringRecordFactory.create(topic1, "B", 
"reject"));
+            driver.pipeInput(stringRecordFactory.create(topic1, "C", 
"reject"));
+        }
 
         final List<MockProcessor<String, String>> processors = 
supplier.capturedProcessors(2);
         processors.get(0).checkAndClearProcessResult("A:(reject<-null)", 
"B:(reject<-null)", "C:(reject<-null)");
@@ -437,7 +436,7 @@ public void testSkipNullOnMaterialization() {
 
         String topic1 = "topic1";
 
-        final Consumed<String, String> consumed = Consumed.with(stringSerde, 
stringSerde);
+        final Consumed<String, String> consumed = 
Consumed.with(Serdes.String(), Serdes.String());
         KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, 
consumed);
         KTableImpl<String, String, String> table2 = (KTableImpl<String, 
String, String>) table1.filter(
@@ -459,7 +458,7 @@ public void testQueryableSkipNullOnMaterialization() {
 
         String topic1 = "topic1";
 
-        final Consumed<String, String> consumed = Consumed.with(stringSerde, 
stringSerde);
+        final Consumed<String, String> consumed = 
Consumed.with(Serdes.String(), Serdes.String());
         KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, 
consumed);
         KTableImpl<String, String, String> table2 = (KTableImpl<String, 
String, String>) table1.filter(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 399e519bb4c..0d194e44816 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -16,12 +16,17 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyDescription;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -30,10 +35,11 @@
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
@@ -41,36 +47,31 @@
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
 import java.lang.reflect.Field;
 import java.util.List;
+import java.util.Properties;
 
 import static org.easymock.EasyMock.mock;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 public class KTableImplTest {
 
-    private final Serde<String> stringSerde = Serdes.String();
-    private final Consumed<String, String> consumed = 
Consumed.with(stringSerde, stringSerde);
-    private final Produced<String, String> produced = 
Produced.with(stringSerde, stringSerde);
+    private final Consumed<String, String> consumed = 
Consumed.with(Serdes.String(), Serdes.String());
+    private final Produced<String, String> produced = 
Produced.with(Serdes.String(), Serdes.String());
+    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
 
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
     private StreamsBuilder builder;
     private KTable<String, String> table;
 
     @Before
     public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
         builder = new StreamsBuilder();
         table = builder.table("test");
     }
@@ -110,17 +111,12 @@ public boolean test(String key, Integer value) {
 
         table4.toStream().process(supplier);
 
-        driver.setUp(builder, stateDir);
-
-        driver.process(topic1, "A", "01");
-        driver.flushState();
-        driver.process(topic1, "B", "02");
-        driver.flushState();
-        driver.process(topic1, "C", "03");
-        driver.flushState();
-        driver.process(topic1, "D", "04");
-        driver.flushState();
-        driver.flushState();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "03"));
+            driver.pipeInput(recordFactory.create(topic1, "D", "04"));
+        }
 
         final List<MockProcessor<String, Object>> processors = 
supplier.capturedProcessors(4);
         assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), 
processors.get(0).processed);
@@ -156,104 +152,109 @@ public boolean test(String key, Integer value) {
         table1.toStream().to(topic2, produced);
         final KTableImpl<String, String, String> table4 = (KTableImpl<String, 
String, String>) builder.table(topic2, consumed);
 
+        final Topology topology = builder.build();
+
         final KTableValueGetterSupplier<String, String> getterSupplier1 = 
table1.valueGetterSupplier();
         final KTableValueGetterSupplier<String, Integer> getterSupplier2 = 
table2.valueGetterSupplier();
         final KTableValueGetterSupplier<String, Integer> getterSupplier3 = 
table3.valueGetterSupplier();
         final KTableValueGetterSupplier<String, String> getterSupplier4 = 
table4.valueGetterSupplier();
 
-        driver.setUp(builder, stateDir, null, null);
+        final InternalTopologyBuilder topologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+        topologyBuilder.connectProcessorAndStateStores(table1.name, 
getterSupplier1.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table2.name, 
getterSupplier2.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table3.name, 
getterSupplier3.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table4.name, 
getterSupplier4.storeNames());
+
+        try (final TopologyTestDriverWrapper driver = new 
TopologyTestDriverWrapper(topology, props)) {
 
-        // two state store should be created
-        assertEquals(2, driver.allStateStores().size());
+            assertEquals(2, driver.getAllStateStores().size());
 
-        final KTableValueGetter<String, String> getter1 = 
getterSupplier1.get();
-        getter1.init(driver.context());
-        final KTableValueGetter<String, Integer> getter2 = 
getterSupplier2.get();
-        getter2.init(driver.context());
-        final KTableValueGetter<String, Integer> getter3 = 
getterSupplier3.get();
-        getter3.init(driver.context());
-        final KTableValueGetter<String, String> getter4 = 
getterSupplier4.get();
-        getter4.init(driver.context());
+            final KTableValueGetter<String, String> getter1 = 
getterSupplier1.get();
+            final KTableValueGetter<String, Integer> getter2 = 
getterSupplier2.get();
+            final KTableValueGetter<String, Integer> getter3 = 
getterSupplier3.get();
+            final KTableValueGetter<String, String> getter4 = 
getterSupplier4.get();
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
+            
getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
+            
getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+            
getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
+            
getter4.init(driver.setCurrentNodeForProcessorContext(table4.name));
 
-        assertEquals("01", getter1.get("A"));
-        assertEquals("01", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        assertEquals(new Integer(1), getter2.get("A"));
-        assertEquals(new Integer(1), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        assertNull(getter3.get("A"));
-        assertNull(getter3.get("B"));
-        assertNull(getter3.get("C"));
+            assertEquals(new Integer(1), getter2.get("A"));
+            assertEquals(new Integer(1), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
 
-        assertEquals("01", getter4.get("A"));
-        assertEquals("01", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertNull(getter3.get("C"));
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
+            assertEquals("01", getter4.get("A"));
+            assertEquals("01", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
 
-        assertEquals("02", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        assertEquals(new Integer(2), getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        assertEquals(new Integer(2), getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
 
-        assertEquals("02", getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
+            assertEquals(new Integer(2), getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
 
-        driver.process(topic1, "A", "03");
-        driver.flushState();
+            assertEquals("02", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
 
-        assertEquals("03", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        assertEquals(new Integer(3), getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        assertNull(getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
+            assertEquals(new Integer(3), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
 
-        assertEquals("03", getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
 
-        driver.process(topic1, "A", null);
-        driver.flushState();
+            assertEquals("03", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
 
-        assertNull(getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
 
+            assertNull(getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        assertNull(getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
 
-        assertNull(getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
 
-        assertNull(getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertNull(getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+        }
     }
 
     @Test
@@ -282,11 +283,9 @@ public boolean test(String key, Integer value) {
                     }
                 });
 
-        driver.setUp(builder, stateDir, null, null);
-        driver.setTime(0L);
-
-        // two state stores should be created
-        assertEquals(2, driver.allStateStores().size());
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            assertEquals(2, driver.getAllStateStores().size());
+        }
     }
 
     @Test
@@ -323,15 +322,25 @@ public String apply(String v1, Integer v2) {
                     }
                 });
 
-        driver.setUp(builder, stateDir, null, null);
-        driver.setTime(0L);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            assertEquals(2, driver.getAllStateStores().size());
+        }
+    }
 
-        // two state store should be created
-        assertEquals(2, driver.allStateStores().size());
+    private void assertTopologyContainsProcessor(final Topology topology, 
final String processorName) {
+        for (final TopologyDescription.Subtopology subtopology: 
topology.describe().subtopologies()) {
+            for (final TopologyDescription.Node node: subtopology.nodes()) {
+                if (node.name().equals(processorName)) {
+                    return;
+                }
+            }
+        }
+        throw new AssertionError("No processor named '" + processorName + "'"
+                + "found in the provided Topology:\n" + topology.describe());
     }
 
     @Test
-    public void testRepartition() throws NoSuchFieldException, 
IllegalAccessException {
+    public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws 
NoSuchFieldException, IllegalAccessException {
         final String topic1 = "topic1";
         final String storeName1 = "storeName1";
 
@@ -341,8 +350,8 @@ public void testRepartition() throws NoSuchFieldException, 
IllegalAccessExceptio
                 (KTableImpl<String, String, String>) builder.table(topic1,
                                                                    consumed,
                                                                    
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
-                                                                           
.withKeySerde(stringSerde)
-                                                                           
.withValueSerde(stringSerde)
+                                                                           
.withKeySerde(Serdes.String())
+                                                                           
.withValueSerde(Serdes.String())
                 );
 
         table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
@@ -352,27 +361,26 @@ public void testRepartition() throws 
NoSuchFieldException, IllegalAccessExceptio
         table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
             .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, 
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("mock-result2"));
 
-        driver.setUp(builder, stateDir, stringSerde, stringSerde);
-        driver.setTime(0L);
+        final Topology topology = builder.build();
+        try (final TopologyTestDriverWrapper driver = new 
TopologyTestDriverWrapper(topology, props)) {
 
-        // three state store should be created, one for source, one for 
aggregate and one for reduce
-        assertEquals(3, driver.allStateStores().size());
+            assertEquals(3, driver.getAllStateStores().size());
 
-        // contains the corresponding repartition source / sink nodes
-        
assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
-        
assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
-        
assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
-        
assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
+            assertTopologyContainsProcessor(topology, 
"KSTREAM-SINK-0000000003");
+            assertTopologyContainsProcessor(topology, 
"KSTREAM-SOURCE-0000000004");
+            assertTopologyContainsProcessor(topology, 
"KSTREAM-SINK-0000000007");
+            assertTopologyContainsProcessor(topology, 
"KSTREAM-SOURCE-0000000008");
 
-        Field valSerializerField  = ((SinkNode) 
driver.processor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
-        Field valDeserializerField  = ((SourceNode) 
driver.processor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
-        valSerializerField.setAccessible(true);
-        valDeserializerField.setAccessible(true);
+            Field valSerializerField = ((SinkNode) 
driver.getProcessor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
+            Field valDeserializerField = ((SourceNode) 
driver.getProcessor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
+            valSerializerField.setAccessible(true);
+            valDeserializerField.setAccessible(true);
 
-        assertNotNull(((ChangedSerializer) 
valSerializerField.get(driver.processor("KSTREAM-SINK-0000000003"))).inner());
-        assertNotNull(((ChangedDeserializer) 
valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000004"))).inner());
-        assertNotNull(((ChangedSerializer) 
valSerializerField.get(driver.processor("KSTREAM-SINK-0000000007"))).inner());
-        assertNotNull(((ChangedDeserializer) 
valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000008"))).inner());
+            assertNotNull(((ChangedSerializer) 
valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
+            assertNotNull(((ChangedDeserializer) 
valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
+            assertNotNull(((ChangedSerializer) 
valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
+            assertNotNull(((ChangedDeserializer) 
valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
+        }
     }
 
     @Test(expected = NullPointerException.class)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index a54e43e92c3..a01d5cb215a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -16,27 +16,30 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -45,27 +48,19 @@
 
 public class KTableMapValuesTest {
 
-    private final Serde<String> stringSerde = Serdes.String();
-    private final Consumed<String, String> consumed = 
Consumed.with(stringSerde, stringSerde);
-    private final Produced<String, String> produced = 
Produced.with(stringSerde, stringSerde);
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final Consumed<String, String> consumed = 
Consumed.with(Serdes.String(), Serdes.String());
+    private final Produced<String, String> produced = 
Produced.with(Serdes.String(), Serdes.String());
+    private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     private void doTestKTable(final StreamsBuilder builder, final String 
topic1, final MockProcessorSupplier<String, Integer> supplier) {
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
-
-        driver.process(topic1, "A", "1");
-        driver.process(topic1, "B", "2");
-        driver.process(topic1, "C", "3");
-        driver.process(topic1, "D", "4");
-        driver.flushState();
-        assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), 
supplier.theCapturedProcessor().processed);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic1, "A", "1"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "3"));
+            driver.pipeInput(recordFactory.create(topic1, "D", "4"));
+            assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), 
supplier.theCapturedProcessor().processed);
+        }
     }
 
     @Test
@@ -114,99 +109,106 @@ private void doTestValueGetter(final StreamsBuilder 
builder,
                                    final KTableImpl<String, String, Integer> 
table2,
                                    final KTableImpl<String, Integer, Integer> 
table3,
                                    final KTableImpl<String, String, String> 
table4) {
+
+        final Topology topology = builder.build();
+
         final KTableValueGetterSupplier<String, String> getterSupplier1 = 
table1.valueGetterSupplier();
         final KTableValueGetterSupplier<String, Integer> getterSupplier2 = 
table2.valueGetterSupplier();
         final KTableValueGetterSupplier<String, Integer> getterSupplier3 = 
table3.valueGetterSupplier();
         final KTableValueGetterSupplier<String, String> getterSupplier4 = 
table4.valueGetterSupplier();
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
-        final KTableValueGetter<String, String> getter1 = 
getterSupplier1.get();
-        getter1.init(driver.context());
-        final KTableValueGetter<String, Integer> getter2 = 
getterSupplier2.get();
-        getter2.init(driver.context());
-        final KTableValueGetter<String, Integer> getter3 = 
getterSupplier3.get();
-        getter3.init(driver.context());
-        final KTableValueGetter<String, String> getter4 = 
getterSupplier4.get();
-        getter4.init(driver.context());
-
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
-
-        assertEquals("01", getter1.get("A"));
-        assertEquals("01", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
-
-        assertEquals(new Integer(1), getter2.get("A"));
-        assertEquals(new Integer(1), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
-
-        assertNull(getter3.get("A"));
-        assertNull(getter3.get("B"));
-        assertNull(getter3.get("C"));
-
-        assertEquals("01", getter4.get("A"));
-        assertEquals("01", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
-
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
-
-        assertEquals("02", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
-
-        assertEquals(new Integer(2), getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
-
-        assertEquals(new Integer(2), getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
-
-        assertEquals("02", getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
-
-        driver.process(topic1, "A", "03");
-        driver.flushState();
-
-        assertEquals("03", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
-
-        assertEquals(new Integer(3), getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
-
-        assertNull(getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
-
-        assertEquals("03", getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
-
-        driver.process(topic1, "A", null);
-        driver.flushState();
-
-        assertNull(getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
-
-        assertNull(getter2.get("A"));
-        assertEquals(new Integer(2), getter2.get("B"));
-        assertEquals(new Integer(1), getter2.get("C"));
-
-        assertNull(getter3.get("A"));
-        assertEquals(new Integer(2), getter3.get("B"));
-        assertNull(getter3.get("C"));
-
-        assertNull(getter4.get("A"));
-        assertEquals("02", getter4.get("B"));
-        assertEquals("01", getter4.get("C"));
+        final InternalTopologyBuilder topologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+        topologyBuilder.connectProcessorAndStateStores(table1.name, 
getterSupplier1.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table2.name, 
getterSupplier2.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table3.name, 
getterSupplier3.storeNames());
+        topologyBuilder.connectProcessorAndStateStores(table4.name, 
getterSupplier4.storeNames());
+
+        try (final TopologyTestDriverWrapper driver = new 
TopologyTestDriverWrapper(builder.build(), props)) {
+            KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+            KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+
+            
getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
+            
getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+            
getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
+            
getter4.init(driver.setCurrentNodeForProcessorContext(table4.name));
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
+
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(1), getter2.get("A"));
+            assertEquals(new Integer(1), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertNull(getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("01", getter4.get("A"));
+            assertEquals("01", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
+
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(2), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertEquals(new Integer(2), getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("02", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
+
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertEquals(new Integer(3), getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertEquals("03", getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+
+            assertNull(getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+
+            assertNull(getter2.get("A"));
+            assertEquals(new Integer(2), getter2.get("B"));
+            assertEquals(new Integer(1), getter2.get("C"));
+
+            assertNull(getter3.get("A"));
+            assertEquals(new Integer(2), getter3.get("B"));
+            assertNull(getter3.get("C"));
+
+            assertNull(getter4.get("A"));
+            assertEquals("02", getter4.get("B"));
+            assertEquals("01", getter4.get("C"));
+        }
     }
 
     @Test
@@ -244,6 +246,8 @@ public void testQueryableValueGetter() {
 
         final String topic1 = "topic1";
         final String topic2 = "topic2";
+        final String storeName2 = "anyMapName";
+        final String storeName3 = "anyFilterName";
 
         final KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, 
consumed);
@@ -253,14 +257,14 @@ public void testQueryableValueGetter() {
                 public Integer apply(String value) {
                     return new Integer(value);
                 }
-            }, Materialized.<String, Integer, KeyValueStore<Bytes, 
byte[]>>as("anyMapName").withValueSerde(Serdes.Integer()));
+            }, Materialized.<String, Integer, KeyValueStore<Bytes, 
byte[]>>as(storeName2).withValueSerde(Serdes.Integer()));
         final KTableImpl<String, Integer, Integer> table3 = 
(KTableImpl<String, Integer, Integer>) table2.filter(
             new Predicate<String, Integer>() {
                 @Override
                 public boolean test(String key, Integer value) {
                     return (value % 2) == 0;
                 }
-            }, Materialized.<String, Integer, KeyValueStore<Bytes, 
byte[]>>as("anyFilterName").withValueSerde(Serdes.Integer()));
+            }, Materialized.<String, Integer, KeyValueStore<Bytes, 
byte[]>>as(storeName3).withValueSerde(Serdes.Integer()));
         table1.toStream().to(topic2, produced);
         final KTableImpl<String, String, String> table4 = (KTableImpl<String, 
String, String>) builder.table(topic2, consumed);
 
@@ -285,37 +289,34 @@ public Integer apply(String value) {
 
         final MockProcessorSupplier<String, Integer> supplier = new 
MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc", supplier, table2.name);
+        final Topology topology = builder.build().addProcessor("proc", 
supplier, table2.name);
 
-        driver.setUp(builder, stateDir);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, props)) {
 
-        final MockProcessor<String, Integer> proc = 
supplier.theCapturedProcessor();
+            final MockProcessor<String, Integer> proc = 
supplier.theCapturedProcessor();
 
-        assertFalse(table1.sendingOldValueEnabled());
-        assertFalse(table2.sendingOldValueEnabled());
+            assertFalse(table1.sendingOldValueEnabled());
+            assertFalse(table2.sendingOldValueEnabled());
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", 
"C:(1<-null)");
+            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", 
"C:(1<-null)");
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+            proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
-        driver.process(topic1, "A", "03");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        proc.checkAndClearProcessResult("A:(3<-null)");
+            proc.checkAndClearProcessResult("A:(3<-null)");
 
-        driver.process(topic1, "A", null);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
 
-        proc.checkAndClearProcessResult("A:(null<-null)");
+            proc.checkAndClearProcessResult("A:(null<-null)");
+        }
     }
 
     @Test
@@ -340,34 +341,31 @@ public Integer apply(String value) {
 
         builder.build().addProcessor("proc", supplier, table2.name);
 
-        driver.setUp(builder, stateDir);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
 
-        final MockProcessor<String, Integer> proc = 
supplier.theCapturedProcessor();
+            final MockProcessor<String, Integer> proc = 
supplier.theCapturedProcessor();
 
-        assertTrue(table1.sendingOldValueEnabled());
-        assertTrue(table2.sendingOldValueEnabled());
+            assertTrue(table1.sendingOldValueEnabled());
+            assertTrue(table2.sendingOldValueEnabled());
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", 
"C:(1<-null)");
+            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", 
"C:(1<-null)");
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+            proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
 
-        driver.process(topic1, "A", "03");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        proc.checkAndClearProcessResult("A:(3<-2)");
+            proc.checkAndClearProcessResult("A:(3<-2)");
 
-        driver.process(topic1, "A", null);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
 
-        proc.checkAndClearProcessResult("A:(null<-3)");
+            proc.checkAndClearProcessResult("A:(null<-3)");
+        }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 504d8414aae..80a60ab2f20 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -16,22 +16,26 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
+import java.util.Properties;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
@@ -42,17 +46,9 @@
 
 public class KTableSourceTest {
 
-    final private Serde<String> stringSerde = Serdes.String();
-    private final Consumed<String, String> stringConsumed = 
Consumed.with(stringSerde, stringSerde);
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-    private File stateDir = null;
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final Consumed<String, String> stringConsumed = 
Consumed.with(Serdes.String(), Serdes.String());
+    private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
     @Test
     public void testKTable() {
@@ -60,38 +56,38 @@ public void testKTable() {
 
         final String topic1 = "topic1";
 
-        final KTable<String, Integer> table1 = builder.table(topic1, 
Consumed.with(stringSerde, intSerde));
+        final KTable<String, Integer> table1 = builder.table(topic1, 
Consumed.with(Serdes.String(), Serdes.Integer()));
 
         final MockProcessorSupplier<String, Integer> supplier = new 
MockProcessorSupplier<>();
         table1.toStream().process(supplier);
 
-        driver.setUp(builder, stateDir);
-        driver.process(topic1, "A", 1);
-        driver.process(topic1, "B", 2);
-        driver.process(topic1, "C", 3);
-        driver.process(topic1, "D", 4);
-        driver.flushState();
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
+        final ConsumerRecordFactory<String, Integer> integerFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(integerFactory.create(topic1, "A", 1));
+            driver.pipeInput(integerFactory.create(topic1, "B", 2));
+            driver.pipeInput(integerFactory.create(topic1, "C", 3));
+            driver.pipeInput(integerFactory.create(topic1, "D", 4));
+            driver.pipeInput(integerFactory.create(topic1, "A", null));
+            driver.pipeInput(integerFactory.create(topic1, "B", null));
+        }
 
         assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", 
"B:null"), supplier.theCapturedProcessor().processed);
     }
 
     @Test
     public void kTableShouldLogAndMeterOnSkippedRecords() {
-        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-        streamsBuilder.table(topic, Consumed.with(stringSerde, intSerde));
+        builder.table(topic, stringConsumed);
 
         final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
-        driver.setUp(streamsBuilder, stateDir);
-        driver.process(topic, null, "value");
-        driver.flushState();
-        LogCaptureAppender.unregister(appender);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(topic, null, "value"));
+            LogCaptureAppender.unregister(appender);
 
-        assertEquals(1.0, 
getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", 
"stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to 
null key. topic=[topic] partition=[-1] offset=[-1]"));
+            assertEquals(1.0, getMetricByName(driver.metrics(), 
"skipped-records-total", "stream-metrics").metricValue());
+            assertThat(appender.getMessages(), hasItem("Skipping record due to 
null key. topic=[topic] partition=[0] offset=[0]"));
+        }
     }
 
     @Test
@@ -102,39 +98,45 @@ public void testValueGetter() {
 
         final KTableImpl<String, String, String> table1 = (KTableImpl<String, 
String, String>) builder.table(topic1, stringConsumed);
 
+        final Topology topology = builder.build();
+
         final KTableValueGetterSupplier<String, String> getterSupplier1 = 
table1.valueGetterSupplier();
 
-        driver.setUp(builder, stateDir);
-        final KTableValueGetter<String, String> getter1 = 
getterSupplier1.get();
-        getter1.init(driver.context());
+        final InternalTopologyBuilder topologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+        topologyBuilder.connectProcessorAndStateStores(table1.name, 
getterSupplier1.storeNames());
+
+        try (final TopologyTestDriverWrapper driver = new 
TopologyTestDriverWrapper(builder.build(), props)) {
+            final KTableValueGetter<String, String> getter1 = 
getterSupplier1.get();
+            
getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        assertEquals("01", getter1.get("A"));
-        assertEquals("01", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            assertEquals("01", getter1.get("A"));
+            assertEquals("01", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        assertEquals("02", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            assertEquals("02", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        driver.process(topic1, "A", "03");
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        assertEquals("03", getter1.get("A"));
-        assertEquals("02", getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            assertEquals("03", getter1.get("A"));
+            assertEquals("02", getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
 
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+            driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
 
-        assertNull(getter1.get("A"));
-        assertNull(getter1.get("B"));
-        assertEquals("01", getter1.get("C"));
+            assertNull(getter1.get("A"));
+            assertNull(getter1.get("B"));
+            assertEquals("01", getter1.get("C"));
+        }
 
     }
 
@@ -148,35 +150,32 @@ public void testNotSendingOldValue() {
 
         final MockProcessorSupplier<String, Integer> supplier = new 
MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc1", supplier, table1.name);
+        final Topology topology = builder.build().addProcessor("proc1", 
supplier, table1.name);
 
-        driver.setUp(builder, stateDir);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, props)) {
 
-        final MockProcessor<String, Integer> proc1 = 
supplier.theCapturedProcessor();
+            final MockProcessor<String, Integer> proc1 = 
supplier.theCapturedProcessor();
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", 
"C:(01<-null)");
+            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", 
"C:(01<-null)");
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
+            proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
 
-        driver.process(topic1, "A", "03");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        proc1.checkAndClearProcessResult("A:(03<-null)");
+            proc1.checkAndClearProcessResult("A:(03<-null)");
 
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+            driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
 
-        proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(null<-null)", 
"B:(null<-null)");
+        }
     }
 
     @Test
@@ -193,34 +192,31 @@ public void testSendingOldValue() {
 
         final MockProcessorSupplier<String, Integer> supplier = new 
MockProcessorSupplier<>();
 
-        builder.build().addProcessor("proc1", supplier, table1.name);
+        final Topology topology = builder.build().addProcessor("proc1", 
supplier, table1.name);
 
-        driver.setUp(builder, stateDir);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, props)) {
 
-        final MockProcessor<String, Integer> proc1 = 
supplier.theCapturedProcessor();
+            final MockProcessor<String, Integer> proc1 = 
supplier.theCapturedProcessor();
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "01");
-        driver.process(topic1, "C", "01");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+            driver.pipeInput(recordFactory.create(topic1, "C", "01"));
 
-        proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", 
"C:(01<-null)");
+            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", 
"C:(01<-null)");
 
-        driver.process(topic1, "A", "02");
-        driver.process(topic1, "B", "02");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+            driver.pipeInput(recordFactory.create(topic1, "B", "02"));
 
-        proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
+            proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
 
-        driver.process(topic1, "A", "03");
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", "03"));
 
-        proc1.checkAndClearProcessResult("A:(03<-02)");
+            proc1.checkAndClearProcessResult("A:(03<-02)");
 
-        driver.process(topic1, "A", null);
-        driver.process(topic1, "B", null);
-        driver.flushState();
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+            driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
 
-        proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
+            proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
+        }
     }
 }
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index e46ec6a35d0..83471ce61c0 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -175,13 +175,14 @@
 
     private final static int PARTITION_ID = 0;
     private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
-    private final StreamTask task;
+    final StreamTask task;
     private final GlobalStateUpdateTask globalStateTask;
     private final GlobalStateManager globalStateManager;
 
     private final StateDirectory stateDirectory;
     private final Metrics metrics;
-    private final ProcessorTopology processorTopology;
+    final ProcessorTopology processorTopology;
+    final ProcessorTopology globalTopology;
 
     private final MockProducer<byte[], byte[]> producer;
 
@@ -220,18 +221,6 @@ public TopologyTestDriver(final Topology topology,
         this(topology.internalTopologyBuilder, config, initialWallClockTimeMs);
     }
 
-    /**
-     * Create a new test diver instance.
-     *
-     * @param builder builder for the topology to be tested
-     * @param config the configuration for the topology
-     */
-    TopologyTestDriver(final InternalTopologyBuilder builder,
-                       final Properties config) {
-        this(builder, config,  System.currentTimeMillis());
-
-    }
-
     /**
      * Create a new test diver instance.
      *
@@ -249,7 +238,7 @@ private TopologyTestDriver(final InternalTopologyBuilder 
builder,
         
internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
 
         processorTopology = internalTopologyBuilder.build(null);
-        final ProcessorTopology globalTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+        globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
 
         final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
         producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, 
bytesSerializer) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Rewrite test to use new public TopologyTestDriver
> -------------------------------------------------
>
>                 Key: KAFKA-6474
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6474
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams, unit tests
>    Affects Versions: 1.1.0
>            Reporter: Matthias J. Sax
>            Assignee: Filipe Agapito
>            Priority: Major
>              Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to