This is correct.

Here is a copy/paste of a comment I posted following a question on the same
topic 13 days ago:

"
The logic of a map state is to keep a "state" somewhere, you can think of a
Storm state as a big Map of key values, the keys come from the groupBy and
the values are the result of the aggregations. Conceptually, when your
topology is talking to a State, you can imagine it's actually talking to a
big HashMap (only there's a DB behind for persistence + opaque logic for
error handling).
"

Cheers,
S


On Wed, Feb 19, 2014 at 12:13 PM, Matti Dahlbom <[email protected]>wrote:

> Do I understand this correctly, you need a MapState approach to get the
> key values..?
>
>
> On Wed, Feb 19, 2014 at 9:41 AM, Svend Vanderveken <
> [email protected]> wrote:
>
>> Hi Matti,
>>
>> Use a persistent aggregate, it's doing precisely what you are describing:
>> giving the result of an Aggregator to a Trident State so we can save it
>> somewhere based on the groupby "bucket" it belongs to.
>>
>> Here's a blog post where I explain my understanding of how it works:
>> http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/
>>
>> Cheers,
>>
>> Svend
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Feb 19, 2014 at 8:09 AM, Matti Dahlbom <[email protected]>wrote:
>>
>>> Hello, a beginner question coming up. I'm trying to build analytics
>>> crunching with Storm Trident; a continuous stream of events of which I need
>>> to group/aggregate things and then write the aggregated results over a
>>> time-slice into a database for quick access later on. I am starting with
>>> the following topology:
>>>
>>>         TridentState state = topology.newStream("logspout", spout)
>>>             .parallelismHint(8).each(new Fields("json"),
>>>                                      new ProcessJsonFunction(),
>>>                                      new Fields("ad_id", "zone",
>>>                                                 "impressions", "clicks"))
>>>             .groupBy(new Fields("ad_id", "zone"))
>>>             .chainedAgg()
>>>             .aggregate(new Fields("impressions"), new Sum(),
>>>                        new Fields("impressions_sum"))
>>>             .aggregate(new Fields("clicks"), new Sum(),
>>>                        new Fields("clicks_sum"))
>>>             .chainEnd()
>>>             .partitionPersist(new AnalyticsStateFactory(),
>>>                               new Fields("impressions_sum",
>>> "clicks_sum"),
>>>                               new AnalyticsStateUpdater());
>>>
>>> And then in my class AnalyticsStateUpdater:s method updateState() I
>>> would like to store the aggregated values ("impressions_sum", "clicks_sum")
>>> per key-"bucket" -- and here I ran into problems; how do I - in the
>>> StateUpdater - know to which groupBy() "bucket" the aggregated data belongs
>>> to? In other words I would need to get the key formed of the values of the
>>> fields ("ad_id", "zone"). The aggregated values themselves end up properly
>>> Sum():ed in the StateUpdater.
>>>
>>> I am aware this is probably trivial but from the Trident documentation
>>> (or the lack of it) I cannot seem to figure out how to do this.
>>>
>>> BR,
>>>
>>> - Matti
>>>
>>
>>
>

Reply via email to