Bruce Robbins created SPARK-23560:
-------------------------------------

             Summary: A joinWith followed by groupBy requires extra shuffle
                 Key: SPARK-23560
                 URL: https://issues.apache.org/jira/browse/SPARK-23560
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.3.0
         Environment: debian 8.9, macos x high sierra
            Reporter: Bruce Robbins


Depending on the size of the input, a joinWith followed by a groupBy requires 
more shuffles than a join followed by a groupBy.

For example, here's a joinWith on two CSV files, followed by a groupBy:
{noformat}
import org.apache.spark.sql.types._
val schema = StructType(StructField("id1", LongType) :: StructField("id2", 
LongType) :: Nil)

val df1 = spark.read.schema(schema).csv("ds1.csv")
val df2 = spark.read.schema(schema).csv("ds2.csv")

val result1 = df1.joinWith(df2, df1.col("id1") === 
df2.col("id2")).groupBy("_1.id1").count

result1.explain
== Physical Plan ==
*(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)])
+- Exchange hashpartitioning(_1#8.id1#19L, 200)
   +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], 
functions=[partial_count(1)])
      +- *(5) Project [_1#8]
         +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner
            :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(_1#8.id1, 200)
            :     +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS 
_1#8]
            :        +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, 
Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
            +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(_2#9.id2, 200)
                  +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS 
_2#9]
                     +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, 
Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
{noformat}
Using join, there is one less shuffle:
{noformat}
val result2 = df1.join(df2,  df1.col("id1") === 
df2.col("id2")).groupBy(df1("id1")).count

result2.explain
== Physical Plan ==
*(5) HashAggregate(keys=[id1#0L], functions=[count(1)])
+- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)])
   +- *(5) Project [id1#0L]
      +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner
         :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id1#0L, 200)
         :     +- *(1) Project [id1#0L]
         :        +- *(1) Filter isnotnull(id1#0L)
         :           +- *(1) FileScan csv [id1#0L] Batched: false, Format: CSV, 
Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], 
PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint>
         +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id2#5L, 200)
               +- *(3) Project [id2#5L]
                  +- *(3) Filter isnotnull(id2#5L)
                     +- *(3) FileScan csv [id2#5L] Batched: false, Format: CSV, 
Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], 
PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint>
{noformat}
The extra exchange is reflected in the run time of the query.

My tests were on inputs with more than 2 million records.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to