Currently the kafka spout always stores the offset in zookeeper and we have not 
yet updated the code to leverage the feature you are referring to.

Thanks
Parth

From: Srikanth <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Friday, April 10, 2015 at 2:31 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>, 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Atleast once processing with KakfaSpout

I was able to verify this on my setup.

I'm still not able to figure out if KafkaSpout can write offset to Kafka 
instead of Zookeeper. Has anyone tried this?
Anyway I can pass through consumer configs like "offsets.storage = kafka"?

I found a few reference that explicitly say that KafkaSpout will write to 
Zookeeper.
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.1/bk_user-guide/content/ch_storm-using-ingest-connectors.html
"The Kafka spout stores its offsets in the same instance of Zookeeper used by 
Apache Storm."

We have a Solr cloud using Zookeeper and now I'm planning to use it for 
Kafka+Storm. Our clusters will be rather small but I'd like to reduce zookeeper 
usage as much as possible.
Has anyone here tried three applications using same Zookeeper? We would 
probably have 4-5 Solr cloud nodes, 3-4 kafka nodes, ~10 Storm nodes to begin 
with.

Thanks,
Srikanth

On Thu, Apr 9, 2015 at 7:58 PM, Erik Weathers 
<[email protected]<mailto:[email protected]>> wrote:
Srikanth,

Yes, the KafkaSpout ensures "at least once" processing.

As for whether the topology restart leads to reading from the last offset 
committed, it is configurable.  e.g., with the kafka-0.8 spout available with 
storm-0.9.3+, you can set the startOffsetTime to force a different behavior 
than "last committed offset":

  spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();?
  spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime()

- Erik

On Thu, Apr 9, 2015 at 4:12 PM, Srikanth 
<[email protected]<mailto:[email protected]>> wrote:
Hello,

We have an uses case for Storm+Kafka where we need atleast once processing.

If I understand correctly kafkaSpout
  i) Adds each offset Id to pending set when emitting a tuple.
  ii) Removes offset ID from pending set only when processing is successful 
downstream(ack())
  iii) Does a commit() to zookeeper every stateUpdateIntervalMs.
  iv) Commit writes lowest pending offset to Zookeeper.

So, when a topology restarts, it will start from the last write in step iv 
hence assuring atleast once processing. Is my understanding correct?

Thanks,
Srikanth


Reply via email to