[
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533017#comment-16533017
]
Jouni commented on KAFKA-7125:
------------------------------
Thanks Guozhang. Process method not being called, could be because I'm somewhat
a beginner with Kafka, messing up everything all the time, and reverted to
streams 1.1.0. I'll switch back to current 2.1.0-SNAPSHOT + your PR, so I can
debug, put some breakpoints here and there, see what's really happening and
double-check myself, so I don't have to bother you with unnecessary questions.
It's already late evening here in Finland (GMT+3), so I'm going to sleep after
a while, but will use the whole next day for tracking this down.
Besides, I've already converted 2 out of 3 my to globaltables to streams
aggregated to tables, and learned a lot of interesting things about when I
really have to use Consumed, Produced, Materialized, Joined etc. and when it
just works with the defaults. Also wrote a custom partitioner...
BTW, if you happen to be connected to Confluent, I think the Option 2. example
in
[https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step]
should be changed. I'd recommend something like
{{.groupByKey()}}
{{.aggregate(() -> null, (foo, value, bar) -> value, Materialized...}}
It took me a while to find out that reduce will NOT call method apply the first
time, only on the second time, wondering why my table is empty until I fed the
data twice. This was with 1.1.0, things could be otherwise with different
versions.
Being able add just one processor is enough to solve my use case for doing some
non-streams-related per-instance local processing. More than one processor per
globaltable should never be needed. But another possibility, maybe safer
considering Kafka itself, would be to prevent it altogether from users and
instead add something like withChangeStream and a way to subscribe that stream.
Although it still wouldn't prevent us users doing dumb things. I don't mind
whatever the solution is/will be and can adapt, as long as this use case is
covered.
> Calling StreamsBuilderbuilder.build().describe() causes
> java.util.NoSuchElementException: null
> ----------------------------------------------------------------------------------------------
>
> Key: KAFKA-7125
> URL: https://issues.apache.org/jira/browse/KAFKA-7125
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0
> Reporter: Jouni
> Assignee: Nikolay Izhikov
> Priority: Minor
> Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and
> then calling StreamBuilder.build().describe() again (for debugging purposes
> and to check I got the topology right), had the following exception and
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
> {{ at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444)
> ~[na:1.8.0_171]}}
> {{ at java.util.HashMap$KeyIterator.next(HashMap.java:1466)
> ~[na:1.8.0_171]}}
> {{ at
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
> ~[kafka-streams-1.1.0.jar:na]}}
> {{ at
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
> ~[kafka-streams-1.1.0.jar:na]}}
> {{ at org.apache.kafka.streams.Topology.describe(Topology.java:647)
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{ GlobalKTable<String, ServiceList> jsonRoutesToServices}}
> {{ = builder.globalTable("routes-to-services",}}
> {{ Consumed.with(Serdes.String(),
> jsonServiceListSerde),}}
> {{ Materialized.<String, ServiceList,
> KeyValueStore<Bytes, byte[]>>as("routes-to-services"));}}
> {{ TopologyDescription td = builder.build().describe();}}
> {{ String parent = null;}}
> {{ // We get an iterator to a TreeSet sorted by processing order, and
> just want the last one.}}
> {{ for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
> {{ parent = store.processor().name();}}
> }
> {{ TopologyDescription tdtd = builder.build().describe();}}
> {{ builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new
> UnneededCruftSupplier(), parent);}}
> {{ builder.build().addSink("FST-ROUTES-TO-SERVICES",
> "fst-routes-to-services", Serdes.String().serializer(),
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
> {{ TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet,
> calling describe again before adding anything works fine.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)