The documentation for trident aggregations mentions that aggregations are done globally across all batches. https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#aggregation-operations Is this incorrect ?
When the documentation says "all batches" it does mean all batches across all worker nodes right ? On Thu, Apr 10, 2014 at 3:21 PM, Jason Jackson <[email protected]> wrote: > > > > stream.aggregate(...) computes aggregations per batch > stream.groupBy(..).aggregate(..) computes aggregations per key per batch. > > stream.persistentAggregate(..) computes partial aggregations per batch, DB > is updated with partial aggregates per batch > stream.groupBy(..).persistentAggregate(..) computes partial aggregations > per batch per key, DB is updated with partial aggregates per batch. > > you can also think of persistent aggregate as computing DB deltas, and > only sending the deltas to the DB. > > Whether increasing the batch size reduces the total amount of writes to > persistent store is not strictly true, but in practice it does. E.g. > imagine our stream has 5 unique keys and you do a > groupBy.persistentAggregate, and the throughput is 1B/sec. If you have > batches of 5B items, then after 5 seconds you sent ~10 key/vals updates to > the DB, if you have batches of 0.5B items, then after 10 seconds you've > sent ~100 key/vals to the DB. > > Some of the tradeoffs here are that if a batch fails you have to do more > recomputation. And greater chance for a batch to fail as there's more > tuples per batch. This should though, definitely give it a shot. > > > > > > > On Thu, Apr 10, 2014 at 1:33 PM, Raphael Hsieh <[email protected]>wrote: > >> Thanks for your reply Jason, >> So what I'm hearing is that there is no nice way of doing temporal >> flushes to a database. My main reason for wanting to do this is because I >> want to use DynamoDB for my external datastore, but it gets expensive. I >> would like to limit my reads and writes as much as I can so that the cost >> does not add up. >> >> Increasing the batch size seems like the best solution so far, however >> from my understanding doing an aggregation in storm/trident does a global >> aggregation, so do batch sizes really make a difference ? Or is my >> understanding of the aggregation process wrong. I am had though that >> aggregating is global among all partitions (and storm nodes). >> >> >> On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson <[email protected]>wrote: >> >>> trident doesn't expose tick tuples in it's API yet, even though it was >>> added in storm a while ago. >>> >>> There's two problems I think you're talking about (1) windowed >>> aggregations (2) reducing DB load. >>> >>> For (1) >>> Trident can do aggregations at the batch level but this doesn't really >>> help you for doing aggregations over a range of timestamps. The way to do >>> that is you would include the timebucket in your key when >>> persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64" >>> for minutely buckets. Then when serving the data you would query all keys >>> across the time range. Certain databases such as cassandra can make this >>> query very fast. >>> >>> (2) You'll need to implement your own IBackingMap persistent store >>> plugin and pass it to persistentAggregate. Look other examples such as the >>> trident-memcache for how to implement these. So for your custom persistent >>> store plugin you could use a combination of in-memory map and DB. 4/5 >>> batches would just commit their state updates to the in-memory map. The 5th >>> batch would commit to the in-memory map, and then flush that map to the >>> database. You could even launch a separate thread to do the flushing, >>> incase it takes a while. This design however is not going to give you >>> exactly once semantics. As if you loose the in-memory map because your >>> worker died for example, then when it comes back online it will still >>> resume from the last successful batch (not the last flushed batch). >>> >>> To retain exactly once semantics you could also make your batch sizes >>> much larger, by default they read 1MB from each kafka partition (see >>> bufferSize and fetchSize configuration option in Kafka Spout). IF you >>> increased batch size, and you're doing some kind of key based aggregations, >>> then this would reduce the total number of writes you would need to do your >>> persistent storage. >>> >>> Trident could definitely be improved here, so your mileage may vary. >>> >>> >>> On Wed, Apr 9, 2014 at 9:15 AM, Raphael Hsieh <[email protected]>wrote: >>> >>>> I have been struggling to figure out how to get trident to aggregate >>>> data over a certain time period then flush the data to an external data >>>> store. >>>> The reasoning behind this is to reduce the number of reads and writes >>>> sent to the database. >>>> >>>> I've seen that Storm allows for tick tuples to be inserted into the >>>> stream, however I can't figure out how to do this with trident. I had >>>> thought that this functionality was added with Storm version 0.8.0 ? >>>> Is this the case ? >>>> >>>> One thing I had tried was to create a new stream that emitted a tuple >>>> once every X time period, then I tried to merge this stream into my actual >>>> data stream. However, doing this would result in a non transactional stream >>>> which would be no good. Also it didn't work, as the resulting stream only >>>> consisted of tuples from my clock stream. >>>> >>>> Can anybody help me figure out how to have Trident aggregate data over >>>> a certain time frame, flush it out to an external datastore, then rinse and >>>> repeat ? >>>> >>>> there are some blogs out there regarding how to use a sliding window in >>>> storm, however I just want sequential windows in Trident. >>>> >>>> Thanks >>>> >>>> -- >>>> Raphael Hsieh >>>> >>>> >>>> >>>> >>> >>> >> >> >> -- >> Raphael Hsieh >> Amazon.com >> Software Development Engineer I >> (978) 764-9014 >> >> >> >> > > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
