[jira] [Commented] (KAFKA-5368) Kafka Streams skipped-records-rate sensor produces nonzero values when the timestamps are valid

2017-06-02 Thread Hamidreza Afzali (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16034367#comment-16034367
 ] 

Hamidreza Afzali commented on KAFKA-5368:
-

*Problem:*

Skipped records sensor is using a {{Rate}} of type {{Count}} for 
skipped-records-rate metric.

In 
{{org.apache.kafka.streams.processor.internals.StreamThread#addRecordsToTasks}} 
the Count value is incremented by one regardless of the number of skipped 
records, i.e. the value increments even if no record is skipped.


{code}
skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", 
this.groupName, "The average per-second number of skipped records.", 
this.tags), new Rate(new Count()));

...

private void addRecordsToTasks(final ConsumerRecords records) {
if (records != null && !records.isEmpty()) {
...
streamsMetrics.skippedRecordsSensor.record(records.count() - 
numAddedRecords, timerStartedMs);
}
}
{code}

{{org.apache.kafka.streams.processor.internals.StreamThread#addRecordsToTasks}} 
is called in 
{{org.apache.kafka.streams.processor.internals.StreamThread#runLoop}} after 
each successful poll request.

{code}

private void runLoop() {
...
while (stillRunning()) {
...
final ConsumerRecords records = 
pollRequests(pollTimeMs);
if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
streamsMetrics.pollTimeSensor.record(computeLatency(), 
timerStartedMs);
addRecordsToTasks(records);
...
}
...
}
...
}
{code}

This can explain why skipped-records-rate is equal to poll-rate.

*Solution:*

The sensor should keep a sum of all skipped records.



> Kafka Streams skipped-records-rate sensor produces nonzero values when the 
> timestamps are valid
> ---
>
> Key: KAFKA-5368
> URL: https://issues.apache.org/jira/browse/KAFKA-5368
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Hamidreza Afzali
>Assignee: Hamidreza Afzali
>
> Kafka Streams skipped-records-rate sensor produces nonzero values even when 
> the timestamps are valid and records are processed. The values are equal to 
> poll-rate.
> Related issue: KAFKA-5055 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5368) Kafka Streams skipped-records-rate sensor produces nonzero values when the timestamps are valid

2017-06-02 Thread Hamidreza Afzali (JIRA)
Hamidreza Afzali created KAFKA-5368:
---

 Summary: Kafka Streams skipped-records-rate sensor produces 
nonzero values when the timestamps are valid
 Key: KAFKA-5368
 URL: https://issues.apache.org/jira/browse/KAFKA-5368
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Hamidreza Afzali
Assignee: Hamidreza Afzali


Kafka Streams skipped-records-rate sensor produces nonzero values even when the 
timestamps are valid and records are processed. The values are equal to 
poll-rate.

Related issue: KAFKA-5055 




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0

2017-03-06 Thread Hamidreza Afzali (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hamidreza Afzali resolved KAFKA-4408.
-
   Resolution: Fixed
Fix Version/s: 0.11.0.0

Issue resolved by pull request 2629
https://github.com/apache/kafka/pull/2629

> KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
> --
>
> Key: KAFKA-4408
> URL: https://issues.apache.org/jira/browse/KAFKA-4408
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Linux
>Reporter: Byron Nikolaidis
>Assignee: Hamidreza Afzali
>  Labels: newbie, unit-test
> Fix For: 0.11.0.0
>
>
> In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with 
> KTables.  The below test code worked fine under Kafka 0.10.0.1 but now 
> produces this error:
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Could not find partition info for topic: alertInputTopic
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)
> at 
> org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:174)
> at 
> mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)
> {code}
> package mil.navy.icap.kafka.streams.processor.track;
> import java.io.IOException;
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.Serdes.StringSerde;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KTable;
> import org.apache.kafka.test.ProcessorTopologyTestDriver;
> public class ProcessorDriverTest2 {
>  
>  public static void main(String[] args) throws IOException, 
> InterruptedException {
>  System.out.println("ProcessorDriverTest2");
>  
>  Properties props = new Properties();
>  props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
> "ProcessorDriverTest2");
>  props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>  props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  
>  StreamsConfig streamsConfig = new StreamsConfig(props);
>  
>  // topology
>  KStreamBuilder kstreamBuilder = new KStreamBuilder();
>  StringSerde stringSerde = new StringSerde();
>  KTable table = kstreamBuilder.table(stringSerde,
>  stringSerde, "alertInputTopic");
>  table.to(stringSerde, stringSerde, "alertOutputTopic");
>  
>  // create test driver
>  ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
>  streamsConfig, 
>  kstreamBuilder, 
>  "alertStore");
>  StringSerializer serializer = new StringSerializer();
>  StringDeserializer deserializer = new StringDeserializer();
>  // send data to input topic
>  testDriver.process("alertInputTopic", 
>  "the Key", "the Value", serializer, serializer);
>  
>  // read data from output topic
>  ProducerRecord rec = 
> testDriver.readOutput("alertOutputTopic", 
>  deserializer, deserializer);
>  
>  System.out.println("rec: " + rec);
>  }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0

2017-03-05 Thread Hamidreza Afzali (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4408 started by Hamidreza Afzali.
---
> KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
> --
>
> Key: KAFKA-4408
> URL: https://issues.apache.org/jira/browse/KAFKA-4408
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Linux
>Reporter: Byron Nikolaidis
>Assignee: Hamidreza Afzali
>  Labels: newbie, unit-test
>
> In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with 
> KTables.  The below test code worked fine under Kafka 0.10.0.1 but now 
> produces this error:
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Could not find partition info for topic: alertInputTopic
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)
> at 
> org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:174)
> at 
> mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)
> {code}
> package mil.navy.icap.kafka.streams.processor.track;
> import java.io.IOException;
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.Serdes.StringSerde;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KTable;
> import org.apache.kafka.test.ProcessorTopologyTestDriver;
> public class ProcessorDriverTest2 {
>  
>  public static void main(String[] args) throws IOException, 
> InterruptedException {
>  System.out.println("ProcessorDriverTest2");
>  
>  Properties props = new Properties();
>  props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
> "ProcessorDriverTest2");
>  props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>  props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  
>  StreamsConfig streamsConfig = new StreamsConfig(props);
>  
>  // topology
>  KStreamBuilder kstreamBuilder = new KStreamBuilder();
>  StringSerde stringSerde = new StringSerde();
>  KTable table = kstreamBuilder.table(stringSerde,
>  stringSerde, "alertInputTopic");
>  table.to(stringSerde, stringSerde, "alertOutputTopic");
>  
>  // create test driver
>  ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
>  streamsConfig, 
>  kstreamBuilder, 
>  "alertStore");
>  StringSerializer serializer = new StringSerializer();
>  StringDeserializer deserializer = new StringDeserializer();
>  // send data to input topic
>  testDriver.process("alertInputTopic", 
>  "the Key", "the Value", serializer, serializer);
>  
>  // read data from output topic
>  ProducerRecord rec = 
> testDriver.readOutput("alertOutputTopic", 
>  deserializer, deserializer);
>  
>  System.out.println("rec: " + rec);
>  }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0

2017-03-05 Thread Hamidreza Afzali (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hamidreza Afzali reassigned KAFKA-4408:
---

Assignee: Hamidreza Afzali

> KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
> --
>
> Key: KAFKA-4408
> URL: https://issues.apache.org/jira/browse/KAFKA-4408
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Linux
>Reporter: Byron Nikolaidis
>Assignee: Hamidreza Afzali
>  Labels: newbie, unit-test
>
> In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with 
> KTables.  The below test code worked fine under Kafka 0.10.0.1 but now 
> produces this error:
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Could not find partition info for topic: alertInputTopic
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)
> at 
> org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:174)
> at 
> mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)
> {code}
> package mil.navy.icap.kafka.streams.processor.track;
> import java.io.IOException;
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.Serdes.StringSerde;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KTable;
> import org.apache.kafka.test.ProcessorTopologyTestDriver;
> public class ProcessorDriverTest2 {
>  
>  public static void main(String[] args) throws IOException, 
> InterruptedException {
>  System.out.println("ProcessorDriverTest2");
>  
>  Properties props = new Properties();
>  props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
> "ProcessorDriverTest2");
>  props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>  props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  
>  StreamsConfig streamsConfig = new StreamsConfig(props);
>  
>  // topology
>  KStreamBuilder kstreamBuilder = new KStreamBuilder();
>  StringSerde stringSerde = new StringSerde();
>  KTable table = kstreamBuilder.table(stringSerde,
>  stringSerde, "alertInputTopic");
>  table.to(stringSerde, stringSerde, "alertOutputTopic");
>  
>  // create test driver
>  ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
>  streamsConfig, 
>  kstreamBuilder, 
>  "alertStore");
>  StringSerializer serializer = new StringSerializer();
>  StringDeserializer deserializer = new StringDeserializer();
>  // send data to input topic
>  testDriver.process("alertInputTopic", 
>  "the Key", "the Value", serializer, serializer);
>  
>  // read data from output topic
>  ProducerRecord rec = 
> testDriver.readOutput("alertOutputTopic", 
>  deserializer, deserializer);
>  
>  System.out.println("rec: " + rec);
>  }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0

2017-03-05 Thread Hamidreza Afzali (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896843#comment-15896843
 ] 

Hamidreza Afzali commented on KAFKA-4408:
-

This should resolve the issue: https://github.com/apache/kafka/pull/2629

> KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
> --
>
> Key: KAFKA-4408
> URL: https://issues.apache.org/jira/browse/KAFKA-4408
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Linux
>Reporter: Byron Nikolaidis
>  Labels: newbie, unit-test
>
> In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with 
> KTables.  The below test code worked fine under Kafka 0.10.0.1 but now 
> produces this error:
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Could not find partition info for topic: alertInputTopic
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)
> at 
> org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:174)
> at 
> mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)
> {code}
> package mil.navy.icap.kafka.streams.processor.track;
> import java.io.IOException;
> import java.util.Properties;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.Serdes.StringSerde;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KTable;
> import org.apache.kafka.test.ProcessorTopologyTestDriver;
> public class ProcessorDriverTest2 {
>  
>  public static void main(String[] args) throws IOException, 
> InterruptedException {
>  System.out.println("ProcessorDriverTest2");
>  
>  Properties props = new Properties();
>  props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
> "ProcessorDriverTest2");
>  props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>  props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>  props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
>  
>  StreamsConfig streamsConfig = new StreamsConfig(props);
>  
>  // topology
>  KStreamBuilder kstreamBuilder = new KStreamBuilder();
>  StringSerde stringSerde = new StringSerde();
>  KTable table = kstreamBuilder.table(stringSerde,
>  stringSerde, "alertInputTopic");
>  table.to(stringSerde, stringSerde, "alertOutputTopic");
>  
>  // create test driver
>  ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
>  streamsConfig, 
>  kstreamBuilder, 
>  "alertStore");
>  StringSerializer serializer = new StringSerializer();
>  StringDeserializer deserializer = new StringDeserializer();
>  // send data to input topic
>  testDriver.process("alertInputTopic", 
>  "the Key", "the Value", serializer, serializer);
>  
>  // read data from output topic
>  ProducerRecord rec = 
> testDriver.readOutput("alertOutputTopic", 
>  deserializer, deserializer);
>  
>  System.out.println("rec: " + rec);
>  }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-4828) ProcessorTopologyTestDriver does not work when using .through()

2017-03-04 Thread Hamidreza Afzali (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4828 started by Hamidreza Afzali.
---
> ProcessorTopologyTestDriver does not work when using .through()
> ---
>
> Key: KAFKA-4828
> URL: https://issues.apache.org/jira/browse/KAFKA-4828
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>Assignee: Hamidreza Afzali
>  Labels: unit-test
>
> *Problem:*
> ProcessorTopologyTestDriver does not work when testing a topology that uses 
> through().
> {code}
> org.apache.kafka.streams.errors.StreamsException: Store count2's change log 
> (count2-topic) does not contain partition 1
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
> {code}
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val stateStore = "count"
> val stateStore2 = "count2"
> val outputTopic2 = "count2-topic"
> val inputs = Seq[(String, Integer)](("A", 1), ("A", 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(stateStore)
>   .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore, stateStore2)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic2, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4828) ProcessorTopologyTestDriver does not work when using .through()

2017-03-02 Thread Hamidreza Afzali (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893105#comment-15893105
 ] 

Hamidreza Afzali commented on KAFKA-4828:
-

There are two issues:
- ProcessorTopologyTestDriver creates an internal changelog topic for through()
- ProcessorTopologyTestDriver does not forward the produced record back into 
the topology if it is to a source topic


> ProcessorTopologyTestDriver does not work when using .through()
> ---
>
> Key: KAFKA-4828
> URL: https://issues.apache.org/jira/browse/KAFKA-4828
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>Assignee: Hamidreza Afzali
>  Labels: unit-test
>
> *Problem:*
> ProcessorTopologyTestDriver does not work when testing a topology that uses 
> through().
> {code}
> org.apache.kafka.streams.errors.StreamsException: Store count2's change log 
> (count2-topic) does not contain partition 1
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
> {code}
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val stateStore = "count"
> val stateStore2 = "count2"
> val outputTopic2 = "count2-topic"
> val inputs = Seq[(String, Integer)](("A", 1), ("A", 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(stateStore)
>   .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore, stateStore2)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic2, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4828) ProcessorTopologyTestDriver does not work when using .through()

2017-03-01 Thread Hamidreza Afzali (JIRA)
Hamidreza Afzali created KAFKA-4828:
---

 Summary: ProcessorTopologyTestDriver does not work when using 
.through()
 Key: KAFKA-4828
 URL: https://issues.apache.org/jira/browse/KAFKA-4828
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Hamidreza Afzali
Assignee: Hamidreza Afzali


*Problem:*

ProcessorTopologyTestDriver does not work when testing a topology that uses 
through().

{code}
org.apache.kafka.streams.errors.StreamsException: Store count2's change log 
(count2-topic) does not contain partition 1
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
{code}

*Example:*

{code}
object Topology1 {

  def main(args: Array[String]): Unit = {

val inputTopic = "input"
val stateStore = "count"
val stateStore2 = "count2"
val outputTopic2 = "count2-topic"
val inputs = Seq[(String, Integer)](("A", 1), ("A", 2))

val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")

val builder = new KStreamBuilder
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
  .groupByKey(Serdes.String, Serdes.Integer)
  .count(stateStore)
  .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2)

val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
builder, stateStore, stateStore2)
inputs.foreach {
  case (key, value) => {
driver.process(inputTopic, key, value, Serdes.String.serializer, 
Serdes.Integer.serializer)
val record = driver.readOutput(outputTopic2, 
Serdes.String.deserializer, Serdes.Long.deserializer)
println(record)
  }
}
  }
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-28 Thread Hamidreza Afzali (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888941#comment-15888941
 ] 

Hamidreza Afzali commented on KAFKA-4789:
-

Thanks!

> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>Assignee: Hamidreza Afzali
>  Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-23 Thread Hamidreza Afzali (JIRA)
Hamidreza Afzali created KAFKA-4789:
---

 Summary: ProcessorTopologyTestDriver does not forward extracted 
timestamps to internal topics
 Key: KAFKA-4789
 URL: https://issues.apache.org/jira/browse/KAFKA-4789
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Hamidreza Afzali


*Problem:*

When using ProcessorTopologyTestDriver, the extracted timestamp is not 
forwarded with the produced record to the internal topics.

*Example:*

{code}
object Topology1 {

  def main(args: Array[String]): Unit = {

val inputTopic = "input"
val outputTopic = "output"
val stateStore = "count"
val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
2))

val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
classOf[MyTimestampExtractor].getName)

val windowedStringSerde = Serdes.serdeFrom(new 
WindowedSerializer(Serdes.String.serializer),
  new WindowedDeserializer(Serdes.String.deserializer))

val builder = new KStreamBuilder
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
  .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
  .groupByKey(Serdes.String, Serdes.Integer)
  .count(TimeWindows.of(1000L), stateStore)
  .to(windowedStringSerde, Serdes.Long, outputTopic)

val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
builder, stateStore)
inputs.foreach {
  case (key, value) => {
driver.process(inputTopic, key, value, Serdes.String.serializer, 
Serdes.Integer.serializer)
val record = driver.readOutput(outputTopic, Serdes.String.deserializer, 
Serdes.Long.deserializer)
println(record)
  }
}

  }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4461) When using ProcessorTopologyTestDriver, the combination of map and .groupByKey does not produce any result

2016-12-01 Thread Hamidreza Afzali (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hamidreza Afzali updated KAFKA-4461:

Description: 
*Problem*

When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the 
combination of map and .groupByKey does not produce any result. However, it 
works fine when using KStreamTestDriver.

The topology looks like this:

{code}
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
 .map((k, v) => new KeyValue(fn(k), v))
 .groupByKey(Serdes.String, Serdes.Integer)
 .count(stateStore)
{code}

*Full examples*

Examples for ProcessorTopologyTestDriver and KStreamTestDriver:

https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13

*Additional info*

kafka-users mailing list:

http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E



  was:
*Problem*

When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the 
combination of map and .groupByKey does not produce any result. However, it 
works fine when using KStreamTestDriver.

The topology looks like this:

{code}
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
 .map((k, v) => new KeyValue(fn(k), v))
 .groupByKey(Serdes.String, Serdes.Integer)
 .count(stateStore)
{code}

*Full example*
https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13

*Additional info*

kafka-users mailing list:

http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E




> When using ProcessorTopologyTestDriver, the combination of map and 
> .groupByKey does not produce any result
> --
>
> Key: KAFKA-4461
> URL: https://issues.apache.org/jira/browse/KAFKA-4461
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Hamidreza Afzali
>Priority: Blocker
>
> *Problem*
> When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the 
> combination of map and .groupByKey does not produce any result. However, it 
> works fine when using KStreamTestDriver.
> The topology looks like this:
> {code}
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>  .map((k, v) => new KeyValue(fn(k), v))
>  .groupByKey(Serdes.String, Serdes.Integer)
>  .count(stateStore)
> {code}
> *Full examples*
> Examples for ProcessorTopologyTestDriver and KStreamTestDriver:
> https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13
> *Additional info*
> kafka-users mailing list:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4461) When using ProcessorTopologyTestDriver, the combination of map and .groupByKey does not produce any result

2016-11-29 Thread Hamidreza Afzali (JIRA)
Hamidreza Afzali created KAFKA-4461:
---

 Summary: When using ProcessorTopologyTestDriver, the combination 
of map and .groupByKey does not produce any result
 Key: KAFKA-4461
 URL: https://issues.apache.org/jira/browse/KAFKA-4461
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Hamidreza Afzali
Priority: Blocker


*Problem*

When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the 
combination of map and .groupByKey does not produce any result. However, it 
works fine when using KStreamTestDriver.

The topology looks like this:

{code}
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
 .map((k, v) => new KeyValue(fn(k), v))
 .groupByKey(Serdes.String, Serdes.Integer)
 .count(stateStore)
{code}

*Full example*
https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13

*Additional info*

kafka-users mailing list:

http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)