Hello, This a bit of a complex topic and you have to decide whether you want to trade performance for persistence primitives that are provided from Trident (amongst other things). On one hand you have Trident which can enforce correct chronological ordering of the tuples as well as provide persistence primitives (such as partitionPersist etc.) and one the other hand you have regular Storm which is faster but cannot give such guarantees and you have to manually do it yourself. If you don't care about enforcing the chronological tuple order, say a node has failed and will send its results sometime later; then you can use (in Trident) Opaque spouts that are more flexible which are described in the docs. One last but important thing to note is that even with Trident you have to use a reliable persistent store if you want to never lose your results, local caching and storage methods will fail this requirement as if that particular host dies -- so does the existing state; don't get me wrong, the failed batches will be replayed, but the already existing local state gained will be lost -- even with Trident.
Hope this helped. On Fri, May 20, 2016 at 2:10 AM, Rajasekhar <[email protected]> wrote: > Hi, > > Are you using core storm ? then, please use Trident version of Storm. > Trident works on Micro batch principle rather than each tuple. You can > tune the batch size and flush the entire batch once and ack it back. > This way you will achieve bulk indexing in ES. > > On Thu, May 19, 2016 at 3:29 PM, Franck Lefebure < > [email protected]> wrote: > >> Hello, >> >> This is my first post on this ML, plz to meet you! >> >> I'm new to Storm. >> >> I've written topologies with some bolts that are doing indexation of raw >> datas in ElasticSearch. >> To enhance performances, It's better to do bulk indexing in ES. >> >> In my first tries, I was storing on the fly the Records in a buffer >> (HashMap) and ack immediately the corrsponding tuple. >> When the map size overflow a limit, or when I receive a tick tuple, I >> flush the Map to ES >> >> Now I would like that this ES indexing to be reliable. >> Actually, if the Storm cluster is killed, every records in the buffer are >> lost, because tuples have been acked, so won't be replayed, >> >> Actually, I'm experimenting a bufferization of the tuple. >> >> The buffer becomes so : >> >> Map<Tuple, RawData> buffer; >> >> For each tuple, I fill the buffer and don't ack the tuple : >> >> buffer.put(tuple, rawData); >> >> When the bulkIndexing is triggered, I massively ack the bufferized Tuple : >> >> try { >> logger.info("indexing "+buffer.size()+" raw datas"); >> rawDataRepository.save(buffer.values()); >> logger.info("rawdatas indexing took >> "+stopwatch.elapsed(TimeUnit.MILLISECONDS)+" ms"); >> Iterator<Tuple> it = buffer.keySet().iterator(); >> while (it.hasNext()) { >> Tuple tuple = it.next(); >> this.collector.ack(tuple); >> it.remove(); >> } >> } catch (Exception e) { >> logger.error("error while bulk indexing rawdatas. failing >> "+buffer.size()+" tuples",e); >> for (Tuple tuple : buffer.keySet()) { >> this.collector.fail(tuple); >> } >> >> } >> buffer.clear(); >> >> logger.info("rawdatas indexing exit at >> "+stopwatch.elapsed(TimeUnit.MILLISECONDS)+" ms"); >> >> >> On some tests, It seems to be functional, But I keep suspicious because I >> don't see the implications to retain the tuples like that. >> More over the bulk tuple acking seems to be anormaly slow >> >> 18:20:51.826 [Thread-24-rawdata-bolt] INFO >> c.s.streamer.bolts.RawDataBolt - rawdatas indexing took 680 ms >> 18:21:23.029 [Thread-24-rawdata-bolt] INFO >> c.s.streamer.bolts.RawDataBolt - rawdatas indexing exit at 31883 ms >> >> Does exist another pattern to achieve what I'm trying ? >> >> Thanks >> Franck >> >> >> >> > > > -- > Thanks & Regards > Rajasekhar >
