Hello Tao, Do you think the solution to KAFKA-2056 will resolve this issue? It will be included in 0.8.3 release.
Guozhang On Wed, Apr 15, 2015 at 2:21 PM, tao xiao <xiaotao...@gmail.com> wrote: > Hi team, > > I discovered an issue that when a high level consumer with roundrobin > assignment strategy consumes a topic that hasn't been created on broker a > NPE exception is thrown during partition rebalancing phase. I use Kafka > 0.8.2.1 > > Here is the step to reproduce: > > 1. create a high level consumer with roundrobin > 2. use connector.createMessageStreamsByFilter to create a message stream in > the consumer to a topic that yet to be created on broker > > Below is the exception. > > 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:68 - > [test12345667fffff_localhost], Committing all offsets after clearing the > fetcher queues > 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:68 - > [test12345667fffff_localhost], Releasing partition ownership > 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:76 - > [test12345667fffff_localhost], exception during rebalance > java.lang.NullPointerException > at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:210) > at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:202) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.IterableLike$class.head(IterableLike.scala:91) > at scala.collection.AbstractIterable.head(Iterable.scala:54) > at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:75) > at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:69) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:660) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:608) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598) > at > > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:905) > at > > kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:939) > at > > kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:160) > at > > kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101) > at > > kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105) > at com.ebay.kafka.demo.Consumer.main(Consumer.java:61) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > > -- > Regards, > Tao > -- -- Guozhang