[ 
https://issues.apache.org/jira/browse/KAFKA-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2284.
------------------------------
    Resolution: Auto Closed

The Scala consumers have been deprecated and no further work is planned, please 
upgrade to the Java consumer whenever possible.

> ConsumerRebalanceListener receives wrong type in partitionOwnership values
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-2284
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2284
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.2.0
>            Reporter: E. Sammer
>            Assignee: Neha Narkhede
>            Priority: Major
>
> The ConsumerRebalanceListener's beforeReleasingPartitions() method is 
> supposed to receive an arg of Map<String, Set<Integer>> (topic -> 
> Set(partitions)). Even though the type of the map value is specified as a 
> java.util.Set, a scala.collection.convert.Wrappers$JSetWrapper is passed 
> instead which does not implement Set<T> causing a class cast exception as 
> soon as one attempts to access any value of the map. It looks as if this 
> method was never tested against the actual types specified by the interface.
> Here's what happens if you call {{Set<T> foo = 
> partitionOwnership.get(topic)}}:
> {code}
> 2015-06-18 07:28:43,776 
> (search-consumer_esammer-mbp.local-1434637723383-12126c1b_watcher_executor) 
> [WARN - 
> com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:246)]
>  Exception while rebalancing!
> java.lang.ClassCastException: scala.collection.convert.Wrappers$JSetWrapper 
> cannot be cast to java.util.Set
>       at 
> com.rocana.search.consumer.IndexConsumerWorker.onRebalance(IndexConsumerWorker.java:80)
>       at 
> com.rocana.search.consumer.ConsumerService$1.beforeReleasingPartitions(ConsumerService.java:244)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:675)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:625)
>       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:619)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:615)
>       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:568)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to