Do I understand this correctly, you need a MapState approach to get the key values..?
On Wed, Feb 19, 2014 at 9:41 AM, Svend Vanderveken < [email protected]> wrote: > Hi Matti, > > Use a persistent aggregate, it's doing precisely what you are describing: > giving the result of an Aggregator to a Trident State so we can save it > somewhere based on the groupby "bucket" it belongs to. > > Here's a blog post where I explain my understanding of how it works: > http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/ > > Cheers, > > Svend > > > > > > > > > On Wed, Feb 19, 2014 at 8:09 AM, Matti Dahlbom <[email protected]>wrote: > >> Hello, a beginner question coming up. I'm trying to build analytics >> crunching with Storm Trident; a continuous stream of events of which I need >> to group/aggregate things and then write the aggregated results over a >> time-slice into a database for quick access later on. I am starting with >> the following topology: >> >> TridentState state = topology.newStream("logspout", spout) >> .parallelismHint(8).each(new Fields("json"), >> new ProcessJsonFunction(), >> new Fields("ad_id", "zone", >> "impressions", "clicks")) >> .groupBy(new Fields("ad_id", "zone")) >> .chainedAgg() >> .aggregate(new Fields("impressions"), new Sum(), >> new Fields("impressions_sum")) >> .aggregate(new Fields("clicks"), new Sum(), >> new Fields("clicks_sum")) >> .chainEnd() >> .partitionPersist(new AnalyticsStateFactory(), >> new Fields("impressions_sum", "clicks_sum"), >> new AnalyticsStateUpdater()); >> >> And then in my class AnalyticsStateUpdater:s method updateState() I would >> like to store the aggregated values ("impressions_sum", "clicks_sum") per >> key-"bucket" -- and here I ran into problems; how do I - in the >> StateUpdater - know to which groupBy() "bucket" the aggregated data belongs >> to? In other words I would need to get the key formed of the values of the >> fields ("ad_id", "zone"). The aggregated values themselves end up properly >> Sum():ed in the StateUpdater. >> >> I am aware this is probably trivial but from the Trident documentation >> (or the lack of it) I cannot seem to figure out how to do this. >> >> BR, >> >> - Matti >> > >
