Thanks Jun, yup, that is how I understand it.  I just figured out the error
is a red herring to my problem since that is caught properly when logged in
ZookeeperConsumerConnector.commitOffsets.

it seems that the problem I am running into is this (any help is
appreciated)

one code base consumer =
apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from the
logs)

so when i kill that consumer (on purpose) and start another consumer (new
code to pickup where the old code left off) in its place it seems to be a
different consumer (though I want it to start back where the other one
stopped)

new code = apophis_starscream.site1.medialets.com-1322840984781-5d58d658
(from the logs) says "stopping fetcher FetchRunnable-0 to host
starscream Received 0 messages"

but ... if I then start back up the code that registeres
as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it keeps
back up where it left off

I am using the exact same consumer.properties for both apps (the point here
is code re-write where my new code is not getting messages but my old code
is and I want to pickup where the old code was consuming messages).

so I guess the question is how to make my new consumer code "trick"
kafka/zk into thinking it is the old one so it can pick back up where the
old one left off or what else have folks done in this regard or how
should/could I handle this????

On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <jun...@gmail.com> wrote:

> Joe,
>
> It seems that the offset commit thread somehow gets interrupted while
> trying to write the offset to ZK. The consumer should still be able to
> consume in this case though. The offset commit thread runs in the
> background and is decoupled from the consumption logic.
>
> Thanks,
>
> Jun
>
> On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <crypt...@gmail.com> wrote:
>
> > has anyone else gotten this error? I get it 100% of the time depending on
> > how I run my consumer
> >
> > what is weird is when i use the kafka/bin/kafka-consumer-shell.sh from
> > trunk the error shows up BUT the consumer keeps producing the messages
> >
> > if i take the ConsumerShell code and put it into my own program without
> any
> > modification the error comes up and everything is halted
> >
> > not sure if this is known or what but let me know, thanks!
> >
> > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0
> > kafka.consumer.ZookeeperConsumerConnector - exception during
> commitOffsets
> > org.I0Itec.zkclient.exception.ZkInterruptedException:
> > java.lang.InterruptedException
> > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:660)
> > at
> >
> >
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> > at
> >
> >
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232)
> > at
> >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> > at scala.collection.immutable.List.foreach(List.scala:45)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100)
> > at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > at
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > at java.lang.Thread.run(Thread.java:619)
> > Caused by: java.lang.InterruptedException
> > at java.lang.Object.wait(Native Method)
> > at java.lang.Object.wait(Object.java:485)
> > at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317)
> > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036)
> > at org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >
> >
> > /*
> > Joe Stein
> > http://www.linkedin.com/in/charmalloc
> > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > */
> >
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/

Reply via email to