Optimising multiple hive table join and query in spark

2020-03-14 Thread Manjunath Shetty H
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We 
are serving a usecase on top of that by joining 4-5 tables using Hive as of 
now. But it is not fast as we wanted it to be, so we are thinking of using 
spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? 
Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (MMdd) as integer.
  *   Query will fetch data for last 7 days from some tables while joining with 
other tables.

Approach we thought of as now :

  *   Create dataframe for each table and partition by same column for all 
tables ( Lets say Country as partition column )
  *   Register all tables as temporary tables
  *   Run the sql query with joins

But the problem we are seeing with this approach is , even though we already 
partitioned using country it still does hashParittioning + shuffle during join. 
All the table join contain `Country` column with some extra column based on the 
table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards
Manjunath


Re: FYI: The evolution on `CHAR` type behavior

2020-03-14 Thread Reynold Xin
I don’t understand this change. Wouldn’t this “ban” confuse the hell out of
both new and old users?

For old users, their old code that was working for char(3) would now stop
working.

For new users, depending on whether the underlying metastore char(3) is
either supported but different from ansi Sql (which is not that big of a
deal if we explain it) or not supported.

On Sat, Mar 14, 2020 at 3:51 PM Dongjoon Hyun 
wrote:

> Hi, All.
>
> Apache Spark has been suffered from a known consistency issue on `CHAR`
> type behavior among its usages and configurations. However, the evolution
> direction has been gradually moving forward to be consistent inside Apache
> Spark because we don't have `CHAR` offically. The following is the summary.
>
> With 1.6.x ~ 2.3.x, `STORED PARQUET` has the following different result.
> (`spark.sql.hive.convertMetastoreParquet=false` provides a fallback to
> Hive behavior.)
>
> spark-sql> CREATE TABLE t1(a CHAR(3));
> spark-sql> CREATE TABLE t2(a CHAR(3)) STORED AS ORC;
> spark-sql> CREATE TABLE t3(a CHAR(3)) STORED AS PARQUET;
>
> spark-sql> INSERT INTO TABLE t1 SELECT 'a ';
> spark-sql> INSERT INTO TABLE t2 SELECT 'a ';
> spark-sql> INSERT INTO TABLE t3 SELECT 'a ';
>
> spark-sql> SELECT a, length(a) FROM t1;
> a   3
> spark-sql> SELECT a, length(a) FROM t2;
> a   3
> spark-sql> SELECT a, length(a) FROM t3;
> a 2
>
> Since 2.4.0, `STORED AS ORC` became consistent.
> (`spark.sql.hive.convertMetastoreOrc=false` provides a fallback to Hive
> behavior.)
>
> spark-sql> SELECT a, length(a) FROM t1;
> a   3
> spark-sql> SELECT a, length(a) FROM t2;
> a 2
> spark-sql> SELECT a, length(a) FROM t3;
> a 2
>
> Since 3.0.0-preview2, `CREATE TABLE` (without `STORED AS` clause) became
> consistent.
> (`spark.sql.legacy.createHiveTableByDefault.enabled=true` provides a
> fallback to Hive behavior.)
>
> spark-sql> SELECT a, length(a) FROM t1;
> a 2
> spark-sql> SELECT a, length(a) FROM t2;
> a 2
> spark-sql> SELECT a, length(a) FROM t3;
> a 2
>
> In addition, in 3.0.0, SPARK-31147 aims to ban `CHAR/VARCHAR` type in the
> following syntax to be safe.
>
> CREATE TABLE t(a CHAR(3));
> https://github.com/apache/spark/pull/27902
>
> This email is sent out to inform you based on the new policy we voted.
> The recommendation is always using Apache Spark's native type `String`.
>
> Bests,
> Dongjoon.
>
> References:
> 1. "CHAR implementation?", 2017/09/15
>
> https://lists.apache.org/thread.html/96b004331d9762e356053b5c8c97e953e398e489d15e1b49e775702f%40%3Cdev.spark.apache.org%3E
> 2. "FYI: SPARK-30098 Use default datasource as provider for CREATE TABLE
> syntax", 2019/12/06
>
> https://lists.apache.org/thread.html/493f88c10169680191791f9f6962fd16cd0ffa3b06726e92ed04cbe1%40%3Cdev.spark.apache.org%3E
>


smime.p7s
Description: S/MIME Cryptographic Signature


FYI: The evolution on `CHAR` type behavior

2020-03-14 Thread Dongjoon Hyun
Hi, All.

Apache Spark has been suffered from a known consistency issue on `CHAR`
type behavior among its usages and configurations. However, the evolution
direction has been gradually moving forward to be consistent inside Apache
Spark because we don't have `CHAR` offically. The following is the summary.

With 1.6.x ~ 2.3.x, `STORED PARQUET` has the following different result.
(`spark.sql.hive.convertMetastoreParquet=false` provides a fallback to Hive
behavior.)

spark-sql> CREATE TABLE t1(a CHAR(3));
spark-sql> CREATE TABLE t2(a CHAR(3)) STORED AS ORC;
spark-sql> CREATE TABLE t3(a CHAR(3)) STORED AS PARQUET;

spark-sql> INSERT INTO TABLE t1 SELECT 'a ';
spark-sql> INSERT INTO TABLE t2 SELECT 'a ';
spark-sql> INSERT INTO TABLE t3 SELECT 'a ';

spark-sql> SELECT a, length(a) FROM t1;
a   3
spark-sql> SELECT a, length(a) FROM t2;
a   3
spark-sql> SELECT a, length(a) FROM t3;
a 2

Since 2.4.0, `STORED AS ORC` became consistent.
(`spark.sql.hive.convertMetastoreOrc=false` provides a fallback to Hive
behavior.)

spark-sql> SELECT a, length(a) FROM t1;
a   3
spark-sql> SELECT a, length(a) FROM t2;
a 2
spark-sql> SELECT a, length(a) FROM t3;
a 2

Since 3.0.0-preview2, `CREATE TABLE` (without `STORED AS` clause) became
consistent.
(`spark.sql.legacy.createHiveTableByDefault.enabled=true` provides a
fallback to Hive behavior.)

spark-sql> SELECT a, length(a) FROM t1;
a 2
spark-sql> SELECT a, length(a) FROM t2;
a 2
spark-sql> SELECT a, length(a) FROM t3;
a 2

In addition, in 3.0.0, SPARK-31147 aims to ban `CHAR/VARCHAR` type in the
following syntax to be safe.

CREATE TABLE t(a CHAR(3));
https://github.com/apache/spark/pull/27902

This email is sent out to inform you based on the new policy we voted.
The recommendation is always using Apache Spark's native type `String`.

Bests,
Dongjoon.

References:
1. "CHAR implementation?", 2017/09/15

https://lists.apache.org/thread.html/96b004331d9762e356053b5c8c97e953e398e489d15e1b49e775702f%40%3Cdev.spark.apache.org%3E
2. "FYI: SPARK-30098 Use default datasource as provider for CREATE TABLE
syntax", 2019/12/06

https://lists.apache.org/thread.html/493f88c10169680191791f9f6962fd16cd0ffa3b06726e92ed04cbe1%40%3Cdev.spark.apache.org%3E


[PySpark] How to write HFiles as an 'append' to the same directory?

2020-03-14 Thread Gautham Acharya
I have a process in Apache Spark that attempts to write HFiles to S3 in a 
batched process. I want the resulting HFiles in the same directory, as they are 
in the same column family. However, I'm getting a 'directory already exists 
error' when I try to run this on AWS EMR. How can I write Hfiles via Spark as 
an 'append', like I can do via a CSV?

The batch writing function looks like this:

for col_group in split_cols:
processed_chunk = 
batch_write_pandas_udf_for_col_aggregation(joined_dataframe, col_group, 
pandas_udf_func, group_by_args)

hfile_writer.write_hfiles(processed_chunk, output_path,
  zookeeper_ip, table_name, 
constants.DEFAULT_COL_FAMILY)

The actual function to write the Hfiles is this:

rdd.saveAsNewAPIHadoopFile(output_path,
   constants.OUTPUT_FORMAT_CLASS,
   keyClass=constants.KEY_CLASS,
   valueClass=constants.VALUE_CLASS,
   keyConverter=constants.KEY_CONVERTER,
   valueConverter=constants.VALUE_CONVERTER,
   conf=conf)

The exception I'm getting:


Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv', 
'metadata_path=/tmp/metadata.csv', 
'output_path=s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles',
 'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal', 
'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'], 
job_name='matrix_transformations')

job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path', 
'/tmp/metadata.csv'], ['output_path', 
's3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'],
 ['group_by_args', 'cluster_id'], ['zookeeper_ip', 
'ip-172-30-5-36.ec2.internal'], ['table_name', 
'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']]

Traceback (most recent call last):

  File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in 

job_module.transform(spark, **job_args)

  File 
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py",
 line 93, in transform

  File 
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py",
 line 73, in write_split_columnwise_transform

  File 
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py",
 line 44, in write_hfiles

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in 
saveAsNewAPIHadoopFile

  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 
line 1257, in __call__

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, 
in deco

  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 
328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.

: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory 
s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median
 already exists

at 
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)

at 
org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393)

at 
org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at 

sample syntax in spark-env.sh for env.

2020-03-14 Thread Zahid Rahman
WARN NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable



*I was chasing this warning when I found misinformationfrom SPARK training
companies such as eudreka who offer weird and wonderful suggestions*
https://www.edureka.co/community/110/hadoop-unable-native-hadoop-library-your-platform-warning



*Meanhwile another SPARK Training company cloudera has claimed resolution
of issue by turning the warning off to an API which may be needed as warned
by SPARK  developer.  *
https://community.cloudera.com/t5/Support-Questions/Apache-Spark-Error-Unable-to-load-native-hadoop-library/td-p/160968


*StackOverflow also has misleading information i.e. change /etc/profile,
 ~/.bashrc.*


*Anyway I found the resolution is to insert the bottom two lines to include
the hadoop librariesnot SPARK_LOCAL_IP.*
SPARK_LOCAL_IP=192.168.0.38
HADOOP_HOME=/home/kub18/hadoop-3.2.1
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native



*The reason for this post is that there was no sample syntax so this is was
my first change.*SPARK_LOCAL_IP=192.168.0.38
export HADOOP_HOME=/home/kub18/hadoop-3.2.1
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native

*That was accepted by SPARK code.*

*My second change was :*SPARK_LOCAL_IP=192.168.0.38
HADOOP_HOME=/home/kub18/hadoop-3.2.1
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native

*that was also accepted.*


*my third change was *SPARK_LOCAL_IP=192.168.0.38
doogie HADOOP_HOME=/home/kub18/hadoop-3.2.1
doogie LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native

*This change produced a doogie command not found error.*


*So it is unclear by having the prefix "export" in spark-env.sh as
illustrated changes the behaviour of SPARK or NOT.*


*my suggestion would be to supply example syntax in the spark-env.sh and
also give information whetherby prefixing "export" to the SPARK environment
variable  makes a difference or not.*

Best Regards
Z.


Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org