GitHub user HyukjinKwon opened a pull request:
https://github.com/apache/spark/pull/16098
[WIP][SPARK-18672][CORE] Close recordwriter in SparkHadoopMapReduceWriter
before committing
## What changes were proposed in this pull request?
It seems some APIs such as `PairRDDFunctions.saveAsHadoopDataset()` do not
close the record writer before issuing the commit for the task.
On Windows, the output in the temp directory is being open and output
committer tries to rename it from temp directory to the output directory after
finishing writing.
So, it fails to move the file. It seems we should close the writer actually
before committing the task like the other writers such as `FileFormatWriter`.
Identified failure was as below:
```
FAILURE! - in org.apache.spark.JavaAPISuite
writeWithNewAPIHadoopFile(org.apache.spark.JavaAPISuite) Time elapsed:
0.25 sec <<< ERROR!
org.apache.spark.SparkException: Job aborted.
at
org.apache.spark.JavaAPISuite.writeWithNewAPIHadoopFile(JavaAPISuite.java:1231)
Caused by: org.apache.spark.SparkException:
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver):
org.apache.spark.SparkException: Task failed while writing rows
at
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.org$apache$spark$internal$io$SparkHadoopMapReduceWriter$$executeTask(SparkHadoopMapReduceWriter.scala:182)
at
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$3.apply(SparkHadoopMapReduceWriter.scala:100)
at
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$3.apply(SparkHadoopMapReduceWriter.scala:99)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not rename
file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/_temporary/attempt_20161201005155_0000_r_000000_0
to
file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/task_20161201005155_0000_r_000000
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:436)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:415)
at
org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at
org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
at
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:153)
at
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:167)
at
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:156)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.org$apache$spark$internal$io$SparkHadoopMapReduceWriter$$executeTask(SparkHadoopMapReduceWriter.scala:168)
... 8 more
Driver stacktrace:
at
org.apache.spark.JavaAPISuite.writeWithNewAPIHadoopFile(JavaAPISuite.java:1231)
Caused by: org.apache.spark.SparkException: Task failed while writing rows
Caused by: java.io.IOException: Could not rename
file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/_temporary/attempt_20161201005155_0000_r_000000_0
to
file:/C:/projects/spark/core/target/tmp/1480553515529-0/output/_temporary/0/task_20161201005155_0000_r_000000
```
This PR proposes to close this before committing the task.
## How was this patch tested?
Manually tested.
**Before**
https://ci.appveyor.com/project/spark-test/spark/build/94-scala-tests
**After**
https://ci.appveyor.com/project/spark-test/spark/build/93-scala-tests
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/HyukjinKwon/spark close-wirter-first
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/16098.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #16098
----
commit 7d76472db2770845f813381a4a810759dc745203
Author: hyukjinkwon <[email protected]>
Date: 2016-12-01T09:44:43Z
Close first
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]