thanks, let me hack on that a bit On Wed, Jan 4, 2012 at 12:58 PM, Jun Rao <jun...@gmail.com> wrote:
> Joe, > > If you enable GC logging in the consumer, you can see the GC time. This > will confirm whether GC is causing ZK session timeout. If that's indeed the > issue, then a quick fix could be increasing the ZK sesssion timeout. > However, the right fix is to remove GC by investigating potential memory > leak in the app. > > Thanks, > > Jun > > On Wed, Jan 4, 2012 at 9:35 AM, Joe Stein <crypt...@gmail.com> wrote: > > > On Tue, Jan 3, 2012 at 8:32 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > Joe, > > > > > > It seems that you are using the ZK-based consumer, which uses > > > SimpleConsumer under the cover. The exception in SimpleConsumer seems > to > > be > > > caused by a broker failure. The ZK exception you saw is typically > caused > > by > > > ZK session expiration. Do you see that? > > > > > > yes > > > > > > > If the ZK session only expires > > > occasionally, it is not a problem as long as rebalance completes in the > > end > > > (you should see "Consumer ... selected partitions"). If there are > > constant > > > ZK session expirations, then rebalance may not complete. > > > > > > yup they are happening a lot, consistent > > > > > > > ZK session > > > expiration is typically caused by GC. > > > > > > > any way to stop or prevent my issue? =8^) > > > > > > > > > > Also, which version of Kafka are you using? Which patch did you apply? > > > > > > > > I am running off of trunk, I applied > > https://issues.apache.org/jira/browse/KAFKA-229 (now on trunk also) just > > so > > i could see the error in multifetch > > > > > > > Thanks, > > > > > > Jun > > > > > > On Tue, Jan 3, 2012 at 2:25 PM, Joe Stein <crypt...@gmail.com> wrote: > > > > > > > So my SimpleConsumer woes seem to continue :( I am thinking of not > > using > > > a > > > > SimpleConsumer implementation anymore and rolling my own unless there > > is > > > > another option or a fix here I can implement? > > > > > > > > So, after applying my patch (to see this error) this is the error I > am > > > > getting after some time goes by of consuming my stream > > > > > > > > 2012-01-03 15:04:28,871 INFO > > > > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181 > > > > kafka.consumer.ZookeeperConsumerConnector - Consumer > > > > test-consumer-group_doozer selected partitions : > controserverlogs:0-0: > > > > fetched offset = 137564195795: consumed offset = 137564195795 > > > > 2012-01-03 15:04:28,871 INFO FetchRunnable-0 > > > kafka.consumer.SimpleConsumer > > > > - multifetch reconnect due to > > > > java.nio.channels.ClosedByInterruptException > > > > at > > > > > > > > > > > > > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) > > > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:263) > > > > at kafka.utils.Utils$.read(Utils.scala:497) > > > > at > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:55) > > > > at > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > > at > kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:179) > > > > at > > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:119) > > > > at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:117) > > > > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:61) > > > > > > > > > > > > its does this frequently and if I leave it running if then eventually > > > does > > > > this (after starting back up and rebalancing) > > > > > > > > 2012-01-03 15:06:18,823 INFO > > > > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181 > > > > kafka.consumer.ZookeeperConsumerConnector - begin rebalancing > consumer > > > > test-consumer-group_doozer try #0 > > > > 2012-01-03 15:06:18,825 INFO > > > > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181 > > > > kafka.consumer.ZookeeperConsumerConnector - exception during > rebalance > > > > org.I0Itec.zkclient.exception.ZkNoNodeException: > > > > org.apache.zookeeper.KeeperException$NoNodeException: > KeeperErrorCode = > > > > NoNode for > > /consumers/test-consumer-group/ids/test-consumer-group_doozer > > > > at > > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > > > > at > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) > > > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) > > > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) > > > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) > > > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) > > > > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:161) > > > > at > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$getTopicCount(ZookeeperConsumerConnector.scala:414) > > > > at > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:453) > > > > at > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:430) > > > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:75) > > > > at > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:426) > > > > at > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.handleChildChange(ZookeeperConsumerConnector.scala:369) > > > > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) > > > > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > > > > Caused by: org.apache.zookeeper.KeeperException$NoNodeException: > > > > KeeperErrorCode = NoNode for > > > > /consumers/test-consumer-group/ids/test-consumer-group_doozer > > > > at > > org.apache.zookeeper.KeeperException.create(KeeperException.java:102) > > > > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:42) > > > > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921) > > > > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950) > > > > at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) > > > > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) > > > > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) > > > > at > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > > > > > > > > > > > > > > > > granted this could be all about my implementation it is based largely > > on > > > > the "SimpleConsumerShell" as I was not sure of any other starting > point > > > > without a lot of my own heavy lifting and just doing it myself (which > > > seems > > > > to have a lot involved which is fine then something like > > > > https://issues.apache.org/jira/browse/KAFKA-232 will be important > too > > > for > > > > me something I looked at today need to dig more) > > > > > > > > thoughts? thanks? what are other folks doing? is anyone using the > > > > SimpleConsumer successfully? > > > > > > > > -- > > > > > > > > /* > > > > Joe Stein > > > > http://www.linkedin.com/in/charmalloc > > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > > > */ > > > > > > > > > > > > > > > -- > > > > /* > > Joe Stein > > http://www.linkedin.com/in/charmalloc > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > */ > > > -- /* Joe Stein http://www.linkedin.com/in/charmalloc Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> */