I'm not sure about the partitions , I had a similar problem with the simple Spout and not Kafka one , and the solution was switching to 1 spout . I hope this work for you , good luck !
2014-05-26 16:15 GMT+01:00 Irek Khasyanov <[email protected]>: > >did you try to set the number of spout as 1 beside 4 ? > > Not yet, I will look at current topology version for a day, and will try > to change spout number. I'm worry, all messages with 4 partition on kafka > will be consumed? > > > On 26 May 2014 19:10, Bilal Al Fartakh <[email protected]> wrote: > >> did you try to set the number of spout as 1 beside 4 ? >> >> >> 2014-05-26 16:04 GMT+01:00 Irek Khasyanov <[email protected]>: >> >> After hours of investigation, I have this: >>> >>> 1. Problem happens when some tuple did't ack properly, but all ack's >>> happens in same thread and right after tuple recieved. >>> 2. Time when duplicate record in database inserted relies on TOPOLOGY_ >>> MESSAGE_TIMEOUT_SECS >>> 3. storm-kafka-plus does't alerting about failed tuples (or maybe this >>> is storm problem) even with Config.setDebug(true); It's not possible to >>> investigate where magic happens. >>> 4. Removing anchoring in emitting bolt helps it most situations, >>> sometimes I see duplicate records, but it total mass of events it's not >>> critical. >>> >>> I'm still not sure why failing tuples exists :) >>> >>> >>> >>> On 25 May 2014 20:20, Irek Khasyanov <[email protected]> wrote: >>> >>>> Hi everyone! >>>> >>>> I’m stuck with duplicates of tuples. >>>> >>>> I have kafka-spout with 4 workers for 4 partitions. Some tuples >>>> duplicating as I see in database, twice or triple. This happens not just in >>>> time, but after few second, about 30 second. Sometimes this happens after >>>> few minutes. I can’t figure out what happens, everything is ok in workers >>>> log files. >>>> >>>> My topology config: >>>> >>>> TopologyBuilder builder = new TopologyBuilder(); >>>> builder.setSpout("event", kafkaSpout, 4).setNumTasks(4); >>>> builder.setBolt(“events", new EventsBatchBolt(topologyConfig), >>>> 1) >>>> .shuffleGrouping("event") >>>> .setNumTasks(1); >>>> >>>> builder.setBolt("properties", new >>>> EventPropertiesBolt(topologyConfig), 1) >>>> .fieldsGrouping("vertica", new Fields("event_id", >>>> "fieldname", "repr_type", "repr_value")) >>>> .setNumTasks(1); >>>> >>>> storm config: >>>> >>>> Config config = new Config(); >>>> config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60); >>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 20); >>>> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2); >>>> >>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); >>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, >>>> 131072); >>>> >>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, >>>> 131072); >>>> config.setNumAckers(2); >>>> config.setDebug(false); >>>> >>>> My guess, I doing something wrong with topology config or something >>>> wrong with storm-kafka-plus 0.4. >>>> >>>> Any ideas why I have duplicates? >>>> >>>> Thanks >>>> >>>> — With best regards, Irek Khasyanov >>>> >>> >>> >>> >>> -- >>> With best regards, Irek Khasyanov. >>> >> >> >> >> -- >> *Al Fartakh Bilal* >> > > > > -- > With best regards, Irek Khasyanov. > -- *Al Fartakh Bilal*
