Hello, I have a use case where one large table has to be joined with several smaller tables. I've added broadcast hint for all small tables in the joins.
val largeTableDF = sqlContext.read.format("com.databricks.spark.csv") val metaActionDF = sqlContext.read.format("json") val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv") val metaLocationDF = sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile) .join(broadcast(metaActionDF), "campaign_id") .join(broadcast(cidOrgDF), List("organization_id"), "left_outer") val metaCreativeDF = sqlContext.read.format("json") val metaExchangeDF = sqlContext.read.format("json") val localizationDF = sqlContext.read.format("com.databricks.spark.csv") val techKeyDF = sqlContext.read.format("com.databricks.spark.csv") val joinedBidderDF = largeTableDF.as("BID") .join(broadcast(metaLocationDF), "strategy_id") .join(broadcast(metaCreativeDF), "creative_id") .join(broadcast(metaExchangeDF), $"exchange_id" === $"id" , "left_outer") .join(broadcast(techKeyDF).as("TK"), $"BID.tech_id" === $"TK.tech_key" , "left_outer") .join(broadcast(localizationDF).as("BL"), $"BID.language" === $"BL.id" , "left_outer") When I look at the execution plan, all the joins are marked as broadcastjoin. But when I look at the spark job UI, the DAG visualization shows that some joins are sortmerged with shuffle involved. The ones that I've highlighted in yellow were shuffled. DAG can be viewed here - https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0 Why is the actual execution as seen in the DAG different from the physical plan pasted below. I'm trying not to shuffle my largeTable. Any idea what is causing this? == Physical Plan == BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None : :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L], LeftOuter, None : : :- Project [...] : : : +- BroadcastHashJoin [cast(creative_id#9 as bigint)], [creative_id#131L], BuildRight : : : :- Project [...] : : : : +- BroadcastHashJoin [cast(strategy_id#10 as bigint)], [strategy_id#127L], BuildRight : : : : :- ConvertToUnsafe : : : : : +- Scan CsvRelation(<function0>,Some(file:///shared/data/bidder/*.lzo),false, : : : : +- Project [...] : : : : +- BroadcastHashOuterJoin [organization_id#90L], [cast(organization_id#102 as bigint)], LeftOuter, None : : : : :- Project [...] : : : : : +- BroadcastHashJoin [campaign_id#105L], [campaign_id#75L], BuildRight : : : : : :- Project [...] : : : : : : +- Scan JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths: file:/shared/data/t1_meta/t1_meta_strategy.jsonl : : : : : +- Scan JSONRelation[] InputPaths: file:/shared/data/t1_meta/t1_meta_campaign.jsonl : : : : +- ConvertToUnsafe : : : : +- Scan CsvRelation(<function0>,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,, : : : +- Scan JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130] InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl : : +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths: file:/shared/data/t1_meta/t1_meta_exchange.jsonl : +- ConvertToUnsafe : +- Scan CsvRelation(<function0>,Some(file:///shared/data/t1_meta/technology_key.txt),false, +- ConvertToUnsafe +- Scan CsvRelation(<function0>,Some(file:///shared/data/t1_meta/browser_languages.osv),false Srikanth