Hi, Are you using core storm ? then, please use Trident version of Storm. Trident works on Micro batch principle rather than each tuple. You can tune the batch size and flush the entire batch once and ack it back. This way you will achieve bulk indexing in ES.
On Thu, May 19, 2016 at 3:29 PM, Franck Lefebure < [email protected]> wrote: > Hello, > > This is my first post on this ML, plz to meet you! > > I'm new to Storm. > > I've written topologies with some bolts that are doing indexation of raw > datas in ElasticSearch. > To enhance performances, It's better to do bulk indexing in ES. > > In my first tries, I was storing on the fly the Records in a buffer > (HashMap) and ack immediately the corrsponding tuple. > When the map size overflow a limit, or when I receive a tick tuple, I > flush the Map to ES > > Now I would like that this ES indexing to be reliable. > Actually, if the Storm cluster is killed, every records in the buffer are > lost, because tuples have been acked, so won't be replayed, > > Actually, I'm experimenting a bufferization of the tuple. > > The buffer becomes so : > > Map<Tuple, RawData> buffer; > > For each tuple, I fill the buffer and don't ack the tuple : > > buffer.put(tuple, rawData); > > When the bulkIndexing is triggered, I massively ack the bufferized Tuple : > > try { > logger.info("indexing "+buffer.size()+" raw datas"); > rawDataRepository.save(buffer.values()); > logger.info("rawdatas indexing took > "+stopwatch.elapsed(TimeUnit.MILLISECONDS)+" ms"); > Iterator<Tuple> it = buffer.keySet().iterator(); > while (it.hasNext()) { > Tuple tuple = it.next(); > this.collector.ack(tuple); > it.remove(); > } > } catch (Exception e) { > logger.error("error while bulk indexing rawdatas. failing > "+buffer.size()+" tuples",e); > for (Tuple tuple : buffer.keySet()) { > this.collector.fail(tuple); > } > > } > buffer.clear(); > > logger.info("rawdatas indexing exit at > "+stopwatch.elapsed(TimeUnit.MILLISECONDS)+" ms"); > > > On some tests, It seems to be functional, But I keep suspicious because I > don't see the implications to retain the tuples like that. > More over the bulk tuple acking seems to be anormaly slow > > 18:20:51.826 [Thread-24-rawdata-bolt] INFO c.s.streamer.bolts.RawDataBolt > - rawdatas indexing took 680 ms > 18:21:23.029 [Thread-24-rawdata-bolt] INFO c.s.streamer.bolts.RawDataBolt > - rawdatas indexing exit at 31883 ms > > Does exist another pattern to achieve what I'm trying ? > > Thanks > Franck > > > > -- Thanks & Regards Rajasekhar
