Hi,
I'm running Storm Trident workload, fetching message from Kafka brokers. Storm
version is 0.9.3.
I send just 64 records to Kafka, however, the trident will process these
records multiple times.
Some code are given in the end, thanks for your reading and sincerely wait for
your help.
BrokerHosts brokerHosts = new ZkHosts(zkHost);
TridentKafkaConfig tridentKafkaConfig = new
TridentKafkaConfig(brokerHosts,topic,consumerGroup);
tridentKafkaConfig.fetchSizeBytes = 10*1024;
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new
OpaqueTridentKafkaSpout(tridentKafkaConfig);
topology
.newStream("bg0", spout)
.each(spout.getOutputFields(), new Identity(), new Fields("tuple"));
public static class Identity extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector){
collector.emit(new Values(tuple.getValues()));
}
}
Regards
Qian, Shilei