[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533017#comment-16533017
 ] 

Jouni commented on KAFKA-7125:
------------------------------

Thanks Guozhang. Process method not being called, could be because I'm somewhat 
a beginner with Kafka, messing up everything all the time, and reverted to 
streams 1.1.0. I'll switch back to current 2.1.0-SNAPSHOT + your PR, so I can 
debug, put some breakpoints here and there, see what's really happening and 
double-check myself, so I don't have to bother you with unnecessary questions.

It's already late evening here in Finland (GMT+3), so I'm going to sleep after 
a while, but will use the whole next day for tracking this down.

Besides, I've already converted 2 out of 3 my to globaltables to streams 
aggregated to tables, and learned a lot of interesting things about when I 
really have to use Consumed, Produced, Materialized, Joined etc. and when it 
just works with the defaults. Also wrote a custom partitioner...

BTW, if you happen to be connected to Confluent, I think the Option 2. example 
in 
[https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step]
 should be changed. I'd recommend something like

{{.groupByKey()}}
{{.aggregate(() -> null, (foo, value, bar) -> value, Materialized...}}

It took me a while to find out that reduce will NOT call method apply the first 
time, only on the second time, wondering why my table is empty until I fed the 
data twice. This was with 1.1.0, things could be otherwise with different 
versions.

Being able add just one processor is enough to solve my use case for doing some 
non-streams-related per-instance local processing. More than one processor per 
globaltable should never be needed. But another possibility, maybe safer 
considering Kafka itself, would be to prevent it altogether from users and 
instead add something like withChangeStream and a way to subscribe that stream. 
Although it still wouldn't prevent us users doing dumb things. I don't mind 
whatever the solution is/will be and can adapt, as long as this use case is 
covered.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7125
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7125
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Jouni
>            Assignee: Nikolay Izhikov
>            Priority: Minor
>              Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{        GlobalKTable<String, ServiceList> jsonRoutesToServices}}
>  {{                = builder.globalTable("routes-to-services",}}
>  {{                        Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{                        Materialized.<String, ServiceList, 
> KeyValueStore<Bytes, byte[]>>as("routes-to-services"));}}
> {{        TopologyDescription td = builder.build().describe();}}
>  {{        String parent = null;}}
>  {{        // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{        for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{            parent = store.processor().name();}}
>                  }
>  {{        TopologyDescription tdtd = builder.build().describe();}}
>  {{        builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{        builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{        TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to