[jira] [Updated] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-4791: - Resolution: Fixed Fix Version/s: 0.10.2.1 0.11.0.0 Status: Resolved (was: Patch Available) Issue resolved by pull request 2618 [https://github.com/apache/kafka/pull/2618] > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1, 0.10.2.0 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > Fix For: 0.11.0.0, 0.10.2.1 > > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Noll updated KAFKA-4791: Affects Version/s: 0.10.2.0 > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1, 0.10.2.0 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-4791: --- Status: Patch Available (was: In Progress) > Kafka Streams - unable to add state stores when using wildcard topics on the > source > --- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1 > Environment: Java 8 >Reporter: Bart Vercammen >Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set findSourceTopicsForProcessorParents(String [] > parents) { > final Set sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)