Re: Dataframe broadcast join hint not working
If your query plan has "Project" in it, there is a bug in Spark preventing "broadcast" hint working in pre-2.0 release. https://issues.apache.org/jira/browse/SPARK-13383 Unfortunately, there is no port fix in 1.x. Yong From: Anton Okolnychyi Sent: Saturday, November 26, 2016 4:05 PM To: Swapnil Shinde Cc: Benyi Wang; user@spark.apache.org Subject: Re: Dataframe broadcast join hint not working Hi guys, I also experienced a situation when Spark 1.6.2 ignored my hint to do a broadcast join (i.e. broadcast(df)) with a small dataset. However, this happened only in 1 of 3 cases. Setting the "spark.sql.autoBroadcastJoinThreshold" property did not have any impact as well. All 3 cases work fine in Spark 2.0. Is there any chance that Spark can neglect manually specified broadcast operation? In other words, is Spark forced to perform a broadcast if one specifies "df1.join(broadcast(df2), ...)"? Best regards, Anton 2016-11-26 21:04 GMT+01:00 Swapnil Shinde mailto:swapnilushi...@gmail.com>>: I am using Spark 1.6.3 and below is the real plan (a,b,c in above were just for illustration purpose) == Physical Plan == Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815] +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801], [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None :- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0 : +- TungstenExchange hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200), None : +- Project [_1#3797 AS ltt#3800,_2#3798 AS mr_demo_id#3801,_3#3799 AS mr_demoname#3802] :+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799] +- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0 +- TungstenExchange hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200), None +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813] +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804 AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806 AS etv_demo_id#3813] +- Filter ((map_type#3809 = master_roster_to_etv) && NOT (demogroup#3803 = gender_age_id)) +- Scan ExistingRDD[demogroup#3803,demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,demovalue_old_map#3808,map_type#3809] Thanks Swapnil On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang mailto:bewang.t...@gmail.com>> wrote: Could you post the result of explain `c.explain`? If it is broadcast join, you will see it in explain. On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde mailto:swapnilushi...@gmail.com>> wrote: Hello I am trying a broadcast join on dataframes but it is still doing SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold higher but still no luck. Related piece of code- val c = a.join(braodcast(b), "id") On a side note, if I do SizeEstimator.estimate(b) and it is really high(460956584 bytes) compared to data it contains. b has just 85 rows and around 4964 bytes. Help is very much appreciated!! Thanks Swapnil
Re: Dataframe broadcast join hint not working
Hi guys, I also experienced a situation when Spark 1.6.2 ignored my hint to do a broadcast join (i.e. broadcast(df)) with a small dataset. However, this happened only in 1 of 3 cases. Setting the "spark.sql.autoBroadcastJoinThreshold" property did not have any impact as well. All 3 cases work fine in Spark 2.0. Is there any chance that Spark can neglect manually specified broadcast operation? In other words, is Spark forced to perform a broadcast if one specifies "df1.join(broadcast(df2), ...)"? Best regards, Anton 2016-11-26 21:04 GMT+01:00 Swapnil Shinde : > I am using Spark 1.6.3 and below is the real plan (a,b,c in above were > just for illustration purpose) > > == Physical Plan == > Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN > mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815] > +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801], > [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None >:- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0 >: +- TungstenExchange > hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200), > None >: +- Project [_1#3797 AS ltt#3800,_2#3798 AS > mr_demo_id#3801,_3#3799 AS mr_demoname#3802] >:+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799] >+- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0 > +- TungstenExchange > hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200), > None > +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813] > +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804 > AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806 > AS etv_demo_id#3813] >+- Filter ((map_type#3809 = master_roster_to_etv) && NOT > (demogroup#3803 = gender_age_id)) > +- Scan ExistingRDD[demogroup#3803, > demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807, > demovalue_old_map#3808,map_type#3809] > > > Thanks > Swapnil > > On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang wrote: > >> Could you post the result of explain `c.explain`? If it is broadcast >> join, you will see it in explain. >> >> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde < >> swapnilushi...@gmail.com> wrote: >> >>> Hello >>> I am trying a broadcast join on dataframes but it is still doing >>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold >>> higher but still no luck. >>> >>> Related piece of code- >>> val c = a.join(braodcast(b), "id") >>> >>> On a side note, if I do SizeEstimator.estimate(b) and it is really >>> high(460956584 bytes) compared to data it contains. b has just 85 rows and >>> around 4964 bytes. >>> Help is very much appreciated!! >>> >>> Thanks >>> Swapnil >>> >>> >>> >> >
Re: Dataframe broadcast join hint not working
I think your dataframes are converted from RDDs, Are those RDDs computed or read from files directly? I guess it might affect how spark compute the execution plan. Try this: save your data frame which will be broadcasted to HDFS, and read it back into a dataframe. Then do the join and check the explain plan. On Sat, Nov 26, 2016 at 12:04 PM, Swapnil Shinde wrote: > I am using Spark 1.6.3 and below is the real plan (a,b,c in above were > just for illustration purpose) > > == Physical Plan == > Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN > mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815] > +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801], > [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None >:- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0 >: +- TungstenExchange > hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200), > None >: +- Project [_1#3797 AS ltt#3800,_2#3798 AS > mr_demo_id#3801,_3#3799 AS mr_demoname#3802] >:+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799] >+- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0 > +- TungstenExchange > hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200), > None > +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813] > +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804 > AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806 > AS etv_demo_id#3813] >+- Filter ((map_type#3809 = master_roster_to_etv) && NOT > (demogroup#3803 = gender_age_id)) > +- Scan ExistingRDD[demogroup#3803, > demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807, > demovalue_old_map#3808,map_type#3809] > > > Thanks > Swapnil > > On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang wrote: > >> Could you post the result of explain `c.explain`? If it is broadcast >> join, you will see it in explain. >> >> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde < >> swapnilushi...@gmail.com> wrote: >> >>> Hello >>> I am trying a broadcast join on dataframes but it is still doing >>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold >>> higher but still no luck. >>> >>> Related piece of code- >>> val c = a.join(braodcast(b), "id") >>> >>> On a side note, if I do SizeEstimator.estimate(b) and it is really >>> high(460956584 bytes) compared to data it contains. b has just 85 rows and >>> around 4964 bytes. >>> Help is very much appreciated!! >>> >>> Thanks >>> Swapnil >>> >>> >>> >> >
Re: Dataframe broadcast join hint not working
I am using Spark 1.6.3 and below is the real plan (a,b,c in above were just for illustration purpose) == Physical Plan == Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815] +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801], [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None :- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0 : +- TungstenExchange hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200), None : +- Project [_1#3797 AS ltt#3800,_2#3798 AS mr_demo_id#3801,_3#3799 AS mr_demoname#3802] :+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799] +- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0 +- TungstenExchange hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200), None +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813] +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804 AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806 AS etv_demo_id#3813] +- Filter ((map_type#3809 = master_roster_to_etv) && NOT (demogroup#3803 = gender_age_id)) +- Scan ExistingRDD[demogroup#3803,demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,demovalue_old_map#3808,map_type#3809] Thanks Swapnil On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang wrote: > Could you post the result of explain `c.explain`? If it is broadcast join, > you will see it in explain. > > On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde > wrote: > >> Hello >> I am trying a broadcast join on dataframes but it is still doing >> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold >> higher but still no luck. >> >> Related piece of code- >> val c = a.join(braodcast(b), "id") >> >> On a side note, if I do SizeEstimator.estimate(b) and it is really >> high(460956584 bytes) compared to data it contains. b has just 85 rows and >> around 4964 bytes. >> Help is very much appreciated!! >> >> Thanks >> Swapnil >> >> >> >
Re: Dataframe broadcast join hint not working
Could you post the result of explain `c.explain`? If it is broadcast join, you will see it in explain. On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde wrote: > Hello > I am trying a broadcast join on dataframes but it is still doing > SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold > higher but still no luck. > > Related piece of code- > val c = a.join(braodcast(b), "id") > > On a side note, if I do SizeEstimator.estimate(b) and it is really > high(460956584 bytes) compared to data it contains. b has just 85 rows and > around 4964 bytes. > Help is very much appreciated!! > > Thanks > Swapnil > > >
Re: Dataframe broadcast join hint not working
Hi, Which version of spark you are using. Less than 10Mb automatically converted as broadcast join in spark. \Thanks, selvam R On Sat, Nov 26, 2016 at 6:51 PM, Swapnil Shinde wrote: > Hello > I am trying a broadcast join on dataframes but it is still doing > SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold > higher but still no luck. > > Related piece of code- > val c = a.join(braodcast(b), "id") > > On a side note, if I do SizeEstimator.estimate(b) and it is really > high(460956584 bytes) compared to data it contains. b has just 85 rows and > around 4964 bytes. > Help is very much appreciated!! > > Thanks > Swapnil > > > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Dataframe broadcast join hint not working
Hello I am trying a broadcast join on dataframes but it is still doing SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold higher but still no luck. Related piece of code- val c = a.join(braodcast(b), "id") On a side note, if I do SizeEstimator.estimate(b) and it is really high(460956584 bytes) compared to data it contains. b has just 85 rows and around 4964 bytes. Help is very much appreciated!! Thanks Swapnil