Hi MK You may want to have a look at this [1] post or the code for this paper [2] which you can find here [3]. I'm currently working on an implementation of an idea we presented in this paper [4] (a short video of the idea can be found here [5]).
Cheers, Lorenz [1] http://xumingming.sinaapp.com/885/twitter-storm-how-to-develop-a-pluggable-scheduler/ [2] http://dl.acm.org/citation.cfm?id=2488267 [3] http://www.dis.uniroma1.it/~midlab/software/storm-adaptive-schedulers.zip [4] http://www.merlin.uzh.ch/publication/show/8337 [5] https://www.youtube.com/watch?v=dLLOKo9VRag On Wed, Apr 23, 2014 at 5:40 PM, Karthikeyan Muthukumarasamy <[email protected]> wrote: > Hi, > I use Trident for real-time aggregation for multiple time-buckets, like 5 > min, 15 mins, 1 hour etc, write partial aggregates as batches into > corresponding mart tables in database. > In the EventTimeBucketer function, I just floor the event_time to the > corresponding 5 min (or 15 min or 1 hour) starting epoch. > Something like this: long bucket_fivemin = event_time - (event_time % > (1000*60*5)); > > The topology looks like below: > > Stream mainStream = topology.newStream(" > PartitionedTransactionalSpout", spout) > .each(new Fields("event_time"), new EventTimeBucketer(), new > Fields("bucket_fivemin", "bucket_fifteenmin", "bucket_onehour")) > .parallelismHint(NUM_OF_PARTITIONS); > > Stream fiveMinAggr = mainStream. > .groupBy(new Fields("bucket_fivemin", "dimension1","dimension2", > "dimension3, "dimension4")) > .chainedAgg() > .aggregate(new Fields("measure1"), new Sum(), new > Fields("measure1_sum")) > .aggregate(new Fields("measure2"), new Sum(), new > Fields("measure2_sum")) > .chainEnd() > .partitionAggregate(new Fields("bucket_fivemin", "dimension1", > "dimension2", "dimension3", "dimension4", "measure1_sum", "measure2_sum"), > new AggrPersistor(FIVE_MIN_AGGR), new Fields("done")) > .parallelismHint(10); > > Stream fifteenMinAggr = mainStream. > .groupBy(new Fields("bucket_fifteenmin", > "dimension1","dimension2", "dimension3, "dimension4")) > .chainedAgg() > .aggregate(new Fields("measure1"), new Sum(), new > Fields("measure1_sum")) > .aggregate(new Fields("measure2"), new Sum(), new > Fields("measure2_sum")) > .chainEnd() > .partitionAggregate(new Fields("bucket_fifteenmin", > "dimension1", "dimension2", "dimension3", "dimension4", "measure1_sum", > "measure2_sum"), > new AggrPersistor(FIFTEEN_MIN_AGGR), new Fields("done")) > .parallelismHint(10); > > and so on for one hour, one day etc. > > Left as it is, a single event record, will now flow N times across the > network (once for each groupby combination), which would result in undue > network traffic. > How can I optimize network traffic for this usecase? Is it possible to use > Storm's pluggable scheduler feature to co-locate all bolts handling a single > combination of dimensions in a single machine to reduce network. > Any suggestions or code snippets will be very useful. > > Thanks > MK
