Yes partially,
The part I was missing was getting old values and feeding it through the
aggregator again, which still doesn't quite make sense to me.

I am using an external datastore, so I am not able to use the vanilla
MemcachedState, hence why I am implementing my own version of the
IBackingMap.

So let me try and explain what I am understanding.
When I do something like

Stream
    .groupBy(new Fields("a")
    .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
"d"), new MyAggregator(), new Fields("resultMap"))

What happens (as described
here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
is the stream is split into different groups based on field "a":
[image: Grouping]
like so.
then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
"d"), since that is what we are using as our keys. So in each of the
"groups" described above, we would have not only the raw tuples resulting
from the grouping, but also a single tuple with the result of the previous
aggregation.
These would all be run through the aggregator, which should be able to
handle aggregating with this semi-complete aggregation (The "Reduce"
function in a ReducerAggregator, or the "Combine" function in the
CombinerAggregator).

How does it know not to treat the previous aggregation as a single new
tuple? (hence not running the "init" function ? For example if I was
aggregating a count, having that previous value (say 60) as a single extra
tuple would only increment the count by 1, instead of 60.
would I then just need to implement my own "init" function such that it has
checks for the tuple  value, whether it is a raw new tuple, vs a previous
tuple aggregation?


On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray <cody.a....@gmail.com> wrote:

> My understanding is that the process is
> 1. multiGet from the IBackingMap  is called and returns a value for each
> key (or null if not present)
> 2. For each key, the old value from the get and new values in the batch
> are fed through the aggregator to produce one value per key
> 3. This value is then stored back into the state through the multiPut in
> the IBackingMap.
>
> If you just want to use nathanmarz's trident-memcached integration, you
> don't have to write an IBackingMap yourself. The MemcachedState itself
> implements IBackingMap to do the get and put. To use it, just decide what
> you want to groupBy (these become your keys) and how you want it aggregated
> (this is the reduced/combiner implementation). You don't have to write the
> memcache connection logic or the aggregation logic yourself unless you want
> to change how it's aggregated or stored.
> I've not used the trident-memcached state in particular, but in general
> this would look something like this:
>
> topology.newStream("spout1", spout1)
>   .groupBy(new Fields("mykeyfield"))
>   .persistentAggregate(MemcachedState.opaque(servers), new
> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>
> (Sorry for any code errors; writing in my phone)
>
> Does that answer your question?
>
> -Cody
> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <raffihs...@gmail.com> wrote:
>
>> The Reducer/Combiner Aggregators hold logic in order to aggregate across
>> an entire batch, however it does not have the logic to aggregate between
>> batches.
>> In order for this to happen, it must read the previous TransactionId and
>> value from the datastore, determine whether this incoming data is in the
>> right sequence, then increment the value within the datastore.
>>
>> I am asking about this second part. Where the logic goes in order to read
>> previous data from the datastore, and add it to the new incoming aggregate
>> data.
>>
>>
>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <cody.a....@gmail.com>wrote:
>>
>>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>>> logic. Look at Count and Sum that are built-in to Trident. You can also
>>> implement your own aggregator.
>>>
>>> -Cody
>>>
>>>
>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <raffihs...@gmail.com>wrote:
>>>
>>>> If I am using an opaque spout and doing a persistent aggregate to a
>>>> MemcachedState, how is it aggregating/incrementing the values across all
>>>> batches ?
>>>>
>>>> I'm wanting to implement an IBackingMap so that I can use an external
>>>> datastore. However, I'm unsure where the logic goes that will read the
>>>> previous data, and aggregate it with the new data.
>>>>
>>>> From what I've been told, I need to implement the IBackingMap and the
>>>> multiput/multiget functions. So logically, I think it makes sense that I
>>>> would put this update logiv in the multiput function. However, the
>>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>>> the batch.
>>>> Instead of using an OpaqueMap class, should I just make my own
>>>> implementation ?
>>>>
>>>> Thanks
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Cody A. Ray, LEED AP
>>> cody.a....@gmail.com
>>> 215.501.7891
>>>
>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>


-- 
Raphael Hsieh

Reply via email to