Github user mccheah commented on a diff in the pull request:
https://github.com/apache/spark/pull/4155#discussion_r23509086
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
- try {
- cmtr.commitTask(taCtxt)
- logInfo (taID + ": Committed")
- } catch {
- case e: IOException => {
- logError("Error committing the output of task: " + taID.value, e)
- cmtr.abortTask(taCtxt)
- throw e
+ val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+ val conf = SparkEnv.get.conf
+ val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID,
splitID, attemptID)
+ if (canCommit) {
--- End diff --
But another follow-up question - how did we get away with this case before?
If committer.needsTaskCommit returns false in SparkHadoopWriter it also does
not throw an error. But if some other task that had committer.needsTaskCommit
returned true but failed to commit the output, are we in the same situation as
what @vanzin described albeit in a slightly different code path?
Or to be more explicit, take the 5 steps but slightly adjust them and
assume the code I added in this patch doesn't exist:
1: Task 1 start
2. Task 1 asks committer.needsTaskCommit() which returns true
3. task 1 fails to commit
4. Task 2 starts
5. Task 2 asks committer.needsTaskCommit but that returns false
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]