Ashish K Singh created KAFKA-2413:
-------------------------------------

             Summary: New consumer's subscribe(Topic...) api fails if called 
more than once
                 Key: KAFKA-2413
                 URL: https://issues.apache.org/jira/browse/KAFKA-2413
             Project: Kafka
          Issue Type: Bug
          Components: consumer
            Reporter: Ashish K Singh
            Assignee: Ashish K Singh


I believe new consumer is supposed to allow adding to existing topic 
subscriptions. If it is then the issue is that on trying to subscribe to a 
topic when consumer is already subscribed to a topic, below exception is thrown.

{code}
[2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
(kafka.server.KafkaApis:103)
java.util.NoSuchElementException: key not found: topic
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:58)
        at scala.collection.MapLike$class.apply(MapLike.scala:141)
        at scala.collection.AbstractMap.apply(Map.scala:58)
        at 
kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
        at 
kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
        at 
kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
        at 
kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
        at 
kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
        at 
kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
        at 
kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
        at 
kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
        at 
kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
        at 
kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
        at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
        at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
        at 
kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
        at 
kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
        at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
        at java.lang.Thread.run(Thread.java:745)

Unexpected error in join group response: The server experienced an unexpected 
error when processing the request
org.apache.kafka.common.KafkaException: Unexpected error in join group 
response: The server experienced an unexpected error when processing the request
        at 
org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
        at 
org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
        at 
org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
        at 
org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145)
        at 
org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:197)
        at 
org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:172)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:764)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:725)
        at 
kafka.api.ConsumerTest$$anonfun$testRepetitiveTopicSubscription$2.apply$mcZ$sp(ConsumerTest.scala:80)
        at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:616)
        at 
kafka.api.ConsumerTest.testRepetitiveTopicSubscription(ConsumerTest.scala:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at junit.framework.TestCase.runTest(TestCase.java:168)
        at junit.framework.TestCase.runBare(TestCase.java:134)
        at junit.framework.TestResult$1.protect(TestResult.java:110)
        at junit.framework.TestResult.runProtected(TestResult.java:128)
        at junit.framework.TestResult.run(TestResult.java:113)
        at junit.framework.TestCase.run(TestCase.java:124)
        at junit.framework.TestSuite.runTest(TestSuite.java:232)
        at junit.framework.TestSuite.run(TestSuite.java:227)
        at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
        at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
        at 
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
        at 
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
        at 
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
        at 
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
        at 
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
        at 
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
        at org.scalatest.tools.Runner$.run(Runner.scala:883)
        at org.scalatest.tools.Runner.run(Runner.scala)
        at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
        at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to