I think I gave you one misleading information. If you have 2 already partitioned (K, V) RDDs, and join them by K, then the correct plan you should see is HashJoin, instead of SortMerge. My guess is that when you see the SortMerge Join in DF, then Spark doesn't use the most efficient way of joining in this case. In the RDD level, this is already mature, but I don't know about Dataframe level, so someone else can give you more info how to archive that in the DataFrame level. I am not sure I understand the map side join question you have. If you have one DF very small, and the other one is much big, then you want to try map join. But you already partitioned both DFs, why you want to map-side join then? Yong
Date: Wed, 6 Apr 2016 21:03:16 +0100 Subject: Re: Plan issue with spark 1.5.2 From: darshan.m...@gmail.com To: java8...@hotmail.com CC: user@spark.apache.org Thanks a lot for this. I was thinking of using cogrouped RDDs. We will try to move to 1.6 as there are other issues as well in 1.5.2. Same code is much faster in the 1.6.1.But plan wise I do not see much diff.Why it is still partitioning and then sorting and then joining? I expect it to sort within same partition based on title (another column for join). Then join the partitions and then results should be shuffled. Another question is how can we specify the same partitioner for saving say these 2 dataframes as parquet files or when read from a parquet files which were partitioned on movie column so that the map side join can happen. I do not want to repartition the data which I will read from parquet files using hashpartitioner on the movie column. == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#409L]) +- TungstenExchange SinglePartition, None +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#442L]) +- Project +- SortMergeJoin [movie#290,title#291], [movie#308,title#309] :- Sort [movie#290 ASC,title#291 ASC], false, 0 : +- TungstenExchange hashpartitioning(movie#290,title#291,200), None : +- InMemoryColumnarTableScan [movie#290,title#291], InMemoryRelation [movie#290,title#291,genres#292], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None +- Sort [movie#308 ASC,title#309 ASC], false, 0 +- TungstenExchange hashpartitioning(movie#308,title#309,200), None +- InMemoryColumnarTableScan [movie#308,title#309], InMemoryRelation [movie#308,title#309,genres#310], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None Thanks On Wed, Apr 6, 2016 at 8:37 PM, Yong Zhang <java8...@hotmail.com> wrote: What you are looking for is https://issues.apache.org/jira/browse/SPARK-4849 This feature is available in Spark 1.6.0, so the DataFrame can reuse the partitioned data in the join. For you case in 1.5.x, you have to use the RDD way to tell Spark that the join should utilize the presorted data. The way you did it, won't work no matter what, as Spark won't know that part_movies_1 and part_movies_2 are indeed partitioned by the same way. In the RDD level, you need to keep (K,V) RDD, and partitioned both side keys same ways, as you did below, then do the join in the RDD level, then Spark will know that it can reuse the pre-partitioned (K,V), and just merge join the 2 RDDs. If you lost the key (You built the DF only on the values part), then there is no way Spark will know that the values part indeed already partitioned. I don't think this can be done in the Dataframe level in 1.5.x. If this is wrong, please let me know. The execution plan is in fact doing SortMerge (which is correct in this case), but I think spark will sort both DFs again, even you already partitioned them. Yong Date: Wed, 6 Apr 2016 20:10:14 +0100 Subject: Re: Plan issue with spark 1.5.2 From: darshan.m...@gmail.com To: java8...@hotmail.com CC: user@spark.apache.org I used 1.5.2.I have used movies data to reproduce the issue. Below is the physical plan. I am not sure why it is hash partitioning the data and then sort and then join. I expect the data to be joined first and then send for further processing. I sort of expect a common partitioner which will work on say a column and will partition the dataframe on a given column in say given number of buckets and try to keep this data as close as possible physically as well i.e. colocated and if it sees the 2 tables with same partition columns then try to join them at partition level if the partition column is part of join condition and then shuffle data for further processing. Below are queries //read movies from parquet files.val movies_1 = sqlContext.sql("select * from movies")val movies_2 = sqlContext.sql("select * from movies") val part_movies_1 = sqlContext.createDataFrame( movies_1.rdd.map(r => (r.getInt(0), r)).partitionBy(my_partitioner).values, movies_1.schema)val part_movies_2 = sqlContext.createDataFrame( movies_2.rdd.map(r => (r.getInt(0), r)).partitionBy(my_partitioner).values, movies_2.schema) part_movies_1.persist()part_movies_2.persist()part_movies_1.registerTempTable("part_movies_1")part_movies_2.registerTempTable("part_movies_2")//look at storage in sparkUI val sql1 = sqlContext.sql("select * from part_movies_1 pm1 inner join part_movies_2 pm2 on pm1.movie=pm2.movie and pm1.title = pm2.title") sql1.count() ///plan is for this count statement. == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#750L]) TungstenExchange SinglePartition TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#783L]) TungstenProject SortMergeJoin [movie#622,title#623], [movie#640,title#641] TungstenSort [movie#622 ASC,title#623 ASC], false, 0 TungstenExchange hashpartitioning(movie#622,title#623) ConvertToUnsafe InMemoryColumnarTableScan [movie#622,title#623], (InMemoryRelation [movie#622,title#623,genres#624], true, 10000, StorageLevel(true, true, false, true, 1), (Scan PhysicalRDD[movie#622,title#623,genres#624]), None) TungstenSort [movie#640 ASC,title#641 ASC], false, 0 TungstenExchange hashpartitioning(movie#640,title#641) ConvertToUnsafe InMemoryColumnarTableScan [movie#640,title#641], (InMemoryRelation [movie#640,title#641,genres#642], true, 10000, StorageLevel(true, true, false, true, 1), (Scan PhysicalRDD[movie#640,title#641,genres#642]), None)Please let me know if you need further information. On Tue, Apr 5, 2016 at 6:33 PM, Yong Zhang <java8...@hotmail.com> wrote: You need to show us the execution plan, so we can understand what is your issue. Use the spark shell code to show how your DF is built, how you partition them, then use explain(true) on your join DF, and show the output here, so we can better help you. Yong > Date: Tue, 5 Apr 2016 09:46:59 -0700 > From: darshan.m...@gmail.com > To: user@spark.apache.org > Subject: Plan issue with spark 1.5.2 > > > I am using spark 1.5.2. I have a question regarding plan generated by spark. > I have 3 data-frames which has the data for different countries. I have > around 150 countries and data is skewed. > > My 95% queries will have country as criteria. However, I have seen issues > with the plans generated for queries which has country as join column. > > Data-frames are partitioned based on the country.Not only these dataframes > are co-partitioned, these are co-located as well. E.g. Data for UK in > data-frame df1, df2 df3 will be at on same hdfs datanode. > > Then when i join these 3 tables and country is one of the join column. I > assume that the join should be the map side join but it shuffles the data > from 3 dataframes and then join using shuffled data. Apart from country > there are other columns in join. > > Is this correct behavior? If it is an issue is it fixed in latest versions? > > Thanks > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Plan-issue-with-spark-1-5-2-tp26681.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >