Hi, You set startOffsetTime to "earliest" so that it will always read from the beginning of Kafka. You may try "latest".
Thanks, Manu Zhang pramod niralakeri <pramod.niralak...@gmail.com>于2016年9月19日周一 下午1:07写道: > Hi Storm Team, i have deployed storm topology in cluster, working all fine, > only problem is that it reading repeated msg's. > > following is my storm configuration, hoping for solution, Thanks in advance > > Config conf = new Config(); > > BrokerHosts hosts = new ZkHosts("localhost:2181"); > TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topicName); > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > kafkaConfig.forceFromStart = true; > kafkaConfig.fetchSizeBytes = 1024 * 1024; > kafkaConfig.bufferSizeBytes = 1024 * 1024; > kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); > OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(kafkaConfig); > StormSubmitter.submitTopology(name, conf,buildTopology(spout)); > ............. > ............. > ............. > ............. > > > /****STREAM****/ > Stream stream = topology.newStream("spout", spout) > .each(new Fields("str"), new TupleParser(), new Fields("parse")) > .each(new Fields("parse"), new TupleFilter()) > .each(new Fields("parse"), new TupleTransformation(), new Fields("trans")) > .each(new Fields("trans"), new DimensionUpdate(), new > Fields(finalColumnList)); > > > > Regards > pramod >