Do I understand this correctly, you need a MapState approach to get the key
values..?


On Wed, Feb 19, 2014 at 9:41 AM, Svend Vanderveken <
[email protected]> wrote:

> Hi Matti,
>
> Use a persistent aggregate, it's doing precisely what you are describing:
> giving the result of an Aggregator to a Trident State so we can save it
> somewhere based on the groupby "bucket" it belongs to.
>
> Here's a blog post where I explain my understanding of how it works:
> http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/
>
> Cheers,
>
> Svend
>
>
>
>
>
>
>
>
> On Wed, Feb 19, 2014 at 8:09 AM, Matti Dahlbom <[email protected]>wrote:
>
>> Hello, a beginner question coming up. I'm trying to build analytics
>> crunching with Storm Trident; a continuous stream of events of which I need
>> to group/aggregate things and then write the aggregated results over a
>> time-slice into a database for quick access later on. I am starting with
>> the following topology:
>>
>>         TridentState state = topology.newStream("logspout", spout)
>>             .parallelismHint(8).each(new Fields("json"),
>>                                      new ProcessJsonFunction(),
>>                                      new Fields("ad_id", "zone",
>>                                                 "impressions", "clicks"))
>>             .groupBy(new Fields("ad_id", "zone"))
>>             .chainedAgg()
>>             .aggregate(new Fields("impressions"), new Sum(),
>>                        new Fields("impressions_sum"))
>>             .aggregate(new Fields("clicks"), new Sum(),
>>                        new Fields("clicks_sum"))
>>             .chainEnd()
>>             .partitionPersist(new AnalyticsStateFactory(),
>>                               new Fields("impressions_sum", "clicks_sum"),
>>                               new AnalyticsStateUpdater());
>>
>> And then in my class AnalyticsStateUpdater:s method updateState() I would
>> like to store the aggregated values ("impressions_sum", "clicks_sum") per
>> key-"bucket" -- and here I ran into problems; how do I - in the
>> StateUpdater - know to which groupBy() "bucket" the aggregated data belongs
>> to? In other words I would need to get the key formed of the values of the
>> fields ("ad_id", "zone"). The aggregated values themselves end up properly
>> Sum():ed in the StateUpdater.
>>
>> I am aware this is probably trivial but from the Trident documentation
>> (or the lack of it) I cannot seem to figure out how to do this.
>>
>> BR,
>>
>> - Matti
>>
>
>

Reply via email to