Also, is there any need of checking any acknowledgement variable? as on any exception, while dealing with database, would make the consumer program stop and hence *consumer.commit()* wouldn't have been called..right??
The above question is for single topic.Now, let's assume there are 5 topics. First, i thought i can start a Java program which will spawn 5 threads, one for each topic but the issue i find is that if an exception comes even for a single thread(topic), then i need to terminate the java process explicitly to resolve the issue(may be due to database related code) which will make all other threads come to halt. Isn't it better to have 5 java processes for each topic? Regards Anand On Mon, Aug 4, 2014 at 12:31 PM, anand jain <anandjain1...@gmail.com> wrote: > Thanks Guozhang!! > > Below is the code for iterating over log messages: > ......................................................... > ......................................................... > for (final KafkaStream stream : streams) { > ConsumerIterator<byte[], byte[]> consumerIte = > stream.iterator(); > *while (consumerIte.hasNext()){* > System.out.println("Message from Topic :: " + new > String(consumerIte.next().message())); > } > } > ........................................................ > ......................................................... > > As far as I understand, the statement *while (consumerIte.hasNext())* > runs in an infinite loop and returns true whenever a message is published. > > How should I fit your piece of code(solution as suggested by you) here? > > Regards > Anand > > On Fri, Aug 1, 2014 at 8:46 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi Anand, >> >> You can use the high-level consumer and turn of auto.offset.commit, and do >> sth. like: >> >> message = consumer.iter.next(); >> bool acked = false >> while (!acked) { >> process(message) >> acked = writeToDB(); >> } >> consumer.commit() >> >> >> Guozhang >> >> >> On Fri, Aug 1, 2014 at 3:30 AM, anand jain <anandjain1...@gmail.com> >> wrote: >> >> > I am very much new to Kafka and we are using Kafka 0.8.1. >> > >> > What I need to do is to consume a message from topic. For that, I will >> have >> > to write one consumer in Java which will consume a message from topic >> and >> > then save that message to database. After a message is saved, some >> > acknowledgement will be sent to Java consumer. If acknowledgement is >> true, >> > then next message should be consumed from the topic. If acknowldgement >> is >> > false(which means due to some error message,read from the topic, >> couldn't >> > be saved into the database), then again that message should be read. >> > >> > I think I need to use Simple Consumer,to have control over message >> offset >> > and have gone through the Simple Consumer example as given in this link >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example >> > . >> > >> > In this example, offset is evaluated in run method as 'readOffset'. Do I >> > need to play with that? For e.g. I can use LatestTime() instead of >> > EarliestTime() and in case of false, I will reset the offset to the one >> > before using offset - 1. >> > >> > Is this how I should proceed? Or can the same be done using High Level >> API? >> > >> >> >> >> -- >> -- Guozhang >> > >