Hello,

This is my first post on this ML, plz to meet you!

I'm new to Storm.

I've written topologies with some bolts that are doing indexation of raw
datas in  ElasticSearch.
To enhance performances, It's better to do bulk indexing in ES.

In my first tries, I was storing on the fly the Records in a buffer
(HashMap) and ack immediately the corrsponding tuple.
When the map size overflow a limit, or when I receive a tick tuple, I flush
the Map to ES

Now I would like that this ES indexing to be reliable.
Actually, if the Storm cluster is killed, every records in the buffer are
lost, because tuples have been acked, so won't be replayed,

Actually, I'm experimenting a bufferization of the tuple.

The buffer becomes so :

Map<Tuple, RawData> buffer;

 For each tuple, I fill the buffer and don't ack the tuple :

buffer.put(tuple, rawData);

When the bulkIndexing is triggered, I massively ack the bufferized Tuple :

try {
    logger.info("indexing  "+buffer.size()+" raw datas");
    rawDataRepository.save(buffer.values());
    logger.info("rawdatas indexing took
"+stopwatch.elapsed(TimeUnit.MILLISECONDS)+" ms");
    Iterator<Tuple> it = buffer.keySet().iterator();
    while (it.hasNext()) {
        Tuple tuple = it.next();
        this.collector.ack(tuple);
        it.remove();
    }
} catch (Exception e) {
    logger.error("error while bulk indexing rawdatas. failing
"+buffer.size()+" tuples",e);
    for (Tuple tuple : buffer.keySet()) {
        this.collector.fail(tuple);
    }

}
buffer.clear();

logger.info("rawdatas indexing exit at
"+stopwatch.elapsed(TimeUnit.MILLISECONDS)+" ms");


On some tests, It seems to be functional, But I keep suspicious because I
don't see the implications to retain the tuples like that.
More over the bulk tuple acking seems to be anormaly slow

18:20:51.826 [Thread-24-rawdata-bolt] INFO  c.s.streamer.bolts.RawDataBolt
- rawdatas indexing took 680 ms
18:21:23.029 [Thread-24-rawdata-bolt] INFO  c.s.streamer.bolts.RawDataBolt
- rawdatas indexing exit at 31883 ms

Does exist another pattern to achieve what I'm trying ?

Thanks
Franck

Reply via email to