[jira] [Updated] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-03-30 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4791:
-
   Resolution: Fixed
Fix Version/s: 0.10.2.1
   0.11.0.0
   Status: Resolved  (was: Patch Available)

Issue resolved by pull request 2618
[https://github.com/apache/kafka/pull/2618]

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.0
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
> Fix For: 0.11.0.0, 0.10.2.1
>
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-03-21 Thread Michael Noll (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Noll updated KAFKA-4791:

Affects Version/s: 0.10.2.0

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.0
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-28 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-4791:
---
Status: Patch Available  (was: In Progress)

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)