[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-03-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950005#comment-15950005
 ] 

ASF GitHub Bot commented on KAFKA-4791:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2618


> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.0
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889382#comment-15889382
 ] 

ASF GitHub Bot commented on KAFKA-4791:
---

GitHub user bbejeck opened a pull request:

https://github.com/apache/kafka/pull/2618

KAFKA-4791: unable to add state store with regex matched topics

Fix for adding state stores with regex defined sources

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bbejeck/kafka 
KAFKA-4791_unable_to_add_statestore_regex_topics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2618.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2618


commit 828928905e393113e2a4f0148b8f1edc48d6e077
Author: bbejeck 
Date:   2017-03-01T02:27:50Z

KAFKA-4791: unable to add state store with regex matched topics




> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881456#comment-15881456
 ] 

Matthias J. Sax commented on KAFKA-4791:


Ack. My bad. I thought [~clouTrix] calls 
{{connectStateStoreNameToSourceTopics}}, but he doesn't... Calling 
{{addStateStore}} is of course fine. [~bbejeck] feel free to start working on 
this. :)

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881280#comment-15881280
 ] 

Damian Guy commented on KAFKA-4791:
---

[~mjsax] this is a bug. Bart is calling {{addStateStore}} which is, and will 
remain, a public API. He has just kindly pointed out where the problem lies.

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Bart Vercammen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881043#comment-15881043
 ] 

Bart Vercammen commented on KAFKA-4791:
---

This then means that {{addStateStore}} should not be used from TopologyBuilder ?
So, in order to connect my stores to the processors I'll need to create them 
inside the processor's {{init}} block and register them manually to the 
processor-context?
Or am I seeing this wrong?

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881018#comment-15881018
 ] 

Bill Bejeck commented on KAFKA-4791:


Fair enough.  I did not look into the issue at all, at first blush it seemed 
like a big that needed to be fixed asap.  But considering your comments and the 
forthcoming changes with KIP-120, I'll hold off.

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880971#comment-15880971
 ] 

Matthias J. Sax commented on KAFKA-4791:


\cc [~damianguy]

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880966#comment-15880966
 ] 

Matthias J. Sax commented on KAFKA-4791:


You should not call {{connectStateStoreNameToSourceTopics}}. It's an leaking 
internal API that will be removed. Cf. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API
I would close this as "not a problem". Right now, {{KStreamBuilder}} uses this 
for source {{KTable}} and a {{KTable}} is always read from a single topic, so 
there is not need to all multiple topics or topic patterns. \cc [~guozhang]

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880782#comment-15880782
 ] 

Bill Bejeck commented on KAFKA-4791:


picking this one up.

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)