Thanks Guozhang!! Below is the code for iterating over log messages: ......................................................... ......................................................... for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); *while (consumerIte.hasNext()){* System.out.println("Message from Topic :: " + new String(consumerIte.next().message())); } } ........................................................ .........................................................
As far as I understand, the statement *while (consumerIte.hasNext())* runs in an infinite loop and returns true whenever a message is published. How should I fit your piece of code(solution as suggested by you) here? Regards Anand On Fri, Aug 1, 2014 at 8:46 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Anand, > > You can use the high-level consumer and turn of auto.offset.commit, and do > sth. like: > > message = consumer.iter.next(); > bool acked = false > while (!acked) { > process(message) > acked = writeToDB(); > } > consumer.commit() > > > Guozhang > > > On Fri, Aug 1, 2014 at 3:30 AM, anand jain <anandjain1...@gmail.com> > wrote: > > > I am very much new to Kafka and we are using Kafka 0.8.1. > > > > What I need to do is to consume a message from topic. For that, I will > have > > to write one consumer in Java which will consume a message from topic and > > then save that message to database. After a message is saved, some > > acknowledgement will be sent to Java consumer. If acknowledgement is > true, > > then next message should be consumed from the topic. If acknowldgement is > > false(which means due to some error message,read from the topic, couldn't > > be saved into the database), then again that message should be read. > > > > I think I need to use Simple Consumer,to have control over message offset > > and have gone through the Simple Consumer example as given in this link > > > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > > . > > > > In this example, offset is evaluated in run method as 'readOffset'. Do I > > need to play with that? For e.g. I can use LatestTime() instead of > > EarliestTime() and in case of false, I will reset the offset to the one > > before using offset - 1. > > > > Is this how I should proceed? Or can the same be done using High Level > API? > > > > > > -- > -- Guozhang >