This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c140f09  KAFKA-6474: remove KStreamTestDriver (#6732)
c140f09 is described below

commit c140f09406b119d86d43bba6248d17fe0120a4dd
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Sat May 18 21:18:14 2019 -0500

    KAFKA-6474: remove KStreamTestDriver (#6732)
    
    The implementation of KIP-258 broke the state store methods in 
KStreamTestDriver.
    These methods were unused in this project, so the breakage was not detected.
    Since this is an internal testing utility, and it was deprecated and 
partially removed in
    favor of TopologyTestDriver, I opted to just complete the removal of the 
class.
    
    Reviewers: A. Sophie Blee-Goldman <ableegold...@gmail.com>, Boyang Chen 
<boy...@confluent.io>, Bill Bejeck <b...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com>
---
 .../kstream/internals/KStreamTransformTest.java    |  67 ++--
 .../kstream/internals/KTableAggregateTest.java     | 411 ++++++++-------------
 .../org/apache/kafka/test/KStreamTestDriver.java   | 277 --------------
 .../apache/kafka/streams/TopologyTestDriver.java   |   1 -
 4 files changed, 186 insertions(+), 570 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 8f87d40..fcf6aea 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -20,6 +20,7 @@ import 
org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
@@ -30,24 +31,22 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.time.Duration;
 import java.util.Properties;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamTransformTest {
-    private final String topicName = "topic";
+    private static final String TOPIC_NAME = "topic";
     private final ConsumerRecordFactory<Integer, Integer> recordFactory =
         new ConsumerRecordFactory<>(new IntegerSerializer(), new 
IntegerSerializer(), 0L);
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
 
-    @SuppressWarnings("deprecation")
-    @Rule
-    public final org.apache.kafka.test.KStreamTestDriver kstreamDriver = new 
org.apache.kafka.test.KStreamTestDriver();
-
     @Test
     public void testTransform() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -57,7 +56,13 @@ public class KStreamTransformTest {
                 private int total = 0;
 
                 @Override
-                public void init(final ProcessorContext context) {}
+                public void init(final ProcessorContext context) {
+                    context.schedule(
+                        Duration.ofMillis(1),
+                        PunctuationType.WALL_CLOCK_TIME,
+                        timestamp -> context.forward(-1, (int) timestamp)
+                    );
+                }
 
                 @Override
                 public KeyValue<Integer, Integer> transform(final Number key, 
final Number value) {
@@ -72,27 +77,39 @@ public class KStreamTransformTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         final MockProcessorSupplier<Integer, Integer> processor = new 
MockProcessorSupplier<>();
-        final KStream<Integer, Integer> stream = builder.stream(topicName, 
Consumed.with(Serdes.Integer(), Serdes.Integer()));
+        final KStream<Integer, Integer> stream = builder.stream(TOPIC_NAME, 
Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transform(transformerSupplier).process(processor);
 
-        kstreamDriver.setUp(builder);
-        for (final int expectedKey : expectedKeys) {
-            kstreamDriver.setTime(expectedKey / 2L);
-            kstreamDriver.process(topicName, expectedKey, expectedKey * 10);
-        }
-
-        // TODO: un-comment after replaced with TopologyTestDriver
-        //kstreamDriver.punctuate(2);
-        //kstreamDriver.punctuate(3);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(
+            builder.build(),
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test")
+            )),
+            0L)) {
+            final ConsumerRecordFactory<Integer, Integer> recordFactory =
+                new ConsumerRecordFactory<>(TOPIC_NAME, new 
IntegerSerializer(), new IntegerSerializer());
 
-        //assertEquals(6, processor.theCapturedProcessor().processed.size());
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(expectedKey, expectedKey 
* 10, expectedKey / 2L));
+            }
 
-        //String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", 
"-1:2", "-1:3"};
+            driver.advanceWallClockTime(2);
+            driver.advanceWallClockTime(1);
 
-        final String[] expected = {"2:10 (ts: 0)", "20:110 (ts: 5)", "200:1110 
(ts: 50)", "2000:11110 (ts: 500)"};
+            final String[] expected = {
+                "2:10 (ts: 0)",
+                "20:110 (ts: 5)",
+                "200:1110 (ts: 50)",
+                "2000:11110 (ts: 500)",
+                "-1:2 (ts: 2)",
+                "-1:3 (ts: 3)"
+            };
 
-        for (int i = 0; i < expected.length; i++) {
-            assertEquals(expected[i], 
processor.theCapturedProcessor().processed.get(i));
+            assertEquals(expected.length, 
processor.theCapturedProcessor().processed.size());
+            for (int i = 0; i < expected.length; i++) {
+                assertEquals(expected[i], 
processor.theCapturedProcessor().processed.get(i));
+            }
         }
     }
 
@@ -125,15 +142,15 @@ public class KStreamTransformTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         final MockProcessorSupplier<Integer, Integer> processor = new 
MockProcessorSupplier<>();
-        final KStream<Integer, Integer> stream = builder.stream(topicName, 
Consumed.with(Serdes.Integer(), Serdes.Integer()));
+        final KStream<Integer, Integer> stream = builder.stream(TOPIC_NAME, 
Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.transform(transformerSupplier).process(processor);
 
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
             for (final int expectedKey : expectedKeys) {
-                driver.pipeInput(recordFactory.create(topicName, expectedKey, 
expectedKey * 10, 0L));
+                driver.pipeInput(recordFactory.create(TOPIC_NAME, expectedKey, 
expectedKey * 10, 0L));
             }
 
-            // This tick will yield yields the "-1:2" result
+            // This tick yields the "-1:2" result
             driver.advanceWallClockTime(2);
             // This tick further advances the clock to 3, which leads to the 
"-1:3" result
             driver.advanceWallClockTime(1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 6144051..b704e13 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -18,52 +18,38 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.junit.Assert.assertEquals;
 
-@SuppressWarnings("deprecation")
 public class KTableAggregateTest {
     private final Serde<String> stringSerde = Serdes.String();
     private final Consumed<String, String> consumed = 
Consumed.with(stringSerde, stringSerde);
-    private final Grouped<String, String> stringSerialzied = 
Grouped.with(stringSerde, stringSerde);
+    private final Grouped<String, String> stringSerialized = 
Grouped.with(stringSerde, stringSerde);
     private final MockProcessorSupplier<String, Object> supplier = new 
MockProcessorSupplier<>();
 
-    private File stateDir = null;
-
-    @Rule
-    public EmbeddedKafkaCluster cluster = null;
-    @Rule
-    public final org.apache.kafka.test.KStreamTestDriver driver = new 
org.apache.kafka.test.KStreamTestDriver();
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
-
     @Test
     public void testAggBasic() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -73,7 +59,7 @@ public class KTableAggregateTest {
         final KTable<String, String> table2 = table1
             .groupBy(
                 MockMapper.noOpKeyValueMapper(),
-                stringSerialzied)
+                stringSerialized)
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
@@ -83,76 +69,43 @@ public class KTableAggregateTest {
 
         table2.toStream().process(supplier);
 
-        driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
-
-        driver.setTime(10L);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        driver.setTime(15L);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        driver.setTime(20L);
-        driver.process(topic1, "A", "3");
-        driver.flushState();
-        driver.setTime(18L);
-        driver.process(topic1, "B", "4");
-        driver.flushState();
-        driver.setTime(5L);
-        driver.process(topic1, "C", "5");
-        driver.flushState();
-        driver.setTime(25L);
-        driver.process(topic1, "D", "6");
-        driver.flushState();
-        driver.setTime(15L);
-        driver.process(topic1, "B", "7");
-        driver.flushState();
-        driver.setTime(10L);
-        driver.process(topic1, "C", "8");
-        driver.flushState();
-
-        assertEquals(
-            asList(
-                "A:0+1 (ts: 10)",
-                "B:0+2 (ts: 15)",
-                "A:0+1-1+3 (ts: 20)",
-                "B:0+2-2+4 (ts: 18)",
-                "C:0+5 (ts: 5)",
-                "D:0+6 (ts: 25)",
-                "B:0+2-2+4-4+7 (ts: 18)",
-                "C:0+5-5+8 (ts: 10)"),
-            supplier.theCapturedProcessor().processed);
-    }
-
-
-    @Test
-    public void testAggCoalesced() {
-        final StreamsBuilder builder = new StreamsBuilder();
-        final String topic1 = "topic1";
-
-        final KTable<String, String> table1 = builder.table(topic1, consumed);
-        final KTable<String, String> table2 = table1
-            .groupBy(
-                MockMapper.noOpKeyValueMapper(),
-                stringSerialzied)
-            .aggregate(MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                MockAggregator.TOSTRING_REMOVER,
-                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("topic1-Canonized")
-                    .withValueSerde(stringSerde));
-
-        table2.toStream().process(supplier);
-
-        driver.setUp(builder, stateDir);
-
-        driver.setTime(10L);
-        driver.process(topic1, "A", "1");
-        driver.setTime(20L);
-        driver.process(topic1, "A", "3");
-        driver.setTime(15L);
-        driver.process(topic1, "A", "4");
-        driver.flushState();
-
-        assertEquals(Collections.singletonList("A:0+4 (ts: 15)"), 
supplier.theCapturedProcessor().processed);
+        try (
+            final TopologyTestDriver driver = new TopologyTestDriver(
+                builder.build(),
+                mkProperties(mkMap(
+                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
+                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
+                    mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory("kafka-test").getAbsolutePath())
+                )),
+                0L)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(new StringSerializer(), new 
StringSerializer(), 0L, 0L);
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 15L));
+            driver.pipeInput(recordFactory.create(topic1, "A", "3", 20L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "4", 18L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "5", 5L));
+            driver.pipeInput(recordFactory.create(topic1, "D", "6", 25L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "7", 15L));
+            driver.pipeInput(recordFactory.create(topic1, "C", "8", 10L));
+
+            assertEquals(
+                asList(
+                    "A:0+1 (ts: 10)",
+                    "B:0+2 (ts: 15)",
+                    "A:0+1-1 (ts: 20)",
+                    "A:0+1-1+3 (ts: 20)",
+                    "B:0+2-2 (ts: 18)",
+                    "B:0+2-2+4 (ts: 18)",
+                    "C:0+5 (ts: 5)",
+                    "D:0+6 (ts: 25)",
+                    "B:0+2-2+4-4 (ts: 18)",
+                    "B:0+2-2+4-4+7 (ts: 18)",
+                    "C:0+5-5 (ts: 10)",
+                    "C:0+5-5+8 (ts: 10)"),
+                supplier.theCapturedProcessor().processed);
+        }
     }
 
     @Test
@@ -173,7 +126,7 @@ public class KTableAggregateTest {
                             return KeyValue.pair(value, value);
                     }
                 },
-                stringSerialzied)
+                stringSerialized)
             .aggregate(
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
@@ -183,78 +136,74 @@ public class KTableAggregateTest {
 
         table2.toStream().process(supplier);
 
-        driver.setUp(builder, stateDir);
-
-        driver.setTime(10L);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        driver.setTime(15L);
-        driver.process(topic1, "A", null);
-        driver.flushState();
-        driver.setTime(12L);
-        driver.process(topic1, "A", "1");
-        driver.flushState();
-        driver.setTime(20L);
-        driver.process(topic1, "B", "2");
-        driver.flushState();
-        driver.setTime(25L);
-        driver.process(topic1, "null", "3");
-        driver.flushState();
-        driver.setTime(23L);
-        driver.process(topic1, "B", "4");
-        driver.flushState();
-        driver.setTime(24L);
-        driver.process(topic1, "NULL", "5");
-        driver.flushState();
-        driver.setTime(22L);
-        driver.process(topic1, "B", "7");
-        driver.flushState();
-
-        assertEquals(
-            asList(
-                "1:0+1 (ts: 10)",
-                "1:0+1-1 (ts: 15)",
-                "1:0+1-1+1 (ts: 15)",
-                "2:0+2 (ts: 20)",
-                  //noop
-                "2:0+2-2 (ts: 23)", "4:0+4 (ts: 23)",
-                  //noop
-                "4:0+4-4 (ts: 23)", "7:0+7 (ts: 22)"),
-            supplier.theCapturedProcessor().processed);
+        try (
+            final TopologyTestDriver driver = new TopologyTestDriver(
+                builder.build(),
+                mkProperties(mkMap(
+                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
+                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
+                    mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory("kafka-test").getAbsolutePath())
+                )),
+                0L)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(new StringSerializer(), new 
StringSerializer(), 0L, 0L);
+
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 10L));
+            driver.pipeInput(recordFactory.create(topic1, "A", (String) null, 
15L));
+            driver.pipeInput(recordFactory.create(topic1, "A", "1", 12L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "2", 20L));
+            driver.pipeInput(recordFactory.create(topic1, "null", "3", 25L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "4", 23L));
+            driver.pipeInput(recordFactory.create(topic1, "NULL", "5", 24L));
+            driver.pipeInput(recordFactory.create(topic1, "B", "7", 22L));
+
+            assertEquals(
+                asList(
+                    "1:0+1 (ts: 10)",
+                    "1:0+1-1 (ts: 15)",
+                    "1:0+1-1+1 (ts: 15)",
+                    "2:0+2 (ts: 20)",
+                    //noop
+                    "2:0+2-2 (ts: 23)", "4:0+4 (ts: 23)",
+                    //noop
+                    "4:0+4-4 (ts: 23)", "7:0+7 (ts: 22)"),
+                supplier.theCapturedProcessor().processed);
+        }
     }
 
-    private void testCountHelper(final StreamsBuilder builder,
-                                 final String input,
-                                 final MockProcessorSupplier<String, Object> 
supplier) {
-        driver.setUp(builder, stateDir);
-
-        driver.setTime(10L);
-        driver.process(input, "A", "green");
-        driver.flushState();
-        driver.setTime(9L);
-        driver.process(input, "B", "green");
-        driver.flushState();
-        driver.setTime(12L);
-        driver.process(input, "A", "blue");
-        driver.flushState();
-        driver.setTime(15L);
-        driver.process(input, "C", "yellow");
-        driver.flushState();
-        driver.setTime(11L);
-        driver.process(input, "D", "green");
-        driver.flushState();
-        driver.flushState();
-
-        assertEquals(
-            asList(
-                "green:1 (ts: 10)",
-                "green:2 (ts: 10)",
-                "green:1 (ts: 12)", "blue:1 (ts: 12)",
-                "yellow:1 (ts: 15)",
-                "green:2 (ts: 12)"),
-            supplier.theCapturedProcessor().processed);
+    private static void testCountHelper(final StreamsBuilder builder,
+                                        final String input,
+                                        final MockProcessorSupplier<String, 
Object> supplier) {
+        try (
+            final TopologyTestDriver driver = new TopologyTestDriver(
+                builder.build(),
+                mkProperties(mkMap(
+                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
+                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
+                    mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory("kafka-test").getAbsolutePath())
+                )),
+                0L)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(new StringSerializer(), new 
StringSerializer(), 0L, 0L);
+
+            driver.pipeInput(recordFactory.create(input, "A", "green", 10L));
+            driver.pipeInput(recordFactory.create(input, "B", "green", 9L));
+            driver.pipeInput(recordFactory.create(input, "A", "blue", 12L));
+            driver.pipeInput(recordFactory.create(input, "C", "yellow", 15L));
+            driver.pipeInput(recordFactory.create(input, "D", "green", 11L));
+
+            assertEquals(
+                asList(
+                    "green:1 (ts: 10)",
+                    "green:2 (ts: 10)",
+                    "green:1 (ts: 12)", "blue:1 (ts: 12)",
+                    "yellow:1 (ts: 15)",
+                    "green:2 (ts: 12)"),
+                supplier.theCapturedProcessor().processed);
+        }
     }
 
+
     @Test
     public void testCount() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -262,7 +211,7 @@ public class KTableAggregateTest {
 
         builder
             .table(input, consumed)
-            .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied)
+            .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialized)
             .count(Materialized.as("count"))
             .toStream()
             .process(supplier);
@@ -277,7 +226,7 @@ public class KTableAggregateTest {
 
         builder
             .table(input, consumed)
-            .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied)
+            .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialized)
             .count()
             .toStream()
             .process(supplier);
@@ -286,43 +235,6 @@ public class KTableAggregateTest {
     }
 
     @Test
-    public void testCountCoalesced() {
-        final StreamsBuilder builder = new StreamsBuilder();
-        final String input = "count-test-input";
-        final MockProcessorSupplier<String, Long> supplier = new 
MockProcessorSupplier<>();
-
-        builder
-            .table(input, consumed)
-            .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied)
-            .count(Materialized.as("count"))
-            .toStream()
-            .process(supplier);
-
-        driver.setUp(builder, stateDir);
-
-        final MockProcessor<String, Long> proc = 
supplier.theCapturedProcessor();
-
-        driver.setTime(10L);
-        driver.process(input, "A", "green");
-        driver.setTime(8L);
-        driver.process(input, "B", "green");
-        driver.setTime(9L);
-        driver.process(input, "A", "blue");
-        driver.setTime(10L);
-        driver.process(input, "C", "yellow");
-        driver.setTime(15L);
-        driver.process(input, "D", "green");
-        driver.flushState();
-
-        assertEquals(
-            asList(
-                "blue:1 (ts: 9)",
-                "yellow:1 (ts: 10)",
-                "green:2 (ts: 15)"),
-            proc.processed);
-    }
-
-    @Test
     public void testRemoveOldBeforeAddNew() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "count-test-input";
@@ -334,7 +246,7 @@ public class KTableAggregateTest {
                 (key, value) -> KeyValue.pair(
                     String.valueOf(key.charAt(0)),
                     String.valueOf(key.charAt(1))),
-                stringSerialzied)
+                stringSerialized)
             .aggregate(
                 () -> "",
                 (aggKey, value, aggregate) -> aggregate + value,
@@ -344,70 +256,35 @@ public class KTableAggregateTest {
             .toStream()
             .process(supplier);
 
-        driver.setUp(builder, stateDir);
-
-        final MockProcessor<String, String> proc = 
supplier.theCapturedProcessor();
-
-        driver.setTime(10L);
-        driver.process(input, "11", "A");
-        driver.flushState();
-        driver.setTime(8L);
-        driver.process(input, "12", "B");
-        driver.flushState();
-        driver.setTime(12L);
-        driver.process(input, "11", null);
-        driver.flushState();
-        driver.setTime(6L);
-        driver.process(input, "12", "C");
-        driver.flushState();
-
-        assertEquals(
-            asList(
-                "1:1 (ts: 10)",
-                "1:12 (ts: 10)",
-                "1:2 (ts: 12)",
-                "1:2 (ts: 12)"),
-            proc.processed);
-    }
-
-    @Test
-    public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() {
-        final String tableOne = "tableOne";
-        final String tableTwo = "tableTwo";
-        final StreamsBuilder builder = new StreamsBuilder();
-        final String reduceTopic = "TestDriver-reducer-store-repartition";
-        final Map<String, Long> reduceResults = new HashMap<>();
-
-        final KTable<String, String> one = builder.table(tableOne, consumed);
-        final KTable<Long, String> two = builder.table(tableTwo, 
Consumed.with(Serdes.Long(), Serdes.String()));
-
-        final KTable<String, Long> reduce = two
-            .groupBy(
-                (key, value) -> new KeyValue<>(value, key),
-                Grouped.with(Serdes.String(), Serdes.Long()))
-            .reduce(
-                (value1, value2) -> value1 + value2,
-                (value1, value2) -> value1 - value2,
-                Materialized.as("reducer-store"));
-
-        reduce.toStream().foreach(reduceResults::put);
-
-        one.leftJoin(reduce, (value1, value2) -> value1 + ":" + value2)
-            .mapValues(value -> value);
-
-        driver.setUp(builder, stateDir, 111);
-        driver.process(reduceTopic, "1", new Change<>(1L, null));
-        driver.process("tableOne", "2", "2");
-        // this should trigger eviction on the reducer-store topic
-        driver.process(reduceTopic, "2", new Change<>(2L, null));
-        // this wont as it is the same value
-        driver.process(reduceTopic, "2", new Change<>(2L, null));
-        assertEquals(Long.valueOf(2L), reduceResults.get("2"));
-
-        // this will trigger eviction on the tableOne topic
-        // that in turn will cause an eviction on reducer-topic. It will flush
-        // key 2 as it is the only dirty entry in the cache
-        driver.process("tableOne", "1", "5");
-        assertEquals(Long.valueOf(4L), reduceResults.get("2"));
+        try (
+            final TopologyTestDriver driver = new TopologyTestDriver(
+                builder.build(),
+                mkProperties(mkMap(
+                    mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"),
+                    mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"),
+                    mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory("kafka-test").getAbsolutePath())
+                )),
+                0L)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(new StringSerializer(), new 
StringSerializer(), 0L, 0L);
+
+            final MockProcessor<String, String> proc = 
supplier.theCapturedProcessor();
+
+            driver.pipeInput(recordFactory.create(input, "11", "A", 10L));
+            driver.pipeInput(recordFactory.create(input, "12", "B", 8L));
+            driver.pipeInput(recordFactory.create(input, "11", (String) null, 
12L));
+            driver.pipeInput(recordFactory.create(input, "12", "C", 6L));
+
+            assertEquals(
+                asList(
+                    "1:1 (ts: 10)",
+                    "1:12 (ts: 10)",
+                    "1:2 (ts: 12)",
+                    "1: (ts: 12)",
+                    "1:2 (ts: 12)"
+                ),
+                proc.processed
+            );
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
deleted file mode 100644
index b83936b..0000000
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.TopologyWrapper;
-import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.junit.rules.ExternalResource;
-
-import java.io.File;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-/**
- * KStreamTestDriver
- *
- * @deprecated please use {@link org.apache.kafka.streams.TopologyTestDriver} 
instead
- */
-@Deprecated
-public class KStreamTestDriver extends ExternalResource {
-
-    private static final long DEFAULT_CACHE_SIZE_BYTES = 1024 * 1024L;
-
-    private ProcessorTopology topology;
-    private InternalMockProcessorContext context;
-    private ProcessorTopology globalTopology;
-    private final LogContext logContext = new LogContext("testCache ");
-
-    public void setUp(final StreamsBuilder builder) {
-        setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
-    }
-
-    public void setUp(final StreamsBuilder builder, final File stateDir) {
-        setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
-    }
-
-    public void setUp(final StreamsBuilder builder, final File stateDir, final 
long cacheSize) {
-        setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), 
cacheSize);
-    }
-
-    public void setUp(final StreamsBuilder builder,
-                      final File stateDir,
-                      final Serde<?> keySerde,
-                      final Serde<?> valSerde) {
-        setUp(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
-    }
-
-    public void setUp(final StreamsBuilder builder,
-                      final File stateDir,
-                      final Serde<?> keySerde,
-                      final Serde<?> valSerde,
-                      final long cacheSize) {
-        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(builder.build());
-
-        internalTopologyBuilder.setApplicationId("TestDriver");
-        topology = internalTopologyBuilder.build(null);
-        globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
-
-        final ThreadCache cache = new ThreadCache(logContext, cacheSize, new 
MockStreamsMetrics(new Metrics()));
-        context = new InternalMockProcessorContext(stateDir, keySerde, 
valSerde, new MockRecordCollector(), cache);
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic", 
null));
-
-        // init global topology first as it will add stores to the
-        // store map that are required for joins etc.
-        if (globalTopology != null) {
-            initTopology(globalTopology, globalTopology.globalStateStores());
-        }
-        initTopology(topology, topology.stateStores());
-    }
-
-    @Override
-    protected void after() {
-        if (topology != null) {
-            close();
-        }
-    }
-
-    private void initTopology(final ProcessorTopology topology, final 
List<StateStore> stores) {
-        for (final StateStore store : stores) {
-            try {
-                store.init(context, store);
-            } catch (final RuntimeException e) {
-                new RuntimeException("Fatal exception initializing store.", 
e).printStackTrace();
-                throw e;
-            }
-        }
-
-        for (final ProcessorNode node : topology.processors()) {
-            context.setCurrentNode(node);
-            try {
-                node.init(context);
-            } finally {
-                context.setCurrentNode(null);
-            }
-        }
-    }
-
-    public ProcessorTopology topology() {
-        return topology;
-    }
-
-    public ProcessorContext context() {
-        return context;
-    }
-
-    public void process(final String topicName, final Object key, final Object 
value) {
-        final ProcessorNode prevNode = context.currentNode();
-        final ProcessorNode currNode = sourceNodeByTopicName(topicName);
-
-        if (currNode != null) {
-            context.setRecordContext(createRecordContext(topicName, 
context.timestamp()));
-            context.setCurrentNode(currNode);
-            try {
-                context.forward(key, value);
-            } finally {
-                context.setCurrentNode(prevNode);
-            }
-        }
-    }
-
-    private ProcessorNode sourceNodeByTopicName(final String topicName) {
-        ProcessorNode topicNode = topology.source(topicName);
-        if (topicNode == null) {
-            for (final String sourceTopic : topology.sourceTopics()) {
-                if (Pattern.compile(sourceTopic).matcher(topicName).matches()) 
{
-                    return topology.source(sourceTopic);
-                }
-            }
-            if (globalTopology != null) {
-                topicNode = globalTopology.source(topicName);
-            }
-        }
-
-        return topicNode;
-    }
-
-    public void setTime(final long timestamp) {
-        context.setTime(timestamp);
-    }
-
-    public void close() {
-        // close all processors
-        for (final ProcessorNode node : topology.processors()) {
-            context.setCurrentNode(node);
-            try {
-                node.close();
-            } finally {
-                context.setCurrentNode(null);
-            }
-        }
-
-        closeState();
-    }
-
-    public Set<String> allProcessorNames() {
-        final Set<String> names = new HashSet<>();
-
-        final List<ProcessorNode> nodes = topology.processors();
-
-        for (final ProcessorNode node : nodes) {
-            names.add(node.name());
-        }
-
-        return names;
-    }
-
-    public ProcessorNode processor(final String name) {
-        final List<ProcessorNode> nodes = topology.processors();
-
-        for (final ProcessorNode node : nodes) {
-            if (node.name().equals(name)) {
-                return node;
-            }
-        }
-
-        return null;
-    }
-
-    public Map<String, StateStore> allStateStores() {
-        return context.allStateStores();
-    }
-
-    public void flushState() {
-        for (final StateStore stateStore : context.allStateStores().values()) {
-            stateStore.flush();
-        }
-    }
-
-    private void closeState() {
-        // we need to first flush all stores before trying to close any one
-        // of them since the flushing could cause eviction and hence tries to 
access other stores
-        flushState();
-
-        for (final StateStore stateStore : context.allStateStores().values()) {
-            stateStore.close();
-        }
-    }
-
-    private ProcessorRecordContext createRecordContext(final String topicName, 
final long timestamp) {
-        return new ProcessorRecordContext(timestamp, -1, -1, topicName, null);
-    }
-
-    private class MockRecordCollector extends RecordCollectorImpl {
-        MockRecordCollector() {
-            super("KStreamTestDriver", new LogContext("KStreamTestDriver "), 
new DefaultProductionExceptionHandler(), new 
Metrics().sensor("skipped-records"));
-        }
-
-        @Override
-        public <K, V> void send(final String topic,
-                                final K key,
-                                final V value,
-                                final Headers headers,
-                                final Long timestamp,
-                                final Serializer<K> keySerializer,
-                                final Serializer<V> valueSerializer,
-                                final StreamPartitioner<? super K, ? super V> 
partitioner) {
-            // The serialization is skipped.
-            if (sourceNodeByTopicName(topic) != null) {
-                process(topic, key, value);
-            }
-        }
-
-        @Override
-        public <K, V> void send(final String topic,
-                                final K key,
-                                final V value,
-                                final Headers headers,
-                                final Integer partition,
-                                final Long timestamp,
-                                final Serializer<K> keySerializer,
-                                final Serializer<V> valueSerializer) {
-            // The serialization is skipped.
-            if (sourceNodeByTopicName(topic) != null) {
-                process(topic, key, value);
-            }
-        }
-
-        @Override
-        public void flush() {}
-
-        @Override
-        public void close() {}
-    }
-}
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 23dbf30..38da0d8 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -587,7 +587,6 @@ public class TopologyTestDriver implements Closeable {
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
      */
-    @SuppressWarnings("WeakerAccess")
     public Map<String, StateStore> getAllStateStores() {
         final Map<String, StateStore> allStores = new HashMap<>();
         for (final String storeName : 
internalTopologyBuilder.allStateStoreName()) {

Reply via email to