Hi,

You set startOffsetTime to "earliest" so that it will always read from the
beginning of Kafka. You may try "latest".

Thanks,
Manu Zhang

pramod niralakeri <pramod.niralak...@gmail.com>于2016年9月19日周一 下午1:07写道:

> Hi Storm Team, i have deployed storm topology in cluster, working all fine,
> only problem is that it reading repeated msg's.
>
> following is my storm configuration, hoping for solution, Thanks in advance
>
> Config conf = new Config();
>
> BrokerHosts hosts = new ZkHosts("localhost:2181");
> TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topicName);
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> kafkaConfig.forceFromStart = true;
> kafkaConfig.fetchSizeBytes = 1024 * 1024;
> kafkaConfig.bufferSizeBytes = 1024 * 1024;
> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
> OpaqueTridentKafkaSpout spout =  new OpaqueTridentKafkaSpout(kafkaConfig);
> StormSubmitter.submitTopology(name, conf,buildTopology(spout));
> .............
> .............
> .............
> .............
>
>
> /****STREAM****/
> Stream stream = topology.newStream("spout", spout)
> .each(new Fields("str"), new TupleParser(), new Fields("parse"))
> .each(new Fields("parse"), new TupleFilter())
> .each(new Fields("parse"), new TupleTransformation(), new Fields("trans"))
> .each(new Fields("trans"), new DimensionUpdate(), new
> Fields(finalColumnList));
>
>
>
> Regards
> pramod
>

Reply via email to