This is correct. Here is a copy/paste of a comment I posted following a question on the same topic 13 days ago:
" The logic of a map state is to keep a "state" somewhere, you can think of a Storm state as a big Map of key values, the keys come from the groupBy and the values are the result of the aggregations. Conceptually, when your topology is talking to a State, you can imagine it's actually talking to a big HashMap (only there's a DB behind for persistence + opaque logic for error handling). " Cheers, S On Wed, Feb 19, 2014 at 12:13 PM, Matti Dahlbom <[email protected]>wrote: > Do I understand this correctly, you need a MapState approach to get the > key values..? > > > On Wed, Feb 19, 2014 at 9:41 AM, Svend Vanderveken < > [email protected]> wrote: > >> Hi Matti, >> >> Use a persistent aggregate, it's doing precisely what you are describing: >> giving the result of an Aggregator to a Trident State so we can save it >> somewhere based on the groupby "bucket" it belongs to. >> >> Here's a blog post where I explain my understanding of how it works: >> http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/ >> >> Cheers, >> >> Svend >> >> >> >> >> >> >> >> >> On Wed, Feb 19, 2014 at 8:09 AM, Matti Dahlbom <[email protected]>wrote: >> >>> Hello, a beginner question coming up. I'm trying to build analytics >>> crunching with Storm Trident; a continuous stream of events of which I need >>> to group/aggregate things and then write the aggregated results over a >>> time-slice into a database for quick access later on. I am starting with >>> the following topology: >>> >>> TridentState state = topology.newStream("logspout", spout) >>> .parallelismHint(8).each(new Fields("json"), >>> new ProcessJsonFunction(), >>> new Fields("ad_id", "zone", >>> "impressions", "clicks")) >>> .groupBy(new Fields("ad_id", "zone")) >>> .chainedAgg() >>> .aggregate(new Fields("impressions"), new Sum(), >>> new Fields("impressions_sum")) >>> .aggregate(new Fields("clicks"), new Sum(), >>> new Fields("clicks_sum")) >>> .chainEnd() >>> .partitionPersist(new AnalyticsStateFactory(), >>> new Fields("impressions_sum", >>> "clicks_sum"), >>> new AnalyticsStateUpdater()); >>> >>> And then in my class AnalyticsStateUpdater:s method updateState() I >>> would like to store the aggregated values ("impressions_sum", "clicks_sum") >>> per key-"bucket" -- and here I ran into problems; how do I - in the >>> StateUpdater - know to which groupBy() "bucket" the aggregated data belongs >>> to? In other words I would need to get the key formed of the values of the >>> fields ("ad_id", "zone"). The aggregated values themselves end up properly >>> Sum():ed in the StateUpdater. >>> >>> I am aware this is probably trivial but from the Trident documentation >>> (or the lack of it) I cannot seem to figure out how to do this. >>> >>> BR, >>> >>> - Matti >>> >> >> >
