>did you try to set the number of spout as 1 beside 4 ?

Not yet, I will look at current topology version for a day, and will try to
change spout number. I'm worry, all messages with 4 partition on kafka will
be consumed?


On 26 May 2014 19:10, Bilal Al Fartakh <[email protected]> wrote:

> did you try to set the number of spout as 1 beside 4 ?
>
>
> 2014-05-26 16:04 GMT+01:00 Irek Khasyanov <[email protected]>:
>
> After hours of investigation, I have this:
>>
>> 1. Problem happens when some tuple did't ack properly, but all ack's
>> happens in same thread and right after tuple recieved.
>> 2. Time when duplicate record in database inserted relies on TOPOLOGY_
>> MESSAGE_TIMEOUT_SECS
>> 3. storm-kafka-plus does't alerting about failed tuples (or maybe this is
>> storm problem) even with Config.setDebug(true); It's not possible to
>> investigate where magic happens.
>> 4. Removing anchoring in emitting bolt helps it most situations,
>> sometimes I see duplicate records, but it total mass of events it's not
>> critical.
>>
>> I'm still not sure why failing tuples exists :)
>>
>>
>>
>> On 25 May 2014 20:20, Irek Khasyanov <[email protected]> wrote:
>>
>>> Hi everyone!
>>>
>>> I’m stuck with duplicates of tuples.
>>>
>>> I have kafka-spout with 4 workers for 4 partitions. Some tuples
>>> duplicating as I see in database, twice or triple. This happens not just in
>>> time, but after few second, about 30 second. Sometimes this happens after
>>> few minutes. I can’t figure out what happens, everything is ok in workers
>>> log files.
>>>
>>> My topology config:
>>>
>>> TopologyBuilder builder = new TopologyBuilder();
>>>         builder.setSpout("event", kafkaSpout, 4).setNumTasks(4);
>>>         builder.setBolt(“events", new EventsBatchBolt(topologyConfig), 1)
>>>                 .shuffleGrouping("event")
>>>                 .setNumTasks(1);
>>>
>>>         builder.setBolt("properties", new
>>> EventPropertiesBolt(topologyConfig), 1)
>>>                 .fieldsGrouping("vertica", new Fields("event_id",
>>> "fieldname", "repr_type", "repr_value"))
>>>                 .setNumTasks(1);
>>>
>>> storm config:
>>>
>>> Config config = new Config();
>>>         config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60);
>>>         config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 20);
>>>         config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2);
>>>
>>>         config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             8);
>>>         config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
>>>         config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 131072);
>>>
>>>         config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,    131072);
>>>         config.setNumAckers(2);
>>>         config.setDebug(false);
>>>
>>> My guess, I doing something wrong with topology config or something
>>> wrong with storm-kafka-plus 0.4.
>>>
>>> Any ideas why I have duplicates?
>>>
>>> Thanks
>>>
>>> — With best regards, Irek Khasyanov
>>>
>>
>>
>>
>> --
>> With best regards, Irek Khasyanov.
>>
>
>
>
> --
> *Al Fartakh Bilal*
>



-- 
With best regards, Irek Khasyanov.

Reply via email to