Hi team, I discovered an issue that when a high level consumer with roundrobin assignment strategy consumes a topic that hasn't been created on broker a NPE exception is thrown during partition rebalancing phase. I use Kafka 0.8.2.1
Here is the step to reproduce: 1. create a high level consumer with roundrobin 2. use connector.createMessageStreamsByFilter to create a message stream in the consumer to a topic that yet to be created on broker Below is the exception. 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:68 - [test12345667fffff_localhost], Committing all offsets after clearing the fetcher queues 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:68 - [test12345667fffff_localhost], Releasing partition ownership 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:76 - [test12345667fffff_localhost], exception during rebalance java.lang.NullPointerException at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:210) at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:202) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at scala.collection.AbstractIterable.head(Iterable.scala:54) at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:75) at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:69) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:660) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:608) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:905) at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:939) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:160) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105) at com.ebay.kafka.demo.Consumer.main(Consumer.java:61) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) -- Regards, Tao