Bruce Robbins created SPARK-23560: ------------------------------------- Summary: A joinWith followed by groupBy requires extra shuffle Key: SPARK-23560 URL: https://issues.apache.org/jira/browse/SPARK-23560 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Environment: debian 8.9, macos x high sierra Reporter: Bruce Robbins
Depending on the size of the input, a joinWith followed by a groupBy requires more shuffles than a join followed by a groupBy. For example, here's a joinWith on two CSV files, followed by a groupBy: {noformat} import org.apache.spark.sql.types._ val schema = StructType(StructField("id1", LongType) :: StructField("id2", LongType) :: Nil) val df1 = spark.read.schema(schema).csv("ds1.csv") val df2 = spark.read.schema(schema).csv("ds2.csv") val result1 = df1.joinWith(df2, df1.col("id1") === df2.col("id2")).groupBy("_1.id1").count result1.explain == Physical Plan == *(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)]) +- Exchange hashpartitioning(_1#8.id1#19L, 200) +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], functions=[partial_count(1)]) +- *(5) Project [_1#8] +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(_1#8.id1, 200) : +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS _1#8] : +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint> +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_2#9.id2, 200) +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS _2#9] +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint> {noformat} Using join, there is one less shuffle: {noformat} val result2 = df1.join(df2, df1.col("id1") === df2.col("id2")).groupBy(df1("id1")).count result2.explain == Physical Plan == *(5) HashAggregate(keys=[id1#0L], functions=[count(1)]) +- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)]) +- *(5) Project [id1#0L] +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id1#0L, 200) : +- *(1) Project [id1#0L] : +- *(1) Filter isnotnull(id1#0L) : +- *(1) FileScan csv [id1#0L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint> +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id2#5L, 200) +- *(3) Project [id2#5L] +- *(3) Filter isnotnull(id2#5L) +- *(3) FileScan csv [id2#5L] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint> {noformat} The extra exchange is reflected in the run time of the query. My tests were on inputs with more than 2 million records. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org