Repository: kafka
Updated Branches:
  refs/heads/trunk 75070bdb5 -> 51af6fb65


MINOR: Allow timestamp parameter in `ProcessorTopologyTestDriver.process`

All current implementations process records using the same timestamp. This 
makes it difficult to test operations that require time windows, like 
`KStream-KStream joins`.

This change would allow tests to simulate records created at different times, 
thus making it possible to test operations like the above mentioned joins.

Author: Sebastian Gavril <sgav...@wehkamp.nl>

Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com>

Closes #3753 from sebigavril/allow-timestamps-in-test-driver


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51af6fb6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51af6fb6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51af6fb6

Branch: refs/heads/trunk
Commit: 51af6fb654c482fb6fdd85126991149238640fd7
Parents: 75070bd
Author: Sebastian Gavril <sgav...@wehkamp.nl>
Authored: Thu Aug 31 14:57:54 2017 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Thu Aug 31 14:57:54 2017 -0700

----------------------------------------------------------------------
 .../internals/ProcessorTopologyTest.java        | 46 +++++++++++++++-----
 .../kafka/test/ProcessorTopologyTestDriver.java | 23 +++++++++-
 2 files changed, 56 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/51af6fb6/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index fd3afa8..1ea09cd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -63,7 +63,6 @@ public class ProcessorTopologyTest {
     private static final String OUTPUT_TOPIC_2 = "output-topic-2";
     private static final String THROUGH_TOPIC_1 = "through-topic-1";
 
-    private static long timestamp = 1000L;
     private final TopologyBuilder builder = new TopologyBuilder();
     private final MockProcessorSupplier mockProcessorSupplier = new 
MockProcessorSupplier();
 
@@ -315,20 +314,43 @@ public class ProcessorTopologyTest {
         assertThat(result, 
containsString("child-two:\n\t\tchildren:\t[child-two-one]"));
     }
 
-    private void assertNextOutputRecord(String topic, String key, String 
value) {
-        ProducerRecord<String, String> record = driver.readOutput(topic, 
STRING_DESERIALIZER, STRING_DESERIALIZER);
-        assertEquals(topic, record.topic());
-        assertEquals(key, record.key());
-        assertEquals(value, record.value());
-        assertNull(record.partition());
+    @Test
+    public void shouldConsiderTimeStamps() throws Exception {
+        final int partition = 10;
+        driver = new ProcessorTopologyTestDriver(config, 
createSimpleTopology(partition).internalTopologyBuilder);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER, 10L);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER, 20L);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, 
STRING_SERIALIZER, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 
10L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 
20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 
30L);
     }
 
-    private void assertNextOutputRecord(String topic, String key, String 
value, Integer partition) {
+
+    private void assertNextOutputRecord(final String topic,
+                                        final String key,
+                                        final String value) {
+        assertNextOutputRecord(topic, key, value, null, 0L);
+    }
+
+    private void assertNextOutputRecord(final String topic,
+                                        final String key,
+                                        final String value,
+                                        final Integer partition) {
+        assertNextOutputRecord(topic, key, value, partition, 0L);
+    }
+
+    private void assertNextOutputRecord(final String topic,
+                                        final String key,
+                                        final String value,
+                                        final Integer partition,
+                                        final Long timestamp) {
         ProducerRecord<String, String> record = driver.readOutput(topic, 
STRING_DESERIALIZER, STRING_DESERIALIZER);
         assertEquals(topic, record.topic());
         assertEquals(key, record.key());
         assertEquals(value, record.value());
         assertEquals(partition, record.partition());
+        assertEquals(timestamp, record.timestamp());
     }
 
     private void assertNoOutputRecord(String topic) {
@@ -543,18 +565,20 @@ public class ProcessorTopologyTest {
 
     /**
      * A custom timestamp extractor that extracts the timestamp from the 
record's value if the value is in ".*@[0-9]+"
-     * format. Otherwise, it returns the record's timestamp or the default 
timestamp if the record's timestamp is zero.
+     * format. Otherwise, it returns the record's timestamp or the default 
timestamp if the record's timestamp is negative.
     */
     public static class CustomTimestampExtractor implements TimestampExtractor 
{
+        private static final long DEFAULT_TIMESTAMP = 1000L;
+
         @Override
         public long extract(final ConsumerRecord<Object, Object> record, final 
long previousTimestamp) {
             if (record.value().toString().matches(".*@[0-9]+"))
                 return Long.parseLong(record.value().toString().split("@")[1]);
 
-            if (record.timestamp() > 0L)
+            if (record.timestamp() >= 0L)
                 return record.timestamp();
 
-            return timestamp;
+            return DEFAULT_TIMESTAMP;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/51af6fb6/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 47124ed..deba860 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -240,7 +240,7 @@ public class ProcessorTopologyTestDriver {
      * @param value the raw message value
      * @param timestamp the raw message timestamp
      */
-    private void process(final String topicName,
+    public void process(final String topicName,
                          final byte[] key,
                          final byte[] value,
                          final long timestamp) {
@@ -309,7 +309,26 @@ public class ProcessorTopologyTestDriver {
                                final V value,
                                final Serializer<K> keySerializer,
                                final Serializer<V> valueSerializer) {
-        process(topicName, keySerializer.serialize(topicName, key), 
valueSerializer.serialize(topicName, value));
+        process(topicName, key, value, keySerializer, valueSerializer, 0L);
+    }
+
+    /**
+     * Send an input message with the given key and value and timestamp on the 
specified topic to the topology.
+     *
+     * @param topicName the name of the topic on which the message is to be 
sent
+     * @param key the raw message key
+     * @param value the raw message value
+     * @param keySerializer the serializer for the key
+     * @param valueSerializer the serializer for the value
+     * @param timestamp the raw message timestamp
+     */
+    public <K, V> void process(final String topicName,
+                               final K key,
+                               final V value,
+                               final Serializer<K> keySerializer,
+                               final Serializer<V> valueSerializer,
+                               final long timestamp) {
+        process(topicName, keySerializer.serialize(topicName, key), 
valueSerializer.serialize(topicName, value), timestamp);
     }
 
     /**

Reply via email to