Harsha,

Its Distributed.  what is the significance of
*kafka.api.OffsetRequest.LatestTime()
 *wrt reading from where storm left last time.


On Mon, Mar 9, 2015 at 7:42 PM, Harsha <[email protected]> wrote:

>  Tousif,
>        How did you deployed the topology . Is this a distributed storm
> cluster or LocalCluster. LocalCluster is a simulated storm cluster which
> should only be used for dev purposes when you are writing your topology and
> you want test it out.
> LocalCluster has in-process zookeeper when you shutdown the this cluster
> it will delete the contents of the zookeeper hence your kafka topology
> offset won't be available when next time you restart. As I said above this
> should only used for testing your topology.
>
>         If you are using storm distributed cluster . Submit topology with
> spoutConfig.froceFromStart=true for the first time if you want to read from
> the beginning of the queue. For the subsequent times when you redeploy the
> topology make sure you set spoutConfig.forceFromStart=false so that your
> topology picks up the kafka offset from zookeeper and starts where its left
> off.
>
> -Harsha
> On Mon, Mar 9, 2015, at 02:59 AM, Tousif wrote:
>
> Since local cluster has in process zoookeper so I tried it in a
> distributed cluster. But was not able to get those messages
>
> On Mon, Mar 9, 2015 at 3:27 PM, Tousif <[email protected]> wrote:
>
> Hello,
>
> I'm trying to read message from kafka which were not processed when
> topology was offline and restarted after a while.
>
> I tried following config
>
> SpoutConfig spoutConfig = new SpoutConfig(hosts,
> PropertyManager.getProperty("kafka.spout.topic").toString(), "/" +
> PropertyManager.getProperty("kafka.spout.topic").toString(),
> UUID.randomUUID().toString());
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>  *spoutConfig.forceFromStart = true; *
> *spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();*
>
>
> and
>
> *spoutConfig.forceFromStart = false; *
> *spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();*
>
>
> None of them give me messages which were not read while topology was
> offline.
>
> Any help?
>
>
>
> --
>
>
>
> Regards
> khazi
>
>
>
>
>
>
>
> --
>
>
> Regards
> Tousif Khazi
>
>
>
>



-- 


Regards
Tousif Khazi

Reply via email to