GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/16098

    [WIP][SPARK-18672][CORE] Close recordwriter in SparkHadoopMapReduceWriter 
before committing

    ## What changes were proposed in this pull request?
    
    It seems some APIs such as `PairRDDFunctions.saveAsHadoopDataset()` do not 
close the record writer before issuing the commit for the task.
    
    On Windows, the output in the temp directory is being open and output 
committer tries to rename it from temp directory to the output directory after 
finishing writing. 
    
    So, it fails to move the file. It seems we should close the writer actually 
before committing the task like the other writers such as `FileFormatWriter`.
    
    Identified failure was as below:
    
    ```
    FAILURE! - in org.apache.spark.JavaAPISuite
    writeWithNewAPIHadoopFile(org.apache.spark.JavaAPISuite)  Time elapsed: 
0.25 sec  <<< ERROR!
    org.apache.spark.SparkException: Job aborted.
        at 
org.apache.spark.JavaAPISuite.writeWithNewAPIHadoopFile(JavaAPISuite.java:1231)
    Caused by: org.apache.spark.SparkException: 
    Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): 
org.apache.spark.SparkException: Task failed while writing rows
        at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.org$apache$spark$internal$io$SparkHadoopMapReduceWriter$$executeTask(SparkHadoopMapReduceWriter.scala:182)
        at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$3.apply(SparkHadoopMapReduceWriter.scala:100)
        at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$3.apply(SparkHadoopMapReduceWriter.scala:99)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.io.IOException: Could not rename 
file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/_temporary/attempt_20161201005155_0000_r_000000_0
 to 
file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/task_20161201005155_0000_r_000000
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:436)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:415)
        at 
org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
        at 
org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
        at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:153)
        at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:167)
        at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:156)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
        at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.org$apache$spark$internal$io$SparkHadoopMapReduceWriter$$executeTask(SparkHadoopMapReduceWriter.scala:168)
        ... 8 more
    Driver stacktrace:
        at 
org.apache.spark.JavaAPISuite.writeWithNewAPIHadoopFile(JavaAPISuite.java:1231)
    Caused by: org.apache.spark.SparkException: Task failed while writing rows
    Caused by: java.io.IOException: Could not rename 
file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/_temporary/attempt_20161201005155_0000_r_000000_0
 to 
file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/task_20161201005155_0000_r_000000
    ```
    
    This PR proposes to close this before committing the task.
    
    ## How was this patch tested?
    
    Manually tested. 
    
    **Before**
    
    https://ci.appveyor.com/project/spark-test/spark/build/94-scala-tests
    
    **After**
    
    https://ci.appveyor.com/project/spark-test/spark/build/93-scala-tests
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark close-wirter-first

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16098
    
----
commit 7d76472db2770845f813381a4a810759dc745203
Author: hyukjinkwon <gurwls...@gmail.com>
Date:   2016-12-01T09:44:43Z

    Close first

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to