I am reading a data from Kinesis stream (merging shard values with union stream) to spark streaming. then doing the following code to push the data to DB.
splitCSV.foreachRDD(new VoidFunction2<JavaRDD<String[]>,Time>() { private static final long serialVersionUID = 1L; public void call(JavaRDD<String[]> rdd, Time time) throws Exception { JavaRDD<SFieldBean> varMapRDD = rdd.map(new Function<String[],SFieldBean>() { private static final long serialVersionUID = 1L; public SFieldBean call(String[] values) throws Exception { ..... ); varMapRDD.foreachPartition(new VoidFunction<Iterator<SFieldBean>>( { private static final long serialVersionUID = 1L; MySQLConnectionHelper.getConnection("urlinfo"); @Override public void call(Iterator<SFieldBean> iterValues) throws Exception { .... while(iterValues.hasNext()) { } } Though I am using hasNext but it throws the follwing error Caused by: java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30) at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:319) at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:288) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ... 3 more -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"