[jira] [Commented] (SPARK-18650) race condition in FileScanRDD.scala

2016-12-13 Thread Soumabrata Chakraborty (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15744695#comment-15744695
 ] 

Soumabrata Chakraborty commented on SPARK-18650:


I am facing the same issue while trying to load a csv. I am using 
org.apache.spark:spark-core_2.10:2.0.2 and using the following to read the csv
Dataset dataset = 
sparkSession.read().schema(schema).options(options).csv(path)  ;
The options contain "header":"false" and "sep":"|"

> race condition in FileScanRDD.scala
> ---
>
> Key: SPARK-18650
> URL: https://issues.apache.org/jira/browse/SPARK-18650
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
> Environment: scala 2.11
> macos 10.11.6
>Reporter: Jay Goldman
>
> I am attempting to create a DataSet from a single CSV file :
>  val ss: SparkSession = 
>  val ddr = ss.read.option("path", path)
> ... (choose between xml vs csv parsing)
>  var df = ddr.option("sep", ",")
>   .option("quote", "\"")
>   .option("escape", "\"") // want to retain backslashes (\) ...
>   .option("delimiter", ",")
>   .option("comment", "#")
>   .option("header", "true")
>   .option("format", "csv")
>ddr.csv(path)
> df.count() returns 2 times the number of lines in the CSV file - i.e., each 
> line of the input file shows up as 2 rows in df. 
> moreover df.distinct.count has the correct rows.
> There appears to be a problem in FileScanRDD.compute. I am using spark 
> version 2.0.1 with scala 2.11. I am not going to include the entire contents 
> of FileScanRDD.scala here.
> In FileScanRDD.compute there is the following:
>  private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
> If i put a breakpoint in either FileScanRDD.compute or 
> FIleScanRDD.nextIterator the resulting dataset has the correct number of rows.
> Moreover, the code in FileScanRDD.scala is:
> private def nextIterator(): Boolean = {
> updateBytesReadWithFileSize()
> if (files.hasNext) { // breakpoint here => works
>   currentFile = files.next() // breakpoint here => fails
>   
> }
> else {  }
> 
> }
> if i put a breakpoint on the files.hasNext line all is well; however, if i 
> put a breakpoint on the files.next() line the code will fail when i continue 
> because the files iterator has become empty (see stack trace below). 
> Disabling the breakpoint winds up creating a Dataset with each line of the 
> csv file duplicated.
> So it appears that multiple threads are using the files iterator or the 
> underling split value (an RDDPartition) and timing wise on my system 2 
> workers wind up processing the same file, with the resulting DataSet having 2 
> copies of each of the input lines.
> This code is not active when parsing an XML file. 
> here is stack trace:
> java.util.NoSuchElementException: next on empty iterator
>   at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>   at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>   at 
> scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/11/30 09:31:07 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 

[jira] [Commented] (SPARK-18650) race condition in FileScanRDD.scala

2016-12-01 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712358#comment-15712358
 ] 

Sean Owen commented on SPARK-18650:
---

I guess I'm surprised if the partitions are evaluated by multiple threads but 
it's possible I'm just not seeing how that's possible. Some synchronization 
might, at the least, not hurt.

> race condition in FileScanRDD.scala
> ---
>
> Key: SPARK-18650
> URL: https://issues.apache.org/jira/browse/SPARK-18650
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
> Environment: scala 2.11
> macos 10.11.6
>Reporter: Jay Goldman
>
> I am attempting to create a DataSet from a single CSV file :
>  val ss: SparkSession = 
>  val ddr = ss.read.option("path", path)
> ... (choose between xml vs csv parsing)
>  var df = ddr.option("sep", ",")
>   .option("quote", "\"")
>   .option("escape", "\"") // want to retain backslashes (\) ...
>   .option("delimiter", ",")
>   .option("comment", "#")
>   .option("header", "true")
>   .option("format", "csv")
>ddr.csv(path)
> df.count() returns 2 times the number of lines in the CSV file - i.e., each 
> line of the input file shows up as 2 rows in df. 
> moreover df.distinct.count has the correct rows.
> There appears to be a problem in FileScanRDD.compute. I am using spark 
> version 2.0.1 with scala 2.11. I am not going to include the entire contents 
> of FileScanRDD.scala here.
> In FileScanRDD.compute there is the following:
>  private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
> If i put a breakpoint in either FileScanRDD.compute or 
> FIleScanRDD.nextIterator the resulting dataset has the correct number of rows.
> Moreover, the code in FileScanRDD.scala is:
> private def nextIterator(): Boolean = {
> updateBytesReadWithFileSize()
> if (files.hasNext) { // breakpoint here => works
>   currentFile = files.next() // breakpoint here => fails
>   
> }
> else {  }
> 
> }
> if i put a breakpoint on the files.hasNext line all is well; however, if i 
> put a breakpoint on the files.next() line the code will fail when i continue 
> because the files iterator has become empty (see stack trace below). 
> Disabling the breakpoint winds up creating a Dataset with each line of the 
> csv file duplicated.
> So it appears that multiple threads are using the files iterator or the 
> underling split value (an RDDPartition) and timing wise on my system 2 
> workers wind up processing the same file, with the resulting DataSet having 2 
> copies of each of the input lines.
> This code is not active when parsing an XML file. 
> here is stack trace:
> java.util.NoSuchElementException: next on empty iterator
>   at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>   at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>   at 
> scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/11/30 09:31:07 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: 
> Lost task 0.0 in stage 0.0 (TID 0, localhost): 
> java.util.NoSuc

[jira] [Commented] (SPARK-18650) race condition in FileScanRDD.scala

2016-11-30 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15710358#comment-15710358
 ] 

Hyukjin Kwon commented on SPARK-18650:
--

Would this be possible to share your data/sample data? I would like to 
reproduce this.

> race condition in FileScanRDD.scala
> ---
>
> Key: SPARK-18650
> URL: https://issues.apache.org/jira/browse/SPARK-18650
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
> Environment: scala 2.11
> macos 10.11.6
>Reporter: Jay Goldman
>
> I am attempting to create a DataSet from a single CSV file :
>  val ss: SparkSession = 
>  val ddr = ss.read.option("path", path)
> ... (choose between xml vs csv parsing)
>  var df = ddr.option("sep", ",")
>   .option("quote", "\"")
>   .option("escape", "\"") // want to retain backslashes (\) ...
>   .option("delimiter", ",")
>   .option("comment", "#")
>   .option("header", "true")
>   .option("format", "csv")
>ddr.csv(path)
> df.count() returns 2 times the number of lines in the CSV file - i.e., each 
> line of the input file shows up as 2 rows in df. 
> moreover df.distinct.count has the correct rows.
> There appears to be a problem in FileScanRDD.compute. I am using spark 
> version 2.0.1 with scala 2.11. I am not going to include the entire contents 
> of FileScanRDD.scala here.
> In FileScanRDD.compute there is the following:
>  private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
> If i put a breakpoint in either FileScanRDD.compute or 
> FIleScanRDD.nextIterator the resulting dataset has the correct number of rows.
> Moreover, the code in FileScanRDD.scala is:
> private def nextIterator(): Boolean = {
> updateBytesReadWithFileSize()
> if (files.hasNext) { // breakpoint here => works
>   currentFile = files.next() // breakpoint here => fails
>   
> }
> else {  }
> 
> }
> if i put a breakpoint on the files.hasNext line all is well; however, if i 
> put a breakpoint on the files.next() line the code will fail when i continue 
> because the files iterator has become empty (see stack trace below). 
> Disabling the breakpoint winds up creating a Dataset with each line of the 
> csv file duplicated.
> So it appears that multiple threads are using the files iterator or the 
> underling split value (an RDDPartition) and timing wise on my system 2 
> workers wind up processing the same file, with the resulting DataSet having 2 
> copies of each of the input lines.
> This code is not active when parsing an XML file. 
> here is stack trace:
> java.util.NoSuchElementException: next on empty iterator
>   at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>   at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>   at 
> scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:111)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/11/30 09:31:07 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: 
> Lost task 0.0 in stage 0.0 (TID 0, localhost): 
> java.util.NoSuchElementException: next on empty iterator
>   at scala.collection.Iterator$$anon$2.next(I