Tousif, How did you deployed the topology . Is this a distributed storm
cluster or LocalCluster. LocalCluster is a simulated storm cluster
which should only be used for dev purposes when you are writing your
topology and you want test it out. LocalCluster has in-process
zookeeper when you shutdown the this cluster it will delete the
contents of the zookeeper hence your kafka topology offset won't be
available when next time you restart. As I said above this should only
used for testing your topology.

If you are using storm distributed cluster . Submit topology with
spoutConfig.froceFromStart=true for the first time if you want to read
from the beginning of the queue. For the subsequent times when you
redeploy the topology make sure you set spoutConfig.forceFromStart=false
so that your topology picks up the kafka offset from zookeeper and
starts where its left off.

-Harsha On Mon, Mar 9, 2015, at 02:59 AM, Tousif wrote:
> Since local cluster has in process zoookeper so I tried it in a
> distributed cluster. But was not able to get those messages
>
> On Mon, Mar 9, 2015 at 3:27 PM, Tousif <[email protected]> wrote:
>> Hello,
>>
>> I'm trying to read message from kafka which were not processed when
>> topology was offline and restarted after a while.
>>
>> I tried following config
>>
>> SpoutConfig spoutConfig = new SpoutConfig(hosts,
>> PropertyManager.getProperty("kafka.spout.topic").toString(), "/" +
>> PropertyManager.getProperty("kafka.spout.topic").toString(),
>> UUID.randomUUID().toString()); spoutConfig.scheme = new
>> SchemeAsMultiScheme(new StringScheme()); *spoutConfig.forceFromStart
>> = true; * *spoutConfig.startOffsetTime =
>> kafka.api.OffsetRequest.LatestTime();*
>>
>>
>> and
>>
>> *spoutConfig.forceFromStart = false; * *spoutConfig.startOffsetTime =
>> kafka.api.OffsetRequest.LatestTime();*
>>
>>
>> None of them give me messages which were not read while topology was
>> offline.
>>
>> Any help?
>>
>>
>>
>> --
>>
>>
>>
>> Regards khazi
>>
>>
>>
>
>
>
> --
>
>
> Regards Tousif Khazi
>

Reply via email to