What you are looking for is https://issues.apache.org/jira/browse/SPARK-4849 This feature is available in Spark 1.6.0, so the DataFrame can reuse the partitioned data in the join. For you case in 1.5.x, you have to use the RDD way to tell Spark that the join should utilize the presorted data. The way you did it, won't work no matter what, as Spark won't know that part_movies_1 and part_movies_2 are indeed partitioned by the same way. In the RDD level, you need to keep (K,V) RDD, and partitioned both side keys same ways, as you did below, then do the join in the RDD level, then Spark will know that it can reuse the pre-partitioned (K,V), and just merge join the 2 RDDs. If you lost the key (You built the DF only on the values part), then there is no way Spark will know that the values part indeed already partitioned. I don't think this can be done in the Dataframe level in 1.5.x. If this is wrong, please let me know. The execution plan is in fact doing SortMerge (which is correct in this case), but I think spark will sort both DFs again, even you already partitioned them. Yong
Date: Wed, 6 Apr 2016 20:10:14 +0100 Subject: Re: Plan issue with spark 1.5.2 From: darshan.m...@gmail.com To: java8...@hotmail.com CC: user@spark.apache.org I used 1.5.2.I have used movies data to reproduce the issue. Below is the physical plan. I am not sure why it is hash partitioning the data and then sort and then join. I expect the data to be joined first and then send for further processing. I sort of expect a common partitioner which will work on say a column and will partition the dataframe on a given column in say given number of buckets and try to keep this data as close as possible physically as well i.e. colocated and if it sees the 2 tables with same partition columns then try to join them at partition level if the partition column is part of join condition and then shuffle data for further processing. Below are queries //read movies from parquet files.val movies_1 = sqlContext.sql("select * from movies")val movies_2 = sqlContext.sql("select * from movies") val part_movies_1 = sqlContext.createDataFrame( movies_1.rdd.map(r => (r.getInt(0), r)).partitionBy(my_partitioner).values, movies_1.schema)val part_movies_2 = sqlContext.createDataFrame( movies_2.rdd.map(r => (r.getInt(0), r)).partitionBy(my_partitioner).values, movies_2.schema) part_movies_1.persist()part_movies_2.persist()part_movies_1.registerTempTable("part_movies_1")part_movies_2.registerTempTable("part_movies_2")//look at storage in sparkUI val sql1 = sqlContext.sql("select * from part_movies_1 pm1 inner join part_movies_2 pm2 on pm1.movie=pm2.movie and pm1.title = pm2.title") sql1.count() ///plan is for this count statement. == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#750L]) TungstenExchange SinglePartition TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#783L]) TungstenProject SortMergeJoin [movie#622,title#623], [movie#640,title#641] TungstenSort [movie#622 ASC,title#623 ASC], false, 0 TungstenExchange hashpartitioning(movie#622,title#623) ConvertToUnsafe InMemoryColumnarTableScan [movie#622,title#623], (InMemoryRelation [movie#622,title#623,genres#624], true, 10000, StorageLevel(true, true, false, true, 1), (Scan PhysicalRDD[movie#622,title#623,genres#624]), None) TungstenSort [movie#640 ASC,title#641 ASC], false, 0 TungstenExchange hashpartitioning(movie#640,title#641) ConvertToUnsafe InMemoryColumnarTableScan [movie#640,title#641], (InMemoryRelation [movie#640,title#641,genres#642], true, 10000, StorageLevel(true, true, false, true, 1), (Scan PhysicalRDD[movie#640,title#641,genres#642]), None)Please let me know if you need further information. On Tue, Apr 5, 2016 at 6:33 PM, Yong Zhang <java8...@hotmail.com> wrote: You need to show us the execution plan, so we can understand what is your issue. Use the spark shell code to show how your DF is built, how you partition them, then use explain(true) on your join DF, and show the output here, so we can better help you. Yong > Date: Tue, 5 Apr 2016 09:46:59 -0700 > From: darshan.m...@gmail.com > To: user@spark.apache.org > Subject: Plan issue with spark 1.5.2 > > > I am using spark 1.5.2. I have a question regarding plan generated by spark. > I have 3 data-frames which has the data for different countries. I have > around 150 countries and data is skewed. > > My 95% queries will have country as criteria. However, I have seen issues > with the plans generated for queries which has country as join column. > > Data-frames are partitioned based on the country.Not only these dataframes > are co-partitioned, these are co-located as well. E.g. Data for UK in > data-frame df1, df2 df3 will be at on same hdfs datanode. > > Then when i join these 3 tables and country is one of the join column. I > assume that the join should be the map side join but it shuffles the data > from 3 dataframes and then join using shuffled data. Apart from country > there are other columns in join. > > Is this correct behavior? If it is an issue is it fixed in latest versions? > > Thanks > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Plan-issue-with-spark-1-5-2-tp26681.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >