That may be an alternative feasible approach. You can call ConsumerConnector.shutdown() to close the consumer cleanly.
Thanks, Jun On Thu, Jan 17, 2013 at 6:20 AM, navneet sharma <navneetsharma0...@gmail.com > wrote: > That makes sense. > > I tried an alternate approach- i am using high level consumer and going > through Hadoop HDFS APIs and pushing data in HDFS. > > I am not creating any jobs for that. > > The only problem i am seeing here is that the consumer is designed to run > forever. Which means i need to find out how to close the HDFS file and kill > consumer. > > Is there any way to kill or close high level consumer gracefully? > > I am running v0.7.0. I don't mind upgrading to higher version if that > allows me this kind of consumer handling. > > Thanks, > Navneet > > > On Thu, Jan 17, 2013 at 10:41 AM, Jun Rao <jun...@gmail.com> wrote: > > > I think the main reason for using SimpleConsumer is to manage offsets > > explicitly. For example, this is useful when Hadoop retries failed tasks. > > Another reason is that Hadoop already does load balancing. So, there is > not > > much need to balance the load again using the high level consumer. > > > > Thanks, > > > > Jun > > > > On Wed, Jan 16, 2013 at 4:40 PM, navneet sharma < > > navneetsharma0...@gmail.com > > > wrote: > > > > > Thanks Felix. > > > > > > One question still remains. Why SimpleConsumer? > > > Why not high level Consumer? If i change the code to high level > consumer, > > > will it create any challenges? > > > > > > > > > Navneet > > > > > > > > > On Tue, Jan 15, 2013 at 11:46 PM, Felix GV <fe...@mate1inc.com> wrote: > > > > > > > Please read the Kafka design paper < > > http://kafka.apache.org/design.html > > > >. > > > > > > > > It may look a little long, but it's as short as it can be. Kafka > > differs > > > > from other messaging system in a couple of ways, and it's important > to > > > > understand the fundamental design choices that were made in order to > > > > understand the way Kafka works. > > > > > > > > I believe my previous email already answers both your offset tracking > > and > > > > retention questions, but if my explanation are not clear enough, then > > the > > > > next best thing is probably to read the design paper :) > > > > > > > > -- > > > > Felix > > > > > > > > > > > > On Tue, Jan 15, 2013 at 12:01 PM, navneet sharma < > > > > navneetsharma0...@gmail.com> wrote: > > > > > > > > > Thanks Felix for sharing your work. Contrib hadoop-consumer looks > > like > > > > the > > > > > same way. > > > > > > > > > > I think i need to really understand this offset stuff. So far i > have > > > used > > > > > only high level consumer.When consumer is done reading all the > > > messages, > > > > i > > > > > used to kill the process(because it won't on its own). > > > > > > > > > > Again i used Producer to pump more messages and Consumer to read > the > > > new > > > > > messages(which is a new process as i killed the last consumer). > > > > > > > > > > But i never saw messages getting duplicating. > > > > > > > > > > Now its not very clear for me that how offsets is tracked > > specifically > > > > when > > > > > i am re-launching the consumer? > > > > > And why retention policy is not working when used with > > SimpleConsumer? > > > > For > > > > > my experiment i made it 4 hours. > > > > > > > > > > Please help me understand. > > > > > > > > > > Thanks, > > > > > Navneet > > > > > > > > > > > > > > > On Tue, Jan 15, 2013 at 4:12 AM, Felix GV <fe...@mate1inc.com> > > wrote: > > > > > > > > > > > I think you may be misunderstanding the way Kafka works. > > > > > > > > > > > > A kafka broker is never supposed to clear messages just because a > > > > > consumer > > > > > > read them. > > > > > > > > > > > > The kafka broker will instead clear messages after their > retention > > > > period > > > > > > ends, though it will not delete the messages at the exact time > when > > > > they > > > > > > expire. Instead, a background process will periodically delete a > > > batch > > > > of > > > > > > expired messages. The retention policies guarantee a minimum > > > retention > > > > > > time, not an exact retention time. > > > > > > > > > > > > It is the responsibility of each consumer to keep track of which > > > > messages > > > > > > they have consumed already (by recording an offset for each > > consumed > > > > > > partition). The high-level consumer stores these offsets in ZK. > The > > > > > simple > > > > > > consumer has no built-in capability to store and manage offsets, > so > > > it > > > > is > > > > > > the developer's responsibility to do so. In the case of the > hadoop > > > > > consumer > > > > > > in the contrib package, these offsets are stored in offset files > > > within > > > > > > HDFS. > > > > > > > > > > > > I wrote a blog post a while ago that explains how to use the > offset > > > > files > > > > > > generated by the contrib consumer to do incremental consumption > (so > > > > that > > > > > > you don't get duplicated messages by re-consuming everything in > > > > > subsequent > > > > > > runs). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://felixgv.com/post/69/automating-incremental-imports-with-the-kafka-hadoop-consumer/ > > > > > > > > > > > > I'm not sure how up to date this is, regarding the current Kafka > > > > > versions, > > > > > > but it may still give you some useful pointers... > > > > > > > > > > > > -- > > > > > > Felix > > > > > > > > > > > > -- > > > > > > Felix > > > > > > > > > > > > > > > > > > On Mon, Jan 14, 2013 at 1:34 PM, navneet sharma < > > > > > > navneetsharma0...@gmail.com > > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > I am trying to use the code supplied in hadoop-consumer > package. > > I > > > am > > > > > > > running into following issues: > > > > > > > > > > > > > > 1) This code is using SimpleConsumer which is actually > contacting > > > > Kafka > > > > > > > Broker without Zookeeper. Because of which messages are not > > getting > > > > > > cleared > > > > > > > from broker. > > > > > > > And i am getting duplicate messages in each run. > > > > > > > > > > > > > > 2) The retention policy specified as log.retention.hours in > > > > > > > server.properties is not working. Not sure if its due to > > > > > SimpleConsumer. > > > > > > > > > > > > > > Is it expected behaviour. Is there any code using high level > > > consumer > > > > > for > > > > > > > same work? > > > > > > > > > > > > > > Thanks, > > > > > > > Navneet Sharma > > > > > > > > > > > > > > > > > > > > > > > > > > > >