Thanks Rajasekhar & Andrew,

Thanks for the hint to Trident, I've planned to study it.
For the moment I prefer to stick to Storm Core cause I need to well
undestand the core concepts before adding another abstraction layer.
Furthermore chronological order and duplicate replays are not a matter for
me (but performances are)

I've googled a lot since yesterday, and I think now that batch acking of
tuples is a valid pattern (seems to
be the basis of trident too)
Eg
https://github.com/fhussonnois/storm-trident-elasticsearch/blob/master/src/main/java/com/github/fhuss/storm/elasticsearch/bolt/IndexBatchBolt.java
is a (better) variation of my Bolt, with a Trident state.

    protected void ackAll(List<Tuple> inputs) {
        for(Tuple t : inputs)
            outputCollector.ack(t);
    }

I need now to understand why my acking are so costly.
I Tried to tune the number of ackers but I don't see difference.
I think I have not enough resources. Actually I dig into that direction.

Also I'm going to try to transfert the rawdata by bunch to limit the acks.
(got this idea here :
https://www.loggly.com/blog/what-we-learned-about-scaling-with-apache-storm/
)

Franck








2016-05-19 21:24 GMT-04:00 Andrew Xor <[email protected]>:

> Hello,
>
>  This a bit of a complex topic and you have to decide whether you want to
> trade performance for persistence primitives that are provided from Trident
> (amongst other things). On one hand you have Trident which can enforce
> correct chronological ordering of the tuples as well as provide persistence
> primitives (such as partitionPersist etc.) and one the other hand you have
> regular Storm which is faster but cannot give such guarantees and you have
> to manually do it yourself. If you don't care about enforcing the
> chronological tuple order, say a node has failed and will send its results
> sometime later; then you can use (in Trident) Opaque spouts that are more
> flexible which are described in the docs. One last but important thing to
> note is that even with Trident you have to use a reliable persistent store
> if you want to never lose your results, local caching and storage methods
> will fail this requirement as if that particular host dies -- so does the
> existing state; don't get me wrong, the failed batches will be replayed,
> but the already existing local state gained will be lost -- even with
> Trident.
>
> Hope this helped.
>
>
> On Fri, May 20, 2016 at 2:10 AM, Rajasekhar <[email protected]>
> wrote:
>
>> Hi,
>>
>> Are you using core storm ? then, please use Trident version of Storm.
>> Trident works on Micro batch principle rather than each tuple. You can
>> tune the batch size and flush the entire batch once and ack it back.
>> This way you will achieve bulk indexing in ES.
>>
>> On Thu, May 19, 2016 at 3:29 PM, Franck Lefebure <
>> [email protected]> wrote:
>>
>>> Hello,
>>>
>>> This is my first post on this ML, plz to meet you!
>>>
>>> I'm new to Storm.
>>>
>>> I've written topologies with some bolts that are doing indexation of raw
>>> datas in  ElasticSearch.
>>> To enhance performances, It's better to do bulk indexing in ES.
>>>
>>> In my first tries, I was storing on the fly the Records in a buffer
>>> (HashMap) and ack immediately the corrsponding tuple.
>>> When the map size overflow a limit, or when I receive a tick tuple, I
>>> flush the Map to ES
>>>
>>> Now I would like that this ES indexing to be reliable.
>>> Actually, if the Storm cluster is killed, every records in the buffer
>>> are lost, because tuples have been acked, so won't be replayed,
>>>
>>> Actually, I'm experimenting a bufferization of the tuple.
>>>
>>> The buffer becomes so :
>>>
>>> Map<Tuple, RawData> buffer;
>>>
>>>  For each tuple, I fill the buffer and don't ack the tuple :
>>>
>>> buffer.put(tuple, rawData);
>>>
>>> When the bulkIndexing is triggered, I massively ack the bufferized Tuple
>>> :
>>>
>>> try {
>>>     logger.info("indexing  "+buffer.size()+" raw datas");
>>>     rawDataRepository.save(buffer.values());
>>>     logger.info("rawdatas indexing took 
>>> "+stopwatch.elapsed(TimeUnit.MILLISECONDS)+" ms");
>>>     Iterator<Tuple> it = buffer.keySet().iterator();
>>>     while (it.hasNext()) {
>>>         Tuple tuple = it.next();
>>>         this.collector.ack(tuple);
>>>         it.remove();
>>>     }
>>> } catch (Exception e) {
>>>     logger.error("error while bulk indexing rawdatas. failing 
>>> "+buffer.size()+" tuples",e);
>>>     for (Tuple tuple : buffer.keySet()) {
>>>         this.collector.fail(tuple);
>>>     }
>>>
>>> }
>>> buffer.clear();
>>>
>>> logger.info("rawdatas indexing exit at 
>>> "+stopwatch.elapsed(TimeUnit.MILLISECONDS)+" ms");
>>>
>>>
>>> On some tests, It seems to be functional, But I keep suspicious because
>>> I don't see the implications to retain the tuples like that.
>>> More over the bulk tuple acking seems to be anormaly slow
>>>
>>> 18:20:51.826 [Thread-24-rawdata-bolt] INFO
>>>  c.s.streamer.bolts.RawDataBolt - rawdatas indexing took 680 ms
>>> 18:21:23.029 [Thread-24-rawdata-bolt] INFO
>>>  c.s.streamer.bolts.RawDataBolt - rawdatas indexing exit at 31883 ms
>>>
>>> Does exist another pattern to achieve what I'm trying ?
>>>
>>> Thanks
>>> Franck
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Thanks & Regards
>> Rajasekhar
>>
>
>

Reply via email to