did you try to set the number of spout as 1 beside 4 ?
2014-05-26 16:04 GMT+01:00 Irek Khasyanov <[email protected]>: > After hours of investigation, I have this: > > 1. Problem happens when some tuple did't ack properly, but all ack's > happens in same thread and right after tuple recieved. > 2. Time when duplicate record in database inserted relies on TOPOLOGY_ > MESSAGE_TIMEOUT_SECS > 3. storm-kafka-plus does't alerting about failed tuples (or maybe this is > storm problem) even with Config.setDebug(true); It's not possible to > investigate where magic happens. > 4. Removing anchoring in emitting bolt helps it most situations, sometimes > I see duplicate records, but it total mass of events it's not critical. > > I'm still not sure why failing tuples exists :) > > > > On 25 May 2014 20:20, Irek Khasyanov <[email protected]> wrote: > >> Hi everyone! >> >> I’m stuck with duplicates of tuples. >> >> I have kafka-spout with 4 workers for 4 partitions. Some tuples >> duplicating as I see in database, twice or triple. This happens not just in >> time, but after few second, about 30 second. Sometimes this happens after >> few minutes. I can’t figure out what happens, everything is ok in workers >> log files. >> >> My topology config: >> >> TopologyBuilder builder = new TopologyBuilder(); >> builder.setSpout("event", kafkaSpout, 4).setNumTasks(4); >> builder.setBolt(“events", new EventsBatchBolt(topologyConfig), 1) >> .shuffleGrouping("event") >> .setNumTasks(1); >> >> builder.setBolt("properties", new >> EventPropertiesBolt(topologyConfig), 1) >> .fieldsGrouping("vertica", new Fields("event_id", >> "fieldname", "repr_type", "repr_value")) >> .setNumTasks(1); >> >> storm config: >> >> Config config = new Config(); >> config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60); >> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 20); >> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2); >> >> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); >> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 131072); >> >> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 131072); >> config.setNumAckers(2); >> config.setDebug(false); >> >> My guess, I doing something wrong with topology config or something wrong >> with storm-kafka-plus 0.4. >> >> Any ideas why I have duplicates? >> >> Thanks >> >> — With best regards, Irek Khasyanov >> > > > > -- > With best regards, Irek Khasyanov. > -- *Al Fartakh Bilal*
