I wrote code following:
topology.newStream("log", new LogSpout())
.each(new Fields("shopLog"), new AddGroupingValueFunction(), new
Fields("productId:time"))
*.groupBy(new Fields("productId:time")) // how partition tuple*
*.aggregate(new CountAggregator(), new Fields("count")) // 3 instance*
.each(new Fields("productId:time", "count"), new CountSumFunction(), new
Fields("sum"))
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.*parallelismHint(3)*;
And, I found that three CountAggregator instance was created.
How storm partition grouped tuple after groupBy and before aggregate?