Harsha, Its Distributed. what is the significance of *kafka.api.OffsetRequest.LatestTime() *wrt reading from where storm left last time.
On Mon, Mar 9, 2015 at 7:42 PM, Harsha <[email protected]> wrote: > Tousif, > How did you deployed the topology . Is this a distributed storm > cluster or LocalCluster. LocalCluster is a simulated storm cluster which > should only be used for dev purposes when you are writing your topology and > you want test it out. > LocalCluster has in-process zookeeper when you shutdown the this cluster > it will delete the contents of the zookeeper hence your kafka topology > offset won't be available when next time you restart. As I said above this > should only used for testing your topology. > > If you are using storm distributed cluster . Submit topology with > spoutConfig.froceFromStart=true for the first time if you want to read from > the beginning of the queue. For the subsequent times when you redeploy the > topology make sure you set spoutConfig.forceFromStart=false so that your > topology picks up the kafka offset from zookeeper and starts where its left > off. > > -Harsha > On Mon, Mar 9, 2015, at 02:59 AM, Tousif wrote: > > Since local cluster has in process zoookeper so I tried it in a > distributed cluster. But was not able to get those messages > > On Mon, Mar 9, 2015 at 3:27 PM, Tousif <[email protected]> wrote: > > Hello, > > I'm trying to read message from kafka which were not processed when > topology was offline and restarted after a while. > > I tried following config > > SpoutConfig spoutConfig = new SpoutConfig(hosts, > PropertyManager.getProperty("kafka.spout.topic").toString(), "/" + > PropertyManager.getProperty("kafka.spout.topic").toString(), > UUID.randomUUID().toString()); > spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > *spoutConfig.forceFromStart = true; * > *spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();* > > > and > > *spoutConfig.forceFromStart = false; * > *spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();* > > > None of them give me messages which were not read while topology was > offline. > > Any help? > > > > -- > > > > Regards > khazi > > > > > > > > -- > > > Regards > Tousif Khazi > > > > -- Regards Tousif Khazi
