This compiles 3 bolts because you set .parallelismHint(3); after aggregate (important operation, batch size matters)
See https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams .each() operations are not concerned by batch size and thus they are going to be included in the next bolt generated 2014-06-12 10:52 GMT+09:00 최범균 <[email protected]>: > I wrote code following: > > topology.newStream("log", new LogSpout()) > .each(new Fields("shopLog"), new AddGroupingValueFunction(), new > Fields("productId:time")) > *.groupBy(new Fields("productId:time")) // how partition tuple* > *.aggregate(new CountAggregator(), new Fields("count")) // 3 instance* > .each(new Fields("productId:time", "count"), new CountSumFunction(), new > Fields("sum")) > .each(new Fields("productId:time", "sum"), new ThresholdFilter()) > .*parallelismHint(3)*; > > And, I found that three CountAggregator instance was created. > > How storm partition grouped tuple after groupBy and before aggregate? > >
