Hi Anand, You can use the high-level consumer and turn of auto.offset.commit, and do sth. like:
message = consumer.iter.next(); bool acked = false while (!acked) { process(message) acked = writeToDB(); } consumer.commit() Guozhang On Fri, Aug 1, 2014 at 3:30 AM, anand jain <anandjain1...@gmail.com> wrote: > I am very much new to Kafka and we are using Kafka 0.8.1. > > What I need to do is to consume a message from topic. For that, I will have > to write one consumer in Java which will consume a message from topic and > then save that message to database. After a message is saved, some > acknowledgement will be sent to Java consumer. If acknowledgement is true, > then next message should be consumed from the topic. If acknowldgement is > false(which means due to some error message,read from the topic, couldn't > be saved into the database), then again that message should be read. > > I think I need to use Simple Consumer,to have control over message offset > and have gone through the Simple Consumer example as given in this link > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > . > > In this example, offset is evaluated in run method as 'readOffset'. Do I > need to play with that? For e.g. I can use LatestTime() instead of > EarliestTime() and in case of false, I will reset the offset to the one > before using offset - 1. > > Is this how I should proceed? Or can the same be done using High Level API? > -- -- Guozhang