Joe, It seems that you are using the ZK-based consumer, which uses SimpleConsumer under the cover. The exception in SimpleConsumer seems to be caused by a broker failure. The ZK exception you saw is typically caused by ZK session expiration. Do you see that? If the ZK session only expires occasionally, it is not a problem as long as rebalance completes in the end (you should see "Consumer ... selected partitions"). If there are constant ZK session expirations, then rebalance may not complete. ZK session expiration is typically caused by GC.
Also, which version of Kafka are you using? Which patch did you apply? Thanks, Jun On Tue, Jan 3, 2012 at 2:25 PM, Joe Stein <crypt...@gmail.com> wrote: > So my SimpleConsumer woes seem to continue :( I am thinking of not using a > SimpleConsumer implementation anymore and rolling my own unless there is > another option or a fix here I can implement? > > So, after applying my patch (to see this error) this is the error I am > getting after some time goes by of consuming my stream > > 2012-01-03 15:04:28,871 INFO > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181 > kafka.consumer.ZookeeperConsumerConnector - Consumer > test-consumer-group_doozer selected partitions : controserverlogs:0-0: > fetched offset = 137564195795: consumed offset = 137564195795 > 2012-01-03 15:04:28,871 INFO FetchRunnable-0 kafka.consumer.SimpleConsumer > - multifetch reconnect due to > java.nio.channels.ClosedByInterruptException > at > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:263) > at kafka.utils.Utils$.read(Utils.scala:497) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > at kafka.network.Receive$class.readCompletely(Transmission.scala:55) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:179) > at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:119) > at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:117) > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:61) > > > its does this frequently and if I leave it running if then eventually does > this (after starting back up and rebalancing) > > 2012-01-03 15:06:18,823 INFO > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181 > kafka.consumer.ZookeeperConsumerConnector - begin rebalancing consumer > test-consumer-group_doozer try #0 > 2012-01-03 15:06:18,825 INFO > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181 > kafka.consumer.ZookeeperConsumerConnector - exception during rebalance > org.I0Itec.zkclient.exception.ZkNoNodeException: > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /consumers/test-consumer-group/ids/test-consumer-group_doozer > at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:161) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$getTopicCount(ZookeeperConsumerConnector.scala:414) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:453) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:430) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:75) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:426) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.handleChildChange(ZookeeperConsumerConnector.scala:369) > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > Caused by: org.apache.zookeeper.KeeperException$NoNodeException: > KeeperErrorCode = NoNode for > /consumers/test-consumer-group/ids/test-consumer-group_doozer > at org.apache.zookeeper.KeeperException.create(KeeperException.java:102) > at org.apache.zookeeper.KeeperException.create(KeeperException.java:42) > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921) > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950) > at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > > > > granted this could be all about my implementation it is based largely on > the "SimpleConsumerShell" as I was not sure of any other starting point > without a lot of my own heavy lifting and just doing it myself (which seems > to have a lot involved which is fine then something like > https://issues.apache.org/jira/browse/KAFKA-232 will be important too for > me something I looked at today need to dig more) > > thoughts? thanks? what are other folks doing? is anyone using the > SimpleConsumer successfully? > > -- > > /* > Joe Stein > http://www.linkedin.com/in/charmalloc > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > */ >