Github user steveloughran commented on the issue:
https://github.com/apache/spark/pull/19848
Thought some more on this.
Here's a possible workflow for failures which can arise from job attempt
recycling
1. Stage 1, Job ID 0, attempt 1, kicks off task 0 attempt 1, task attempt
ID = 0_1_0_1
1. task attempt ID = 0_1_0_1 drops off the network, pauses for a long time
for GC, etc
1. Job A gives up on attempt 0, kicks off second attempt, task attempt ID
0_1_0_2
1. task attempt ID 0_1_0_2 finishes, asks for commit permission, commits.
1. Stage 1 completes, stage 2 kicks off
1. Stage 2, Job ID 0, attempt 1, kicks off task 0 attempt 1, task attempt
ID = 0_1_0_1
1. Original task attempt 0_1_0_1 finishes, asks driver for permission to
commit.
1. `OutputCommitCoordinator.handleAskPermissionToCommit()` gets that
request.
1. Which it (fortunately) rejects, as ignoring task attempt ID, stage
number is different
1. But it will send back false to the task Attempt
1. Which will cleanup
1. including deleting _temporary/job_0_1/task_0_1_0_1
1. Which the stage 2 task attempt task attempt ID = 0_1_0_1 was using to
write stuff
1. As that no longer exists, `needsTaskCommit()` will return false
1. so that task attempt will silently lose its output.
It's a convoluted failure mode as it depends on the same dest dir being
used, and timings of things, but it does imply that by recycling job IDs, a
partitioned task from a stage can contaminate the next one.
That's across stages. Between jobs, you could have the same workflow.
Indeed, if the entire spark cluster went away the partitioned task attempt
would see a failure and react by deleting its attempt data.
Conclusion: you really need job attempts which are unique across
executions, even sequential ones, unless you can guarantee that all task
attempts from the previous executions are are terminated.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]