stream.aggregate(...) computes aggregations per batch
stream.groupBy(..).aggregate(..) computes aggregations per key per batch.

stream.persistentAggregate(..) computes partial aggregations per batch, DB
is updated with partial aggregates per batch
stream.groupBy(..).persistentAggregate(..) computes partial aggregations
per batch per key, DB is updated with partial aggregates per batch.

you can also think of persistent aggregate as computing DB deltas, and only
sending the deltas to the DB.

Whether increasing the batch size reduces the total amount of writes to
persistent store is not strictly true, but in practice it does. E.g.
imagine our stream has 5 unique keys and you do a
groupBy.persistentAggregate, and the throughput is 1B/sec. If you have
batches of 5B items, then after 5 seconds you sent ~10 key/vals updates to
the DB, if you have batches of 0.5B items, then after 10 seconds you've
sent ~100 key/vals to the DB.

Some of the tradeoffs here are that if a batch fails you have to do more
recomputation. And greater chance for a batch to fail as there's more
tuples per batch. This should though, definitely give it a shot.






On Thu, Apr 10, 2014 at 1:33 PM, Raphael Hsieh <[email protected]> wrote:

> Thanks for your reply Jason,
> So what I'm hearing is that there is no nice way of doing temporal flushes
> to a database. My main reason for wanting to do this is because I want to
> use DynamoDB for my external datastore, but it gets expensive. I would like
> to limit my reads and writes as much as I can so that the cost does not add
> up.
>
> Increasing the batch size seems like the best solution so far, however
> from my understanding doing an aggregation in storm/trident does a global
> aggregation, so do batch sizes really make a difference ? Or is my
> understanding of the aggregation process wrong. I am had though that
> aggregating is global among all partitions (and storm nodes).
>
>
> On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson <[email protected]>wrote:
>
>> trident doesn't expose tick tuples in it's API yet, even though it was
>> added in storm a while ago.
>>
>> There's two problems I think you're talking about (1) windowed
>> aggregations (2) reducing DB load.
>>
>> For (1)
>> Trident can do aggregations at the batch level but this doesn't really
>> help you for doing aggregations over a range of timestamps. The way to do
>> that is you would include the timebucket in your key when
>> persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64"
>> for minutely buckets. Then when serving the data you would query all keys
>> across the time range. Certain databases such as cassandra can make this
>> query very fast.
>>
>> (2) You'll need to implement your own IBackingMap persistent store plugin
>> and pass it to persistentAggregate. Look other examples such as the
>> trident-memcache for how to implement these. So for your custom persistent
>> store plugin  you could use a combination of in-memory map and DB. 4/5
>> batches would just commit their state updates to the in-memory map. The 5th
>> batch would commit to the in-memory map, and then flush that map to the
>> database. You could even launch a separate thread to do the flushing,
>> incase it takes a while. This design however is not going to give you
>> exactly once semantics. As if you loose the in-memory map because your
>> worker died for example, then when it comes back online it will still
>> resume from the last successful batch (not the last flushed batch).
>>
>> To retain exactly once semantics you could also make your batch sizes
>> much larger, by default they read 1MB from each kafka partition (see
>> bufferSize and fetchSize configuration option in Kafka Spout). IF you
>> increased batch size, and you're doing some kind of key based aggregations,
>> then this would reduce the total number of writes you would need to do your
>> persistent storage.
>>
>> Trident could definitely be improved here, so your mileage may vary.
>>
>>
>> On Wed, Apr 9, 2014 at 9:15 AM, Raphael Hsieh <[email protected]>wrote:
>>
>>> I have been struggling to figure out how to get trident to aggregate
>>> data over a certain time period then flush the data to an external data
>>> store.
>>> The reasoning behind this is to reduce the number of reads and writes
>>> sent to the database.
>>>
>>> I've seen that Storm allows for tick tuples to be inserted into the
>>> stream, however I can't figure out how to do this with trident. I had
>>> thought that this functionality was added with Storm version 0.8.0 ?
>>> Is this the case ?
>>>
>>> One thing I had tried was to create a new stream that emitted a tuple
>>> once every X time period, then I tried to merge this stream into my actual
>>> data stream. However, doing this would result in a non transactional stream
>>> which would be no good. Also it didn't work, as the resulting stream only
>>> consisted of tuples from my clock stream.
>>>
>>> Can anybody help me figure out how to have Trident aggregate data over a
>>> certain time frame, flush it out to an external datastore, then rinse and
>>> repeat ?
>>>
>>> there are some blogs out there regarding how to use a sliding window in
>>> storm, however I just want sequential windows in Trident.
>>>
>>> Thanks
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>>
>
>
> --
> Raphael Hsieh
> Amazon.com
> Software Development Engineer I
> (978) 764-9014
>
>
>
>

Reply via email to