[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529506#comment-16529506
 ] 

Jouni commented on KAFKA-7125:
------------------------------

Sorry, forgot that one relevant call just before getting TopologyDescription... 
bug report updated. And there's a good reason for attaching a downstream 
processor to the statestores processor. I have a need to catch the updates 
Change<> -event, do some totally not-streams-related work to it and forward the 
results again downstream. But my main point is that calling builder.describe() 
again shouldn't throw an exception. Something in handing the internal state of 
Topology clearly isn't totally right.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7125
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7125
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Jouni
>            Assignee: Nikolay Izhikov
>            Priority: Minor
>              Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{        GlobalKTable<String, ServiceList> jsonRoutesToServices}}
>  {{                = builder.globalTable("routes-to-services",}}
>  {{                        Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{                        Materialized.<String, ServiceList, 
> KeyValueStore<Bytes, byte[]>>as("routes-to-services"));}}
> {{        TopologyDescription td = builder.build().describe();}}
>  {{        String parent = null;}}
>  {{        // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{        for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{            parent = store.processor().name();}}
>                  }
>  {{        TopologyDescription tdtd = builder.build().describe();}}
>  {{        builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{        builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{        TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to