The broadcast hint does not work as expected in this case, could you
also how the logical plan by 'explain(true)'?

On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <java8...@hotmail.com> wrote:
>
> So I am testing this code to understand "broadcast" feature of DF on Spark 
> 1.6.1.
> This time I am not disable "tungsten". Everything is default value, except 
> setting memory and cores of my job on 1.6.1.
>
> I am testing the join2 case
>
> val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>
> and here is the DAG visualization in the runtime of my testing job:
>
>
>
>
>
> So now, I don't understand how the "broadcast" works on DateFrame in Spark. I 
> originally thought it will be the same as "mapjoin" in the hive, but can 
> someone explain the DAG above me?
>
> I have one day data about 1.5G compressed parquet file, filter by 
> "instr(loadRaw("event_list"), "202") > 0", which will only output about 1494 
> rows (very small), and it is the "trailRaw" DF in my example.
> Stage 3 has a filter, which I thought is for the trailRaw data, but the stage 
> statics doesn't match with the data. I don't know why the input is only 78M, 
> and shuffle write is about 97.6KB
>
>
>
>
> The historyRaw will be about 90 days history data, which should be about 
> 100G, so it looks like stage 4 is scanning it
>
>
>
>
> Now, my original thought is that small data will be broadcasted to all the 
> nodes, and most of history data will be filtered out by the join keys, at 
> least that will be the "mapjoin" in Hive will do, but from the DAG above, I 
> didn't see it working this way.
> It is more like that Spark use the SortMerge join to shuffle both data across 
> network, and filter on the "reducers" side by the join keys, to get the final 
> output. But that is not the "broadcast" join supposed to do, correct?
> In the last stage, it will be very slow, until it reach and process all the 
> history data,  shown below as "shuffle read" reaching 720G, to finish.
>
>
>
>
> One thing I notice that if tungsten is enable, the shuffle write volume on 
> stage 4 is larger (720G) than when tungsten is disable (506G) in my 
> originally run, for the exactly same input. It is an interesting point, does 
> anyone have some idea about this?
>
>
> Overall, for my test case, "broadcast" join is the exactly most optimized way 
> I should use; but somehow, I cannot make it do the same way as "mapjoin" of 
> Hive, even in Spark 1.6.1.
>
> As I said, this is a just test case. We have some business cases making sense 
> to use "broadcast" join, but until I understand exactly how to make it work 
> as I expect in Spark, I don't know what to do.
>
> Yong
>
> ________________________________
> From: java8...@hotmail.com
> To: user@spark.apache.org
> Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in the 
> last step
> Date: Tue, 22 Mar 2016 13:08:31 -0400
>
>
> Please help me understand how the "broadcast" will work on DF in Spark 1.5.2.
>
> Below are the 2 joins I tested and the physical plan I dumped:
>
> val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
> val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>
> join1.explain(true)
> == Physical Plan ==
> Filter (date_time#25L > date_time#513L)
>  SortMergeJoin [visid_high#948L,visid_low#949L], 
> [visid_high#460L,visid_low#461L]
>   ExternalSort [visid_high#948L ASC,visid_low#949L ASC], false
>    Exchange hashpartitioning(visid_high#948L,visid_low#949L)
>     Scan ParquetRelation[hdfs://xxxxxxxx]
>   ExternalSort [visid_high#460L ASC,visid_low#461L ASC], false
>    Exchange hashpartitioning(visid_high#460L,visid_low#461L)
>     Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>      Filter (instr(event_list#105,202) > 0)
>       Scan 
> ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>
> join2.explain(true)
> == Physical Plan ==
> Filter (date_time#25L > date_time#513L)
>  BroadcastHashJoin [visid_high#948L,visid_low#949L], 
> [visid_high#460L,visid_low#461L], BuildRight
>     Scan ParquetRelation[hdfs://xxxxxxxx]
> Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>    Filter (instr(event_list#105,202) > 0)
>     Scan 
> ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>
> Obvious, the explain plans are different, but the performance and the job 
> execution steps are almost exactly same, as shown in the original picture in 
> the email below.
> Keep in mind that I have to run with "--conf 
> spark.sql.tungsten.enabled=false", otherwise, the execution plan will do the 
> tungsten sort.
>
> Now what confusing me is following:
> When using the broadcast join, the job still generates 3 stages, same as 
> SortMergeJoin, but I am not sure this makes sense.
> Ideally, in "Broadcast", the first stage scan the "trialRaw" data, using the 
> filter (instr(event_list#105,202) > 0), which BTW will filter out 99% of 
> data, then "broadcasting" remaining data to all the nodes. Then scan 
> "historyRaw", while filtering by join with broadcasted data. In the end, we 
> can say there is one more stage to save the data in the default "200" 
> partitions. So there should be ONLY 2 stages enough for this case. Why there 
> are still 3 stages in this case, just same as "SortMergeJoin", it looks like 
> "broadcast" not taking effect at all? But the physical plan clearly shows 
> that "Broadcast" hint?
>
> Thanks
>
> Yong
>
>
> ________________________________
> From: java8...@hotmail.com
> To: user@spark.apache.org
> Subject: Spark 1.5.2, why the broadcast join shuffle so much data in the last 
> step
> Date: Fri, 18 Mar 2016 16:54:16 -0400
>
> Hi, Sparkers:
>
> I have some questions related to generate the parquet output in Spark 1.5.2.
>
> I have 2 data sets to join, and I know one is much smaller than the other 
> one, so I have the following test code:
>
> val loadRaw = sqlContext.read.parquet("one days of data in parquet format")
> val historyRaw = sqlContext.read.parquet("90 days of history data in parquet 
> format")
>
> // the trailRaw will be very small, normally only thousands of row from 20M 
> of one day's data
> val trialRaw = loadRaw.filter(instr(loadRaw("event_list"), "202") > 
> 0).selectExpr("e1 as account_id", "visid_high", "visid_low", "ip")
>
> trialRaw.count
> res0: Long = 1494
>
> // so the trailRaw data is small
>
> val join = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>
> val col_1 = trialRaw("visid_high")
> val col_2 = trialRaw("visid_low")
> val col_3 = trialRaw("date_time")
> val col_4 = trialRaw("ip")
>
> // drop the duplicate columns after join
> val output = join.drop(col1).drop(col2).drop(col3).drop(col4)
> output.write.parquet("hdfs location")
>
> First problem, I think I am facing Spark-10309
>
> Caused by: java.io.IOException: Unable to acquire 67108864 bytes of memory
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
>
>
> so I have to disable tungsten (spark.sql.tungsten.enabled=false),
>
> Now the problem is the Spark finishes this job very slow, even worse than 
> same logic done in  Hive.
> The explain shows the broadcast join is used:
> join.explain(true)
>
> .....
> == Physical Plan ==
> Filter (date_time#25L > date_time#519L)
>  BroadcastHashJoin [visid_high#954L,visid_low#955L], 
> [visid_high#460L,visid_low#461L], BuildRight
>   ConvertToUnsafe
>    Scan ParquetRelation[hdfs://xxxxxx][400+ columns shown up here]
>   ConvertToUnsafe
>    Project [soid_e1#30 AS 
> account_id#488,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>     Filter (instr(event_list#105,202) > 0)
>      Scan 
> ParquetRelation[hdfs:xxx/data/event_parquet/2016/03/17][visid_high#460L,ip#127,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
> Code Generation: true
>
>  I don't understand the statistics shown in the GUI below:
>
>
>
> It looks like the last task will shuffle read all 506.6G data, but this 
> DOESN'T make any sense. The final output of 200 files shown below:
>
> hadoop fs -ls hdfs://finalPath | sort -u -k5n
> Found 203 items
> -rw-r--r--   3 biginetl biginetl      44237 2016-03-18 16:47 
> finalPath/_common_metadata
> -rw-r--r--   3 biginetl biginetl     105534 2016-03-18 15:45 
> finalPath/part-r-00069-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> -rw-r--r--   3 biginetl biginetl     107098 2016-03-18 16:24 
> finalPath/part-r-00177-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> .............
> -rw-r--r--   3 biginetl biginetl    1031400 2016-03-18 16:35 
> finalPath/part-r-00187-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> -rw-r--r--   3 biginetl biginetl    1173678 2016-03-18 16:21 
> finalPath/part-r-00120-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> -rw-r--r--   3 biginetl biginetl   12257423 2016-03-18 16:47 
> finalPath/_metadata
>
> As we can see, the largest file is only 1.1M, so the total output is just 
> about 150M for all 200 files.
> I really don't understand why stage 5 is so slow, and why the shuffle read is 
> so BIG.
> Understanding the "broadcast" join in Spark 1.5 is very important for our use 
> case, Please tell me what could the reasons behind this.
>
> Thanks
>
> Yong
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to