By the way, given the requirement of aggregating values into "time slices",
what would be a proper approach for achieving this? I was thinking two
options

a) make your spout collect data for the duration of the time slice and then
emit everything -> result in
persistentAggregate().getValueStream().each(new Value("foo"), new
ProcessTheResultHere())  could be used as-is for one time slice; this
approach would only work for a on-the-fly stream spout; we have a
requirement where we must be able to re-emit() all the raw data and in this
case it would not work properly
b) Have some kind of timer in the "ProcessTheResultHere " function thats
run after persistentAggregate() - here we would write the time-slices into
the database.

Yet both of these fail to adress one thing: who is responsible for flushing
the MapState / IBackingState when the time slice is written into the
database?

- M



On Wed, Feb 19, 2014 at 1:57 PM, Matti Dahlbom <[email protected]>wrote:

> Okay then, a rewrite awaits I guess.
>
> Although, wouldn't this work with using a custom java class / custom Sum()
> aggregator .. that way you could pass the key values in the objects that
> pass through the aggregator?
>
> For some reason I had thought we were stuck to using basic types for
> Values that we emit() and I didn't think to try this at first..
>
> - M
>
>
> On Wed, Feb 19, 2014 at 1:26 PM, Svend Vanderveken <
> [email protected]> wrote:
>
>> This is correct.
>>
>> Here is a copy/paste of a comment I posted following a question on the
>> same topic 13 days ago:
>>
>> "
>> The logic of a map state is to keep a "state" somewhere, you can think of
>> a Storm state as a big Map of key values, the keys come from the groupBy
>> and the values are the result of the aggregations. Conceptually, when your
>> topology is talking to a State, you can imagine it's actually talking to a
>> big HashMap (only there's a DB behind for persistence + opaque logic for
>> error handling).
>> "
>>
>> Cheers,
>>  S
>>
>>
>> On Wed, Feb 19, 2014 at 12:13 PM, Matti Dahlbom <[email protected]>wrote:
>>
>>> Do I understand this correctly, you need a MapState approach to get the
>>> key values..?
>>>
>>>
>>> On Wed, Feb 19, 2014 at 9:41 AM, Svend Vanderveken <
>>> [email protected]> wrote:
>>>
>>>> Hi Matti,
>>>>
>>>> Use a persistent aggregate, it's doing precisely what you are
>>>> describing: giving the result of an Aggregator to a Trident State so we can
>>>> save it somewhere based on the groupby "bucket" it belongs to.
>>>>
>>>> Here's a blog post where I explain my understanding of how it works:
>>>> http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/
>>>>
>>>> Cheers,
>>>>
>>>> Svend
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Feb 19, 2014 at 8:09 AM, Matti Dahlbom 
>>>> <[email protected]>wrote:
>>>>
>>>>> Hello, a beginner question coming up. I'm trying to build analytics
>>>>> crunching with Storm Trident; a continuous stream of events of which I 
>>>>> need
>>>>> to group/aggregate things and then write the aggregated results over a
>>>>> time-slice into a database for quick access later on. I am starting with
>>>>> the following topology:
>>>>>
>>>>>         TridentState state = topology.newStream("logspout", spout)
>>>>>             .parallelismHint(8).each(new Fields("json"),
>>>>>                                      new ProcessJsonFunction(),
>>>>>                                      new Fields("ad_id", "zone",
>>>>>                                                 "impressions",
>>>>> "clicks"))
>>>>>             .groupBy(new Fields("ad_id", "zone"))
>>>>>             .chainedAgg()
>>>>>             .aggregate(new Fields("impressions"), new Sum(),
>>>>>                        new Fields("impressions_sum"))
>>>>>             .aggregate(new Fields("clicks"), new Sum(),
>>>>>                        new Fields("clicks_sum"))
>>>>>             .chainEnd()
>>>>>             .partitionPersist(new AnalyticsStateFactory(),
>>>>>                               new Fields("impressions_sum",
>>>>> "clicks_sum"),
>>>>>                               new AnalyticsStateUpdater());
>>>>>
>>>>> And then in my class AnalyticsStateUpdater:s method updateState() I
>>>>> would like to store the aggregated values ("impressions_sum", 
>>>>> "clicks_sum")
>>>>> per key-"bucket" -- and here I ran into problems; how do I - in the
>>>>> StateUpdater - know to which groupBy() "bucket" the aggregated data 
>>>>> belongs
>>>>> to? In other words I would need to get the key formed of the values of the
>>>>> fields ("ad_id", "zone"). The aggregated values themselves end up properly
>>>>> Sum():ed in the StateUpdater.
>>>>>
>>>>> I am aware this is probably trivial but from the Trident documentation
>>>>> (or the lack of it) I cannot seem to figure out how to do this.
>>>>>
>>>>> BR,
>>>>>
>>>>> - Matti
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to