[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529393#comment-16529393
 ] 

Guozhang Wang commented on KAFKA-7125:
--------------------------------------

[~NIzhikov] Note that you do not need to link a global store to any processor, 
as global stores are "ubiquitously accessible" from any processors, only local 
stores needs to be linked to processor so that they can be retrieved from the 
`ProcessorContext`.

And for a global store, its `store.processor()` 's corresponding processor is 
actually a special processor node that is only used for updating the global 
store, and should NOT be linked to any other processors as predecessors.

I think we can tighten this a bit more as we did in the DSL layer, will submit 
a quick PR for that.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7125
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7125
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Jouni
>            Assignee: Nikolay Izhikov
>            Priority: Minor
>              Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{        TopologyDescription td = builder.build().describe();}}
>  {{        String parent = null;}}
>  {{        // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{        for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{            parent = store.processor().name();}}
>                 }
>  {{        TopologyDescription tdtd = builder.build().describe();}}
>  {{        builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{        builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{        TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to