Github user liancheng commented on the pull request:
https://github.com/apache/spark/pull/8191#issuecomment-131598385
@marmbrus Had some offline discussions with @yhuai, and we believe that the
real problem we hit in the job behind SPARK-9899 is SPARK-10005 (fixed by
#8228).
Several known facts:
1. The failed job we observed is a CTAS job
1. First attempt of a task fails because of SPARK-10005
1. Successive attempts of a tasks fail because of the "File already
exists" error
1. When using S3, `FileSystem` first writes files to local disk and
uploads them when writers are closed
1. We are using direct output committer, so different attempts of a task
writes to exactly the same output path
We suspect things probably happen in the following order:
1. CTAS job gets translated into a `InsertIntoHadoopFsRelation` physical
plan, a write job is issued
1. The 1st attempt of a write task `T1` is issued, it aims output path
`P`, but firstly opens a local temporary file `F`
1. `T1` tries to read data from existing Parquet files and write rows to
`F`
1. `T1` fails due to SPARK-10005 and aborts
1. Output writer used by `T1` is closed in `abortTask`, `F` gets closed
1. **`F` gets uploaded to S3 as an empty/corrupted file**, now `P` is
created
1. Write task `T2` is issued as the 2nd attempt of `T1`
1. `T2` tries to create the output file, but hit "File already exists"
because of existing S3 file `P` uploaded by `T1`
The hard part is that, we can't ...
1. ... delete the target file at the beginning of a task (as what this PR
does)
Because the very case you mentioned.
1. ... using `FileSystem.create(path, true)` to overwrite the output file
Since it's equivalent to 1.
1. ... delete the output file in `abortTask()` when a task fails
Because failed speculative tasks may delete properly committed output
file written by other successful attempt(s)
What makes it worse, consider the following case:
1. Write task `T1` gets issued, and executes slowly
1. Task `T2` gets issued as speculative task of `T1`
1. `T1` succeeds, output file gets uploaded to S3 in `commitTask()`
1. `T2` failed, partially written temporary output file gets uploaded to
S3 in `abortTask()`
Now correct data is overwritten by corrupted data.
The TL;DR is that, S3 + direct output committer + speculation is a REALLY
bad combination. Currently I don't see a solution to fix all the cases
mentioned above. My suggestion is to deprecate this combination by checking
whether speculation is enabled when a direct output committer is used.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]