Hi team,

I discovered an issue that when a high level consumer with roundrobin
assignment strategy consumes a topic that hasn't been created on broker a
NPE exception is thrown during partition rebalancing phase. I use Kafka
0.8.2.1

Here is the step to reproduce:

1. create a high level consumer with roundrobin
2. use connector.createMessageStreamsByFilter to create a message stream in
the consumer to a topic that yet to be created on broker

Below is the exception.

2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:68 -
[test12345667fffff_localhost], Committing all offsets after clearing the
fetcher queues
2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:68 -
[test12345667fffff_localhost], Releasing partition ownership
2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:76 -
[test12345667fffff_localhost], exception during rebalance
java.lang.NullPointerException
at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:210)
at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:202)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.IterableLike$class.head(IterableLike.scala:91)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:75)
at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:69)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:660)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:608)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598)
at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:905)
at
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:939)
at
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:160)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105)
at com.ebay.kafka.demo.Consumer.main(Consumer.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

-- 
Regards,
Tao

Reply via email to