the previous link didn't work,
https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams


On Tue, Apr 22, 2014 at 10:30 AM, Raphael Hsieh <raffihs...@gmail.com>wrote:

> Yes partially,
> The part I was missing was getting old values and feeding it through the
> aggregator again, which still doesn't quite make sense to me.
>
> I am using an external datastore, so I am not able to use the vanilla
> MemcachedState, hence why I am implementing my own version of the
> IBackingMap.
>
> So let me try and explain what I am understanding.
> When I do something like
>
> Stream
>     .groupBy(new Fields("a")
>     .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
> "d"), new MyAggregator(), new Fields("resultMap"))
>
> What happens (as described 
> here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
> is the stream is split into different groups based on field "a":
> [image: Grouping]
> like so.
> then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
> "d"), since that is what we are using as our keys. So in each of the
> "groups" described above, we would have not only the raw tuples resulting
> from the grouping, but also a single tuple with the result of the previous
> aggregation.
> These would all be run through the aggregator, which should be able to
> handle aggregating with this semi-complete aggregation (The "Reduce"
> function in a ReducerAggregator, or the "Combine" function in the
> CombinerAggregator).
>
> How does it know not to treat the previous aggregation as a single new
> tuple? (hence not running the "init" function ? For example if I was
> aggregating a count, having that previous value (say 60) as a single extra
> tuple would only increment the count by 1, instead of 60.
> would I then just need to implement my own "init" function such that it
> has checks for the tuple  value, whether it is a raw new tuple, vs a
> previous tuple aggregation?
>
>
> On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray <cody.a....@gmail.com> wrote:
>
>> My understanding is that the process is
>> 1. multiGet from the IBackingMap  is called and returns a value for each
>> key (or null if not present)
>> 2. For each key, the old value from the get and new values in the batch
>> are fed through the aggregator to produce one value per key
>> 3. This value is then stored back into the state through the multiPut in
>> the IBackingMap.
>>
>> If you just want to use nathanmarz's trident-memcached integration, you
>> don't have to write an IBackingMap yourself. The MemcachedState itself
>> implements IBackingMap to do the get and put. To use it, just decide what
>> you want to groupBy (these become your keys) and how you want it aggregated
>> (this is the reduced/combiner implementation). You don't have to write the
>> memcache connection logic or the aggregation logic yourself unless you want
>> to change how it's aggregated or stored.
>> I've not used the trident-memcached state in particular, but in general
>> this would look something like this:
>>
>> topology.newStream("spout1", spout1)
>>   .groupBy(new Fields("mykeyfield"))
>>   .persistentAggregate(MemcachedState.opaque(servers), new
>> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>>
>> (Sorry for any code errors; writing in my phone)
>>
>> Does that answer your question?
>>
>> -Cody
>> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <raffihs...@gmail.com> wrote:
>>
>>> The Reducer/Combiner Aggregators hold logic in order to aggregate across
>>> an entire batch, however it does not have the logic to aggregate between
>>> batches.
>>> In order for this to happen, it must read the previous TransactionId and
>>> value from the datastore, determine whether this incoming data is in the
>>> right sequence, then increment the value within the datastore.
>>>
>>> I am asking about this second part. Where the logic goes in order to
>>> read previous data from the datastore, and add it to the new incoming
>>> aggregate data.
>>>
>>>
>>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <cody.a....@gmail.com>wrote:
>>>
>>>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>>>> logic. Look at Count and Sum that are built-in to Trident. You can also
>>>> implement your own aggregator.
>>>>
>>>> -Cody
>>>>
>>>>
>>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <raffihs...@gmail.com>wrote:
>>>>
>>>>> If I am using an opaque spout and doing a persistent aggregate to a
>>>>> MemcachedState, how is it aggregating/incrementing the values across all
>>>>> batches ?
>>>>>
>>>>> I'm wanting to implement an IBackingMap so that I can use an external
>>>>> datastore. However, I'm unsure where the logic goes that will read the
>>>>> previous data, and aggregate it with the new data.
>>>>>
>>>>> From what I've been told, I need to implement the IBackingMap and the
>>>>> multiput/multiget functions. So logically, I think it makes sense that I
>>>>> would put this update logiv in the multiput function. However, the
>>>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>>>> the batch.
>>>>> Instead of using an OpaqueMap class, should I just make my own
>>>>> implementation ?
>>>>>
>>>>> Thanks
>>>>> --
>>>>> Raphael Hsieh
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cody A. Ray, LEED AP
>>>> cody.a....@gmail.com
>>>> 215.501.7891
>>>>
>>>
>>>
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>
>
> --
> Raphael Hsieh
>
>
>
>



-- 
Raphael Hsieh

Reply via email to