Tousif, How did you deployed the topology . Is this a distributed storm cluster or LocalCluster. LocalCluster is a simulated storm cluster which should only be used for dev purposes when you are writing your topology and you want test it out. LocalCluster has in-process zookeeper when you shutdown the this cluster it will delete the contents of the zookeeper hence your kafka topology offset won't be available when next time you restart. As I said above this should only used for testing your topology.
If you are using storm distributed cluster . Submit topology with spoutConfig.froceFromStart=true for the first time if you want to read from the beginning of the queue. For the subsequent times when you redeploy the topology make sure you set spoutConfig.forceFromStart=false so that your topology picks up the kafka offset from zookeeper and starts where its left off. -Harsha On Mon, Mar 9, 2015, at 02:59 AM, Tousif wrote: > Since local cluster has in process zoookeper so I tried it in a > distributed cluster. But was not able to get those messages > > On Mon, Mar 9, 2015 at 3:27 PM, Tousif <[email protected]> wrote: >> Hello, >> >> I'm trying to read message from kafka which were not processed when >> topology was offline and restarted after a while. >> >> I tried following config >> >> SpoutConfig spoutConfig = new SpoutConfig(hosts, >> PropertyManager.getProperty("kafka.spout.topic").toString(), "/" + >> PropertyManager.getProperty("kafka.spout.topic").toString(), >> UUID.randomUUID().toString()); spoutConfig.scheme = new >> SchemeAsMultiScheme(new StringScheme()); *spoutConfig.forceFromStart >> = true; * *spoutConfig.startOffsetTime = >> kafka.api.OffsetRequest.LatestTime();* >> >> >> and >> >> *spoutConfig.forceFromStart = false; * *spoutConfig.startOffsetTime = >> kafka.api.OffsetRequest.LatestTime();* >> >> >> None of them give me messages which were not read while topology was >> offline. >> >> Any help? >> >> >> >> -- >> >> >> >> Regards khazi >> >> >> > > > > -- > > > Regards Tousif Khazi >
