Re: Tune hive query launched thru spark-yarn job.

2019-09-05 Thread Himali Patel
Hi Sathi,

Thanks for a quick reply, so this ( list of some epoch times in IN clause) was 
part of 30 days aggregation already. As per our input to output aggregation 
ratio, our cardinality is too high. So we require query tuning kind of thing. 
As we can’t assign additional resource for this job.


From: Sathi Chowdhury 
Date: Thursday, 5 September 2019 at 8:10 PM
To: Himali Patel , "user@spark.apache.org" 

Subject: Re: Tune hive query launched thru spark-yarn job.

What I can immediately think of is,
as you are doing IN in the where clause for a series of timestamps, if you can 
consider breaking them and for each epoch timestamp
You can load your results to an intermediate staging table and then do a final 
aggregate from that table keeping the group by same. As it is sum and can be 
done in two steps.
hth






On Thursday, September 5, 2019, 5:10 AM, Himali Patel 
 wrote:

Hello all,



We have one use-case where we are aggregating billion of rows. It does huge 
shuffle.

Example :

As per ‘Job’ tab on yarn UI

When Input size is 350 G something,  shuffle size >3 TBs. This increases 
Non-DFS usage beyond warning limit and thus affecting entire cluster.



It seems we need to tune our query / resources. Any suggestions ?



1.

Our data is high in cardinality :

# input rows are ~15 billion

# output rows are ~13 billion



2.

Spark version is 1.6

Hive is 1.1

It’s CDH.

We query using hive context in spark job. (yarn is resource manager)

Hive context has configs as :

.setConf("hive.exec.dynamic.partition.mode","nonstrict")

.setConf("hive.exec.dynamic.partition","true")

.setConf("hive.exec.stagingdir","/tmp/hive/")



3.

Our aggregation is done using single query as below :

SELECT

,

SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,

(c1, 'HEX', 'UNION') AS c1, 
(c2, 'HEX', 'UNION') AS c2, 
(c3, 'HEX', 'UNION') AS c3, 
(c4, 'HEX', 'UNION') AS c4, 
(c5, 'HEX', 'UNION') AS c5,

  AS ,  AS 

FROM 

WHERE  IN ( , ,  , 
 ,  ,  ,  , )

GROUP BY .



4.

Configs are :

spark.master=yarn-client
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<>

spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000





How to tune this job ?






















Re: Tune hive query launched thru spark-yarn job.

2019-09-05 Thread Sathi Chowdhury
What I can immediately think of is, 
as you are doing IN in the where clause for a series of timestamps, if you can 
consider breaking them and for each epoch timestampYou can load your results to 
an intermediate staging table and then do a final aggregate from that table 
keeping the group by same. As it is sum and can be done in two steps.hth



On Thursday, September 5, 2019, 5:10 AM, Himali Patel 
 wrote:

 
Hello all,
 
  
 
We have one use-case where we are aggregating billion of rows. It does huge 
shuffle.
 
Example : 
 
As per ‘Job’ tab on yarn UI
 
When Input size is 350 G something,  shuffle size >3 TBs. This increases 
Non-DFS usage beyond warning limit and thus affecting entire cluster.
 
  
 
It seems we need to tune our query / resources. Any suggestions ?
 
  
 
1.
 
Our data is high in cardinality :
 
# input rows are ~15 billion
 
# output rows are ~13 billion
 
  
 
2.
 
Spark version is 1.6
 
Hive is 1.1
 
It’s CDH.
 
We query using hive context in spark job. (yarn is resource manager)
 
Hive context has configs as :
 
.setConf("hive.exec.dynamic.partition.mode","nonstrict")
 
.setConf("hive.exec.dynamic.partition","true")
 
.setConf("hive.exec.stagingdir","/tmp/hive/")
 
  
 
3.
 
Our aggregation is done using single query as below :
 
SELECT 
 
,
 
SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,
 
(c1, 'HEX', 'UNION') AS c1, 
(c2, 'HEX', 'UNION') AS c2, 
(c3, 'HEX', 'UNION') AS c3, 
(c4, 'HEX', 'UNION') AS c4, 
(c5, 'HEX', 'UNION') AS c5, 
 
  AS ,  AS 
 
FROM 
 
WHERE  IN ( , ,  , 
 ,  ,  ,  , )
 
GROUP BY .
 
  
 
4.
 
Configs are : 
 spark.master=yarn-client 
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<> spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000       How to tune this job ?