Hi,

My job is writing ~10 partitions with insertInto. With the same input /
output data the total duration of the job is very different depending on
how many partitions the target table has.

Target table with 10 of partitions:
1 min 30 s

Target table with ~10000 partitions:
13 min 0 s

It seems that spark is always fetching the full list of partitions in
target table. When this happens, the cluster is basically idling while
driver is listing partitions.

Here's a thread dump for executor driver from such idle time:
https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20

Is there any way to optimize this currently? Is this a known issue? Any
plans to improve?

My code is essentially:

spark = SparkSession.builder \
    .config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
    .config("hive.exec.dynamic.partition", "true") \
    .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

out_df.write \
    .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
    .insertInto(target_table_name, overwrite=True)

Table has been originally created from spark with saveAsTable.

Does spark need to know anything about the existing partitions though? As a
manual workaround I would write the files directly to the partition
locations, delete existing files first if there's anything in that
partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
PARTITION. This doesn't require previous knowledge on existing partitions.

Thanks.

Reply via email to