What I can immediately think of is,
as you are doing IN in the where clause for a series of timestamps, if you can
consider breaking them and for each epoch timestampYou can load your results to
an intermediate staging table and then do a final aggregate from that table
keeping the group by same. As it is sum and can be done in two steps.hth
On Thursday, September 5, 2019, 5:10 AM, Himali Patel
wrote:
Hello all,
We have one use-case where we are aggregating billion of rows. It does huge
shuffle.
Example :
As per ‘Job’ tab on yarn UI
When Input size is 350 G something, shuffle size >3 TBs. This increases
Non-DFS usage beyond warning limit and thus affecting entire cluster.
It seems we need to tune our query / resources. Any suggestions ?
1.
Our data is high in cardinality :
# input rows are ~15 billion
# output rows are ~13 billion
2.
Spark version is 1.6
Hive is 1.1
It’s CDH.
We query using hive context in spark job. (yarn is resource manager)
Hive context has configs as :
.setConf("hive.exec.dynamic.partition.mode","nonstrict")
.setConf("hive.exec.dynamic.partition","true")
.setConf("hive.exec.stagingdir","/tmp/hive/")
3.
Our aggregation is done using single query as below :
SELECT
,
SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,
(c1, 'HEX', 'UNION') AS c1,
(c2, 'HEX', 'UNION') AS c2,
(c3, 'HEX', 'UNION') AS c3,
(c4, 'HEX', 'UNION') AS c4,
(c5, 'HEX', 'UNION') AS c5,
AS , AS
FROM
WHERE IN ( , , ,
, , , , )
GROUP BY .
4.
Configs are :
spark.master=yarn-client
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<> spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000 How to tune this job ?