Maybe i could batch the messages before commit.., e.g committing every 10 second.this is what the auto commit does anyway and I could live with duplicate data. What do u think?
I would then also seem to need a monitoring daemon to check the lag to restart the consumer during machine crashes.. On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <chen.apache.s...@gmail.com> wrote: > Thanks,Guozhang, > So if I switch to SimpleConsumer, will these problems be taken care of > already? I would assume that I will need to manage all the offset by > myself, including the error recovery logic, right? > Chen > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Chen, >> >> 1. Manually commit offsets does have the risk of duplicates, consider the >> following pattern: >> >> message = consumer.next(); >> process(message); >> consumer.commit(); >> >> the rebalance can happen between line 2 and 3, where the message has been >> processed but offset not being committed, if another consumer picks up >> this >> partition after the rebalance, it may re-consume this message again. With >> auto.commit turned on, offsets will always be committed before the >> consumers release ownership of partitions during rebalances. >> >> In the 0.9 consumer design, we have fixed this issue by introducing the >> onPartitionDeassigned callback, you can take a look at its current API >> here: >> >> >> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html >> >> 2. Commit offsets too often does have an overhead since it is going to >> Zookeeper, and ZK is not write-scalable. We are also fixing that issue by >> moving the offset management from ZK to kafka servers. This is already >> checked in trunk, and will be included in 0.8.2 release. >> >> Guozhang >> >> >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <chen.apache.s...@gmail.com> >> wrote: >> >> > Guozhang, >> > Just to make it clear: >> > If I have 10 threads with the same consumer group id, read the topic T. >> The >> > auto commit is turned off, and commitOffset is called only when the >> message >> > is processed successfully. >> > If thread 1 dies when processing message from partition P1, and the last >> > offset is Offset1. Then kafka will ensure that one of the other >> running 9 >> > threads will automatically pick up the message on partition P1 from >> Offset1 >> > ? will the thread have the risk of reading the same message more than >> once? >> > >> > Also I would assume commit offset for each message is a bit heavy. What >> you >> > guys usually do for error handling during reading kafka? >> > Thanks much! >> > Chen >> > >> > >> > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > Yes, in that case you can turn of auto commit and call commitOffsets >> > > manually after processing is finished. commitOffsets() will only write >> > the >> > > offset of the partitions that the consumer is currently fetching, so >> > there >> > > is no need to coordinate this operation. >> > > >> > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang <chen.apache.s...@gmail.com >> > >> > > wrote: >> > > >> > > > But with the auto commit turned on, I am risking off losing the >> failed >> > > > message, right? should I turn off the auto commit, and only commit >> the >> > > > offset when the message is processed successfully..But that would >> > require >> > > > the coordination between threads in order to know what is the right >> > > timing >> > > > to commit offset.. >> > > > >> > > > >> > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wangg...@gmail.com> >> > > wrote: >> > > > >> > > > > Hello Chen, >> > > > > >> > > > > With high-level consumer, the partition re-assignment is automatic >> > upon >> > > > > consumer failures. >> > > > > >> > > > > Guozhang >> > > > > >> > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang < >> > chen.apache.s...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > Folks, >> > > > > > I have a process started at specific time and read from a >> specific >> > > > > topic. >> > > > > > I am currently using the High Level API(consumer group) to read >> > from >> > > > > > kafka(and will stop once there is nothing in the topic by >> > specifying >> > > a >> > > > > > timeout). i am most concerned about error recovery in multiple >> > thread >> > > > > > context. If one thread dies, will other running bolt threads >> picks >> > up >> > > > the >> > > > > > failed message? Or I have to start another thread in order to >> pick >> > up >> > > > the >> > > > > > failed message? What would be a good practice to ensure the >> > message >> > > > can >> > > > > be >> > > > > > processed at least once? >> > > > > > >> > > > > > Note that all threads are using the same group id. >> > > > > > >> > > > > > Thanks, >> > > > > > Chen >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > -- Guozhang >> > > > > >> > > > >> > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> >> >> >> -- >> -- Guozhang >> > >