Hello, a beginner question coming up. I'm trying to build analytics
crunching with Storm Trident; a continuous stream of events of which I need
to group/aggregate things and then write the aggregated results over a
time-slice into a database for quick access later on. I am starting with
the following topology:
TridentState state = topology.newStream("logspout", spout)
.parallelismHint(8).each(new Fields("json"),
new ProcessJsonFunction(),
new Fields("ad_id", "zone",
"impressions", "clicks"))
.groupBy(new Fields("ad_id", "zone"))
.chainedAgg()
.aggregate(new Fields("impressions"), new Sum(),
new Fields("impressions_sum"))
.aggregate(new Fields("clicks"), new Sum(),
new Fields("clicks_sum"))
.chainEnd()
.partitionPersist(new AnalyticsStateFactory(),
new Fields("impressions_sum", "clicks_sum"),
new AnalyticsStateUpdater());
And then in my class AnalyticsStateUpdater:s method updateState() I would
like to store the aggregated values ("impressions_sum", "clicks_sum") per
key-"bucket" -- and here I ran into problems; how do I - in the
StateUpdater - know to which groupBy() "bucket" the aggregated data belongs
to? In other words I would need to get the key formed of the values of the
fields ("ad_id", "zone"). The aggregated values themselves end up properly
Sum():ed in the StateUpdater.
I am aware this is probably trivial but from the Trident documentation (or
the lack of it) I cannot seem to figure out how to do this.
BR,
- Matti