This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new dc54c0e KAFKA-3625: TopologyTestDriver must process output for wall-clock-time punctuations and on close() (#4502) dc54c0e is described below commit dc54c0e24b3f7ca27990b1d576b8b8fd6d740ca1 Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Fri Feb 9 15:29:21 2018 -0800 KAFKA-3625: TopologyTestDriver must process output for wall-clock-time punctuations and on close() (#4502) Author: Matthias J. Sax <matth...@confluent.io> Reviewer: Damian Guy <dam...@confluent.io>, Bill Bejeck <b...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- .../apache/kafka/streams/TopologyTestDriver.java | 64 +++++---- .../kafka/streams/TopologyTestDriverTest.java | 159 ++++++++++++++++++++- 2 files changed, 188 insertions(+), 35 deletions(-) diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index ff63554..a108f22 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -328,42 +328,14 @@ public class TopologyTestDriver { consumerRecord.serializedValueSize(), consumerRecord.key(), consumerRecord.value()))); - producer.clear(); // Process the record ... ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(consumerRecord.timestamp(), offset, topicPartition.partition(), topicName)); task.process(); task.maybePunctuateStreamTime(); task.commit(); + captureOutputRecords(); - // Capture all the records sent to the producer ... - for (final ProducerRecord<byte[], byte[]> record : producer.history()) { - Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic()); - if (outputRecords == null) { - outputRecords = new LinkedList<>(); - outputRecordsByTopic.put(record.topic(), outputRecords); - } - outputRecords.add(record); - - // Forward back into the topology if the produced record is to an internal or a source topic ... - final String outputTopicName = record.topic(); - if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)) { - final byte[] serializedKey = record.key(); - final byte[] serializedValue = record.value(); - - pipeInput(new ConsumerRecord<>( - outputTopicName, - -1, - -1L, - record.timestamp(), - TimestampType.CREATE_TIME, - 0L, - serializedKey == null ? 0 : serializedKey.length, - serializedValue == null ? 0 : serializedValue.length, - serializedKey, - serializedValue)); - } - } } else { final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName); if (globalTopicPartition == null) { @@ -385,6 +357,38 @@ public class TopologyTestDriver { } } + private void captureOutputRecords() { + // Capture all the records sent to the producer ... + final List<ProducerRecord<byte[], byte[]>> output = producer.history(); + producer.clear(); + for (final ProducerRecord<byte[], byte[]> record : output) { + Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic()); + if (outputRecords == null) { + outputRecords = new LinkedList<>(); + outputRecordsByTopic.put(record.topic(), outputRecords); + } + outputRecords.add(record); + + // Forward back into the topology if the produced record is to an internal or a source topic ... + final String outputTopicName = record.topic(); + if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)) { + final byte[] serializedKey = record.key(); + final byte[] serializedValue = record.value(); + + pipeInput(new ConsumerRecord<>( + outputTopicName, + -1, + -1L, + record.timestamp(), + TimestampType.CREATE_TIME, + 0L, + serializedKey == null ? 0 : serializedKey.length, + serializedValue == null ? 0 : serializedValue.length, + serializedKey, + serializedValue)); + } + } + } /** * Send input messages to the topology and then commit each message individually. * @@ -407,6 +411,7 @@ public class TopologyTestDriver { mockTime.sleep(advanceMs); task.maybePunctuateSystemTime(); task.commit(); + captureOutputRecords(); } /** @@ -558,6 +563,7 @@ public class TopologyTestDriver { // ignore } } + captureOutputRecords(); } static class MockTime implements Time { diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 921f6d6..17d5e02 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -19,8 +19,12 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.streams.processor.Processor; @@ -28,12 +32,15 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.OutputVerifier; import org.apache.kafka.test.TestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; @@ -81,6 +88,14 @@ public class TopologyTestDriverTest { put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); } }; + private KeyValueStore<String, Long> store; + + private StringDeserializer stringDeserializer = new StringDeserializer(); + private LongDeserializer longDeserializer = new LongDeserializer(); + private ConsumerRecordFactory<String, Long> recordFactory = new ConsumerRecordFactory<>( + new StringSerializer(), + new LongSerializer()); + private final static class Record { private Object key; @@ -223,7 +238,9 @@ public class TopologyTestDriverTest { @After public void tearDown() { - testDriver.close(); + if (testDriver != null) { + testDriver.close(); + } } private Topology setupSourceSinkTopology() { @@ -417,7 +434,7 @@ public class TopologyTestDriverTest { SINK_TOPIC_1, new Serializer() { @Override - public byte[] serialize(String topic, Object data) { + public byte[] serialize(final String topic, final Object data) { if (data instanceof Long) { return Serdes.Long().serializer().serialize(topic, (Long) data); } @@ -426,11 +443,11 @@ public class TopologyTestDriverTest { @Override public void close() {} @Override - public void configure(Map configs, boolean isKey) {} + public void configure(final Map configs, final boolean isKey) {} }, new Serializer() { @Override - public byte[] serialize(String topic, Object data) { + public byte[] serialize(final String topic, final Object data) { if (data instanceof String) { return Serdes.String().serializer().serialize(topic, (String) data); } @@ -439,7 +456,7 @@ public class TopologyTestDriverTest { @Override public void close() {} @Override - public void configure(Map configs, boolean isKey) {} + public void configure(final Map configs, final boolean isKey) {} }, processor); @@ -476,7 +493,7 @@ public class TopologyTestDriverTest { } @Test - public void shouldUseSinkeSpecificSerializers() { + public void shouldUseSinkSpecificSerializers() { final Topology topology = new Topology(); final String sourceName1 = "source-1"; @@ -691,4 +708,134 @@ public class TopologyTestDriverTest { expectedStoreNames.add("globalStore"); assertThat(testDriver.getAllStateStores().keySet(), equalTo(expectedStoreNames)); } + + private void setup() { + Topology topology = new Topology(); + topology.addSource("sourceProcessor", "input-topic"); + topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor"); + topology.addStateStore(Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("aggStore"), + Serdes.String(), + Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating + "aggregator"); + topology.addSink("sinkProcessor", "result-topic", "aggregator"); + + config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); + testDriver = new TopologyTestDriver(topology, config); + + store = testDriver.getKeyValueStore("aggStore"); + store.put("a", 21L); + } + + @Test + public void shouldFlushStoreForFirstInput() { + setup(); + testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L)); + OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); + Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + } + + @Test + public void shouldNotUpdateStoreForSmallerValue() { + setup(); + testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L)); + Assert.assertThat(store.get("a"), equalTo(21L)); + OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); + Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + } + + @Test + public void shouldNotUpdateStoreForLargerValue() { + setup(); + testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L)); + Assert.assertThat(store.get("a"), equalTo(42L)); + OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 42L); + Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + } + + @Test + public void shouldUpdateStoreForNewKey() { + setup(); + testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 9999L)); + Assert.assertThat(store.get("b"), equalTo(21L)); + OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); + OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "b", 21L); + Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + } + + @Test + public void shouldPunctuateIfEvenTimeAdvances() { + setup(); + testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L)); + OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); + + testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L)); + Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + + testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 10000L)); + OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); + Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + } + + @Test + public void shouldPunctuateIfWallClockTimeAdvances() { + setup(); + testDriver.advanceWallClockTime(60000); + OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L); + Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer)); + } + + private class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> { + @Override + public Processor<String, Long> get() { + return new CustomMaxAggregator(); + } + } + + private class CustomMaxAggregator implements Processor<String, Long> { + ProcessorContext context; + private KeyValueStore<String, Long> store; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + this.context = context; + context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() { + @Override + public void punctuate(final long timestamp) { + flushStore(); + } + }); + context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() { + @Override + public void punctuate(final long timestamp) { + flushStore(); + } + }); + store = (KeyValueStore<String, Long>) context.getStateStore("aggStore"); + } + + @Override + public void process(final String key, final Long value) { + final Long oldValue = store.get(key); + if (oldValue == null || value > oldValue) { + store.put(key, value); + } + } + + private void flushStore() { + final KeyValueIterator<String, Long> it = store.all(); + while (it.hasNext()) { + final KeyValue<String, Long> next = it.next(); + context.forward(next.key, next.value); + } + } + + @Override + public void punctuate(final long timestamp) {} + + @Override + public void close() {} + } } -- To stop receiving notification emails like this one, please contact mj...@apache.org.