The patch works as I expect.
The DAG shows the broadcast joining in stage 4, eliminating following stages,
and data generated right after it (There are no shuffle write in this stage any
more).Much faster than before.
If you like this patch back port to 1.5 and 1.6, please vote in
JIRA SPARK-13383
Thanks
Yong
From: [email protected]
To: [email protected]
CC: [email protected]
Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in the
last step
Date: Wed, 23 Mar 2016 20:30:42 -0400
Sounds good.
I will manual merge this patch on 1.6.1, and test again for my case tomorrow on
my environment and will update later.
Thanks
Yong
> Date: Wed, 23 Mar 2016 16:20:23 -0700
> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in the
> last step
> From: [email protected]
> To: [email protected]
> CC: [email protected]
>
> On Wed, Mar 23, 2016 at 10:35 AM, Yong Zhang <[email protected]> wrote:
> > Here is the output:
> >
> > == Parsed Logical Plan ==
> > Project [400+ columns]
> > +- Project [400+ columns]
> > +- Project [400+ columns]
> > +- Project [400+ columns]
> > +- Join Inner, Some((((visid_high#460L = visid_high#948L) &&
> > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- BroadcastHint
> > +- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> > +- Filter (instr(event_list#105,202) > 0)
> > +- Relation[400+ columns] ParquetRelation
> >
> > == Analyzed Logical Plan ==
> > 400+ columns
> > Project [400+ columns]
> > +- Project [400+ columns]
> > +- Project [400+ columns]
> > +- Project [400+ columns]
> > +- Join Inner, Some((((visid_high#460L = visid_high#948L) &&
> > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- BroadcastHint
> > +- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> > +- Filter (instr(event_list#105,202) > 0)
> > +- Relation[400+ columns] ParquetRelation
> >
> > == Optimized Logical Plan ==
> > Project [400+ columns]
> > +- Join Inner, Some((((visid_high#460L = visid_high#948L) && (visid_low#461L
> > = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- Project [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
> > +- BroadcastHint
> > +- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> > +- Filter (instr(event_list#105,202) > 0)
> > +- Relation[400+ columns] ParquetRelation
>
> There is a Project on top of BroadcastHint, which is inserted by
> column pruning rule, that make
> the SparkStratege can not regonize BroadcastHint anymore, it's fixed
> recently in master [1]
>
> https://github.com/apache/spark/pull/11260
>
> Your join should run as expected in master.
>
> > == Physical Plan ==
> > Project [400+ columns]
> > +- Filter (date_time#25L > date_time#513L)
> > +- SortMergeJoin [visid_high#948L,visid_low#949L],
> > [visid_high#460L,visid_low#461L]
> > :- Sort [visid_high#948L ASC,visid_low#949L ASC], false, 0
> > : +- TungstenExchange
> > hashpartitioning(visid_high#948L,visid_low#949L,200), None
> > : +- Scan ParquetRelation[400+ columns] InputPaths:
> > hdfs://xxx/2015/12/17, hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19,
> > hdfs://xxx/2015/12/20, hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22,
> > hdfs://xxx/2015/12/23, hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25,
> > hdfs://xxx/2015/12/26, hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28,
> > hdfs://xxx/2015/12/29, hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31,
> > hdfs://xxx/2016/01/01, hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03,
> > hdfs://xxx/2016/01/04, hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06,
> > hdfs://xxx/2016/01/07, hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09,
> > hdfs://xxx/2016/01/10, hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12,
> > hdfs://xxx/2016/01/13, hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15,
> > hdfs://xxx/2016/01/16, hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18,
> > hdfs://xxx/2016/01/19, hdfs://xxx/2016/01/20, hdfs://xxx/2016/01/21,
> > hdfs://xxx/2016/01/22, hdfs://xxx/2016/01/23, hdfs://xxx/2016/01/24,
> > hdfs://xxx/2016/01/25, hdfs://xxx/2016/01/26, hdfs://xxx/2016/01/27,
> > hdfs://xxx/2016/01/28, hdfs://xxx/2016/01/29, hdfs://xxx/2016/01/30,
> > hdfs://xxx/2016/01/31, hdfs://xxx/2016/02/01, hdfs://xxx/2016/02/02,
> > hdfs://xxx/2016/02/03, hdfs://xxx/2016/02/04, hdfs://xxx/2016/02/05,
> > hdfs://xxx/2016/02/06, hdfs://xxx/2016/02/07, hdfs://xxx/2016/02/08,
> > hdfs://xxx/2016/02/09, hdfs://xxx/2016/02/10, hdfs://xxx/2016/02/11,
> > hdfs://xxx/2016/02/12, hdfs://xxx/2016/02/13, hdfs://xxx/2016/02/14,
> > hdfs://xxx/2016/02/15, hdfs://xxx/2016/02/16, hdfs://xxx/2016/02/17,
> > hdfs://xxx/2016/02/18, hdfs://xxx/2016/02/19, hdfs://xxx/2016/02/20,
> > hdfs://xxx/2016/02/21, hdfs://xxx/2016/02/22, hdfs://xxx/2016/02/23,
> > hdfs://xxx/2016/02/24, hdfs://xxx/2016/02/25, hdfs://xxx/2016/02/26,
> > hdfs://xxx/2016/02/27, hdfs://xxx/2016/02/28, hdfs://xxx/2016/02/29,
> > hdfs://xxx/2016/03/01, hdfs://xxx/2016/03/02, hdfs://xxx/2016/03/03,
> > hdfs://xxx/2016/03/04, hdfs://xxx/2016/03/05, hdfs://xxx/2016/03/06,
> > hdfs://xxx/2016/03/07, hdfs://xxx/2016/03/08, hdfs://xxx/2016/03/09,
> > hdfs://xxx/2016/03/10, hdfs://xxx/2016/03/11, hdfs://xxx/2016/03/12,
> > hdfs://xxx/2016/03/13, hdfs://xxx/2016/03/14, hdfs://xxx/2016/03/15,
> > hdfs://xxx/2016/03/16, hdfs://xxx/2016/03/17
> > +- Sort [visid_high#460L ASC,visid_low#461L ASC], false, 0
> > +- TungstenExchange
> > hashpartitioning(visid_high#460L,visid_low#461L,200), None
> > +- Project
> > [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
> > +- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> > +- Filter (instr(event_list#105,202) > 0)
> > +- Scan
> > ParquetRelation[visid_low#461L,ip#127,soid_e1#30,event_list#105,visid_high#460L,date_time#25L]
> > InputPaths: hdfs://xxx/2016/03/17
> >
> > This dataset has more than 480 columns in parquet file, so I replaced them
> > with "400+ columns", without blow out the email, but I don't think this
> > could do anything with "broadcast" problem.
> >
> > Thanks
> >
> > Yong
> >
> >
> >> Date: Wed, 23 Mar 2016 10:14:19 -0700
> >> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in
> >> the last step
> >> From: [email protected]
> >> To: [email protected]
> >> CC: [email protected]
> >
> >>
> >> The broadcast hint does not work as expected in this case, could you
> >> also how the logical plan by 'explain(true)'?
> >>
> >> On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <[email protected]> wrote:
> >> >
> >> > So I am testing this code to understand "broadcast" feature of DF on
> >> > Spark 1.6.1.
> >> > This time I am not disable "tungsten". Everything is default value,
> >> > except setting memory and cores of my job on 1.6.1.
> >> >
> >> > I am testing the join2 case
> >> >
> >> > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high")
> >> > === historyRaw("visid_high") && trialRaw("visid_low") ===
> >> > historyRaw("visid_low") && trialRaw("date_time") >
> >> > historyRaw("date_time"))
> >> >
> >> > and here is the DAG visualization in the runtime of my testing job:
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > So now, I don't understand how the "broadcast" works on DateFrame in
> >> > Spark. I originally thought it will be the same as "mapjoin" in the hive,
> >> > but can someone explain the DAG above me?
> >> >
> >> > I have one day data about 1.5G compressed parquet file, filter by
> >> > "instr(loadRaw("event_list"), "202") > 0", which will only output about
> >> > 1494
> >> > rows (very small), and it is the "trailRaw" DF in my example.
> >> > Stage 3 has a filter, which I thought is for the trailRaw data, but the
> >> > stage statics doesn't match with the data. I don't know why the input is
> >> > only 78M, and shuffle write is about 97.6KB
> >> >
> >> >
> >> >
> >> >
> >> > The historyRaw will be about 90 days history data, which should be about
> >> > 100G, so it looks like stage 4 is scanning it
> >> >
> >> >
> >> >
> >> >
> >> > Now, my original thought is that small data will be broadcasted to all
> >> > the nodes, and most of history data will be filtered out by the join
> >> > keys,
> >> > at least that will be the "mapjoin" in Hive will do, but from the DAG
> >> > above,
> >> > I didn't see it working this way.
> >> > It is more like that Spark use the SortMerge join to shuffle both data
> >> > across network, and filter on the "reducers" side by the join keys, to
> >> > get
> >> > the final output. But that is not the "broadcast" join supposed to do,
> >> > correct?
> >> > In the last stage, it will be very slow, until it reach and process all
> >> > the history data, shown below as "shuffle read" reaching 720G, to finish.
> >> >
> >> >
> >> >
> >> >
> >> > One thing I notice that if tungsten is enable, the shuffle write volume
> >> > on stage 4 is larger (720G) than when tungsten is disable (506G) in my
> >> > originally run, for the exactly same input. It is an interesting point,
> >> > does
> >> > anyone have some idea about this?
> >> >
> >> >
> >> > Overall, for my test case, "broadcast" join is the exactly most
> >> > optimized way I should use; but somehow, I cannot make it do the same
> >> > way as
> >> > "mapjoin" of Hive, even in Spark 1.6.1.
> >> >
> >> > As I said, this is a just test case. We have some business cases making
> >> > sense to use "broadcast" join, but until I understand exactly how to
> >> > make it
> >> > work as I expect in Spark, I don't know what to do.
> >> >
> >> > Yong
> >> >
> >> > ________________________________
> >> > From: [email protected]
> >> > To: [email protected]
> >> > Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in
> >> > the last step
> >> > Date: Tue, 22 Mar 2016 13:08:31 -0400
> >> >
> >> >
> >> > Please help me understand how the "broadcast" will work on DF in Spark
> >> > 1.5.2.
> >> >
> >> > Below are the 2 joins I tested and the physical plan I dumped:
> >> >
> >> > val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") ===
> >> > historyRaw("visid_high") && trialRaw("visid_low") ===
> >> > historyRaw("visid_low") && trialRaw("date_time") >
> >> > historyRaw("date_time"))
> >> > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high")
> >> > === historyRaw("visid_high") && trialRaw("visid_low") ===
> >> > historyRaw("visid_low") && trialRaw("date_time") >
> >> > historyRaw("date_time"))
> >> >
> >> > join1.explain(true)
> >> > == Physical Plan ==
> >> > Filter (date_time#25L > date_time#513L)
> >> > SortMergeJoin [visid_high#948L,visid_low#949L],
> >> > [visid_high#460L,visid_low#461L]
> >> > ExternalSort [visid_high#948L ASC,visid_low#949L ASC], false
> >> > Exchange hashpartitioning(visid_high#948L,visid_low#949L)
> >> > Scan ParquetRelation[hdfs://xxxxxxxx]
> >> > ExternalSort [visid_high#460L ASC,visid_low#461L ASC], false
> >> > Exchange hashpartitioning(visid_high#460L,visid_low#461L)
> >> > Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
> >> > Filter (instr(event_list#105,202) > 0)
> >> > Scan
> >> > ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
> >> >
> >> > join2.explain(true)
> >> > == Physical Plan ==
> >> > Filter (date_time#25L > date_time#513L)
> >> > BroadcastHashJoin [visid_high#948L,visid_low#949L],
> >> > [visid_high#460L,visid_low#461L], BuildRight
> >> > Scan ParquetRelation[hdfs://xxxxxxxx]
> >> > Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
> >> > Filter (instr(event_list#105,202) > 0)
> >> > Scan
> >> > ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
> >> >
> >> > Obvious, the explain plans are different, but the performance and the
> >> > job execution steps are almost exactly same, as shown in the original
> >> > picture in the email below.
> >> > Keep in mind that I have to run with "--conf
> >> > spark.sql.tungsten.enabled=false", otherwise, the execution plan will do
> >> > the
> >> > tungsten sort.
> >> >
> >> > Now what confusing me is following:
> >> > When using the broadcast join, the job still generates 3 stages, same as
> >> > SortMergeJoin, but I am not sure this makes sense.
> >> > Ideally, in "Broadcast", the first stage scan the "trialRaw" data, using
> >> > the filter (instr(event_list#105,202) > 0), which BTW will filter out
> >> > 99% of
> >> > data, then "broadcasting" remaining data to all the nodes. Then scan
> >> > "historyRaw", while filtering by join with broadcasted data. In the end,
> >> > we
> >> > can say there is one more stage to save the data in the default "200"
> >> > partitions. So there should be ONLY 2 stages enough for this case. Why
> >> > there
> >> > are still 3 stages in this case, just same as "SortMergeJoin", it looks
> >> > like
> >> > "broadcast" not taking effect at all? But the physical plan clearly shows
> >> > that "Broadcast" hint?
> >> >
> >> > Thanks
> >> >
> >> > Yong
> >> >
> >> >
> >> > ________________________________
> >> > From: [email protected]
> >> > To: [email protected]
> >> > Subject: Spark 1.5.2, why the broadcast join shuffle so much data in the
> >> > last step
> >> > Date: Fri, 18 Mar 2016 16:54:16 -0400
> >> >
> >> > Hi, Sparkers:
> >> >
> >> > I have some questions related to generate the parquet output in Spark
> >> > 1.5.2.
> >> >
> >> > I have 2 data sets to join, and I know one is much smaller than the
> >> > other one, so I have the following test code:
> >> >
> >> > val loadRaw = sqlContext.read.parquet("one days of data in parquet
> >> > format")
> >> > val historyRaw = sqlContext.read.parquet("90 days of history data in
> >> > parquet format")
> >> >
> >> > // the trailRaw will be very small, normally only thousands of row from
> >> > 20M of one day's data
> >> > val trialRaw = loadRaw.filter(instr(loadRaw("event_list"), "202") >
> >> > 0).selectExpr("e1 as account_id", "visid_high", "visid_low", "ip")
> >> >
> >> > trialRaw.count
> >> > res0: Long = 1494
> >> >
> >> > // so the trailRaw data is small
> >> >
> >> > val join = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high")
> >> > === historyRaw("visid_high") && trialRaw("visid_low") ===
> >> > historyRaw("visid_low") && trialRaw("date_time") >
> >> > historyRaw("date_time"))
> >> >
> >> > val col_1 = trialRaw("visid_high")
> >> > val col_2 = trialRaw("visid_low")
> >> > val col_3 = trialRaw("date_time")
> >> > val col_4 = trialRaw("ip")
> >> >
> >> > // drop the duplicate columns after join
> >> > val output = join.drop(col1).drop(col2).drop(col3).drop(col4)
> >> > output.write.parquet("hdfs location")
> >> >
> >> > First problem, I think I am facing Spark-10309
> >> >
> >> > Caused by: java.io.IOException: Unable to acquire 67108864 bytes of
> >> > memory
> >> > at
> >> > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
> >> > at
> >> > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138)
> >> > at
> >> > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
> >> >
> >> >
> >> > so I have to disable tungsten (spark.sql.tungsten.enabled=false),
> >> >
> >> > Now the problem is the Spark finishes this job very slow, even worse
> >> > than same logic done in Hive.
> >> > The explain shows the broadcast join is used:
> >> > join.explain(true)
> >> >
> >> > .....
> >> > == Physical Plan ==
> >> > Filter (date_time#25L > date_time#519L)
> >> > BroadcastHashJoin [visid_high#954L,visid_low#955L],
> >> > [visid_high#460L,visid_low#461L], BuildRight
> >> > ConvertToUnsafe
> >> > Scan ParquetRelation[hdfs://xxxxxx][400+ columns shown up here]
> >> > ConvertToUnsafe
> >> > Project [soid_e1#30 AS
> >> > account_id#488,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >> > Filter (instr(event_list#105,202) > 0)
> >> > Scan
> >> > ParquetRelation[hdfs:xxx/data/event_parquet/2016/03/17][visid_high#460L,ip#127,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
> >> > Code Generation: true
> >> >
> >> > I don't understand the statistics shown in the GUI below:
> >> >
> >> >
> >> >
> >> > It looks like the last task will shuffle read all 506.6G data, but this
> >> > DOESN'T make any sense. The final output of 200 files shown below:
> >> >
> >> > hadoop fs -ls hdfs://finalPath | sort -u -k5n
> >> > Found 203 items
> >> > -rw-r--r-- 3 biginetl biginetl 44237 2016-03-18 16:47
> >> > finalPath/_common_metadata
> >> > -rw-r--r-- 3 biginetl biginetl 105534 2016-03-18 15:45
> >> > finalPath/part-r-00069-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> >> > -rw-r--r-- 3 biginetl biginetl 107098 2016-03-18 16:24
> >> > finalPath/part-r-00177-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> >> > .............
> >> > -rw-r--r-- 3 biginetl biginetl 1031400 2016-03-18 16:35
> >> > finalPath/part-r-00187-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> >> > -rw-r--r-- 3 biginetl biginetl 1173678 2016-03-18 16:21
> >> > finalPath/part-r-00120-c534cd56-64b5-4ec8-8ba9-1514791f05ab.snappy.parquet
> >> > -rw-r--r-- 3 biginetl biginetl 12257423 2016-03-18 16:47
> >> > finalPath/_metadata
> >> >
> >> > As we can see, the largest file is only 1.1M, so the total output is
> >> > just about 150M for all 200 files.
> >> > I really don't understand why stage 5 is so slow, and why the shuffle
> >> > read is so BIG.
> >> > Understanding the "broadcast" join in Spark 1.5 is very important for
> >> > our use case, Please tell me what could the reasons behind this.
> >> >
> >> > Thanks
> >> >
> >> > Yong
> >> >
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: [email protected]
> >> For additional commands, e-mail: [email protected]
> >>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>