[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950005#comment-15950005 ] ASF GitHub Bot commented on KAFKA-4791: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/2618 > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1, 0.10.2.0 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889382#comment-15889382 ] ASF GitHub Bot commented on KAFKA-4791: --- GitHub user bbejeck opened a pull request: https://github.com/apache/kafka/pull/2618 KAFKA-4791: unable to add state store with regex matched topics Fix for adding state stores with regex defined sources You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbejeck/kafka KAFKA-4791_unable_to_add_statestore_regex_topics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2618.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2618 commit 828928905e393113e2a4f0148b8f1edc48d6e077 Author: bbejeckDate: 2017-03-01T02:27:50Z KAFKA-4791: unable to add state store with regex matched topics > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881456#comment-15881456 ] Matthias J. Sax commented on KAFKA-4791: Ack. My bad. I thought [~clouTrix] calls {{connectStateStoreNameToSourceTopics}}, but he doesn't... Calling {{addStateStore}} is of course fine. [~bbejeck] feel free to start working on this. :) > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881280#comment-15881280 ] Damian Guy commented on KAFKA-4791: --- [~mjsax] this is a bug. Bart is calling {{addStateStore}} which is, and will remain, a public API. He has just kindly pointed out where the problem lies. > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881043#comment-15881043 ] Bart Vercammen commented on KAFKA-4791: --- This then means that {{addStateStore}} should not be used from TopologyBuilder ? So, in order to connect my stores to the processors I'll need to create them inside the processor's {{init}} block and register them manually to the processor-context? Or am I seeing this wrong? > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881018#comment-15881018 ] Bill Bejeck commented on KAFKA-4791: Fair enough. I did not look into the issue at all, at first blush it seemed like a big that needed to be fixed asap. But considering your comments and the forthcoming changes with KIP-120, I'll hold off. > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880971#comment-15880971 ] Matthias J. Sax commented on KAFKA-4791: \cc [~damianguy] > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880966#comment-15880966 ] Matthias J. Sax commented on KAFKA-4791: You should not call {{connectStateStoreNameToSourceTopics}}. It's an leaking internal API that will be removed. Cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API I would close this as "not a problem". Right now, {{KStreamBuilder}} uses this for source {{KTable}} and a {{KTable}} is always read from a single topic, so there is not need to all multiple topics or topic patterns. \cc [~guozhang] > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880782#comment-15880782 ] Bill Bejeck commented on KAFKA-4791: picking this one up. > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)