The broadcast hint does not work as expected in this case, could you also how the logical plan by 'explain(true)'?
On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <java8...@hotmail.com> wrote: > > So I am testing this code to understand "broadcast" feature of DF on Spark > 1.6.1. > This time I am not disable "tungsten". Everything is default value, except > setting memory and cores of my job on 1.6.1. > > I am testing the join2 case > > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === > historyRaw("visid_high") && trialRaw("visid_low") === > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time")) > > and here is the DAG visualization in the runtime of my testing job: > > > > > > So now, I don't understand how the "broadcast" works on DateFrame in Spark. I > originally thought it will be the same as "mapjoin" in the hive, but can > someone explain the DAG above me? > > I have one day data about 1.5G compressed parquet file, filter by > "instr(loadRaw("event_list"), "202") > 0", which will only output about 1494 > rows (very small), and it is the "trailRaw" DF in my example. > Stage 3 has a filter, which I thought is for the trailRaw data, but the stage > statics doesn't match with the data. I don't know why the input is only 78M, > and shuffle write is about 97.6KB > > > > > The historyRaw will be about 90 days history data, which should be about > 100G, so it looks like stage 4 is scanning it > > > > > Now, my original thought is that small data will be broadcasted to all the > nodes, and most of history data will be filtered out by the join keys, at > least that will be the "mapjoin" in Hive will do, but from the DAG above, I > didn't see it working this way. > It is more like that Spark use the SortMerge join to shuffle both data across > network, and filter on the "reducers" side by the join keys, to get the final > output. But that is not the "broadcast" join supposed to do, correct? > In the last stage, it will be very slow, until it reach and process all the > history data, shown below as "shuffle read" reaching 720G, to finish. > > > > > One thing I notice that if tungsten is enable, the shuffle write volume on > stage 4 is larger (720G) than when tungsten is disable (506G) in my > originally run, for the exactly same input. It is an interesting point, does > anyone have some idea about this? > > > Overall, for my test case, "broadcast" join is the exactly most optimized way > I should use; but somehow, I cannot make it do the same way as "mapjoin" of > Hive, even in Spark 1.6.1. > > As I said, this is a just test case. We have some business cases making sense > to use "broadcast" join, but until I understand exactly how to make it work > as I expect in Spark, I don't know what to do. > > Yong > > ________________________________ > From: java8...@hotmail.com > To: user@spark.apache.org > Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in the > last step > Date: Tue, 22 Mar 2016 13:08:31 -0400 > > > Please help me understand how the "broadcast" will work on DF in Spark 1.5.2. > > Below are the 2 joins I tested and the physical plan I dumped: > > val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") === > historyRaw("visid_high") && trialRaw("visid_low") === > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time")) > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === > historyRaw("visid_high") && trialRaw("visid_low") === > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time")) > > join1.explain(true) > == Physical Plan == > Filter (date_time#25L > date_time#513L) > SortMergeJoin [visid_high#948L,visid_low#949L], > [visid_high#460L,visid_low#461L] > ExternalSort [visid_high#948L ASC,visid_low#949L ASC], false > Exchange hashpartitioning(visid_high#948L,visid_low#949L) > Scan ParquetRelation[hdfs://xxxxxxxx] > ExternalSort [visid_high#460L ASC,visid_low#461L ASC], false > Exchange hashpartitioning(visid_high#460L,visid_low#461L) > Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L] > Filter (instr(event_list#105,202) > 0) > Scan > ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30] > > join2.explain(true) > == Physical Plan == > Filter (date_time#25L > date_time#513L) > BroadcastHashJoin [visid_high#948L,visid_low#949L], > [visid_high#460L,visid_low#461L], BuildRight > Scan ParquetRelation[hdfs://xxxxxxxx] > Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L] > Filter (instr(event_list#105,202) > 0) > Scan > ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30] > > Obvious, the explain plans are different, but the performance and the job > execution steps are almost exactly same, as shown in the original picture in > the email below. > Keep in mind that I have to run with "--conf > spark.sql.tungsten.enabled=false", otherwise, the execution plan will do the > tungsten sort. > > Now what confusing me is following: > When using the broadcast join, the job still generates 3 stages, same as > SortMergeJoin, but I am not sure this makes sense. > Ideally, in "Broadcast", the first stage scan the "trialRaw" data, using the > filter (instr(event_list#105,202) > 0), which BTW will filter out 99% of > data, then "broadcasting" remaining data to all the nodes. Then scan > "historyRaw", while filtering by join with broadcasted data. In the end, we > can say there is one more stage to save the data in the default "200" > partitions. So there should be ONLY 2 stages enough for this case. Why there > are still 3 stages in this case, just same as "SortMergeJoin", it looks like > "broadcast" not taking effect at all? But the physical plan clearly shows > that "Broadcast" hint? > > Thanks > > Yong > > > ________________________________ > From: java8...@hotmail.com > To: user@spark.apache.org > Subject: Spark 1.5.2, why the broadcast join shuffle so much data in the last > step > Date: Fri, 18 Mar 2016 16:54:16 -0400 > > Hi, Sparkers: > > I have some questions related to generate the parquet output in Spark 1.5.2. > > I have 2 data sets to join, and I know one is much smaller than the other > one, so I have the following test code: > > val loadRaw = sqlContext.read.parquet("one days of data in parquet format") > val historyRaw = sqlContext.read.parquet("90 days of history data in parquet > format") > > // the trailRaw will be very small, normally only thousands of row from 20M > of one day's data > val trialRaw = loadRaw.filter(instr(loadRaw("event_list"), "202") > > 0).selectExpr("e1 as account_id", "visid_high", "visid_low", "ip") > > trialRaw.count > res0: Long = 1494 > > // so the trailRaw data is small > > val join = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === > historyRaw("visid_high") && trialRaw("visid_low") === > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time")) > > val col_1 = trialRaw("visid_high") > val col_2 = trialRaw("visid_low") > val col_3 = trialRaw("date_time") > val col_4 = trialRaw("ip") > > // drop the duplicate columns after join > val output = join.drop(col1).drop(col2).drop(col3).drop(col4) > output.write.parquet("hdfs location") > > First problem, I think I am facing Spark-10309 > > Caused by: java.io.IOException: Unable to acquire 67108864 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106) > > > so I have to disable tungsten (spark.sql.tungsten.enabled=false), > > Now the problem is the Spark finishes this job very slow, even worse than > same logic done in Hive. > The explain shows the broadcast join is used: > join.explain(true) > > ..... > == Physical Plan == > Filter (date_time#25L > date_time#519L) > BroadcastHashJoin [visid_high#954L,visid_low#955L], > [visid_high#460L,visid_low#461L], BuildRight > ConvertToUnsafe > Scan ParquetRelation[hdfs://xxxxxx][400+ columns shown up here] > ConvertToUnsafe > Project [soid_e1#30 AS > account_id#488,visid_high#460L,visid_low#461L,date_time#25L,ip#127] > Filter (instr(event_list#105,202) > 0) > Scan > ParquetRelation[hdfs:xxx/data/event_parquet/2016/03/17][visid_high#460L,ip#127,visid_low#461L,date_time#25L,event_list#105,soid_e1#30] > Code Generation: true > > I don't understand the statistics shown in the GUI below: > > > > It looks like the last task will shuffle read all 506.6G data, but this > DOESN'T make any sense. The final output of 200 files shown below: > > hadoop fs -ls hdfs://finalPath | sort -u -k5n > Found 203 items > -rw-r--r-- 3 biginetl biginetl 44237 2016-03-18 16:47 > finalPath/_common_metadata > -rw-r--r-- 3 biginetl biginetl 105534 2016-03-18 15:45 > finalPath/part-r-00069-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet > -rw-r--r-- 3 biginetl biginetl 107098 2016-03-18 16:24 > finalPath/part-r-00177-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet > ............. > -rw-r--r-- 3 biginetl biginetl 1031400 2016-03-18 16:35 > finalPath/part-r-00187-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet > -rw-r--r-- 3 biginetl biginetl 1173678 2016-03-18 16:21 > finalPath/part-r-00120-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet > -rw-r--r-- 3 biginetl biginetl 12257423 2016-03-18 16:47 > finalPath/_metadata > > As we can see, the largest file is only 1.1M, so the total output is just > about 150M for all 200 files. > I really don't understand why stage 5 is so slow, and why the shuffle read is > so BIG. > Understanding the "broadcast" join in Spark 1.5 is very important for our use > case, Please tell me what could the reasons behind this. > > Thanks > > Yong > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org