I guess any update will have to wait till the upcoming consumer API changes are stable.
Srikanth On Fri, Apr 10, 2015 at 5:48 PM, Parth Brahmbhatt < [email protected]> wrote: > Currently the kafka spout always stores the offset in zookeeper and we > have not yet updated the code to leverage the feature you are referring to. > > Thanks > Parth > > From: Srikanth <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Friday, April 10, 2015 at 2:31 PM > To: "[email protected]" <[email protected]>, " > [email protected]" <[email protected]> > Subject: Re: Atleast once processing with KakfaSpout > > I was able to verify this on my setup. > > I'm still not able to figure out if KafkaSpout can write offset to Kafka > instead of Zookeeper. Has anyone tried this? > Anyway I can pass through consumer configs like "offsets.storage = kafka"? > > I found a few reference that explicitly say that KafkaSpout will write > to Zookeeper. > > http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.1/bk_user-guide/content/ch_storm-using-ingest-connectors.html > "The Kafka spout stores its offsets in the same instance of Zookeeper > used by Apache Storm." > > We have a Solr cloud using Zookeeper and now I'm planning to use it for > Kafka+Storm. Our clusters will be rather small but I'd like to reduce > zookeeper usage as much as possible. > Has anyone here tried three applications using same Zookeeper? We would > probably have 4-5 Solr cloud nodes, 3-4 kafka nodes, ~10 Storm nodes to > begin with. > > Thanks, > Srikanth > > On Thu, Apr 9, 2015 at 7:58 PM, Erik Weathers <[email protected]> > wrote: > >> Srikanth, >> >> Yes, the KafkaSpout ensures "at least once" processing. >> >> As for whether the topology restart leads to reading from the last >> offset committed, it is configurable. e.g., with the kafka-0.8 spout >> available with storm-0.9.3+, you can set the startOffsetTime to force a >> different behavior than "last committed offset": >> >> spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();? >> spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime() >> >> - Erik >> >> On Thu, Apr 9, 2015 at 4:12 PM, Srikanth <[email protected]> wrote: >> >>> Hello, >>> >>> We have an uses case for Storm+Kafka where we need atleast once >>> processing. >>> >>> If I understand correctly kafkaSpout >>> i) Adds each offset Id to pending set when emitting a tuple. >>> ii) Removes offset ID from pending set only when processing is >>> successful downstream(ack()) >>> iii) Does a commit() to zookeeper every stateUpdateIntervalMs. >>> iv) Commit writes lowest pending offset to Zookeeper. >>> >>> So, when a topology restarts, it will start from the last write in >>> step iv hence assuring atleast once processing. Is my understanding correct? >>> >>> Thanks, >>> Srikanth >>> >> >> >
