My understanding is that the process is
1. multiGet from the IBackingMap is called and returns a value for each
key (or null if not present)
2. For each key, the old value from the get and new values in the batch are
fed through the aggregator to produce one value per key
3. This value is then stored back into the state through the multiPut in
the IBackingMap.
If you just want to use nathanmarz's trident-memcached integration, you
don't have to write an IBackingMap yourself. The MemcachedState itself
implements IBackingMap to do the get and put. To use it, just decide what
you want to groupBy (these become your keys) and how you want it aggregated
(this is the reduced/combiner implementation). You don't have to write the
memcache connection logic or the aggregation logic yourself unless you want
to change how it's aggregated or stored.
I've not used the trident-memcached state in particular, but in general
this would look something like this:
topology.newStream("spout1", spout1)
.groupBy(new Fields("mykeyfield"))
.persistentAggregate(MemcachedState.opaque(servers), new
Fields("myvaluefield"), new Sum(), new Fields("sum"))
(Sorry for any code errors; writing in my phone)
Does that answer your question?
-Cody
On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <[email protected]> wrote:
> The Reducer/Combiner Aggregators hold logic in order to aggregate across
> an entire batch, however it does not have the logic to aggregate between
> batches.
> In order for this to happen, it must read the previous TransactionId and
> value from the datastore, determine whether this incoming data is in the
> right sequence, then increment the value within the datastore.
>
> I am asking about this second part. Where the logic goes in order to read
> previous data from the datastore, and add it to the new incoming aggregate
> data.
>
>
> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <[email protected]> wrote:
>
>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>> logic. Look at Count and Sum that are built-in to Trident. You can also
>> implement your own aggregator.
>>
>> -Cody
>>
>>
>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <[email protected]>wrote:
>>
>>> If I am using an opaque spout and doing a persistent aggregate to a
>>> MemcachedState, how is it aggregating/incrementing the values across all
>>> batches ?
>>>
>>> I'm wanting to implement an IBackingMap so that I can use an external
>>> datastore. However, I'm unsure where the logic goes that will read the
>>> previous data, and aggregate it with the new data.
>>>
>>> From what I've been told, I need to implement the IBackingMap and the
>>> multiput/multiget functions. So logically, I think it makes sense that I
>>> would put this update logiv in the multiput function. However, the
>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>> the batch.
>>> Instead of using an OpaqueMap class, should I just make my own
>>> implementation ?
>>>
>>> Thanks
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>>
>>
>> --
>> Cody A. Ray, LEED AP
>> [email protected]
>> 215.501.7891
>>
>
>
>
> --
> Raphael Hsieh
> Amazon.com
> Software Development Engineer I
> (978) 764-9014
>
>
>
>