Hi all,
I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka,
I'm doing some filtering and aggregation and then calling persistAggregate
to maintain a map state:
stream
.each(new Fields("msg"), new MyFilter())
.each(new Fields("msg"), new ExtractFieldsFunction(), new
Fields("id"))
.groupBy(new Fields("id"))
.persistentAggregate(MyMapState.getNonTransactional, new
CountAggregator(), new Fields("count"))
.parallelismHint(1)
It works fine, for the first batch, but then I am having a very strange
problem where after the first batch my map state is no longer called. (i.e.
there a call to multiGet and multiPut for the first batch only).
The spout is still providing tuples, as when I debug I can see that the
filter and function both continue to process input tuples (indefinitely).
But the map state never gets called again!
Why would this happen?
I found a couple of very similar threads to this, but they have gone
unanswered:
https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
Thanks for any help on this,
Josh