Hello Tao,

Do you think the solution to KAFKA-2056 will resolve this issue? It will be
included in 0.8.3 release.

Guozhang

On Wed, Apr 15, 2015 at 2:21 PM, tao xiao <xiaotao...@gmail.com> wrote:

> Hi team,
>
> I discovered an issue that when a high level consumer with roundrobin
> assignment strategy consumes a topic that hasn't been created on broker a
> NPE exception is thrown during partition rebalancing phase. I use Kafka
> 0.8.2.1
>
> Here is the step to reproduce:
>
> 1. create a high level consumer with roundrobin
> 2. use connector.createMessageStreamsByFilter to create a message stream in
> the consumer to a topic that yet to be created on broker
>
> Below is the exception.
>
> 2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:68 -
> [test12345667fffff_localhost], Committing all offsets after clearing the
> fetcher queues
> 2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:68 -
> [test12345667fffff_localhost], Releasing partition ownership
> 2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:76 -
> [test12345667fffff_localhost], exception during rebalance
> java.lang.NullPointerException
> at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:210)
> at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:202)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.IterableLike$class.head(IterableLike.scala:91)
> at scala.collection.AbstractIterable.head(Iterable.scala:54)
> at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:75)
> at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:69)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:660)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:608)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598)
> at
>
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:905)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:939)
> at
>
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:160)
> at
>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101)
> at
>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105)
> at com.ebay.kafka.demo.Consumer.main(Consumer.java:61)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
>
> --
> Regards,
> Tao
>



-- 
-- Guozhang

Reply via email to