Hi, I have been trying to distribute Kafka topics among different instances of same consumer group. I am using KafkaDirectStream API for creating DStreams. After the second consumer group comes up, Kafka does partition rebalance and then Spark driver of the first consumer dies with the following exception:
java.lang.IllegalStateException: No current assignment for partition myTopic_5-0 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) I see that there is Spark ticket opened with the same issue(https://issues.apache.org/jira/browse/SPARK-19547) but it has been marked as INVALID. Can someone explain why this ticket is marked INVALID. Thanks, Pankaj