Hi,
I use Trident for real-time aggregation for multiple time-buckets, like 5
min, 15 mins, 1 hour etc, write partial aggregates as batches into
corresponding mart tables in database.
In the EventTimeBucketer function, I just floor the event_time to the
corresponding 5 min (or 15 min or 1 hour) starting epoch.
Something like this: long bucket_fivemin =  event_time - (event_time %
(1000*60*5));

The topology looks like below:

Stream mainStream = topology.newStream("
PartitionedTransactionalSpout", spout)
                .each(new Fields("event_time"), new EventTimeBucketer(),
new Fields("bucket_fivemin", "bucket_fifteenmin", "bucket_onehour"))
                .parallelismHint(NUM_OF_PARTITIONS);

Stream fiveMinAggr = mainStream.
            .groupBy(new Fields("bucket_fivemin",
"dimension1","dimension2", "dimension3, "dimension4"))
            .chainedAgg()
            .aggregate(new Fields("measure1"), new Sum(), new
Fields("measure1_sum"))
            .aggregate(new Fields("measure2"), new Sum(), new
Fields("measure2_sum"))
            .chainEnd()
            .partitionAggregate(new Fields("bucket_fivemin", "dimension1",
"dimension2", "dimension3", "dimension4", "measure1_sum", "measure2_sum"),
                    new AggrPersistor(FIVE_MIN_AGGR), new Fields("done"))
            .parallelismHint(10);

Stream fifteenMinAggr = mainStream.
            .groupBy(new Fields("bucket_fifteenmin",
"dimension1","dimension2", "dimension3, "dimension4"))
            .chainedAgg()
            .aggregate(new Fields("measure1"), new Sum(), new
Fields("measure1_sum"))
            .aggregate(new Fields("measure2"), new Sum(), new
Fields("measure2_sum"))
            .chainEnd()
            .partitionAggregate(new Fields("bucket_fifteenmin",
"dimension1", "dimension2", "dimension3", "dimension4", "measure1_sum",
"measure2_sum"),
                    new AggrPersistor(FIFTEEN_MIN_AGGR), new Fields("done"))
            .parallelismHint(10);

and so on for one hour, one day etc.

Left as it is, a single event record, will now flow N times across the
network (once for each groupby combination), which would result in undue
network traffic.
How can I optimize network traffic for this usecase? Is it possible to use
Storm's pluggable scheduler feature to co-locate all bolts handling a
single combination of dimensions in a single machine to reduce network.
Any suggestions or code snippets will be very useful.

Thanks
MK

Reply via email to