This is interesting. See below notebook. it is in 1.6. https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5228339421202847/186877441366454/2805387300416006/latest.html
You create the 2 data-frame from partitioned parquet file. Persist the files and run the same query. It uses sort merge join. Do not persist the 2 dataframes and it uses hash join :). I guess in memory partition are not somehow able to get this info. Thanks On Wed, Apr 6, 2016 at 10:30 PM, Yong Zhang <java8...@hotmail.com> wrote: > Got it. > > In the old MapReduce/Hive world, mapJoin means the broadcast join in > Spark. So I thought you are looking for broadcast join in this case. > > What you describe is exactly the hash join. The most correct way is hash > join, provided that your joining DFs already partitioned in the same way on > the joined fileds. > > You shouldn't see any more shuffle if it works. > > Yong > > ------------------------------ > Date: Wed, 6 Apr 2016 22:11:38 +0100 > Subject: Re: Plan issue with spark 1.5.2 > From: darshan.m...@gmail.com > To: java8...@hotmail.com > CC: user@spark.apache.org > > Thanks for the information. When I mention map side join. I meant that > each partition from 1 DF join with partition with same key of DF 2 on the > worker node without shuffling the data.In other words do as much as work > within worker node before shuffling the data. > > Thanks > Darshan Singh > > > > On Wed, Apr 6, 2016 at 10:06 PM, Yong Zhang <java8...@hotmail.com> wrote: > > I think I gave you one misleading information. > > If you have 2 already partitioned (K, V) RDDs, and join them by K, then > the correct plan you should see is HashJoin, instead of SortMerge. > > My guess is that when you see the SortMerge Join in DF, then Spark doesn't > use the most efficient way of joining in this case. > > In the RDD level, this is already mature, but I don't know about Dataframe > level, so someone else can give you more info how to archive that in the > DataFrame level. > > I am not sure I understand the map side join question you have. If you > have one DF very small, and the other one is much big, then you want to try > map join. But you already partitioned both DFs, why you want to map-side > join then? > > Yong > > ------------------------------ > Date: Wed, 6 Apr 2016 21:03:16 +0100 > Subject: Re: Plan issue with spark 1.5.2 > From: darshan.m...@gmail.com > To: java8...@hotmail.com > CC: user@spark.apache.org > > Thanks a lot for this. I was thinking of using cogrouped RDDs. We will try > to move to 1.6 as there are other issues as well in 1.5.2. > > Same code is much faster in the 1.6.1.But plan wise I do not see much > diff.Why it is still partitioning and then sorting and then joining? > > I expect it to sort within same partition based on title (another column > for join). Then join the partitions and then results should be shuffled. > > Another question is how can we specify the same partitioner for saving say > these 2 dataframes as parquet files or when read from a parquet files which > were partitioned on movie column so that the map side join can happen. I do > not want to repartition the data which I will read from parquet files using > hashpartitioner on the movie column. > > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#409L]) > +- TungstenExchange SinglePartition, None > +- TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#442L]) > +- Project > +- SortMergeJoin [movie#290,title#291], [movie#308,title#309] > :- Sort [movie#290 ASC,title#291 ASC], false, 0 > : +- TungstenExchange hashpartitioning(movie#290,title#291,200), > None > : +- InMemoryColumnarTableScan [movie#290,title#291], > InMemoryRelation [movie#290,title#291,genres#292], true, 10000, > StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None > +- Sort [movie#308 ASC,title#309 ASC], false, 0 > +- TungstenExchange hashpartitioning(movie#308,title#309,200), > None > +- InMemoryColumnarTableScan [movie#308,title#309], > InMemoryRelation [movie#308,title#309,genres#310], true, 10000, > StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None > > > Thanks > > > On Wed, Apr 6, 2016 at 8:37 PM, Yong Zhang <java8...@hotmail.com> wrote: > > What you are looking for is > https://issues.apache.org/jira/browse/SPARK-4849 > > This feature is available in Spark 1.6.0, so the DataFrame can reuse the > partitioned data in the join. > > For you case in 1.5.x, you have to use the RDD way to tell Spark that the > join should utilize the presorted data. > > The way you did it, won't work no matter what, as Spark won't know > that part_movies_1 and part_movies_2 are indeed partitioned by the same way. > > In the RDD level, you need to keep (K,V) RDD, and partitioned both side > keys same ways, as you did below, then do the join in the RDD level, then > Spark will know that it can reuse the pre-partitioned (K,V), and just merge > join the 2 RDDs. > > If you lost the key (You built the DF only on the values part), then there > is no way Spark will know that the values part indeed already partitioned. > > I don't think this can be done in the Dataframe level in 1.5.x. If this is > wrong, please let me know. > > The execution plan is in fact doing SortMerge (which is correct in this > case), but I think spark will sort both DFs again, even you already > partitioned them. > > Yong > > ------------------------------ > Date: Wed, 6 Apr 2016 20:10:14 +0100 > Subject: Re: Plan issue with spark 1.5.2 > From: darshan.m...@gmail.com > To: java8...@hotmail.com > CC: user@spark.apache.org > > I used 1.5.2.I have used movies data to reproduce the issue. Below is the > physical plan. I am not sure why it is hash partitioning the data and then > sort and then join. I expect the data to be joined first and then send for > further processing. > > I sort of expect a common partitioner which will work on say a column and > will partition the dataframe on a given column in say given number of > buckets and try to keep this data as close as possible physically as well > i.e. colocated and if it sees the 2 tables with same partition columns then > try to join them at partition level if the partition column is part of join > condition and then shuffle data for further processing. > > Below are queries > > //read movies from parquet files. > val movies_1 = sqlContext.sql("select * from movies") > val movies_2 = sqlContext.sql("select * from movies") > > val part_movies_1 = sqlContext.createDataFrame( > movies_1.rdd.map(r => (r.getInt(0), > r)).partitionBy(my_partitioner).values, > movies_1.schema > ) > val part_movies_2 = sqlContext.createDataFrame( > movies_2.rdd.map(r => (r.getInt(0), > r)).partitionBy(my_partitioner).values, > movies_2.schema > ) > > part_movies_1.persist() > part_movies_2.persist() > part_movies_1.registerTempTable("part_movies_1") > part_movies_2.registerTempTable("part_movies_2") > //look at storage in sparkUI > > val sql1 = sqlContext.sql("select * from part_movies_1 pm1 inner join > part_movies_2 pm2 on pm1.movie=pm2.movie and pm1.title = pm2.title") > > sql1.count() ///plan is for this count statement. > > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#750L]) > TungstenExchange SinglePartition > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[currentCount#783L]) > TungstenProject > SortMergeJoin [movie#622,title#623], [movie#640,title#641] > TungstenSort [movie#622 ASC,title#623 ASC], false, 0 > TungstenExchange hashpartitioning(movie#622,title#623) > ConvertToUnsafe > InMemoryColumnarTableScan [movie#622,title#623], (InMemoryRelation > [movie#622,title#623,genres#624], true, 10000, StorageLevel(true, true, > false, true, 1), (Scan PhysicalRDD[movie#622,title#623,genres#624]), None) > TungstenSort [movie#640 ASC,title#641 ASC], false, 0 > TungstenExchange hashpartitioning(movie#640,title#641) > ConvertToUnsafe > InMemoryColumnarTableScan [movie#640,title#641], (InMemoryRelation > [movie#640,title#641,genres#642], true, 10000, StorageLevel(true, true, > false, true, 1), (Scan PhysicalRDD[movie#640,title#641,genres#642]), None) > > Please let me know if you need further information. > > > On Tue, Apr 5, 2016 at 6:33 PM, Yong Zhang <java8...@hotmail.com> wrote: > > You need to show us the execution plan, so we can understand what is your > issue. > > Use the spark shell code to show how your DF is built, how you partition > them, then use explain(true) on your join DF, and show the output here, so > we can better help you. > > Yong > > > Date: Tue, 5 Apr 2016 09:46:59 -0700 > > From: darshan.m...@gmail.com > > To: user@spark.apache.org > > Subject: Plan issue with spark 1.5.2 > > > > > > I am using spark 1.5.2. I have a question regarding plan generated by > spark. > > I have 3 data-frames which has the data for different countries. I have > > around 150 countries and data is skewed. > > > > My 95% queries will have country as criteria. However, I have seen issues > > with the plans generated for queries which has country as join column. > > > > Data-frames are partitioned based on the country.Not only these > dataframes > > are co-partitioned, these are co-located as well. E.g. Data for UK in > > data-frame df1, df2 df3 will be at on same hdfs datanode. > > > > Then when i join these 3 tables and country is one of the join column. I > > assume that the join should be the map side join but it shuffles the data > > from 3 dataframes and then join using shuffled data. Apart from country > > there are other columns in join. > > > > Is this correct behavior? If it is an issue is it fixed in latest > versions? > > > > Thanks > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Plan-issue-with-spark-1-5-2-tp26681.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > > > > > >