Thanks Rajasekhar & Andrew, Thanks for the hint to Trident, I've planned to study it. For the moment I prefer to stick to Storm Core cause I need to well undestand the core concepts before adding another abstraction layer. Furthermore chronological order and duplicate replays are not a matter for me (but performances are)
I've googled a lot since yesterday, and I think now that batch acking of tuples is a valid pattern (seems to be the basis of trident too) Eg https://github.com/fhussonnois/storm-trident-elasticsearch/blob/master/src/main/java/com/github/fhuss/storm/elasticsearch/bolt/IndexBatchBolt.java is a (better) variation of my Bolt, with a Trident state. protected void ackAll(List<Tuple> inputs) { for(Tuple t : inputs) outputCollector.ack(t); } I need now to understand why my acking are so costly. I Tried to tune the number of ackers but I don't see difference. I think I have not enough resources. Actually I dig into that direction. Also I'm going to try to transfert the rawdata by bunch to limit the acks. (got this idea here : https://www.loggly.com/blog/what-we-learned-about-scaling-with-apache-storm/ ) Franck 2016-05-19 21:24 GMT-04:00 Andrew Xor <[email protected]>: > Hello, > > This a bit of a complex topic and you have to decide whether you want to > trade performance for persistence primitives that are provided from Trident > (amongst other things). On one hand you have Trident which can enforce > correct chronological ordering of the tuples as well as provide persistence > primitives (such as partitionPersist etc.) and one the other hand you have > regular Storm which is faster but cannot give such guarantees and you have > to manually do it yourself. If you don't care about enforcing the > chronological tuple order, say a node has failed and will send its results > sometime later; then you can use (in Trident) Opaque spouts that are more > flexible which are described in the docs. One last but important thing to > note is that even with Trident you have to use a reliable persistent store > if you want to never lose your results, local caching and storage methods > will fail this requirement as if that particular host dies -- so does the > existing state; don't get me wrong, the failed batches will be replayed, > but the already existing local state gained will be lost -- even with > Trident. > > Hope this helped. > > > On Fri, May 20, 2016 at 2:10 AM, Rajasekhar <[email protected]> > wrote: > >> Hi, >> >> Are you using core storm ? then, please use Trident version of Storm. >> Trident works on Micro batch principle rather than each tuple. You can >> tune the batch size and flush the entire batch once and ack it back. >> This way you will achieve bulk indexing in ES. >> >> On Thu, May 19, 2016 at 3:29 PM, Franck Lefebure < >> [email protected]> wrote: >> >>> Hello, >>> >>> This is my first post on this ML, plz to meet you! >>> >>> I'm new to Storm. >>> >>> I've written topologies with some bolts that are doing indexation of raw >>> datas in ElasticSearch. >>> To enhance performances, It's better to do bulk indexing in ES. >>> >>> In my first tries, I was storing on the fly the Records in a buffer >>> (HashMap) and ack immediately the corrsponding tuple. >>> When the map size overflow a limit, or when I receive a tick tuple, I >>> flush the Map to ES >>> >>> Now I would like that this ES indexing to be reliable. >>> Actually, if the Storm cluster is killed, every records in the buffer >>> are lost, because tuples have been acked, so won't be replayed, >>> >>> Actually, I'm experimenting a bufferization of the tuple. >>> >>> The buffer becomes so : >>> >>> Map<Tuple, RawData> buffer; >>> >>> For each tuple, I fill the buffer and don't ack the tuple : >>> >>> buffer.put(tuple, rawData); >>> >>> When the bulkIndexing is triggered, I massively ack the bufferized Tuple >>> : >>> >>> try { >>> logger.info("indexing "+buffer.size()+" raw datas"); >>> rawDataRepository.save(buffer.values()); >>> logger.info("rawdatas indexing took >>> "+stopwatch.elapsed(TimeUnit.MILLISECONDS)+" ms"); >>> Iterator<Tuple> it = buffer.keySet().iterator(); >>> while (it.hasNext()) { >>> Tuple tuple = it.next(); >>> this.collector.ack(tuple); >>> it.remove(); >>> } >>> } catch (Exception e) { >>> logger.error("error while bulk indexing rawdatas. failing >>> "+buffer.size()+" tuples",e); >>> for (Tuple tuple : buffer.keySet()) { >>> this.collector.fail(tuple); >>> } >>> >>> } >>> buffer.clear(); >>> >>> logger.info("rawdatas indexing exit at >>> "+stopwatch.elapsed(TimeUnit.MILLISECONDS)+" ms"); >>> >>> >>> On some tests, It seems to be functional, But I keep suspicious because >>> I don't see the implications to retain the tuples like that. >>> More over the bulk tuple acking seems to be anormaly slow >>> >>> 18:20:51.826 [Thread-24-rawdata-bolt] INFO >>> c.s.streamer.bolts.RawDataBolt - rawdatas indexing took 680 ms >>> 18:21:23.029 [Thread-24-rawdata-bolt] INFO >>> c.s.streamer.bolts.RawDataBolt - rawdatas indexing exit at 31883 ms >>> >>> Does exist another pattern to achieve what I'm trying ? >>> >>> Thanks >>> Franck >>> >>> >>> >>> >> >> >> -- >> Thanks & Regards >> Rajasekhar >> > >
