Hi, I'm trying to understand SortMergeJoin (SPARK-2213).
1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For example, in the code below, the two datasets have different number of partitions, but it still does a SortMerge join after a "hashpartitioning". CODE: val sparkConf = new SparkConf() .setAppName("SortMergeJoinTest") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.eventLog.enabled", "true") .set("spark.sql.planner.sortMergeJoin","true") sparkConf.setMaster("local-cluster[3,1,1024]") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val inputpath = input.gz.parquet val df1 = sqlContext.read.parquet(inputpath).repartition(3) val df2 = sqlContext.read.parquet(inputpath).repartition(5) val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" === $"foo2") result.explain() OUTPUT: == Physical Plan == SortMergeJoin [foo#0], [foo2#8] TungstenSort [foo#0 ASC], false, 0 TungstenExchange hashpartitioning(foo#0) ConvertToUnsafe Repartition 3, true Scan ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3] TungstenSort [foo2#8 ASC], false, 0 TungstenExchange hashpartitioning(foo2#8) TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7] Repartition 5, true Scan ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7] 2) If both datasets have already been previously partitioned/sorted the same and stored on the file system (e.g. in a previous job), is there a way to tell Spark this so that it won't want to do a "hashpartitioning" on them? It looks like Spark just considers datasets that have been just read from the the file system to have UnknownPartitioning. In the example below, I try to join a dataframe to itself, and it still wants to hash repartition. CODE: ... val df1 = sqlContext.read.parquet(inputpath) val result = df1.join(df1.withColumnRenamed("foo","foo2"), $"foo" === $"foo2") result.explain() OUTPUT: == Physical Plan == SortMergeJoin [foo#0], [foo2#4] TungstenSort [foo#0 ASC], false, 0 TungstenExchange hashpartitioning(foo#0) ConvertToUnsafe Scan ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3] TungstenSort [foo2#4 ASC], false, 0 TungstenExchange hashpartitioning(foo2#4) ConvertToUnsafe Project [foo#5 AS foo2#4,bar#6L,somefield#7,anotherfield#8] Scan ParquetRelation[file:input.gz.parquet][foo#5,bar#6L,somefield#7,anotherfield#8] Thanks.