回复: [PySpark] How to write HFiles as an 'append' to the same directory?

2020-03-16 Thread Zhang Victor
Maybe set spark.hadoop.validateOutputSpecs=false?


发件人: Gautham Acharya 
发送时间: 2020年3月15日 3:23
收件人: user@spark.apache.org 
主题: [PySpark] How to write HFiles as an 'append' to the same directory?


I have a process in Apache Spark that attempts to write HFiles to S3 in a 
batched process. I want the resulting HFiles in the same directory, as they are 
in the same column family. However, I’m getting a ‘directory already exists 
error’ when I try to run this on AWS EMR. How can I write Hfiles via Spark as 
an ‘append’, like I can do via a CSV?



The batch writing function looks like this:

for col_group in split_cols:
processed_chunk = 
batch_write_pandas_udf_for_col_aggregation(joined_dataframe, col_group, 
pandas_udf_func, group_by_args)

hfile_writer.write_hfiles(processed_chunk, output_path,
  zookeeper_ip, table_name, 
constants.DEFAULT_COL_FAMILY)



The actual function to write the Hfiles is this:

rdd.saveAsNewAPIHadoopFile(output_path,
   constants.OUTPUT_FORMAT_CLASS,
   keyClass=constants.KEY_CLASS,
   valueClass=constants.VALUE_CLASS,
   keyConverter=constants.KEY_CONVERTER,
   valueConverter=constants.VALUE_CONVERTER,
   conf=conf)



The exception I’m getting:



Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv', 
'metadata_path=/tmp/metadata.csv', 
'output_path=s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles',
 'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal', 
'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'], 
job_name='matrix_transformations')

job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path', 
'/tmp/metadata.csv'], ['output_path', 
's3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'],
 ['group_by_args', 'cluster_id'], ['zookeeper_ip', 
'ip-172-30-5-36.ec2.internal'], ['table_name', 
'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']]

Traceback (most recent call last):

  File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in 

job_module.transform(spark, **job_args)

  File 
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py",
 line 93, in transform

  File 
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py",
 line 73, in write_split_columnwise_transform

  File 
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py",
 line 44, in write_hfiles

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in 
saveAsNewAPIHadoopFile

  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 
line 1257, in __call__

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, 
in deco

  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 
328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.

: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory 
s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median
 already exists

at 
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)

at 
org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393)

at 
org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)

   

SparkAppHandle can not stop application in yarn client mode

2020-02-05 Thread Zhang Victor
Hi all,

When using spark launcher starts app in yarn client mode, the 
sparkAppHandle#stop() can not stop the application.


SparkLauncher launcher = new SparkLauncher()
.setAppName("My Launcher")
.setJavaHome("/usr/bin/hadoop/software/java")
.setSparkHome("/usr/bin/hadoop/software/sparkonyarn")
.setConf("spark.executor.instances", "1")
.setConf("spark.executor.memory", "1G")
.setConf(SparkLauncher.EXECUTOR_CORES, "1")
.setAppResource(jarPath)
.setMainClass(mainClass);


SparkAppHandle sparkAppHandle = launcher.startApplication(new 
SparkAppHandle.Listener() {...});

When app started, the launcher app receive a signal outside and stop the spark 
app.


sparkAppHandle.stop();

But the spark app is still running.

In yarn cluster mode, sparkAppHandle#stop() can stop the spark app.

I find some code in org/apache/spark/deploy/yarn/Client.scala


private val launcherBackend = new LauncherBackend() {
  override protected def conf: SparkConf = sparkConf

  override def onStopRequest(): Unit = {
if (isClusterMode && appId != null) {
  yarnClient.killApplication(appId)
} else {
  setState(SparkAppHandle.State.KILLED)
  stop()
}
  }
}

def stop(): Unit = {
  launcherBackend.close()
  yarnClient.stop()
}

Maybe only stop the yarn client and not stop the spark app.

My spark version is 2.3.1 and I tried 3.0.0-preview, but the same result.

Can anyone help me?

Thanks.