This compiles 3 bolts because you set .parallelismHint(3); after aggregate
(important operation, batch size matters)

See
https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams

.each() operations are not concerned by batch size and thus they are going
to be included in the next bolt generated


2014-06-12 10:52 GMT+09:00 최범균 <[email protected]>:

> I wrote code following:
>
> topology.newStream("log", new LogSpout())
> .each(new Fields("shopLog"), new AddGroupingValueFunction(), new
> Fields("productId:time"))
>  *.groupBy(new Fields("productId:time")) // how partition tuple*
> *.aggregate(new CountAggregator(), new Fields("count")) // 3 instance*
>  .each(new Fields("productId:time", "count"), new CountSumFunction(), new
> Fields("sum"))
> .each(new Fields("productId:time", "sum"), new ThresholdFilter())
>  .*parallelismHint(3)*;
>
> And, I found that three CountAggregator instance was created.
>
> How storm partition grouped tuple after groupBy and before aggregate?
>
>

Reply via email to