Repository: kafka Updated Branches: refs/heads/trunk 75070bdb5 -> 51af6fb65
MINOR: Allow timestamp parameter in `ProcessorTopologyTestDriver.process` All current implementations process records using the same timestamp. This makes it difficult to test operations that require time windows, like `KStream-KStream joins`. This change would allow tests to simulate records created at different times, thus making it possible to test operations like the above mentioned joins. Author: Sebastian Gavril <sgav...@wehkamp.nl> Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #3753 from sebigavril/allow-timestamps-in-test-driver Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51af6fb6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51af6fb6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51af6fb6 Branch: refs/heads/trunk Commit: 51af6fb654c482fb6fdd85126991149238640fd7 Parents: 75070bd Author: Sebastian Gavril <sgav...@wehkamp.nl> Authored: Thu Aug 31 14:57:54 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Thu Aug 31 14:57:54 2017 -0700 ---------------------------------------------------------------------- .../internals/ProcessorTopologyTest.java | 46 +++++++++++++++----- .../kafka/test/ProcessorTopologyTestDriver.java | 23 +++++++++- 2 files changed, 56 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/51af6fb6/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index fd3afa8..1ea09cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -63,7 +63,6 @@ public class ProcessorTopologyTest { private static final String OUTPUT_TOPIC_2 = "output-topic-2"; private static final String THROUGH_TOPIC_1 = "through-topic-1"; - private static long timestamp = 1000L; private final TopologyBuilder builder = new TopologyBuilder(); private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier(); @@ -315,20 +314,43 @@ public class ProcessorTopologyTest { assertThat(result, containsString("child-two:\n\t\tchildren:\t[child-two-one]")); } - private void assertNextOutputRecord(String topic, String key, String value) { - ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER); - assertEquals(topic, record.topic()); - assertEquals(key, record.key()); - assertEquals(value, record.value()); - assertNull(record.partition()); + @Test + public void shouldConsiderTimeStamps() throws Exception { + final int partition = 10; + driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder); + driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L); + driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER, 20L); + driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER, 30L); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 10L); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 20L); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 30L); } - private void assertNextOutputRecord(String topic, String key, String value, Integer partition) { + + private void assertNextOutputRecord(final String topic, + final String key, + final String value) { + assertNextOutputRecord(topic, key, value, null, 0L); + } + + private void assertNextOutputRecord(final String topic, + final String key, + final String value, + final Integer partition) { + assertNextOutputRecord(topic, key, value, partition, 0L); + } + + private void assertNextOutputRecord(final String topic, + final String key, + final String value, + final Integer partition, + final Long timestamp) { ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER); assertEquals(topic, record.topic()); assertEquals(key, record.key()); assertEquals(value, record.value()); assertEquals(partition, record.partition()); + assertEquals(timestamp, record.timestamp()); } private void assertNoOutputRecord(String topic) { @@ -543,18 +565,20 @@ public class ProcessorTopologyTest { /** * A custom timestamp extractor that extracts the timestamp from the record's value if the value is in ".*@[0-9]+" - * format. Otherwise, it returns the record's timestamp or the default timestamp if the record's timestamp is zero. + * format. Otherwise, it returns the record's timestamp or the default timestamp if the record's timestamp is negative. */ public static class CustomTimestampExtractor implements TimestampExtractor { + private static final long DEFAULT_TIMESTAMP = 1000L; + @Override public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { if (record.value().toString().matches(".*@[0-9]+")) return Long.parseLong(record.value().toString().split("@")[1]); - if (record.timestamp() > 0L) + if (record.timestamp() >= 0L) return record.timestamp(); - return timestamp; + return DEFAULT_TIMESTAMP; } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/51af6fb6/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 47124ed..deba860 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -240,7 +240,7 @@ public class ProcessorTopologyTestDriver { * @param value the raw message value * @param timestamp the raw message timestamp */ - private void process(final String topicName, + public void process(final String topicName, final byte[] key, final byte[] value, final long timestamp) { @@ -309,7 +309,26 @@ public class ProcessorTopologyTestDriver { final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { - process(topicName, keySerializer.serialize(topicName, key), valueSerializer.serialize(topicName, value)); + process(topicName, key, value, keySerializer, valueSerializer, 0L); + } + + /** + * Send an input message with the given key and value and timestamp on the specified topic to the topology. + * + * @param topicName the name of the topic on which the message is to be sent + * @param key the raw message key + * @param value the raw message value + * @param keySerializer the serializer for the key + * @param valueSerializer the serializer for the value + * @param timestamp the raw message timestamp + */ + public <K, V> void process(final String topicName, + final K key, + final V value, + final Serializer<K> keySerializer, + final Serializer<V> valueSerializer, + final long timestamp) { + process(topicName, keySerializer.serialize(topicName, key), valueSerializer.serialize(topicName, value), timestamp); } /**