Re: Broadcast join on multiple dataframes
Hello, Any pointers on what is causing the optimizer to convert broadcast to shuffle join? This join is with a file that is just 4kb in size. Complete plan --> https://www.dropbox.com/s/apuomw1dg0t1jtc/plan_with_select.txt?dl=0 DAG from UI --> https://www.dropbox.com/s/4xc9d0rdkx2fun8/DAG_with_select.PNG?dl=0 == Optimized Logical Plan == Project [...] +- Join LeftOuter, Some((start_ip#48L = start_ip_num#144L)) :- Project [...] : +- Join Inner, Some((cast(creative_id#9 as bigint) = creative_id#130L)) : :- Project [...] : : +- Join Inner, Some((cast(strategy_id#10 as bigint) = strategy_id#126L)) : : :- Project [...] : : : +- Join LeftOuter, Some((cast(exchange_id#13 as bigint) = id#142L)) : : : :- Project [...] : : : : +- Join LeftOuter, Some((browser_id#59 = technology_key#169)) : : : : :- Project [...] : : : : : +- Join LeftOuter, Some((primary_browser_language#61 = id#166)) : : : : : :- Project [...] : : : : : : +- Filter ((NOT (campaign_id#12 = 0) && (mm_int_cost#36 < 100.0)) && ((cost_sum#41 < 100.0) && (total_spend#42 < 100.0))) : : : : : : +- Relation[...) : : : : : +- Project [id#166,two_letter_code#167] : : : : :+- BroadcastHint : : : : : +- Relation[... : : : : +- BroadcastHint : : : :+- Relation[... : : : +- Project [description#141,id#142L] : : :+- BroadcastHint : : : +- Relation[description#141,id#142L,name#143] JSONRelation == Physical Plan == Project [...] +- SortMergeOuterJoin [start_ip#48L], [start_ip_num#144L], LeftOuter, None :- Sort [start_ip#48L ASC], false, 0 : +- TungstenExchange hashpartitioning(start_ip#48L,480), None : +- Project [...] :+- BroadcastHashJoin [cast(creative_id#9 as bigint)], [creative_id#130L], BuildRight : :- Project [...] : : +- BroadcastHashJoin [cast(strategy_id#10 as bigint)], [strategy_id#126L], BuildRight : : :- Project [...] : : : +- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#142L], LeftOuter, None : : : :- Project [...] : : : : +- BroadcastHashOuterJoin [browser_id#59], [technology_key#169], LeftOuter, None : : : : :- Project [...] : : : : : +- SortMergeOuterJoin [primary_browser_language#61], [id#166], LeftOuter, None : : : : : :- Sort [primary_browser_language#61 ASC], false, 0 : : : : : : +- TungstenExchange hashpartitioning(primary_browser_language#61,480), None : : : : : : +- Project [...] : : : : : :+- Filter (((NOT (campaign_id#12 = 0) && (mm_int_cost#36 < 100.0)) && (cost_sum#41 < 100.0)) && (total_spend#42 < 100.0)) : : : : : : +- Scan CsvRelation(,Some(s3:// : : : : : +- Sort [id#166 ASC], false, 0 : : : : :+- TungstenExchange hashpartitioning(id#166,480), None : : : : : +- Project [id#166,two_letter_code#167] : : : : : +- Scan CsvRelation(,Some(s3 : : : : +- ConvertToUnsafe : : : :+- Scan CsvRelation(,Some(s3:// : : : +- Project [description#141,id#142L] : : :+- Scan JSONRelation[description#141,id#142L,name#143] InputPaths: s3:// : : +- Project
Re: Broadcast join on multiple dataframes
Micheal, Output of DF.queryExecution is saved to https://www.dropbox.com/s/1vizuwpswza1e3x/plan.txt?dl=0 I don't see anything in this to suggest a switch in strategy. Hopefully you find this helpful. Srikanth On Thu, Jan 28, 2016 at 4:43 PM, Michael Armbrustwrote: > Can you provide the analyzed and optimized plans (explain(true)) > > On Thu, Jan 28, 2016 at 12:26 PM, Srikanth wrote: > >> Hello, >> >> I have a use case where one large table has to be joined with several >> smaller tables. >> I've added broadcast hint for all small tables in the joins. >> >> val largeTableDF = sqlContext.read.format("com.databricks.spark.csv") >> >> val metaActionDF = sqlContext.read.format("json") >> val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv") >> val metaLocationDF = >> sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile) >>.join(broadcast(metaActionDF), >> "campaign_id") >>.join(broadcast(cidOrgDF), >> List("organization_id"), "left_outer") >> >> val metaCreativeDF = sqlContext.read.format("json") >> val metaExchangeDF = sqlContext.read.format("json") >> val localizationDF = >> sqlContext.read.format("com.databricks.spark.csv") >> val techKeyDF = sqlContext.read.format("com.databricks.spark.csv") >> >> val joinedBidderDF = largeTableDF.as("BID") >> .join(broadcast(metaLocationDF), >> "strategy_id") >> .join(broadcast(metaCreativeDF), >> "creative_id") >> .join(broadcast(metaExchangeDF), >> $"exchange_id" === $"id" , "left_outer") >> .join(broadcast(techKeyDF).as("TK"), >> $"BID.tech_id" === $"TK.tech_key" , "left_outer") >> .join(broadcast(localizationDF).as("BL"), >> $"BID.language" === $"BL.id" , "left_outer") >> >> When I look at the execution plan, all the joins are marked as >> broadcastjoin. >> But when I look at the spark job UI, the DAG visualization shows that >> some joins are sortmerged with shuffle involved. >> The ones that I've highlighted in yellow were shuffled. >> DAG can be viewed here - >> https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0 >> >> Why is the actual execution as seen in the DAG different from the >> physical plan pasted below. >> I'm trying not to shuffle my largeTable. Any idea what is causing this? >> >> == Physical Plan == >> >> BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None >> >> :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None >> >> : :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L], >> LeftOuter, None >> >> : : :- Project [...] >> >> : : : +- BroadcastHashJoin [cast(creative_id#9 as bigint)], >> [creative_id#131L], BuildRight >> >> : : : :- Project [...] >> >> : : : : +- BroadcastHashJoin [cast(strategy_id#10 as bigint)], >> [strategy_id#127L], BuildRight >> >> : : : : :- ConvertToUnsafe >> >> : : : : : +- Scan >> CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false, >> >> : : : : +- Project [...] >> >> : : : :+- BroadcastHashOuterJoin [organization_id#90L], >> [cast(organization_id#102 as bigint)], LeftOuter, None >> >> : : : : :- Project [...] >> >> : : : : : +- BroadcastHashJoin [campaign_id#105L], >> [campaign_id#75L], BuildRight >> >> : : : : : :- Project [...] >> >> : : : : : : +- Scan >> JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths: >> file:/shared/data/t1_meta/t1_meta_strategy.jsonl >> >> : : : : : +- Scan JSONRelation[] InputPaths: >> file:/shared/data/t1_meta/t1_meta_campaign.jsonl >> >> : : : : +- ConvertToUnsafe >> >> : : : : +- Scan >> CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,, >> >> : : : +- Scan >> JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130] >> InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl >> >> : : +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths: >> file:/shared/data/t1_meta/t1_meta_exchange.jsonl >> >> : +- ConvertToUnsafe >> >> : +- Scan >> CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false, >> >> >> +- ConvertToUnsafe >> >>+- Scan >> CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false >> >> >> >> Srikanth >> > >
Re: Broadcast join on multiple dataframes
Can you provide the analyzed and optimized plans (explain(true)) On Thu, Jan 28, 2016 at 12:26 PM, Srikanthwrote: > Hello, > > I have a use case where one large table has to be joined with several > smaller tables. > I've added broadcast hint for all small tables in the joins. > > val largeTableDF = sqlContext.read.format("com.databricks.spark.csv") > > val metaActionDF = sqlContext.read.format("json") > val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv") > val metaLocationDF = > sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile) >.join(broadcast(metaActionDF), > "campaign_id") >.join(broadcast(cidOrgDF), > List("organization_id"), "left_outer") > > val metaCreativeDF = sqlContext.read.format("json") > val metaExchangeDF = sqlContext.read.format("json") > val localizationDF = sqlContext.read.format("com.databricks.spark.csv") > val techKeyDF = sqlContext.read.format("com.databricks.spark.csv") > > val joinedBidderDF = largeTableDF.as("BID") > .join(broadcast(metaLocationDF), > "strategy_id") > .join(broadcast(metaCreativeDF), "creative_id") > .join(broadcast(metaExchangeDF), > $"exchange_id" === $"id" , "left_outer") > .join(broadcast(techKeyDF).as("TK"), > $"BID.tech_id" === $"TK.tech_key" , "left_outer") > .join(broadcast(localizationDF).as("BL"), > $"BID.language" === $"BL.id" , "left_outer") > > When I look at the execution plan, all the joins are marked as > broadcastjoin. > But when I look at the spark job UI, the DAG visualization shows that some > joins are sortmerged with shuffle involved. > The ones that I've highlighted in yellow were shuffled. > DAG can be viewed here - > https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0 > > Why is the actual execution as seen in the DAG different from the physical > plan pasted below. > I'm trying not to shuffle my largeTable. Any idea what is causing this? > > == Physical Plan == > > BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None > > :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None > > : :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L], > LeftOuter, None > > : : :- Project [...] > > : : : +- BroadcastHashJoin [cast(creative_id#9 as bigint)], > [creative_id#131L], BuildRight > > : : : :- Project [...] > > : : : : +- BroadcastHashJoin [cast(strategy_id#10 as bigint)], > [strategy_id#127L], BuildRight > > : : : : :- ConvertToUnsafe > > : : : : : +- Scan > CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false, > > : : : : +- Project [...] > > : : : :+- BroadcastHashOuterJoin [organization_id#90L], > [cast(organization_id#102 as bigint)], LeftOuter, None > > : : : : :- Project [...] > > : : : : : +- BroadcastHashJoin [campaign_id#105L], > [campaign_id#75L], BuildRight > > : : : : : :- Project [...] > > : : : : : : +- Scan > JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths: > file:/shared/data/t1_meta/t1_meta_strategy.jsonl > > : : : : : +- Scan JSONRelation[] InputPaths: > file:/shared/data/t1_meta/t1_meta_campaign.jsonl > > : : : : +- ConvertToUnsafe > > : : : : +- Scan > CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,, > > : : : +- Scan > JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130] > InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl > > : : +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths: > file:/shared/data/t1_meta/t1_meta_exchange.jsonl > > : +- ConvertToUnsafe > > : +- Scan > CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false, > > > +- ConvertToUnsafe > >+- Scan > CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false > > > > Srikanth >