Hello,

I have a use case where one large table has to be joined with several
smaller tables.
I've added broadcast hint for all small tables in the joins.

    val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")

    val metaActionDF = sqlContext.read.format("json")
    val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
    val metaLocationDF =
sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
                                   .join(broadcast(metaActionDF),
"campaign_id")
                                   .join(broadcast(cidOrgDF),
List("organization_id"), "left_outer")

    val metaCreativeDF = sqlContext.read.format("json")
    val metaExchangeDF = sqlContext.read.format("json")
    val localizationDF = sqlContext.read.format("com.databricks.spark.csv")
    val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")

    val joinedBidderDF = largeTableDF.as("BID")
                            .join(broadcast(metaLocationDF), "strategy_id")
                            .join(broadcast(metaCreativeDF), "creative_id")
                            .join(broadcast(metaExchangeDF), $"exchange_id"
=== $"id" , "left_outer")
                            .join(broadcast(techKeyDF).as("TK"),
$"BID.tech_id" === $"TK.tech_key" , "left_outer")
                            .join(broadcast(localizationDF).as("BL"),
$"BID.language" === $"BL.id" , "left_outer")

When I look at the execution plan, all the joins are marked as
broadcastjoin.
But when I look at the spark job UI, the DAG visualization shows that some
joins are sortmerged with shuffle involved.
The ones that I've highlighted in yellow were shuffled.
DAG can be viewed here -
https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0

Why is the actual execution as seen in the DAG different from the physical
plan pasted below.
I'm trying not to shuffle my largeTable. Any idea what is causing this?

== Physical Plan ==

BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None

:- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None

:  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
LeftOuter, None

:  :  :- Project [...]

:  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
[creative_id#131L], BuildRight

:  :  :     :- Project [...]

:  :  :     :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
[strategy_id#127L], BuildRight

:  :  :     :     :- ConvertToUnsafe

:  :  :     :     :  +- Scan
CsvRelation(<function0>,Some(file:///shared/data/bidder/*.lzo),false,

:  :  :     :     +- Project [...]

:  :  :     :        +- BroadcastHashOuterJoin [organization_id#90L],
[cast(organization_id#102 as bigint)], LeftOuter, None

:  :  :     :           :- Project [...]

:  :  :     :           :  +- BroadcastHashJoin [campaign_id#105L],
[campaign_id#75L], BuildRight

:  :  :     :           :     :- Project [...]

:  :  :     :           :     :  +- Scan
JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
file:/shared/data/t1_meta/t1_meta_strategy.jsonl

:  :  :     :           :     +- Scan JSONRelation[] InputPaths:
file:/shared/data/t1_meta/t1_meta_campaign.jsonl

:  :  :     :           +- ConvertToUnsafe

:  :  :     :              +- Scan
CsvRelation(<function0>,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,

:  :  :     +- Scan
JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl

:  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
file:/shared/data/t1_meta/t1_meta_exchange.jsonl

:  +- ConvertToUnsafe

:     +- Scan
CsvRelation(<function0>,Some(file:///shared/data/t1_meta/technology_key.txt),false,


+- ConvertToUnsafe

   +- Scan
CsvRelation(<function0>,Some(file:///shared/data/t1_meta/browser_languages.osv),false



Srikanth

Reply via email to