So,, Which partition strategy Storm choose when no partion strategy
specified between groupBy and aggregate?
shuffle? partitionBy(fields)? or else?

*.groupBy(new Fields("productId:time")) // no partition strategy*
*.aggregate(new CountAggregator(), new Fields("count")) // how storm choose
partition which receive grouped tuples?*
.each(new Fields("productId:time", "count"), new CountSumFunction(), new
Fields("sum"))
.each(new Fields("productId:time", "sum"), new ThresholdFilter())
.*parallelismHint(3)*;





2014-06-13 0:11 GMT+09:00 Romain Leroux <[email protected]>:

> This compiles 3 bolts because you set .parallelismHint(3); after aggregate
> (important operation, batch size matters)
>
> See
> https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams
>
> .each() operations are not concerned by batch size and thus they are going
> to be included in the next bolt generated
>
>
> 2014-06-12 10:52 GMT+09:00 최범균 <[email protected]>:
>
> I wrote code following:
>>
>> topology.newStream("log", new LogSpout())
>> .each(new Fields("shopLog"), new AddGroupingValueFunction(), new
>> Fields("productId:time"))
>>  *.groupBy(new Fields("productId:time")) // how partition tuple*
>> *.aggregate(new CountAggregator(), new Fields("count")) // 3 instance*
>>  .each(new Fields("productId:time", "count"), new CountSumFunction(),
>> new Fields("sum"))
>> .each(new Fields("productId:time", "sum"), new ThresholdFilter())
>>  .*parallelismHint(3)*;
>>
>> And, I found that three CountAggregator instance was created.
>>
>> How storm partition grouped tuple after groupBy and before aggregate?
>>
>>
>

Reply via email to