[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14662548#comment-14662548
 ] 

ASF GitHub Bot commented on KAFKA-2413:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/122


 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Onur Karaman
 Fix For: 0.8.3


 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661246#comment-14661246
 ] 

Onur Karaman commented on KAFKA-2413:
-

I'll have the patch ready later today.

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
   

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661294#comment-14661294
 ] 

Ashish K Singh commented on KAFKA-2413:
---

Hi [~onurkaraman], sorry for not explicitly saying this, but I am working on a 
patch already. Thanks for your help!

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661317#comment-14661317
 ] 

ASF GitHub Bot commented on KAFKA-2413:
---

GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/122

KAFKA-2413: fix ConsumerCoordinator updateConsumer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-2413

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/122.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #122


commit 073dc4b716594880de4fb58c8832f02dd3792683
Author: Onur Karaman okara...@linkedin.com
Date:   2015-08-07T04:49:53Z

fix ConsumerCoordinator updateConsumer




 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661319#comment-14661319
 ] 

Onur Karaman commented on KAFKA-2413:
-

Aand I just saw this. My bad.

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661329#comment-14661329
 ] 

Ashish K Singh commented on KAFKA-2413:
---

I am not sure if posting patches to issues other person is working on is a good 
idea. It is discouraging, that is all I can see. There has to be a reason why 
Assignee field is present in JIRA. I guess it will be a moot point to discuss 
about it anymore as the fix is already posted. Feel free to assign yourself to 
the JIRA. I will review the patch posted by you as I have already spent some 
time to find the issue and to try to fix it. I just hope it does not happen 
again.

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661227#comment-14661227
 ] 

Onur Karaman commented on KAFKA-2413:
-

Woops! I haven't tried it out yet, but I think the fix is:
{code}
private def updateConsumer(group: ConsumerGroupMetadata, consumer: 
ConsumerMetadata, topics: Set[String]) {
val topicsToBind = topics -- group.topics
group.remove(consumer.consumerId)
val topicsToUnbind = consumer.topics -- (group.topics ++ topics)
group.add(consumer.consumerId, consumer)
consumer.topics = topics
coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, 
topicsToBind, topicsToUnbind)
  }
{code}

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661242#comment-14661242
 ] 

Onur Karaman commented on KAFKA-2413:
-

Okay I just tested it out. It seems to have fixed the bug.

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661094#comment-14661094
 ] 

Ashish K Singh commented on KAFKA-2413:
---

[~hachikuji] thoughts?

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661157#comment-14661157
 ] 

Ashish K Singh commented on KAFKA-2413:
---

Thanks guys for quick response!

Below is a test case that can repro the issue.

{code}
  def testRepetitiveTopicSubscription() {
val numRecords = 1
sendRecords(numRecords)

this.consumers(0).subscribe(topic)

TestUtils.waitUntilTrue(() = {
  this.consumers(0).poll(50)
  this.consumers(0).subscriptions.size == 2
},
  Could not find expected number of subscriptions)

TestUtils.createTopic(this.zkClient, tblablac, 2, serverCount, 
this.servers)
sendRecords(1000, new TopicPartition(tblablac, 0))
sendRecords(1000, new TopicPartition(tblablac, 1))

this.consumers(0).subscribe(tblablac)

TestUtils.waitUntilTrue(() = {
  this.consumers(0).poll(50)
  this.consumers(0).subscriptions.size == 4
},
  Could not find expected number of subscriptions)
  }
{code}

I guess another interesting problem to solve.

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661177#comment-14661177
 ] 

Ashish K Singh commented on KAFKA-2413:
---

[~hachikuji] I was planning to play with it to get more accustomed with the new 
consumer's intricacies. If you have not already worked out a patch, then is it 
OK I try to fix it by tonight?

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
   at 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661135#comment-14661135
 ] 

Jason Gustafson commented on KAFKA-2413:


[~onurkaraman] I was able to reproduce this error on trunk by subscribing to a 
second topic while in the consumer's poll loop. It looks like the error might 
be related to how the server manages group topics.

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
   at 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661160#comment-14661160
 ] 

Jason Gustafson commented on KAFKA-2413:


I think the issue is in here:

{code}
  private def updateConsumer(group: ConsumerGroupMetadata, consumer: 
ConsumerMetadata, topics: Set[String]) {
val topicsToBind = topics -- group.topics
group.remove(consumer.consumerId)
val topicsToUnbind = consumer.topics -- group.topics
group.add(consumer.consumerId, consumer)
consumer.topics = topics
coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, 
topicsToBind, topicsToUnbind)
  }
{code}

In particular, it looks like topicsToUnbind is taking the difference in the 
wrong order. But I'm not sure that just reversing that is correct either since 
we'd only wan to unbind the topic if no other consumers in the group are 
subscribed.

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661110#comment-14661110
 ] 

Onur Karaman commented on KAFKA-2413:
-

Couple questions to start things off:
1. What is the git hash of kafka you're running on the brokers and consumers?
2. How can I reproduce this? What does your consumer code look like? More 
specifically, can you go into more detail on the called more than once part?

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
   at 
 

[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once

2015-08-06 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661167#comment-14661167
 ] 

Jason Gustafson commented on KAFKA-2413:


[~singhashish] I can attempt a patch if you want.

 New consumer's subscribe(Topic...) api fails if called more than once
 -

 Key: KAFKA-2413
 URL: https://issues.apache.org/jira/browse/KAFKA-2413
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Ashish K Singh
Assignee: Ashish K Singh

 I believe new consumer is supposed to allow adding to existing topic 
 subscriptions. If it is then the issue is that on trying to subscribe to a 
 topic when consumer is already subscribed to a topic, below exception is 
 thrown.
 {code}
 [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
 (kafka.server.KafkaApis:103)
 java.util.NoSuchElementException: key not found: topic
   at scala.collection.MapLike$class.default(MapLike.scala:228)
   at scala.collection.AbstractMap.default(Map.scala:58)
   at scala.collection.MapLike$class.apply(MapLike.scala:141)
   at scala.collection.AbstractMap.apply(Map.scala:58)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
   at 
 kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
   at 
 kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
   at 
 kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
   at 
 kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
   at 
 kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
   at 
 kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
   at 
 kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
   at 
 kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
   at 
 kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
   at 
 kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
   at 
 kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
   at 
 kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
   at 
 kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
   at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)
 Unexpected error in join group response: The server experienced an unexpected 
 error when processing the request
 org.apache.kafka.common.KafkaException: Unexpected error in join group 
 response: The server experienced an unexpected error when processing the 
 request
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
   at 
 org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
   at 
 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
   at 
 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
   at