I use commit 9e894aa0173b14d64a900bcf780d6b7809368384 from trunk code On Wed, 10 Jun 2015 at 01:09 Jiangjie Qin <j...@linkedin.com.invalid> wrote:
> Which version of MM are you running? > > On 6/9/15, 4:49 AM, "tao xiao" <xiaotao...@gmail.com> wrote: > > >Hi, > > > >I have two mirror makers A and B both subscripting to the same whitelist. > >During topic rebalancing one of the mirror maker A encountered > >ZkNoNodeException and then stopped all connections. but mirror maker B > >didn't pick up the topics that were consumed by A and left some of the > >topics unassigned. I think this is due to A not releasing ownership of > >those topics. My question is why A didn't release ownership upon receiving > >error? > > > >Here is the stack trace > > > >[2015-06-08 15:55:46,379] INFO [test_some-ip-1433797157389-247c1fc4], > >exception during rebalance (kafka.consumer.ZookeeperConsumerConnector) > >org.I0Itec.zkclient.exception.ZkNoNodeException: > >org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > >NoNode for /consumers/test/ids/test_some-ip-1433797157389-247c1fc4 > > at > >org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > > at > >org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) > > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:473) > > at > >kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61) > > at > >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consu > >mer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperCo > >nsumerConnector.scala:657) > > at > >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy > >ncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerCon > >nector.scala:629) > > at > >scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > > at > >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy > >ncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:620) > > at > >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy > >ncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620) > > at > >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy > >ncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620) > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > at > >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebal > >ance(ZookeeperConsumerConnector.scala:619) > > at > >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run > >(ZookeeperConsumerConnector.scala:572) > >Caused by: org.apache.zookeeper.KeeperException$NoNodeException: > >KeeperErrorCode = NoNode for > >/consumers/test/ids/test_some-ip-1433797157389-247c1fc4 > > at > >org.apache.zookeeper.KeeperException.create(KeeperException.java:111) > > at > >org.apache.zookeeper.KeeperException.create(KeeperException.java:51) > > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155) > > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1184) > > at > >org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) > > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) > > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) > > at > >org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > > ... 13 more > > > > > > > >Here is the last part of the log > > > >[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399] > >Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager) > > > >[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399] > >Stopping all fetchers (kafka.consumer.ConsumerFetcherManager) > > > >[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399] All > >connections stopped (kafka.consumer.ConsumerFetcherManager) > > > >[2015-06-08 15:55:49,848] INFO [test_some-ip-1433797157389-247c1fc4], > >Cleared all relevant queues for this fetcher > >(kafka.consumer.ZookeeperConsumerConnector) > > > >[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4], > >Cleared the data chunks in all the consumer message iterators > >(kafka.consumer.ZookeeperConsumerConnector) > > > >[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4], > >Invoking rebalance listener before relasing partition ownerships. > >(kafka.consumer.ZookeeperConsumerConnector) > > > > > >As seen in the log Mirror maker A didn't release ownership and it didn't > >attempt to trigger another round of rebalancing either. I checked zk. the > >node that was reported missing actually existed and it was created at the > >same time the error was thrown. > > > > > >I use the latest trunk code > >