Hi MK

You may want to have a look at this [1] post or the code for this
paper [2] which you can find here [3]. I'm currently working on an
implementation of an idea we presented in this paper [4] (a short
video of the idea can be found here [5]).

Cheers,
Lorenz

[1] 
http://xumingming.sinaapp.com/885/twitter-storm-how-to-develop-a-pluggable-scheduler/
[2] http://dl.acm.org/citation.cfm?id=2488267
[3] http://www.dis.uniroma1.it/~midlab/software/storm-adaptive-schedulers.zip
[4] http://www.merlin.uzh.ch/publication/show/8337
[5] https://www.youtube.com/watch?v=dLLOKo9VRag

On Wed, Apr 23, 2014 at 5:40 PM, Karthikeyan Muthukumarasamy
<[email protected]> wrote:
> Hi,
> I use Trident for real-time aggregation for multiple time-buckets, like 5
> min, 15 mins, 1 hour etc, write partial aggregates as batches into
> corresponding mart tables in database.
> In the EventTimeBucketer function, I just floor the event_time to the
> corresponding 5 min (or 15 min or 1 hour) starting epoch.
> Something like this: long bucket_fivemin =  event_time - (event_time %
> (1000*60*5));
>
> The topology looks like below:
>
> Stream mainStream = topology.newStream("
> PartitionedTransactionalSpout", spout)
>                 .each(new Fields("event_time"), new EventTimeBucketer(), new
> Fields("bucket_fivemin", "bucket_fifteenmin", "bucket_onehour"))
>                 .parallelismHint(NUM_OF_PARTITIONS);
>
> Stream fiveMinAggr = mainStream.
>             .groupBy(new Fields("bucket_fivemin", "dimension1","dimension2",
> "dimension3, "dimension4"))
>             .chainedAgg()
>             .aggregate(new Fields("measure1"), new Sum(), new
> Fields("measure1_sum"))
>             .aggregate(new Fields("measure2"), new Sum(), new
> Fields("measure2_sum"))
>             .chainEnd()
>             .partitionAggregate(new Fields("bucket_fivemin", "dimension1",
> "dimension2", "dimension3", "dimension4", "measure1_sum", "measure2_sum"),
>                     new AggrPersistor(FIVE_MIN_AGGR), new Fields("done"))
>             .parallelismHint(10);
>
> Stream fifteenMinAggr = mainStream.
>             .groupBy(new Fields("bucket_fifteenmin",
> "dimension1","dimension2", "dimension3, "dimension4"))
>             .chainedAgg()
>             .aggregate(new Fields("measure1"), new Sum(), new
> Fields("measure1_sum"))
>             .aggregate(new Fields("measure2"), new Sum(), new
> Fields("measure2_sum"))
>             .chainEnd()
>             .partitionAggregate(new Fields("bucket_fifteenmin",
> "dimension1", "dimension2", "dimension3", "dimension4", "measure1_sum",
> "measure2_sum"),
>                     new AggrPersistor(FIFTEEN_MIN_AGGR), new Fields("done"))
>             .parallelismHint(10);
>
> and so on for one hour, one day etc.
>
> Left as it is, a single event record, will now flow N times across the
> network (once for each groupby combination), which would result in undue
> network traffic.
> How can I optimize network traffic for this usecase? Is it possible to use
> Storm's pluggable scheduler feature to co-locate all bolts handling a single
> combination of dimensions in a single machine to reduce network.
> Any suggestions or code snippets will be very useful.
>
> Thanks
> MK

Reply via email to