I encountered a similar problem when trying to: ds.write().save(“s3a://some-bucket/some/path/table”);
which writes the content as a bunch of parquet files in the “folder” named “table”. I am using a Flintrock cluster with the Spark 3.0 preview FWIW. Anyway, I just used the AWS SDK to remove it (and any “subdirectories”) before kicking off the spark machinery. I can show you how to do this in Java, but I think the Python SDK maybe significantly different. Steve C On 15 Mar 2020, at 6:23 am, Gautham Acharya <gauth...@alleninstitute.org<mailto:gauth...@alleninstitute.org>> wrote: I have a process in Apache Spark that attempts to write HFiles to S3 in a batched process. I want the resulting HFiles in the same directory, as they are in the same column family. However, I’m getting a ‘directory already exists error’ when I try to run this on AWS EMR. How can I write Hfiles via Spark as an ‘append’, like I can do via a CSV? The batch writing function looks like this: for col_group in split_cols: processed_chunk = batch_write_pandas_udf_for_col_aggregation(joined_dataframe, col_group, pandas_udf_func, group_by_args) hfile_writer.write_hfiles(processed_chunk, output_path, zookeeper_ip, table_name, constants.DEFAULT_COL_FAMILY) The actual function to write the Hfiles is this: rdd.saveAsNewAPIHadoopFile(output_path, constants.OUTPUT_FORMAT_CLASS, keyClass=constants.KEY_CLASS, valueClass=constants.VALUE_CLASS, keyConverter=constants.KEY_CONVERTER, valueConverter=constants.VALUE_CONVERTER, conf=conf) The exception I’m getting: Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv', 'metadata_path=/tmp/metadata.csv', 'output_path=s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles', 'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal', 'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'], job_name='matrix_transformations') job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path', '/tmp/metadata.csv'], ['output_path', 's3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'], ['group_by_args', 'cluster_id'], ['zookeeper_ip', 'ip-172-30-5-36.ec2.internal'], ['table_name', 'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']] Traceback (most recent call last): File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in <module> job_module.transform(spark, **job_args) File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 93, in transform File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 73, in write_split_columnwise_transform File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py", line 44, in write_hfiles File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in saveAsNewAPIHadoopFile File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. : org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median already exists at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146) at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393) at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:991) at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:584) at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) This email contains confidential information of and is the copyright of Infomedia. It must not be forwarded, amended or disclosed without consent of the sender. If you received this message by mistake, please advise the sender and delete all copies. Security of transmission on the internet cannot be guaranteed, could be infected, intercepted, or corrupted and you should ensure you have suitable antivirus protection in place. By sending us your or any third party personal details, you consent to (or confirm you have obtained consent from such third parties) to Infomedia’s privacy policy. http://www.infomedia.com.au/privacy-policy/