[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-28 Thread Hamidreza Afzali (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888941#comment-15888941
 ] 

Hamidreza Afzali commented on KAFKA-4789:
-

Thanks!

> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>Assignee: Hamidreza Afzali
>  Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888903#comment-15888903
 ] 

Guozhang Wang commented on KAFKA-4789:
--

Thanks [~hrafzali] for the patch! I have added you to the contributor list so 
you can assign JIRAs to yourself in the future.

> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>Assignee: Hamidreza Afzali
>  Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888901#comment-15888901
 ] 

ASF GitHub Bot commented on KAFKA-4789:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2590


> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>  Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880641#comment-15880641
 ] 

ASF GitHub Bot commented on KAFKA-4789:
---

GitHub user hrafzali opened a pull request:

https://github.com/apache/kafka/pull/2590

KAFKA-4789: Added support to ProcessorTopologyTestDriver to forward 
timestamps to internal topics

This resolves the issue in the ProcessorTopologyTestDriver that the 
extracted timestamp is not forwarded with the produced record to the internal 
topics.

JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-4789

The contribution is my original work and I license the work to the project 
under the project's open source license.

@guozhangwang @dguy


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hrafzali/kafka 
KAFKA-4789_ProcessorTopologyTestDriver_timestamp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2590.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2590


commit 579a65a691dd9d07e4bf945badfe18c261da8540
Author: Hamidreza Afzali 
Date:   2017-02-23T10:53:18Z

KAFKA-4789: Added support to ProcessorTopologyTestDriver to forward 
extracted timestamps to internal topics




> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>  Labels: unit-test
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880637#comment-15880637
 ] 

ASF GitHub Bot commented on KAFKA-4789:
---

Github user hrafzali closed the pull request at:

https://github.com/apache/kafka/pull/2587


> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>  Labels: unit-test
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)