Hi all,

I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka,
I'm doing some filtering and aggregation and then calling persistAggregate
to maintain a map state:

stream
      .each(new Fields("msg"), new MyFilter())
      .each(new Fields("msg"), new ExtractFieldsFunction(), new
Fields("id"))
      .groupBy(new Fields("id"))
      .persistentAggregate(MyMapState.getNonTransactional, new
CountAggregator(), new Fields("count"))
      .parallelismHint(1)

It works fine, for the first batch, but then I am having a very strange
problem where after the first batch my map state is no longer called. (i.e.
there a call to multiGet and multiPut for the first batch only).

The spout is still providing tuples, as when I debug I can see that the
filter and function both continue to process input tuples (indefinitely).
But the map state never gets called again!

Why would this happen?

I found a couple of very similar threads to this, but they have gone
unanswered:
https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ


Thanks for any help on this,

Josh

Reply via email to