Not understanding why SendKafka(BaseFunction) and Count(CombinerAggregator)
are not executed. Anyone have any idea why?
topologyTrident.newStream("lines", new RandomDataJsonSpout())
.each(new Fields("line"), new ExtractField(new Fields("id", "speed")), new
Fields("id", "field"))
.aggregate(new Fields("id", "field"), new Average(), new Fields("objAvg"))
.parallelismHint(5)
.each(new Fields("objAvg"), new SendKafka(), fields)
.parallelismHint(5)
.persistentAggregate(new MemoryMapState.Factory(), fields, new Count(), new
Fields());
--
Thomas Cristanis