Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/4155#discussion_r23575011
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
- try {
- cmtr.commitTask(taCtxt)
- logInfo (taID + ": Committed")
- } catch {
- case e: IOException => {
- logError("Error committing the output of task: " + taID.value, e)
- cmtr.abortTask(taCtxt)
- throw e
+ val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+ val conf = SparkEnv.get.conf
+ val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID,
splitID, attemptID)
+ if (canCommit) {
--- End diff --
Ah, I missed this discussion since it got collapsed under a changed diff.
I'll have to revisit the docs / usage to confirm this, but I think that the
Hadoop `needsTaskCommit` was a necessary but not sufficient condition for
committing where `needsTaskCommit` should always return `true` for a task that
has not had its output committed (in fact, I think it would be valid for it to
always return `true`, even for already-committed output).
I agree that task 2 should throw an error, since it seems like doing
otherwise would also lead to missing output: as soon as the scheduler sees one
successful completion for a task, it won't re-run that task, so we need it to
be the case that "successful task completion" implies "output committed."
To run with your example, I think that task 2 would see `needsTaskCommit =
false` because the output hasn't been committed yet, but there's still a
problem that can lead to missing output. Imagine that we had this interleaving:
1: Tasks 1 and 2 start
2. Task 1 asks committer.needsTaskCommit() which returns true
3. Task 2 asks committer.needsTaskCommit(), which returns false. It exits
without throwing an exception and reports success.
4. Task 1 fails to commit
In this case, one copy of the task has reported success even though output
was not committed, so that partition will be missing because we won't
re-schedule an attempt to commit.
So, I agree with @vanzin: if the DAGScheduler did not authorize the commit,
then we should throw an exception. I think that this exception will hopefully
be rare in practice because needsTaskCommit should ideally return `false` for
tasks that are definitely committed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]