I think I gave you one misleading information.
If you have 2 already partitioned (K, V) RDDs, and join them by K, then the 
correct plan you should see is HashJoin, instead of SortMerge.
My guess is that when you see the SortMerge Join in DF, then Spark doesn't use 
the most efficient way of joining in this case.
In the RDD level, this is already mature, but I don't know about Dataframe 
level, so someone else can give you more info how to archive that in the 
DataFrame level.
I am not sure I understand the map side join question you have. If you have one 
DF very small, and the other one is much big, then you want to try map join. 
But you already partitioned both DFs, why you want to map-side join then?
Yong 

Date: Wed, 6 Apr 2016 21:03:16 +0100
Subject: Re: Plan issue with spark 1.5.2
From: darshan.m...@gmail.com
To: java8...@hotmail.com
CC: user@spark.apache.org

Thanks a lot for this. I was thinking of using cogrouped RDDs. We will try to 
move to 1.6 as there are other issues as well in 1.5.2.
Same code is much faster in the 1.6.1.But plan wise I do not see much diff.Why 
it is still partitioning and then sorting and then joining?
I expect it to sort within same partition based on title (another column for 
join). Then join the partitions and then results should be shuffled.
Another question is how can we specify the same partitioner for saving say 
these 2 dataframes as parquet files or when read from a parquet files which 
were partitioned on movie column so that the map side join can happen. I do not 
want to repartition the data which I will read from parquet files using 
hashpartitioner on the movie column.

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
output=[count#409L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#442L])
      +- Project
         +- SortMergeJoin [movie#290,title#291], [movie#308,title#309]
            :- Sort [movie#290 ASC,title#291 ASC], false, 0
            :  +- TungstenExchange hashpartitioning(movie#290,title#291,200), 
None
            :     +- InMemoryColumnarTableScan [movie#290,title#291], 
InMemoryRelation [movie#290,title#291,genres#292], true, 10000, 
StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None
            +- Sort [movie#308 ASC,title#309 ASC], false, 0
               +- TungstenExchange hashpartitioning(movie#308,title#309,200), 
None
                  +- InMemoryColumnarTableScan [movie#308,title#309], 
InMemoryRelation [movie#308,title#309,genres#310], true, 10000, 
StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None
Thanks
On Wed, Apr 6, 2016 at 8:37 PM, Yong Zhang <java8...@hotmail.com> wrote:



What you are looking for is https://issues.apache.org/jira/browse/SPARK-4849
This feature is available in Spark 1.6.0, so the DataFrame can reuse the 
partitioned data in the join.
For you case in 1.5.x, you have to use the RDD way to tell Spark that the join 
should utilize the presorted data.
The way you did it, won't work no matter what, as Spark won't know that 
part_movies_1 and part_movies_2 are indeed partitioned by the same way.
In the RDD level, you need to keep (K,V) RDD, and partitioned both side keys 
same ways, as you did below, then do the join in the RDD level, then Spark will 
know that it can reuse the pre-partitioned (K,V), and just merge join the 2 
RDDs.
If you lost the key (You built the DF only on the values part), then there is 
no way Spark will know that the values part indeed already partitioned.
I don't think this can be done in the Dataframe level in 1.5.x. If this is 
wrong, please let me know.
The execution plan is in fact doing SortMerge (which is correct in this case), 
but I think spark will sort both DFs again, even you already partitioned them.
Yong

Date: Wed, 6 Apr 2016 20:10:14 +0100
Subject: Re: Plan issue with spark 1.5.2
From: darshan.m...@gmail.com
To: java8...@hotmail.com
CC: user@spark.apache.org

I used 1.5.2.I have used movies data to reproduce the issue. Below is the 
physical plan. I am not sure why it is hash partitioning the data and then sort 
and then join. I expect the data to be joined first and then send for further 
processing.
I sort of expect a common partitioner which will work on say a column and will 
partition the dataframe on a given column in say given number of buckets and 
try to keep this data as close as possible physically as well i.e. colocated 
and if it sees the 2 tables with same partition columns then try to join them 
at partition level if the partition column is part of join condition and then 
shuffle data for further processing.

Below are queries
//read movies from parquet files.val movies_1 = sqlContext.sql("select * from 
movies")val movies_2 = sqlContext.sql("select * from movies")
val part_movies_1 = sqlContext.createDataFrame(  movies_1.rdd.map(r => 
(r.getInt(0), r)).partitionBy(my_partitioner).values,  movies_1.schema)val 
part_movies_2 = sqlContext.createDataFrame(  movies_2.rdd.map(r => 
(r.getInt(0), r)).partitionBy(my_partitioner).values,  movies_2.schema)
part_movies_1.persist()part_movies_2.persist()part_movies_1.registerTempTable("part_movies_1")part_movies_2.registerTempTable("part_movies_2")//look
 at storage in sparkUI
val sql1 = sqlContext.sql("select * from part_movies_1 pm1 inner join 
part_movies_2 pm2 on pm1.movie=pm2.movie and pm1.title = pm2.title")

sql1.count() ///plan is for this count statement.

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
output=[count#750L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], 
output=[currentCount#783L])
   TungstenProject
    SortMergeJoin [movie#622,title#623], [movie#640,title#641]
     TungstenSort [movie#622 ASC,title#623 ASC], false, 0
      TungstenExchange hashpartitioning(movie#622,title#623)
       ConvertToUnsafe
        InMemoryColumnarTableScan [movie#622,title#623], (InMemoryRelation 
[movie#622,title#623,genres#624], true, 10000, StorageLevel(true, true, false, 
true, 1), (Scan PhysicalRDD[movie#622,title#623,genres#624]), None)
     TungstenSort [movie#640 ASC,title#641 ASC], false, 0
      TungstenExchange hashpartitioning(movie#640,title#641)
       ConvertToUnsafe
        InMemoryColumnarTableScan [movie#640,title#641], (InMemoryRelation 
[movie#640,title#641,genres#642], true, 10000, StorageLevel(true, true, false, 
true, 1), (Scan PhysicalRDD[movie#640,title#641,genres#642]), None)Please let 
me know if you need further information. 

On Tue, Apr 5, 2016 at 6:33 PM, Yong Zhang <java8...@hotmail.com> wrote:



You need to show us the execution plan, so we can understand what is your issue.
Use the spark shell code to show how your DF is built, how you partition them, 
then use explain(true) on your join DF, and show the output here, so we can 
better help you.
Yong

> Date: Tue, 5 Apr 2016 09:46:59 -0700
> From: darshan.m...@gmail.com
> To: user@spark.apache.org
> Subject: Plan issue with spark 1.5.2
> 
> 
> I am using spark 1.5.2. I have a question regarding plan generated by spark.
> I have 3 data-frames which has the data for different countries. I have
> around 150 countries and data is skewed.
> 
> My 95% queries will have country as criteria. However, I have seen issues
> with the plans generated for queries which has country as join column.
> 
> Data-frames are partitioned based on the country.Not only these dataframes
> are co-partitioned, these are co-located as well. E.g. Data for UK in
> data-frame df1, df2 df3 will be at on same hdfs datanode. 
> 
> Then when i join these 3 tables and country is one of the join column. I
> assume that the join should be the map side join but it shuffles the data
> from 3 dataframes and then join using shuffled data. Apart from country
> there are other columns in join.
> 
> Is this correct behavior? If it is an issue is it fixed in latest versions?
> 
> Thanks
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Plan-issue-with-spark-1-5-2-tp26681.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
                                          

                                          

                                          

Reply via email to