After hours of investigation, I have this:

1. Problem happens when some tuple did't ack properly, but all ack's
happens in same thread and right after tuple recieved.
2. Time when duplicate record in database inserted relies on TOPOLOGY_
MESSAGE_TIMEOUT_SECS
3. storm-kafka-plus does't alerting about failed tuples (or maybe this is
storm problem) even with Config.setDebug(true); It's not possible to
investigate where magic happens.
4. Removing anchoring in emitting bolt helps it most situations, sometimes
I see duplicate records, but it total mass of events it's not critical.

I'm still not sure why failing tuples exists :)



On 25 May 2014 20:20, Irek Khasyanov <[email protected]> wrote:

> Hi everyone!
>
> I’m stuck with duplicates of tuples.
>
> I have kafka-spout with 4 workers for 4 partitions. Some tuples
> duplicating as I see in database, twice or triple. This happens not just in
> time, but after few second, about 30 second. Sometimes this happens after
> few minutes. I can’t figure out what happens, everything is ok in workers
> log files.
>
> My topology config:
>
> TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("event", kafkaSpout, 4).setNumTasks(4);
>         builder.setBolt(“events", new EventsBatchBolt(topologyConfig), 1)
>                 .shuffleGrouping("event")
>                 .setNumTasks(1);
>
>         builder.setBolt("properties", new
> EventPropertiesBolt(topologyConfig), 1)
>                 .fieldsGrouping("vertica", new Fields("event_id",
> "fieldname", "repr_type", "repr_value"))
>                 .setNumTasks(1);
>
> storm config:
>
> Config config = new Config();
>         config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60);
>         config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 20);
>         config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2);
>
>         config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             8);
>         config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
>         config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 131072);
>
>         config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,    131072);
>         config.setNumAckers(2);
>         config.setDebug(false);
>
> My guess, I doing something wrong with topology config or something wrong
> with storm-kafka-plus 0.4.
>
> Any ideas why I have duplicates?
>
> Thanks
>
> — With best regards, Irek Khasyanov
>



-- 
With best regards, Irek Khasyanov.

Reply via email to