What I can immediately think of is, as you are doing IN in the where clause for a series of timestamps, if you can consider breaking them and for each epoch timestampYou can load your results to an intermediate staging table and then do a final aggregate from that table keeping the group by same. As it is sum and can be done in two steps.hth
On Thursday, September 5, 2019, 5:10 AM, Himali Patel <himaliben.pa...@thalesgroup.com> wrote: <!--#yiv4167175330 _filtered #yiv4167175330 {font-family:"Cambria Math";panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv4167175330 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv4167175330 {font-family:Monaco;panose-1:0 0 0 0 0 0 0 0 0 0;}#yiv4167175330 #yiv4167175330 p.yiv4167175330MsoNormal, #yiv4167175330 li.yiv4167175330MsoNormal, #yiv4167175330 div.yiv4167175330MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:12.0pt;font-family:"Calibri", sans-serif;}#yiv4167175330 a:link, #yiv4167175330 span.yiv4167175330MsoHyperlink {color:#0563C1;text-decoration:underline;}#yiv4167175330 a:visited, #yiv4167175330 span.yiv4167175330MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv4167175330 pre {margin:0cm;margin-bottom:.0001pt;font-size:10.0pt;font-family:"Courier New";}#yiv4167175330 span.yiv4167175330EmailStyle17 {font-family:"Calibri", sans-serif;}#yiv4167175330 span.yiv4167175330HTMLPreformattedChar {font-family:"Courier New";}#yiv4167175330 p.yiv4167175330p1, #yiv4167175330 li.yiv4167175330p1, #yiv4167175330 div.yiv4167175330p1 {margin:0cm;margin-bottom:.0001pt;font-size:8.5pt;font-family:Monaco;color:#2A00FF;}#yiv4167175330 span.yiv4167175330s1 {color:black;}#yiv4167175330 span.yiv4167175330apple-converted-space {}#yiv4167175330 .yiv4167175330MsoChpDefault {font-family:"Calibri", sans-serif;} _filtered #yiv4167175330 {margin:72.0pt 72.0pt 72.0pt 72.0pt;}#yiv4167175330 div.yiv4167175330WordSection1 {}--> Hello all, We have one use-case where we are aggregating billion of rows. It does huge shuffle. Example : As per ‘Job’ tab on yarn UI When Input size is 350 G something, shuffle size >3 TBs. This increases Non-DFS usage beyond warning limit and thus affecting entire cluster. It seems we need to tune our query / resources. Any suggestions ? 1. Our data is high in cardinality : # input rows are ~15 billion # output rows are ~13 billion 2. Spark version is 1.6 Hive is 1.1 It’s CDH. We query using hive context in spark job. (yarn is resource manager) Hive context has configs as : .setConf("hive.exec.dynamic.partition.mode","nonstrict") .setConf("hive.exec.dynamic.partition","true") .setConf("hive.exec.stagingdir","/tmp/hive/") 3. Our aggregation is done using single query as below : SELECT <list of 16 dimension columns >, SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5, <custom-aggregate-operation>(c1, 'HEX', 'UNION') AS c1, <custom-aggregate-operation>(c2, 'HEX', 'UNION') AS c2, <custom-aggregate-operation>(c3, 'HEX', 'UNION') AS c3, <custom-aggregate-operation>(c4, 'HEX', 'UNION') AS c4, <custom-aggregate-operation>(c5, 'HEX', 'UNION') AS c5, <Epochtime1> AS <partition-column>, <Epochtime1> AS <column2> FROM <table-name> WHERE <partition-column> IN (<Epochtime1> ,<Epochtime2> , <Epochtime3> , <Epochtime4> , <Epochtime5> , <Epochtime6> , <Epochtime7> , <Epochtime8>) GROUP BY <list of 16 dimension columns >. 4. Configs are : spark.master=yarn-client spark.yarn.queue=default spark.executor.instances=52 spark.executor.cores=4 spark.executor.memory=30g spark.driver.memory=25g spark.memory.fraction=0.8 spark.memory.storageFraction=0.1 spark.yarn.executor.memoryOverhead=9500 spark.yarn.driver.memoryOverhead=5120 spark.core.connection.ack.wait.timeout=1000 spark.eventLog.enabled=True spark.eventLog.dir=<> spark.eventLog.overwrite=True spark.sql.shuffle.partitions=1000 How to tune this job ?