Re: Broadcast join on multiple dataframes

2016-02-04 Thread Srikanth
Hello,

Any pointers on what is causing the optimizer to convert broadcast to
shuffle join?
This join is with a file that is just 4kb in size.

Complete plan -->
https://www.dropbox.com/s/apuomw1dg0t1jtc/plan_with_select.txt?dl=0
DAG from UI -->
https://www.dropbox.com/s/4xc9d0rdkx2fun8/DAG_with_select.PNG?dl=0


== Optimized Logical Plan ==
Project [...]
+- Join LeftOuter, Some((start_ip#48L = start_ip_num#144L))
   :- Project [...]
   :  +- Join Inner, Some((cast(creative_id#9 as bigint) =
creative_id#130L))
   : :- Project [...]
   : :  +- Join Inner, Some((cast(strategy_id#10 as bigint) =
strategy_id#126L))
   : : :- Project [...]
   : : :  +- Join LeftOuter, Some((cast(exchange_id#13 as bigint) =
id#142L))
   : : : :- Project [...]
   : : : :  +- Join LeftOuter, Some((browser_id#59 =
technology_key#169))
   : : : : :- Project [...]
   : : : : :  +- Join LeftOuter,
Some((primary_browser_language#61 = id#166))
   : : : : : :- Project [...]
   : : : : : :  +- Filter ((NOT (campaign_id#12 = 0) &&
(mm_int_cost#36 < 100.0)) && ((cost_sum#41 < 100.0) &&
(total_spend#42 < 100.0)))
   : : : : : : +- Relation[...)
   : : : : : +- Project [id#166,two_letter_code#167]
   : : : : :+- BroadcastHint
   : : : : :   +- Relation[...
   : : : : +- BroadcastHint
   : : : :+- Relation[...
   : : : +- Project [description#141,id#142L]
   : : :+- BroadcastHint
   : : :   +- Relation[description#141,id#142L,name#143]
JSONRelation

== Physical Plan ==
Project [...]
+- SortMergeOuterJoin [start_ip#48L], [start_ip_num#144L], LeftOuter, None
   :- Sort [start_ip#48L ASC], false, 0
   :  +- TungstenExchange hashpartitioning(start_ip#48L,480), None
   : +- Project [...]
   :+- BroadcastHashJoin [cast(creative_id#9 as bigint)],
[creative_id#130L], BuildRight
   :   :- Project [...]
   :   :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
[strategy_id#126L], BuildRight
   :   : :- Project [...]
   :   : :  +- BroadcastHashOuterJoin [cast(exchange_id#13 as
bigint)], [id#142L], LeftOuter, None
   :   : : :- Project [...]
   :   : : :  +- BroadcastHashOuterJoin [browser_id#59],
[technology_key#169], LeftOuter, None
   :   : : : :- Project [...]
   :   : : : :  +- SortMergeOuterJoin
[primary_browser_language#61], [id#166], LeftOuter, None
   :   : : : : :- Sort [primary_browser_language#61
ASC], false, 0
   :   : : : : :  +- TungstenExchange
hashpartitioning(primary_browser_language#61,480), None
   :   : : : : : +- Project [...]
   :   : : : : :+- Filter (((NOT
(campaign_id#12 = 0) && (mm_int_cost#36 < 100.0)) && (cost_sum#41 <
100.0)) && (total_spend#42 < 100.0))
   :   : : : : :   +- Scan
CsvRelation(,Some(s3://
   :   : : : : +- Sort [id#166 ASC], false, 0
   :   : : : :+- TungstenExchange
hashpartitioning(id#166,480), None
   :   : : : :   +- Project
[id#166,two_letter_code#167]
   :   : : : :  +- Scan
CsvRelation(,Some(s3
   :   : : : +- ConvertToUnsafe
   :   : : :+- Scan
CsvRelation(,Some(s3://
   :   : : +- Project [description#141,id#142L]
   :   : :+- Scan
JSONRelation[description#141,id#142L,name#143] InputPaths: s3://
   :   : +- Project


Re: Broadcast join on multiple dataframes

2016-01-29 Thread Srikanth
Micheal,

Output of DF.queryExecution is saved to
https://www.dropbox.com/s/1vizuwpswza1e3x/plan.txt?dl=0
I don't see anything in this to suggest a switch in strategy. Hopefully you
find this helpful.

Srikanth

On Thu, Jan 28, 2016 at 4:43 PM, Michael Armbrust 
wrote:

> Can you provide the analyzed and optimized plans (explain(true))
>
> On Thu, Jan 28, 2016 at 12:26 PM, Srikanth  wrote:
>
>> Hello,
>>
>> I have a use case where one large table has to be joined with several
>> smaller tables.
>> I've added broadcast hint for all small tables in the joins.
>>
>> val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")
>>
>> val metaActionDF = sqlContext.read.format("json")
>> val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
>> val metaLocationDF =
>> sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
>>.join(broadcast(metaActionDF),
>> "campaign_id")
>>.join(broadcast(cidOrgDF),
>> List("organization_id"), "left_outer")
>>
>> val metaCreativeDF = sqlContext.read.format("json")
>> val metaExchangeDF = sqlContext.read.format("json")
>> val localizationDF =
>> sqlContext.read.format("com.databricks.spark.csv")
>> val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")
>>
>> val joinedBidderDF = largeTableDF.as("BID")
>> .join(broadcast(metaLocationDF),
>> "strategy_id")
>> .join(broadcast(metaCreativeDF),
>> "creative_id")
>> .join(broadcast(metaExchangeDF),
>> $"exchange_id" === $"id" , "left_outer")
>> .join(broadcast(techKeyDF).as("TK"),
>> $"BID.tech_id" === $"TK.tech_key" , "left_outer")
>> .join(broadcast(localizationDF).as("BL"),
>> $"BID.language" === $"BL.id" , "left_outer")
>>
>> When I look at the execution plan, all the joins are marked as
>> broadcastjoin.
>> But when I look at the spark job UI, the DAG visualization shows that
>> some joins are sortmerged with shuffle involved.
>> The ones that I've highlighted in yellow were shuffled.
>> DAG can be viewed here -
>> https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0
>>
>> Why is the actual execution as seen in the DAG different from the
>> physical plan pasted below.
>> I'm trying not to shuffle my largeTable. Any idea what is causing this?
>>
>> == Physical Plan ==
>>
>> BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None
>>
>> :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None
>>
>> :  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
>> LeftOuter, None
>>
>> :  :  :- Project [...]
>>
>> :  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
>> [creative_id#131L], BuildRight
>>
>> :  :  : :- Project [...]
>>
>> :  :  : :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
>> [strategy_id#127L], BuildRight
>>
>> :  :  : : :- ConvertToUnsafe
>>
>> :  :  : : :  +- Scan
>> CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false,
>>
>> :  :  : : +- Project [...]
>>
>> :  :  : :+- BroadcastHashOuterJoin [organization_id#90L],
>> [cast(organization_id#102 as bigint)], LeftOuter, None
>>
>> :  :  : :   :- Project [...]
>>
>> :  :  : :   :  +- BroadcastHashJoin [campaign_id#105L],
>> [campaign_id#75L], BuildRight
>>
>> :  :  : :   : :- Project [...]
>>
>> :  :  : :   : :  +- Scan
>> JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_strategy.jsonl
>>
>> :  :  : :   : +- Scan JSONRelation[] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_campaign.jsonl
>>
>> :  :  : :   +- ConvertToUnsafe
>>
>> :  :  : :  +- Scan
>> CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,
>>
>> :  :  : +- Scan
>> JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
>> InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl
>>
>> :  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_exchange.jsonl
>>
>> :  +- ConvertToUnsafe
>>
>> : +- Scan
>> CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false,
>>
>>
>> +- ConvertToUnsafe
>>
>>+- Scan
>> CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false
>>
>>
>>
>> Srikanth
>>
>
>


Re: Broadcast join on multiple dataframes

2016-01-28 Thread Michael Armbrust
Can you provide the analyzed and optimized plans (explain(true))

On Thu, Jan 28, 2016 at 12:26 PM, Srikanth  wrote:

> Hello,
>
> I have a use case where one large table has to be joined with several
> smaller tables.
> I've added broadcast hint for all small tables in the joins.
>
> val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")
>
> val metaActionDF = sqlContext.read.format("json")
> val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
> val metaLocationDF =
> sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
>.join(broadcast(metaActionDF),
> "campaign_id")
>.join(broadcast(cidOrgDF),
> List("organization_id"), "left_outer")
>
> val metaCreativeDF = sqlContext.read.format("json")
> val metaExchangeDF = sqlContext.read.format("json")
> val localizationDF = sqlContext.read.format("com.databricks.spark.csv")
> val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")
>
> val joinedBidderDF = largeTableDF.as("BID")
> .join(broadcast(metaLocationDF),
> "strategy_id")
> .join(broadcast(metaCreativeDF), "creative_id")
> .join(broadcast(metaExchangeDF),
> $"exchange_id" === $"id" , "left_outer")
> .join(broadcast(techKeyDF).as("TK"),
> $"BID.tech_id" === $"TK.tech_key" , "left_outer")
> .join(broadcast(localizationDF).as("BL"),
> $"BID.language" === $"BL.id" , "left_outer")
>
> When I look at the execution plan, all the joins are marked as
> broadcastjoin.
> But when I look at the spark job UI, the DAG visualization shows that some
> joins are sortmerged with shuffle involved.
> The ones that I've highlighted in yellow were shuffled.
> DAG can be viewed here -
> https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0
>
> Why is the actual execution as seen in the DAG different from the physical
> plan pasted below.
> I'm trying not to shuffle my largeTable. Any idea what is causing this?
>
> == Physical Plan ==
>
> BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None
>
> :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None
>
> :  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
> LeftOuter, None
>
> :  :  :- Project [...]
>
> :  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
> [creative_id#131L], BuildRight
>
> :  :  : :- Project [...]
>
> :  :  : :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
> [strategy_id#127L], BuildRight
>
> :  :  : : :- ConvertToUnsafe
>
> :  :  : : :  +- Scan
> CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false,
>
> :  :  : : +- Project [...]
>
> :  :  : :+- BroadcastHashOuterJoin [organization_id#90L],
> [cast(organization_id#102 as bigint)], LeftOuter, None
>
> :  :  : :   :- Project [...]
>
> :  :  : :   :  +- BroadcastHashJoin [campaign_id#105L],
> [campaign_id#75L], BuildRight
>
> :  :  : :   : :- Project [...]
>
> :  :  : :   : :  +- Scan
> JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
> file:/shared/data/t1_meta/t1_meta_strategy.jsonl
>
> :  :  : :   : +- Scan JSONRelation[] InputPaths:
> file:/shared/data/t1_meta/t1_meta_campaign.jsonl
>
> :  :  : :   +- ConvertToUnsafe
>
> :  :  : :  +- Scan
> CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,
>
> :  :  : +- Scan
> JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
> InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl
>
> :  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
> file:/shared/data/t1_meta/t1_meta_exchange.jsonl
>
> :  +- ConvertToUnsafe
>
> : +- Scan
> CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false,
>
>
> +- ConvertToUnsafe
>
>+- Scan
> CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false
>
>
>
> Srikanth
>