Maybe set spark.hadoop.validateOutputSpecs=false?
发件人: Gautham Acharya
发送时间: 2020年3月15日 3:23
收件人: user@spark.apache.org
主题: [PySpark] How to write HFiles as an 'append' to the same directory?
I have a process in Apache Spark that attempts to write HFiles to S3 in a
batched process. I want the resulting HFiles in the same directory, as they are
in the same column family. However, I’m getting a ‘directory already exists
error’ when I try to run this on AWS EMR. How can I write Hfiles via Spark as
an ‘append’, like I can do via a CSV?
The batch writing function looks like this:
for col_group in split_cols:
processed_chunk =
batch_write_pandas_udf_for_col_aggregation(joined_dataframe, col_group,
pandas_udf_func, group_by_args)
hfile_writer.write_hfiles(processed_chunk, output_path,
zookeeper_ip, table_name,
constants.DEFAULT_COL_FAMILY)
The actual function to write the Hfiles is this:
rdd.saveAsNewAPIHadoopFile(output_path,
constants.OUTPUT_FORMAT_CLASS,
keyClass=constants.KEY_CLASS,
valueClass=constants.VALUE_CLASS,
keyConverter=constants.KEY_CONVERTER,
valueConverter=constants.VALUE_CONVERTER,
conf=conf)
The exception I’m getting:
Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv',
'metadata_path=/tmp/metadata.csv',
'output_path=s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles',
'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal',
'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'],
job_name='matrix_transformations')
job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path',
'/tmp/metadata.csv'], ['output_path',
's3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'],
['group_by_args', 'cluster_id'], ['zookeeper_ip',
'ip-172-30-5-36.ec2.internal'], ['table_name',
'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']]
Traceback (most recent call last):
File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in
job_module.transform(spark, **job_args)
File
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py",
line 93, in transform
File
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py",
line 73, in write_split_columnwise_transform
File
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py",
line 44, in write_hfiles
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in
saveAsNewAPIHadoopFile
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63,
in deco
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory
s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median
already exists
at
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
at
org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393)
at
org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)