Hi,
I use Trident for real-time aggregation for multiple time-buckets, like 5
min, 15 mins, 1 hour etc, write partial aggregates as batches into
corresponding mart tables in database.
In the EventTimeBucketer function, I just floor the event_time to the
corresponding 5 min (or 15 min or 1 hour) starting epoch.
Something like this: long bucket_fivemin = event_time - (event_time %
(1000*60*5));
The topology looks like below:
Stream mainStream = topology.newStream("
PartitionedTransactionalSpout", spout)
.each(new Fields("event_time"), new EventTimeBucketer(),
new Fields("bucket_fivemin", "bucket_fifteenmin", "bucket_onehour"))
.parallelismHint(NUM_OF_PARTITIONS);
Stream fiveMinAggr = mainStream.
.groupBy(new Fields("bucket_fivemin",
"dimension1","dimension2", "dimension3, "dimension4"))
.chainedAgg()
.aggregate(new Fields("measure1"), new Sum(), new
Fields("measure1_sum"))
.aggregate(new Fields("measure2"), new Sum(), new
Fields("measure2_sum"))
.chainEnd()
.partitionAggregate(new Fields("bucket_fivemin", "dimension1",
"dimension2", "dimension3", "dimension4", "measure1_sum", "measure2_sum"),
new AggrPersistor(FIVE_MIN_AGGR), new Fields("done"))
.parallelismHint(10);
Stream fifteenMinAggr = mainStream.
.groupBy(new Fields("bucket_fifteenmin",
"dimension1","dimension2", "dimension3, "dimension4"))
.chainedAgg()
.aggregate(new Fields("measure1"), new Sum(), new
Fields("measure1_sum"))
.aggregate(new Fields("measure2"), new Sum(), new
Fields("measure2_sum"))
.chainEnd()
.partitionAggregate(new Fields("bucket_fifteenmin",
"dimension1", "dimension2", "dimension3", "dimension4", "measure1_sum",
"measure2_sum"),
new AggrPersistor(FIFTEEN_MIN_AGGR), new Fields("done"))
.parallelismHint(10);
and so on for one hour, one day etc.
Left as it is, a single event record, will now flow N times across the
network (once for each groupby combination), which would result in undue
network traffic.
How can I optimize network traffic for this usecase? Is it possible to use
Storm's pluggable scheduler feature to co-locate all bolts handling a
single combination of dimensions in a single machine to reduce network.
Any suggestions or code snippets will be very useful.
Thanks
MK