Thanks Jun, yup, that is how I understand it. I just figured out the error is a red herring to my problem since that is caught properly when logged in ZookeeperConsumerConnector.commitOffsets.
it seems that the problem I am running into is this (any help is appreciated) one code base consumer = apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from the logs) so when i kill that consumer (on purpose) and start another consumer (new code to pickup where the old code left off) in its place it seems to be a different consumer (though I want it to start back where the other one stopped) new code = apophis_starscream.site1.medialets.com-1322840984781-5d58d658 (from the logs) says "stopping fetcher FetchRunnable-0 to host starscream Received 0 messages" but ... if I then start back up the code that registeres as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it keeps back up where it left off I am using the exact same consumer.properties for both apps (the point here is code re-write where my new code is not getting messages but my old code is and I want to pickup where the old code was consuming messages). so I guess the question is how to make my new consumer code "trick" kafka/zk into thinking it is the old one so it can pick back up where the old one left off or what else have folks done in this regard or how should/could I handle this???? On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <jun...@gmail.com> wrote: > Joe, > > It seems that the offset commit thread somehow gets interrupted while > trying to write the offset to ZK. The consumer should still be able to > consume in this case though. The offset commit thread runs in the > background and is decoupled from the consumption logic. > > Thanks, > > Jun > > On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <crypt...@gmail.com> wrote: > > > has anyone else gotten this error? I get it 100% of the time depending on > > how I run my consumer > > > > what is weird is when i use the kafka/bin/kafka-consumer-shell.sh from > > trunk the error shows up BUT the consumer keeps producing the messages > > > > if i take the ConsumerShell code and put it into my own program without > any > > modification the error comes up and everything is halted > > > > not sure if this is known or what but let me know, thanks! > > > > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0 > > kafka.consumer.ZookeeperConsumerConnector - exception during > commitOffsets > > org.I0Itec.zkclient.exception.ZkInterruptedException: > > java.lang.InterruptedException > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687) > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809) > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:660) > > at > > > > > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > > at > > > > > scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232) > > at > > > > > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > > at scala.collection.immutable.List.foreach(List.scala:45) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100) > > at kafka.utils.Utils$$anon$2.run(Utils.scala:58) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > > at > > > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > > at > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > > at > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181) > > at > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205) > > at > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > > at > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > > at java.lang.Thread.run(Thread.java:619) > > Caused by: java.lang.InterruptedException > > at java.lang.Object.wait(Native Method) > > at java.lang.Object.wait(Object.java:485) > > at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317) > > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036) > > at org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111) > > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813) > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > > > > > > /* > > Joe Stein > > http://www.linkedin.com/in/charmalloc > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > */ > > > -- /* Joe Stein http://www.linkedin.com/in/charmalloc Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> */