So my SimpleConsumer woes seem to continue :( I am thinking of not using a SimpleConsumer implementation anymore and rolling my own unless there is another option or a fix here I can implement?
So, after applying my patch (to see this error) this is the error I am getting after some time goes by of consuming my stream 2012-01-03 15:04:28,871 INFO ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181 kafka.consumer.ZookeeperConsumerConnector - Consumer test-consumer-group_doozer selected partitions : controserverlogs:0-0: fetched offset = 137564195795: consumed offset = 137564195795 2012-01-03 15:04:28,871 INFO FetchRunnable-0 kafka.consumer.SimpleConsumer - multifetch reconnect due to java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:263) at kafka.utils.Utils$.read(Utils.scala:497) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) at kafka.network.Receive$class.readCompletely(Transmission.scala:55) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:179) at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:119) at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:117) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:61) its does this frequently and if I leave it running if then eventually does this (after starting back up and rebalancing) 2012-01-03 15:06:18,823 INFO ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181 kafka.consumer.ZookeeperConsumerConnector - begin rebalancing consumer test-consumer-group_doozer try #0 2012-01-03 15:06:18,825 INFO ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181 kafka.consumer.ZookeeperConsumerConnector - exception during rebalance org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/test-consumer-group/ids/test-consumer-group_doozer at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) at kafka.utils.ZkUtils$.readData(ZkUtils.scala:161) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$getTopicCount(ZookeeperConsumerConnector.scala:414) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:453) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:430) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:75) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:426) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.handleChildChange(ZookeeperConsumerConnector.scala:369) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/test-consumer-group/ids/test-consumer-group_doozer at org.apache.zookeeper.KeeperException.create(KeeperException.java:102) at org.apache.zookeeper.KeeperException.create(KeeperException.java:42) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950) at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) granted this could be all about my implementation it is based largely on the "SimpleConsumerShell" as I was not sure of any other starting point without a lot of my own heavy lifting and just doing it myself (which seems to have a lot involved which is fine then something like https://issues.apache.org/jira/browse/KAFKA-232 will be important too for me something I looked at today need to dig more) thoughts? thanks? what are other folks doing? is anyone using the SimpleConsumer successfully? -- /* Joe Stein http://www.linkedin.com/in/charmalloc Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> */