>did you try to set the number of spout as 1 beside 4 ? Not yet, I will look at current topology version for a day, and will try to change spout number. I'm worry, all messages with 4 partition on kafka will be consumed?
On 26 May 2014 19:10, Bilal Al Fartakh <[email protected]> wrote: > did you try to set the number of spout as 1 beside 4 ? > > > 2014-05-26 16:04 GMT+01:00 Irek Khasyanov <[email protected]>: > > After hours of investigation, I have this: >> >> 1. Problem happens when some tuple did't ack properly, but all ack's >> happens in same thread and right after tuple recieved. >> 2. Time when duplicate record in database inserted relies on TOPOLOGY_ >> MESSAGE_TIMEOUT_SECS >> 3. storm-kafka-plus does't alerting about failed tuples (or maybe this is >> storm problem) even with Config.setDebug(true); It's not possible to >> investigate where magic happens. >> 4. Removing anchoring in emitting bolt helps it most situations, >> sometimes I see duplicate records, but it total mass of events it's not >> critical. >> >> I'm still not sure why failing tuples exists :) >> >> >> >> On 25 May 2014 20:20, Irek Khasyanov <[email protected]> wrote: >> >>> Hi everyone! >>> >>> I’m stuck with duplicates of tuples. >>> >>> I have kafka-spout with 4 workers for 4 partitions. Some tuples >>> duplicating as I see in database, twice or triple. This happens not just in >>> time, but after few second, about 30 second. Sometimes this happens after >>> few minutes. I can’t figure out what happens, everything is ok in workers >>> log files. >>> >>> My topology config: >>> >>> TopologyBuilder builder = new TopologyBuilder(); >>> builder.setSpout("event", kafkaSpout, 4).setNumTasks(4); >>> builder.setBolt(“events", new EventsBatchBolt(topologyConfig), 1) >>> .shuffleGrouping("event") >>> .setNumTasks(1); >>> >>> builder.setBolt("properties", new >>> EventPropertiesBolt(topologyConfig), 1) >>> .fieldsGrouping("vertica", new Fields("event_id", >>> "fieldname", "repr_type", "repr_value")) >>> .setNumTasks(1); >>> >>> storm config: >>> >>> Config config = new Config(); >>> config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60); >>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 20); >>> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2); >>> >>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); >>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 131072); >>> >>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 131072); >>> config.setNumAckers(2); >>> config.setDebug(false); >>> >>> My guess, I doing something wrong with topology config or something >>> wrong with storm-kafka-plus 0.4. >>> >>> Any ideas why I have duplicates? >>> >>> Thanks >>> >>> — With best regards, Irek Khasyanov >>> >> >> >> >> -- >> With best regards, Irek Khasyanov. >> > > > > -- > *Al Fartakh Bilal* > -- With best regards, Irek Khasyanov.
