[jira] [Commented] (KAFKA-5368) Kafka Streams skipped-records-rate sensor produces nonzero values when the timestamps are valid
[ https://issues.apache.org/jira/browse/KAFKA-5368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16034367#comment-16034367 ] Hamidreza Afzali commented on KAFKA-5368: - *Problem:* Skipped records sensor is using a {{Rate}} of type {{Count}} for skipped-records-rate metric. In {{org.apache.kafka.streams.processor.internals.StreamThread#addRecordsToTasks}} the Count value is incremented by one regardless of the number of skipped records, i.e. the value increments even if no record is skipped. {code} skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName, "The average per-second number of skipped records.", this.tags), new Rate(new Count())); ... private void addRecordsToTasks(final ConsumerRecordsrecords) { if (records != null && !records.isEmpty()) { ... streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs); } } {code} {{org.apache.kafka.streams.processor.internals.StreamThread#addRecordsToTasks}} is called in {{org.apache.kafka.streams.processor.internals.StreamThread#runLoop}} after each successful poll request. {code} private void runLoop() { ... while (stillRunning()) { ... final ConsumerRecords records = pollRequests(pollTimeMs); if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) { streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs); addRecordsToTasks(records); ... } ... } ... } {code} This can explain why skipped-records-rate is equal to poll-rate. *Solution:* The sensor should keep a sum of all skipped records. > Kafka Streams skipped-records-rate sensor produces nonzero values when the > timestamps are valid > --- > > Key: KAFKA-5368 > URL: https://issues.apache.org/jira/browse/KAFKA-5368 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Hamidreza Afzali >Assignee: Hamidreza Afzali > > Kafka Streams skipped-records-rate sensor produces nonzero values even when > the timestamps are valid and records are processed. The values are equal to > poll-rate. > Related issue: KAFKA-5055 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5368) Kafka Streams skipped-records-rate sensor produces nonzero values when the timestamps are valid
Hamidreza Afzali created KAFKA-5368: --- Summary: Kafka Streams skipped-records-rate sensor produces nonzero values when the timestamps are valid Key: KAFKA-5368 URL: https://issues.apache.org/jira/browse/KAFKA-5368 Project: Kafka Issue Type: Bug Components: streams Reporter: Hamidreza Afzali Assignee: Hamidreza Afzali Kafka Streams skipped-records-rate sensor produces nonzero values even when the timestamps are valid and records are processed. The values are equal to poll-rate. Related issue: KAFKA-5055 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
[ https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hamidreza Afzali resolved KAFKA-4408. - Resolution: Fixed Fix Version/s: 0.11.0.0 Issue resolved by pull request 2629 https://github.com/apache/kafka/pull/2629 > KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0 > -- > > Key: KAFKA-4408 > URL: https://issues.apache.org/jira/browse/KAFKA-4408 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Linux >Reporter: Byron Nikolaidis >Assignee: Hamidreza Afzali > Labels: newbie, unit-test > Fix For: 0.11.0.0 > > > In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with > KTables. The below test code worked fine under Kafka 0.10.0.1 but now > produces this error: > Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: > task [0_0] Could not find partition info for topic: alertInputTopic > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) > at > org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:174) > at > mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41) > {code} > package mil.navy.icap.kafka.streams.processor.track; > import java.io.IOException; > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.serialization.Serdes.StringSerde; > import org.apache.kafka.common.serialization.StringDeserializer; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KTable; > import org.apache.kafka.test.ProcessorTopologyTestDriver; > public class ProcessorDriverTest2 { > > public static void main(String[] args) throws IOException, > InterruptedException { > System.out.println("ProcessorDriverTest2"); > > Properties props = new Properties(); > props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, > "ProcessorDriverTest2"); > props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > > StreamsConfig streamsConfig = new StreamsConfig(props); > > // topology > KStreamBuilder kstreamBuilder = new KStreamBuilder(); > StringSerde stringSerde = new StringSerde(); > KTabletable = kstreamBuilder.table(stringSerde, > stringSerde, "alertInputTopic"); > table.to(stringSerde, stringSerde, "alertOutputTopic"); > > // create test driver > ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver( > streamsConfig, > kstreamBuilder, > "alertStore"); > StringSerializer serializer = new StringSerializer(); > StringDeserializer deserializer = new StringDeserializer(); > // send data to input topic > testDriver.process("alertInputTopic", > "the Key", "the Value", serializer, serializer); > > // read data from output topic > ProducerRecord rec = > testDriver.readOutput("alertOutputTopic", > deserializer, deserializer); > > System.out.println("rec: " + rec); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
[ https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4408 started by Hamidreza Afzali. --- > KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0 > -- > > Key: KAFKA-4408 > URL: https://issues.apache.org/jira/browse/KAFKA-4408 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Linux >Reporter: Byron Nikolaidis >Assignee: Hamidreza Afzali > Labels: newbie, unit-test > > In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with > KTables. The below test code worked fine under Kafka 0.10.0.1 but now > produces this error: > Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: > task [0_0] Could not find partition info for topic: alertInputTopic > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) > at > org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:174) > at > mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41) > {code} > package mil.navy.icap.kafka.streams.processor.track; > import java.io.IOException; > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.serialization.Serdes.StringSerde; > import org.apache.kafka.common.serialization.StringDeserializer; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KTable; > import org.apache.kafka.test.ProcessorTopologyTestDriver; > public class ProcessorDriverTest2 { > > public static void main(String[] args) throws IOException, > InterruptedException { > System.out.println("ProcessorDriverTest2"); > > Properties props = new Properties(); > props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, > "ProcessorDriverTest2"); > props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > > StreamsConfig streamsConfig = new StreamsConfig(props); > > // topology > KStreamBuilder kstreamBuilder = new KStreamBuilder(); > StringSerde stringSerde = new StringSerde(); > KTabletable = kstreamBuilder.table(stringSerde, > stringSerde, "alertInputTopic"); > table.to(stringSerde, stringSerde, "alertOutputTopic"); > > // create test driver > ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver( > streamsConfig, > kstreamBuilder, > "alertStore"); > StringSerializer serializer = new StringSerializer(); > StringDeserializer deserializer = new StringDeserializer(); > // send data to input topic > testDriver.process("alertInputTopic", > "the Key", "the Value", serializer, serializer); > > // read data from output topic > ProducerRecord rec = > testDriver.readOutput("alertOutputTopic", > deserializer, deserializer); > > System.out.println("rec: " + rec); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
[ https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hamidreza Afzali reassigned KAFKA-4408: --- Assignee: Hamidreza Afzali > KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0 > -- > > Key: KAFKA-4408 > URL: https://issues.apache.org/jira/browse/KAFKA-4408 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Linux >Reporter: Byron Nikolaidis >Assignee: Hamidreza Afzali > Labels: newbie, unit-test > > In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with > KTables. The below test code worked fine under Kafka 0.10.0.1 but now > produces this error: > Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: > task [0_0] Could not find partition info for topic: alertInputTopic > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) > at > org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:174) > at > mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41) > {code} > package mil.navy.icap.kafka.streams.processor.track; > import java.io.IOException; > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.serialization.Serdes.StringSerde; > import org.apache.kafka.common.serialization.StringDeserializer; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KTable; > import org.apache.kafka.test.ProcessorTopologyTestDriver; > public class ProcessorDriverTest2 { > > public static void main(String[] args) throws IOException, > InterruptedException { > System.out.println("ProcessorDriverTest2"); > > Properties props = new Properties(); > props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, > "ProcessorDriverTest2"); > props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > > StreamsConfig streamsConfig = new StreamsConfig(props); > > // topology > KStreamBuilder kstreamBuilder = new KStreamBuilder(); > StringSerde stringSerde = new StringSerde(); > KTabletable = kstreamBuilder.table(stringSerde, > stringSerde, "alertInputTopic"); > table.to(stringSerde, stringSerde, "alertOutputTopic"); > > // create test driver > ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver( > streamsConfig, > kstreamBuilder, > "alertStore"); > StringSerializer serializer = new StringSerializer(); > StringDeserializer deserializer = new StringDeserializer(); > // send data to input topic > testDriver.process("alertInputTopic", > "the Key", "the Value", serializer, serializer); > > // read data from output topic > ProducerRecord rec = > testDriver.readOutput("alertOutputTopic", > deserializer, deserializer); > > System.out.println("rec: " + rec); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
[ https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896843#comment-15896843 ] Hamidreza Afzali commented on KAFKA-4408: - This should resolve the issue: https://github.com/apache/kafka/pull/2629 > KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0 > -- > > Key: KAFKA-4408 > URL: https://issues.apache.org/jira/browse/KAFKA-4408 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Linux >Reporter: Byron Nikolaidis > Labels: newbie, unit-test > > In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with > KTables. The below test code worked fine under Kafka 0.10.0.1 but now > produces this error: > Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: > task [0_0] Could not find partition info for topic: alertInputTopic > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) > at > org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:174) > at > mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41) > {code} > package mil.navy.icap.kafka.streams.processor.track; > import java.io.IOException; > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.serialization.Serdes.StringSerde; > import org.apache.kafka.common.serialization.StringDeserializer; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KTable; > import org.apache.kafka.test.ProcessorTopologyTestDriver; > public class ProcessorDriverTest2 { > > public static void main(String[] args) throws IOException, > InterruptedException { > System.out.println("ProcessorDriverTest2"); > > Properties props = new Properties(); > props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, > "ProcessorDriverTest2"); > props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > > StreamsConfig streamsConfig = new StreamsConfig(props); > > // topology > KStreamBuilder kstreamBuilder = new KStreamBuilder(); > StringSerde stringSerde = new StringSerde(); > KTabletable = kstreamBuilder.table(stringSerde, > stringSerde, "alertInputTopic"); > table.to(stringSerde, stringSerde, "alertOutputTopic"); > > // create test driver > ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver( > streamsConfig, > kstreamBuilder, > "alertStore"); > StringSerializer serializer = new StringSerializer(); > StringDeserializer deserializer = new StringDeserializer(); > // send data to input topic > testDriver.process("alertInputTopic", > "the Key", "the Value", serializer, serializer); > > // read data from output topic > ProducerRecord rec = > testDriver.readOutput("alertOutputTopic", > deserializer, deserializer); > > System.out.println("rec: " + rec); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-4828) ProcessorTopologyTestDriver does not work when using .through()
[ https://issues.apache.org/jira/browse/KAFKA-4828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4828 started by Hamidreza Afzali. --- > ProcessorTopologyTestDriver does not work when using .through() > --- > > Key: KAFKA-4828 > URL: https://issues.apache.org/jira/browse/KAFKA-4828 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Hamidreza Afzali >Assignee: Hamidreza Afzali > Labels: unit-test > > *Problem:* > ProcessorTopologyTestDriver does not work when testing a topology that uses > through(). > {code} > org.apache.kafka.streams.errors.StreamsException: Store count2's change log > (count2-topic) does not contain partition 1 > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81) > {code} > *Example:* > {code} > object Topology1 { > def main(args: Array[String]): Unit = { > val inputTopic = "input" > val stateStore = "count" > val stateStore2 = "count2" > val outputTopic2 = "count2-topic" > val inputs = Seq[(String, Integer)](("A", 1), ("A", 2)) > val props = new Properties > props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString) > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") > val builder = new KStreamBuilder > builder.stream(Serdes.String, Serdes.Integer, inputTopic) > .groupByKey(Serdes.String, Serdes.Integer) > .count(stateStore) > .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2) > val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), > builder, stateStore, stateStore2) > inputs.foreach { > case (key, value) => { > driver.process(inputTopic, key, value, Serdes.String.serializer, > Serdes.Integer.serializer) > val record = driver.readOutput(outputTopic2, > Serdes.String.deserializer, Serdes.Long.deserializer) > println(record) > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4828) ProcessorTopologyTestDriver does not work when using .through()
[ https://issues.apache.org/jira/browse/KAFKA-4828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893105#comment-15893105 ] Hamidreza Afzali commented on KAFKA-4828: - There are two issues: - ProcessorTopologyTestDriver creates an internal changelog topic for through() - ProcessorTopologyTestDriver does not forward the produced record back into the topology if it is to a source topic > ProcessorTopologyTestDriver does not work when using .through() > --- > > Key: KAFKA-4828 > URL: https://issues.apache.org/jira/browse/KAFKA-4828 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Hamidreza Afzali >Assignee: Hamidreza Afzali > Labels: unit-test > > *Problem:* > ProcessorTopologyTestDriver does not work when testing a topology that uses > through(). > {code} > org.apache.kafka.streams.errors.StreamsException: Store count2's change log > (count2-topic) does not contain partition 1 > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81) > {code} > *Example:* > {code} > object Topology1 { > def main(args: Array[String]): Unit = { > val inputTopic = "input" > val stateStore = "count" > val stateStore2 = "count2" > val outputTopic2 = "count2-topic" > val inputs = Seq[(String, Integer)](("A", 1), ("A", 2)) > val props = new Properties > props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString) > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") > val builder = new KStreamBuilder > builder.stream(Serdes.String, Serdes.Integer, inputTopic) > .groupByKey(Serdes.String, Serdes.Integer) > .count(stateStore) > .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2) > val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), > builder, stateStore, stateStore2) > inputs.foreach { > case (key, value) => { > driver.process(inputTopic, key, value, Serdes.String.serializer, > Serdes.Integer.serializer) > val record = driver.readOutput(outputTopic2, > Serdes.String.deserializer, Serdes.Long.deserializer) > println(record) > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4828) ProcessorTopologyTestDriver does not work when using .through()
Hamidreza Afzali created KAFKA-4828: --- Summary: ProcessorTopologyTestDriver does not work when using .through() Key: KAFKA-4828 URL: https://issues.apache.org/jira/browse/KAFKA-4828 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.2.0 Reporter: Hamidreza Afzali Assignee: Hamidreza Afzali *Problem:* ProcessorTopologyTestDriver does not work when testing a topology that uses through(). {code} org.apache.kafka.streams.errors.StreamsException: Store count2's change log (count2-topic) does not contain partition 1 at org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81) {code} *Example:* {code} object Topology1 { def main(args: Array[String]): Unit = { val inputTopic = "input" val stateStore = "count" val stateStore2 = "count2" val outputTopic2 = "count2-topic" val inputs = Seq[(String, Integer)](("A", 1), ("A", 2)) val props = new Properties props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") val builder = new KStreamBuilder builder.stream(Serdes.String, Serdes.Integer, inputTopic) .groupByKey(Serdes.String, Serdes.Integer) .count(stateStore) .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2) val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStore, stateStore2) inputs.foreach { case (key, value) => { driver.process(inputTopic, key, value, Serdes.String.serializer, Serdes.Integer.serializer) val record = driver.readOutput(outputTopic2, Serdes.String.deserializer, Serdes.Long.deserializer) println(record) } } } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics
[ https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888941#comment-15888941 ] Hamidreza Afzali commented on KAFKA-4789: - Thanks! > ProcessorTopologyTestDriver does not forward extracted timestamps to internal > topics > > > Key: KAFKA-4789 > URL: https://issues.apache.org/jira/browse/KAFKA-4789 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Hamidreza Afzali >Assignee: Hamidreza Afzali > Labels: unit-test > Fix For: 0.10.3.0 > > > *Problem:* > When using ProcessorTopologyTestDriver, the extracted timestamp is not > forwarded with the produced record to the internal topics. > *Example:* > {code} > object Topology1 { > def main(args: Array[String]): Unit = { > val inputTopic = "input" > val outputTopic = "output" > val stateStore = "count" > val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", > 2)) > val props = new Properties > props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString) > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > classOf[MyTimestampExtractor].getName) > val windowedStringSerde = Serdes.serdeFrom(new > WindowedSerializer(Serdes.String.serializer), > new WindowedDeserializer(Serdes.String.deserializer)) > val builder = new KStreamBuilder > builder.stream(Serdes.String, Serdes.Integer, inputTopic) > .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v)) > .groupByKey(Serdes.String, Serdes.Integer) > .count(TimeWindows.of(1000L), stateStore) > .to(windowedStringSerde, Serdes.Long, outputTopic) > val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), > builder, stateStore) > inputs.foreach { > case (key, value) => { > driver.process(inputTopic, key, value, Serdes.String.serializer, > Serdes.Integer.serializer) > val record = driver.readOutput(outputTopic, > Serdes.String.deserializer, Serdes.Long.deserializer) > println(record) > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics
Hamidreza Afzali created KAFKA-4789: --- Summary: ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics Key: KAFKA-4789 URL: https://issues.apache.org/jira/browse/KAFKA-4789 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.2.0 Reporter: Hamidreza Afzali *Problem:* When using ProcessorTopologyTestDriver, the extracted timestamp is not forwarded with the produced record to the internal topics. *Example:* {code} object Topology1 { def main(args: Array[String]): Unit = { val inputTopic = "input" val outputTopic = "output" val stateStore = "count" val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 2)) val props = new Properties props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[MyTimestampExtractor].getName) val windowedStringSerde = Serdes.serdeFrom(new WindowedSerializer(Serdes.String.serializer), new WindowedDeserializer(Serdes.String.deserializer)) val builder = new KStreamBuilder builder.stream(Serdes.String, Serdes.Integer, inputTopic) .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v)) .groupByKey(Serdes.String, Serdes.Integer) .count(TimeWindows.of(1000L), stateStore) .to(windowedStringSerde, Serdes.Long, outputTopic) val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStore) inputs.foreach { case (key, value) => { driver.process(inputTopic, key, value, Serdes.String.serializer, Serdes.Integer.serializer) val record = driver.readOutput(outputTopic, Serdes.String.deserializer, Serdes.Long.deserializer) println(record) } } } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4461) When using ProcessorTopologyTestDriver, the combination of map and .groupByKey does not produce any result
[ https://issues.apache.org/jira/browse/KAFKA-4461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hamidreza Afzali updated KAFKA-4461: Description: *Problem* When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the combination of map and .groupByKey does not produce any result. However, it works fine when using KStreamTestDriver. The topology looks like this: {code} builder.stream(Serdes.String, Serdes.Integer, inputTopic) .map((k, v) => new KeyValue(fn(k), v)) .groupByKey(Serdes.String, Serdes.Integer) .count(stateStore) {code} *Full examples* Examples for ProcessorTopologyTestDriver and KStreamTestDriver: https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 *Additional info* kafka-users mailing list: http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E was: *Problem* When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the combination of map and .groupByKey does not produce any result. However, it works fine when using KStreamTestDriver. The topology looks like this: {code} builder.stream(Serdes.String, Serdes.Integer, inputTopic) .map((k, v) => new KeyValue(fn(k), v)) .groupByKey(Serdes.String, Serdes.Integer) .count(stateStore) {code} *Full example* https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 *Additional info* kafka-users mailing list: http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E > When using ProcessorTopologyTestDriver, the combination of map and > .groupByKey does not produce any result > -- > > Key: KAFKA-4461 > URL: https://issues.apache.org/jira/browse/KAFKA-4461 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Hamidreza Afzali >Priority: Blocker > > *Problem* > When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the > combination of map and .groupByKey does not produce any result. However, it > works fine when using KStreamTestDriver. > The topology looks like this: > {code} > builder.stream(Serdes.String, Serdes.Integer, inputTopic) > .map((k, v) => new KeyValue(fn(k), v)) > .groupByKey(Serdes.String, Serdes.Integer) > .count(stateStore) > {code} > *Full examples* > Examples for ProcessorTopologyTestDriver and KStreamTestDriver: > https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 > *Additional info* > kafka-users mailing list: > http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4461) When using ProcessorTopologyTestDriver, the combination of map and .groupByKey does not produce any result
Hamidreza Afzali created KAFKA-4461: --- Summary: When using ProcessorTopologyTestDriver, the combination of map and .groupByKey does not produce any result Key: KAFKA-4461 URL: https://issues.apache.org/jira/browse/KAFKA-4461 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.1.0 Reporter: Hamidreza Afzali Priority: Blocker *Problem* When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the combination of map and .groupByKey does not produce any result. However, it works fine when using KStreamTestDriver. The topology looks like this: {code} builder.stream(Serdes.String, Serdes.Integer, inputTopic) .map((k, v) => new KeyValue(fn(k), v)) .groupByKey(Serdes.String, Serdes.Integer) .count(stateStore) {code} *Full example* https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 *Additional info* kafka-users mailing list: http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)