This might happen if the batches are failing and replaying over and over again.
On Tuesday, March 10, 2015 2:36 AM, "Qian, Shilei" <[email protected]>
wrote:
<!--#yiv8465419964 _filtered #yiv8465419964 {font-family:SimSun;panose-1:2 1
6 0 3 1 1 1 1 1;} _filtered #yiv8465419964 {font-family:SimSun;panose-1:2 1 6 0
3 1 1 1 1 1;} _filtered #yiv8465419964 {font-family:Calibri;panose-1:2 15 5 2 2
2 4 3 2 4;} _filtered #yiv8465419964 {panose-1:2 1 6 0 3 1 1 1 1
1;}#yiv8465419964 #yiv8465419964 p.yiv8465419964MsoNormal, #yiv8465419964
li.yiv8465419964MsoNormal, #yiv8465419964 div.yiv8465419964MsoNormal
{margin:0in;margin-bottom:.0001pt;font-size:11.0pt;font-family:"Calibri",
"sans-serif";}#yiv8465419964 a:link, #yiv8465419964
span.yiv8465419964MsoHyperlink
{color:blue;text-decoration:underline;}#yiv8465419964 a:visited, #yiv8465419964
span.yiv8465419964MsoHyperlinkFollowed
{color:purple;text-decoration:underline;}#yiv8465419964
span.yiv8465419964EmailStyle17 {font-family:"Calibri",
"sans-serif";color:windowtext;}#yiv8465419964 .yiv8465419964MsoChpDefault
{font-family:"Calibri", "sans-serif";} _filtered #yiv8465419964 {margin:1.0in
1.0in 1.0in 1.0in;}#yiv8465419964 div.yiv8465419964WordSection1 {}-->Hi, I’m
running Storm Trident workload, fetching message from Kafka brokers. Storm
version is 0.9.3. I send just 64 records to Kafka, however, the trident will
process these records multiple times. Some code are given in the end, thanks
for your reading and sincerely wait for your help. BrokerHosts
brokerHosts = new ZkHosts(zkHost); TridentKafkaConfig tridentKafkaConfig =
new TridentKafkaConfig(brokerHosts,topic,consumerGroup);
tridentKafkaConfig.fetchSizeBytes = 10*1024; tridentKafkaConfig.scheme =
new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout
= new OpaqueTridentKafkaSpout(tridentKafkaConfig); topology
.newStream("bg0", spout) .each(spout.getOutputFields(), new Identity(),
new Fields("tuple")); public static class Identity extends BaseFunction
{ @Override public void execute(TridentTuple tuple,
TridentCollector collector){ collector.emit(new
Values(tuple.getValues())); } } Regards Qian, Shilei