The documentation for trident aggregations mentions that aggregations are
done globally across all batches.
https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#aggregation-operations
Is this incorrect ?

When the documentation says "all batches" it does mean all batches across
all worker nodes right ?


On Thu, Apr 10, 2014 at 3:21 PM, Jason Jackson <[email protected]> wrote:

>
>
>
> stream.aggregate(...) computes aggregations per batch
> stream.groupBy(..).aggregate(..) computes aggregations per key per batch.
>
> stream.persistentAggregate(..) computes partial aggregations per batch, DB
> is updated with partial aggregates per batch
> stream.groupBy(..).persistentAggregate(..) computes partial aggregations
> per batch per key, DB is updated with partial aggregates per batch.
>
> you can also think of persistent aggregate as computing DB deltas, and
> only sending the deltas to the DB.
>
> Whether increasing the batch size reduces the total amount of writes to
> persistent store is not strictly true, but in practice it does. E.g.
> imagine our stream has 5 unique keys and you do a
> groupBy.persistentAggregate, and the throughput is 1B/sec. If you have
> batches of 5B items, then after 5 seconds you sent ~10 key/vals updates to
> the DB, if you have batches of 0.5B items, then after 10 seconds you've
> sent ~100 key/vals to the DB.
>
> Some of the tradeoffs here are that if a batch fails you have to do more
> recomputation. And greater chance for a batch to fail as there's more
> tuples per batch. This should though, definitely give it a shot.
>
>
>
>
>
>
> On Thu, Apr 10, 2014 at 1:33 PM, Raphael Hsieh <[email protected]>wrote:
>
>> Thanks for your reply Jason,
>> So what I'm hearing is that there is no nice way of doing temporal
>> flushes to a database. My main reason for wanting to do this is because I
>> want to use DynamoDB for my external datastore, but it gets expensive. I
>> would like to limit my reads and writes as much as I can so that the cost
>> does not add up.
>>
>> Increasing the batch size seems like the best solution so far, however
>> from my understanding doing an aggregation in storm/trident does a global
>> aggregation, so do batch sizes really make a difference ? Or is my
>> understanding of the aggregation process wrong. I am had though that
>> aggregating is global among all partitions (and storm nodes).
>>
>>
>> On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson <[email protected]>wrote:
>>
>>> trident doesn't expose tick tuples in it's API yet, even though it was
>>> added in storm a while ago.
>>>
>>> There's two problems I think you're talking about (1) windowed
>>> aggregations (2) reducing DB load.
>>>
>>> For (1)
>>> Trident can do aggregations at the batch level but this doesn't really
>>> help you for doing aggregations over a range of timestamps. The way to do
>>> that is you would include the timebucket in your key when
>>> persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64"
>>> for minutely buckets. Then when serving the data you would query all keys
>>> across the time range. Certain databases such as cassandra can make this
>>> query very fast.
>>>
>>> (2) You'll need to implement your own IBackingMap persistent store
>>> plugin and pass it to persistentAggregate. Look other examples such as the
>>> trident-memcache for how to implement these. So for your custom persistent
>>> store plugin  you could use a combination of in-memory map and DB. 4/5
>>> batches would just commit their state updates to the in-memory map. The 5th
>>> batch would commit to the in-memory map, and then flush that map to the
>>> database. You could even launch a separate thread to do the flushing,
>>> incase it takes a while. This design however is not going to give you
>>> exactly once semantics. As if you loose the in-memory map because your
>>> worker died for example, then when it comes back online it will still
>>> resume from the last successful batch (not the last flushed batch).
>>>
>>> To retain exactly once semantics you could also make your batch sizes
>>> much larger, by default they read 1MB from each kafka partition (see
>>> bufferSize and fetchSize configuration option in Kafka Spout). IF you
>>> increased batch size, and you're doing some kind of key based aggregations,
>>> then this would reduce the total number of writes you would need to do your
>>> persistent storage.
>>>
>>> Trident could definitely be improved here, so your mileage may vary.
>>>
>>>
>>> On Wed, Apr 9, 2014 at 9:15 AM, Raphael Hsieh <[email protected]>wrote:
>>>
>>>> I have been struggling to figure out how to get trident to aggregate
>>>> data over a certain time period then flush the data to an external data
>>>> store.
>>>> The reasoning behind this is to reduce the number of reads and writes
>>>> sent to the database.
>>>>
>>>> I've seen that Storm allows for tick tuples to be inserted into the
>>>> stream, however I can't figure out how to do this with trident. I had
>>>> thought that this functionality was added with Storm version 0.8.0 ?
>>>> Is this the case ?
>>>>
>>>> One thing I had tried was to create a new stream that emitted a tuple
>>>> once every X time period, then I tried to merge this stream into my actual
>>>> data stream. However, doing this would result in a non transactional stream
>>>> which would be no good. Also it didn't work, as the resulting stream only
>>>> consisted of tuples from my clock stream.
>>>>
>>>> Can anybody help me figure out how to have Trident aggregate data over
>>>> a certain time frame, flush it out to an external datastore, then rinse and
>>>> repeat ?
>>>>
>>>> there are some blogs out there regarding how to use a sliding window in
>>>> storm, however I just want sequential windows in Trident.
>>>>
>>>> Thanks
>>>>
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>> --
>> Raphael Hsieh
>> Amazon.com
>> Software Development Engineer I
>> (978) 764-9014
>>
>>
>>
>>
>
>


-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014

Reply via email to