[
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530209#comment-16530209
]
Guozhang Wang commented on KAFKA-7125:
--------------------------------------
[~Ahto] Thanks for your updated explanation. And yes your findings is correct:
today we assume that for global store, there will only be two processor node
correlated to it, as the source and update processor. This is an unintended
limitation we added into the topology building process, and I will provide a PR
trying to lift this limitation. Please feel free to try it out and let me know
if it resolves your problem. And if yes I will try to merge it into trunk so
that the next minor release will have this fix.
> Calling StreamsBuilderbuilder.build().describe() causes
> java.util.NoSuchElementException: null
> ----------------------------------------------------------------------------------------------
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0
> Reporter: Jouni
> Assignee: Nikolay Izhikov
> Priority: Minor
> Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and
> then calling StreamBuilder.build().describe() again (for debugging purposes
> and to check I got the topology right), had the following exception and
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
> {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444)
> ~[na:1.8.0_171]}}
> {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466)
> ~[na:1.8.0_171]}}
> {{ at
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
> ~[kafka-streams-1.1.0.jar:na]}}
> {{ at
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
> ~[kafka-streams-1.1.0.jar:na]}}
> {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647)
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{ GlobalKTable<String, ServiceList> jsonRoutesToServices}}
> {{ = builder.globalTable("routes-to-services",}}
> {{ Consumed.with(Serdes.String(),
> jsonServiceListSerde),}}
> {{ Materialized.<String, ServiceList,
> KeyValueStore<Bytes, byte[]>>as("routes-to-services"));}}
> {{ TopologyDescription td = builder.build().describe();}}
> {{ String parent = null;}}
> {{ // We get an iterator to a TreeSet sorted by processing order, and
> just want the last one.}}
> {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
> {{ parent = store.processor().name();}}
> }
> {{ TopologyDescription tdtd = builder.build().describe();}}
> {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new
> UnneededCruftSupplier(), parent);}}
> {{ builder.build().addSink("FST-ROUTES-TO-SERVICES",
> "fst-routes-to-services", Serdes.String().serializer(),
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
> {{ TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet,
> calling describe again before adding anything works fine.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)