What you are looking for is https://issues.apache.org/jira/browse/SPARK-4849
This feature is available in Spark 1.6.0, so the DataFrame can reuse the 
partitioned data in the join.
For you case in 1.5.x, you have to use the RDD way to tell Spark that the join 
should utilize the presorted data.
The way you did it, won't work no matter what, as Spark won't know that 
part_movies_1 and part_movies_2 are indeed partitioned by the same way.
In the RDD level, you need to keep (K,V) RDD, and partitioned both side keys 
same ways, as you did below, then do the join in the RDD level, then Spark will 
know that it can reuse the pre-partitioned (K,V), and just merge join the 2 
RDDs.
If you lost the key (You built the DF only on the values part), then there is 
no way Spark will know that the values part indeed already partitioned.
I don't think this can be done in the Dataframe level in 1.5.x. If this is 
wrong, please let me know.
The execution plan is in fact doing SortMerge (which is correct in this case), 
but I think spark will sort both DFs again, even you already partitioned them.
Yong

Date: Wed, 6 Apr 2016 20:10:14 +0100
Subject: Re: Plan issue with spark 1.5.2
From: darshan.m...@gmail.com
To: java8...@hotmail.com
CC: user@spark.apache.org

I used 1.5.2.I have used movies data to reproduce the issue. Below is the 
physical plan. I am not sure why it is hash partitioning the data and then sort 
and then join. I expect the data to be joined first and then send for further 
processing.
I sort of expect a common partitioner which will work on say a column and will 
partition the dataframe on a given column in say given number of buckets and 
try to keep this data as close as possible physically as well i.e. colocated 
and if it sees the 2 tables with same partition columns then try to join them 
at partition level if the partition column is part of join condition and then 
shuffle data for further processing.

Below are queries
//read movies from parquet files.val movies_1 = sqlContext.sql("select * from 
movies")val movies_2 = sqlContext.sql("select * from movies")
val part_movies_1 = sqlContext.createDataFrame(  movies_1.rdd.map(r => 
(r.getInt(0), r)).partitionBy(my_partitioner).values,  movies_1.schema)val 
part_movies_2 = sqlContext.createDataFrame(  movies_2.rdd.map(r => 
(r.getInt(0), r)).partitionBy(my_partitioner).values,  movies_2.schema)
part_movies_1.persist()part_movies_2.persist()part_movies_1.registerTempTable("part_movies_1")part_movies_2.registerTempTable("part_movies_2")//look
 at storage in sparkUI
val sql1 = sqlContext.sql("select * from part_movies_1 pm1 inner join 
part_movies_2 pm2 on pm1.movie=pm2.movie and pm1.title = pm2.title")

sql1.count() ///plan is for this count statement.

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
output=[count#750L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], 
output=[currentCount#783L])
   TungstenProject
    SortMergeJoin [movie#622,title#623], [movie#640,title#641]
     TungstenSort [movie#622 ASC,title#623 ASC], false, 0
      TungstenExchange hashpartitioning(movie#622,title#623)
       ConvertToUnsafe
        InMemoryColumnarTableScan [movie#622,title#623], (InMemoryRelation 
[movie#622,title#623,genres#624], true, 10000, StorageLevel(true, true, false, 
true, 1), (Scan PhysicalRDD[movie#622,title#623,genres#624]), None)
     TungstenSort [movie#640 ASC,title#641 ASC], false, 0
      TungstenExchange hashpartitioning(movie#640,title#641)
       ConvertToUnsafe
        InMemoryColumnarTableScan [movie#640,title#641], (InMemoryRelation 
[movie#640,title#641,genres#642], true, 10000, StorageLevel(true, true, false, 
true, 1), (Scan PhysicalRDD[movie#640,title#641,genres#642]), None)Please let 
me know if you need further information. 

On Tue, Apr 5, 2016 at 6:33 PM, Yong Zhang <java8...@hotmail.com> wrote:



You need to show us the execution plan, so we can understand what is your issue.
Use the spark shell code to show how your DF is built, how you partition them, 
then use explain(true) on your join DF, and show the output here, so we can 
better help you.
Yong

> Date: Tue, 5 Apr 2016 09:46:59 -0700
> From: darshan.m...@gmail.com
> To: user@spark.apache.org
> Subject: Plan issue with spark 1.5.2
> 
> 
> I am using spark 1.5.2. I have a question regarding plan generated by spark.
> I have 3 data-frames which has the data for different countries. I have
> around 150 countries and data is skewed.
> 
> My 95% queries will have country as criteria. However, I have seen issues
> with the plans generated for queries which has country as join column.
> 
> Data-frames are partitioned based on the country.Not only these dataframes
> are co-partitioned, these are co-located as well. E.g. Data for UK in
> data-frame df1, df2 df3 will be at on same hdfs datanode. 
> 
> Then when i join these 3 tables and country is one of the join column. I
> assume that the join should be the map side join but it shuffles the data
> from 3 dataframes and then join using shuffled data. Apart from country
> there are other columns in join.
> 
> Is this correct behavior? If it is an issue is it fixed in latest versions?
> 
> Thanks
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Plan-issue-with-spark-1-5-2-tp26681.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
                                          

                                          

Reply via email to