Hi, We see below exception when using Spark Kafka streaming 0.10 on a normal Kafka topic. Not sure why offset missing in zk, but since Spark streaming override the offset reset policy to none in the code. I can not set the reset policy to latest(I don't really care data loss now).
Is there any quick way to fix the missing offset or work around this? Thanks, Martin 1/06/2018 17:11:02: ERROR:the type of error is org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: elasticsearchtopicrealtimereports-97 01/06/2018 17:11:02: ERROR:Undefined offset with no reset policy for partition: elasticsearchtopicrealtimereports-97 org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:370) org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:248) org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1601) org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034) org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165) org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:184) org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) scala.Option.orElse(Option.scala:289) org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.immutable.List.foreach(List.scala:381) scala.collection.TraversableLike$class.map(TraversableLike.scala:234) scala.collection.immutable.List.map(List.scala:285) org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) scala.Option.orElse(Option.scala:289) org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) scala.Option.orElse(Option.scala:289) org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) scala.Option.orElse(Option.scala:289) org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) scala.Option.orElse(Option.scala:289) org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37) org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:34