Thanks for your reply Jason, So what I'm hearing is that there is no nice way of doing temporal flushes to a database. My main reason for wanting to do this is because I want to use DynamoDB for my external datastore, but it gets expensive. I would like to limit my reads and writes as much as I can so that the cost does not add up.
Increasing the batch size seems like the best solution so far, however from my understanding doing an aggregation in storm/trident does a global aggregation, so do batch sizes really make a difference ? Or is my understanding of the aggregation process wrong. I am had though that aggregating is global among all partitions (and storm nodes). On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson <[email protected]> wrote: > trident doesn't expose tick tuples in it's API yet, even though it was > added in storm a while ago. > > There's two problems I think you're talking about (1) windowed > aggregations (2) reducing DB load. > > For (1) > Trident can do aggregations at the batch level but this doesn't really > help you for doing aggregations over a range of timestamps. The way to do > that is you would include the timebucket in your key when > persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64" > for minutely buckets. Then when serving the data you would query all keys > across the time range. Certain databases such as cassandra can make this > query very fast. > > (2) You'll need to implement your own IBackingMap persistent store plugin > and pass it to persistentAggregate. Look other examples such as the > trident-memcache for how to implement these. So for your custom persistent > store plugin you could use a combination of in-memory map and DB. 4/5 > batches would just commit their state updates to the in-memory map. The 5th > batch would commit to the in-memory map, and then flush that map to the > database. You could even launch a separate thread to do the flushing, > incase it takes a while. This design however is not going to give you > exactly once semantics. As if you loose the in-memory map because your > worker died for example, then when it comes back online it will still > resume from the last successful batch (not the last flushed batch). > > To retain exactly once semantics you could also make your batch sizes much > larger, by default they read 1MB from each kafka partition (see bufferSize > and fetchSize configuration option in Kafka Spout). IF you increased batch > size, and you're doing some kind of key based aggregations, then this would > reduce the total number of writes you would need to do your persistent > storage. > > Trident could definitely be improved here, so your mileage may vary. > > > On Wed, Apr 9, 2014 at 9:15 AM, Raphael Hsieh <[email protected]>wrote: > >> I have been struggling to figure out how to get trident to aggregate data >> over a certain time period then flush the data to an external data store. >> The reasoning behind this is to reduce the number of reads and writes >> sent to the database. >> >> I've seen that Storm allows for tick tuples to be inserted into the >> stream, however I can't figure out how to do this with trident. I had >> thought that this functionality was added with Storm version 0.8.0 ? >> Is this the case ? >> >> One thing I had tried was to create a new stream that emitted a tuple >> once every X time period, then I tried to merge this stream into my actual >> data stream. However, doing this would result in a non transactional stream >> which would be no good. Also it didn't work, as the resulting stream only >> consisted of tuples from my clock stream. >> >> Can anybody help me figure out how to have Trident aggregate data over a >> certain time frame, flush it out to an external datastore, then rinse and >> repeat ? >> >> there are some blogs out there regarding how to use a sliding window in >> storm, however I just want sequential windows in Trident. >> >> Thanks >> >> -- >> Raphael Hsieh >> >> >> >> > > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
