Github user steveloughran commented on the issue:
https://github.com/apache/spark/pull/21286
> After the job is committed, `skip_dir/tab1/_temporary` will be deleted.
Then when other jobs attempt to commit, an error will be reported.
I see. Yes, that's
`org.apache.hadoop.mapreduce.OutputCommitter.cleanupJob()` doing the work. It
does this as it wants to cleanup all attempts, including predecessors which
have failed, and expects only one job to be writing at a time.
Like I said, this proposed patch breaks all the blobstore-specific
committer work, causes problems at scale with HDFS alone, and adds a new
problem: how do you clean up from failed jobs writing to the same destination?
It's causing these problems because it's using another layer of temp dir
and then the rename.
Assuming you only want to work with
`org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter` and subclasses
thereof (like the Parquet one), why not
1. Pick up my SPARK-23977 patch and Hadoop 3.1. There are some problems
with hive versioning there, but that is a WiP of mine.
1. make your own subclass of `FileOutputCommitter` whose `cleanupJob()`
method doesn't do that full `$dest/_temporary` dir cleanup, just deletes the
current job ID's subdir
1. Configure the jobs (new) committer factory underneath the
FileOutputFormat to return your committer; do the same for parquet via the
`BindingParquetOutputCommitter`.
That way, you get to choose cleanup policy, don't create conflict, don't
need to rename things.
There's also the option of providing a MAPREDUCE- patch to add a switch to
change cleanup to only purge that job's data...you'd need to make sure all
attempts of that job get cleaned up, as MR can make multiple attempts. There's
a general fear of going near that class as its such a critical piece of code,
but cleanup is not the bit everyone is scared of. Get a change in there and all
the file output committer subclasses get it. That'd be for Hadoop 3.2 & 2.10;
no need to change anything in spark other than the job ID problem.
> Meanwhile, due to all applications share the same app appempt id, they
write temporary data to the same temporary dir `skip_dir/tab1/_temporary/0`.
Data committed by the successful application is also corrupted.
that's the issue we've been discussing related to job IDs. If each spark
driver comes up with a unique job ID, that conflict will go away.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]