Github user liancheng commented on the pull request:
https://github.com/apache/spark/pull/6864#issuecomment-113024897
Some background and a summary of offline discussion with @yhuai about this
issue:
In 1.4.0, we added `HadoopFsRelation` to abstract partition support of all
data sources that are based on Hadoop `FileSystem` interface. Specifically,
this makes partition discovery, partition pruning, and writing dynamic
partitions for data sources much easier. From users' perspective, what the
write path does is very similar to Hive. However, they differ a lot internally.
When data are inserted into Hive tables via Spark SQL,
`InsertIntoHiveTable` simulates Hive's behaviors:
1. Write data to a temporary location
2. Commit the write job
3. Move data in the temporary location to the final destination location
using
- `Hive.loadTable()` for non-partitioned table
- `Hive.loadPartition()` for static partitions
- `Hive.loadDynamicPartitions()` for dynamic partitions
The important part is that, for appending data to existing tables in step
3, `Hive.copyFiles()` is invoked to move the data (I found the name is kinda
confusing since no "copying" occurs here, we are just moving and renaming
stuff). If a file in the source directory and another file in the destination
directory happen to have the same name, say `part-r-00001.parquet`, the former
is moved to the destination directory and renamed with a `_copy_N` postfix
(`part-r-00001_copy_1.parquet`). That's how Hive avoids name collision.
Some alternatives fixes considered:
1. Use similar approach as Hive
This approach is not preferred in Spark 1.4.0 mainly because file
metadata operations in S3 tend to be slow, especially for tables with lots of
file and/or partitions. That's why `InsertIntoHadoopFsRelation` just inserts
to destination directory directly, and is often used together with
`DirectParquetOutputCommitter` to reduce latency when working with S3. This
means, we don't have the chance to do renaming, and must avoid name collision
from the beginning.
2. Same as 1.3, just move max part number detection back to driver side
This isn't doable because unlike 1.3, 1.4 also takes dynamic
partitioning into account. When inserting into dynamic partitions, we don't
know which partition directories will be touched on driver side before issuing
the write job. Checking all partition directories is simply too expensive for
tables with thousands of partitions.
3. Add extra component to output file names to avoid name collision
This seems to be the only reasonable solution for now.
Currently, the ORC data source adds `System.currentTimeMillis` to the
output file name. This is not 100% safe, but only fails when two tasks with
the same task ID (which implies they belong to two separate concurrent jobs)
are writing to the same location within a same millisecond, which is relatively
unlikely to happen. The benefit of using a time stamp here is that, record
order can be preserved.
Another quite obvious choice is to add a UUID to the output file name.
Obviously, the benefit is this practically avoids name collision. The drawback
is that record order is not preserved any more.
However, we never promise to preserve record order when writing data,
and Hive doesn't promise this either (the `_copy_N` trick breaks record order).
To sum up, adding a UUID to the output file name seems to be the simplest
and safest way to fix this issue.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]