Joe, A couple of questions -
1. What is the difference between "Old code" and "New code" ? Have you changed any code in the latter case ? 2. What is the {topic, consumer threads} map given to both your consumers ? >> but ... if I then start back up the code that registeres as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it keeps back up where it left off When you start a new consumer, it will automatically get a unique id assigned. However, that doesn't mean it will not pick up from where the previous consumer thread left off. If both your consumer thread ids "own" the same partitions, the 2nd one should always start from the offset committed by the previous consumer thread. Thanks, Neha On Fri, Dec 2, 2011 at 8:06 AM, Joe Stein <crypt...@gmail.com> wrote: > Thanks Jun, yup, that is how I understand it. I just figured out the error > is a red herring to my problem since that is caught properly when logged in > ZookeeperConsumerConnector.commitOffsets. > > it seems that the problem I am running into is this (any help is > appreciated) > > one code base consumer = > apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from the > logs) > > so when i kill that consumer (on purpose) and start another consumer (new > code to pickup where the old code left off) in its place it seems to be a > different consumer (though I want it to start back where the other one > stopped) > > new code = apophis_starscream.site1.medialets.com-1322840984781-5d58d658 > (from the logs) says "stopping fetcher FetchRunnable-0 to host > starscream Received 0 messages" > > but ... if I then start back up the code that registeres > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it keeps > back up where it left off > > I am using the exact same consumer.properties for both apps (the point here > is code re-write where my new code is not getting messages but my old code > is and I want to pickup where the old code was consuming messages). > > so I guess the question is how to make my new consumer code "trick" > kafka/zk into thinking it is the old one so it can pick back up where the > old one left off or what else have folks done in this regard or how > should/could I handle this???? > > On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <jun...@gmail.com> wrote: > > > Joe, > > > > It seems that the offset commit thread somehow gets interrupted while > > trying to write the offset to ZK. The consumer should still be able to > > consume in this case though. The offset commit thread runs in the > > background and is decoupled from the consumption logic. > > > > Thanks, > > > > Jun > > > > On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <crypt...@gmail.com> wrote: > > > > > has anyone else gotten this error? I get it 100% of the time depending > on > > > how I run my consumer > > > > > > what is weird is when i use the kafka/bin/kafka-consumer-shell.sh from > > > trunk the error shows up BUT the consumer keeps producing the messages > > > > > > if i take the ConsumerShell code and put it into my own program without > > any > > > modification the error comes up and everything is halted > > > > > > not sure if this is known or what but let me know, thanks! > > > > > > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0 > > > kafka.consumer.ZookeeperConsumerConnector - exception during > > commitOffsets > > > org.I0Itec.zkclient.exception.ZkInterruptedException: > > > java.lang.InterruptedException > > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687) > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809) > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102) > > > at > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237) > > > at > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234) > > > at scala.collection.Iterator$class.foreach(Iterator.scala:660) > > > at > > > > > > > > > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > > > at > > > > > > > > > scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592) > > > at > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234) > > > at > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232) > > > at > > > > > > > > > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > > > at scala.collection.immutable.List.foreach(List.scala:45) > > > at > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232) > > > at > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220) > > > at > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100) > > > at kafka.utils.Utils$$anon$2.run(Utils.scala:58) > > > at > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > > > at > > > > > > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > > > at > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > > > at > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181) > > > at > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > > > at java.lang.Thread.run(Thread.java:619) > > > Caused by: java.lang.InterruptedException > > > at java.lang.Object.wait(Native Method) > > > at java.lang.Object.wait(Object.java:485) > > > at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317) > > > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036) > > > at org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111) > > > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813) > > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > > > > > > > > > /* > > > Joe Stein > > > http://www.linkedin.com/in/charmalloc > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > > */ > > > > > > > > > -- > > /* > Joe Stein > http://www.linkedin.com/in/charmalloc > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > */ >