[jira] [Commented] (SPARK-23560) A joinWith followed by groupBy requires extra shuffle

2018-03-10 Thread Bruce Robbins (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394369#comment-16394369
 ] 

Bruce Robbins commented on SPARK-23560:
---

A simpler example that seems to reproduce this issue (without joinWith or join) 
is the following:

{noformat}
val df = Seq((20, 33), (20, 44), (99, 20), (33, 33), (-44, 99)).toDF("id1", 
"id2")

val result1 = df
  .select(struct('id1 as 'id1, 'id2 as 'id2) as 'x)
  .repartition($"x.id1")
  .groupBy($"x.id1")
  .count

result1.explain
== Physical Plan ==
*(2) HashAggregate(keys=[x#11.id1#25], functions=[count(1)])
+- Exchange hashpartitioning(x#11.id1#25, 200)
   +- *(1) HashAggregate(keys=[x#11.id1 AS x#11.id1#25], 
functions=[partial_count(1)])
  +- Exchange hashpartitioning(x#11.id1, 200)
 +- LocalTableScan [x#11]

val result2 = df
  .repartition('id1)
  .groupBy('id1).count

result2.explain
== Physical Plan ==
*(1) HashAggregate(keys=[id1#5], functions=[count(1)])
+- *(1) HashAggregate(keys=[id1#5], functions=[partial_count(1)])
   +- Exchange hashpartitioning(id1#5, 200)
  +- LocalTableScan [id1#5]
{noformat}

Seems joinWith is relevant only in the sense that it creates a struct.

> A joinWith followed by groupBy requires extra shuffle
> -
>
> Key: SPARK-23560
> URL: https://issues.apache.org/jira/browse/SPARK-23560
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: debian 8.9, macos x high sierra
>Reporter: Bruce Robbins
>Priority: Major
>
> Depending on the size of the input, a joinWith followed by a groupBy requires 
> more shuffles than a join followed by a groupBy.
> For example, here's a joinWith on two CSV files, followed by a groupBy:
> {noformat}
> import org.apache.spark.sql.types._
> val schema = StructType(StructField("id1", LongType) :: StructField("id2", 
> LongType) :: Nil)
> val df1 = spark.read.schema(schema).csv("ds1.csv")
> val df2 = spark.read.schema(schema).csv("ds2.csv")
> val result1 = df1.joinWith(df2, df1.col("id1") === 
> df2.col("id2")).groupBy("_1.id1").count
> result1.explain
> == Physical Plan ==
> *(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)])
> +- Exchange hashpartitioning(_1#8.id1#19L, 200)
>+- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], 
> functions=[partial_count(1)])
>   +- *(5) Project [_1#8]
>  +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner
> :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(_1#8.id1, 200)
> : +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS 
> _1#8]
> :+- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, 
> Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(_2#9.id2, 200)
>   +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS 
> _2#9]
>  +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, 
> Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> {noformat}
> Using join, there is one less shuffle:
> {noformat}
> val result2 = df1.join(df2,  df1.col("id1") === 
> df2.col("id2")).groupBy(df1("id1")).count
> result2.explain
> == Physical Plan ==
> *(5) HashAggregate(keys=[id1#0L], functions=[count(1)])
> +- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)])
>+- *(5) Project [id1#0L]
>   +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner
>  :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id1#0L, 200)
>  : +- *(1) Project [id1#0L]
>  :+- *(1) Filter isnotnull(id1#0L)
>  :   +- *(1) FileScan csv [id1#0L] Batched: false, Format: 
> CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], 
> PushedFilters: [IsNotNull(id1)], ReadSchema: struct
>  +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id2#5L, 200)
>+- *(3) Project [id2#5L]
>   +- *(3) Filter isnotnull(id2#5L)
>  +- *(3) FileScan csv [id2#5L] Batched: false, Format: 
> CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [], 
> PushedFilters: [IsNotNull(id2)], ReadSchema: struct
> {noformat}
> T-he extra exchange is reflected in the run time of the query.- Actually, I 
> recant this bit. In my particular tests, the extra exchange has negligible 
> impact on run time. All the difference is in stage 2.
> My tests were on inputs 

[jira] [Commented] (SPARK-23560) A joinWith followed by groupBy requires extra shuffle

2018-03-07 Thread Bruce Robbins (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390111#comment-16390111
 ] 

Bruce Robbins commented on SPARK-23560:
---

The main issue is that an AttributeReference instance never considers a 
GetStructField instance to be semantically equal, even if they both refer to 
the same field.

When I patched AttributeReference.semanticEquals to allow comparison with a 
GetStructField instance, the extra exchange drops out of the plan. It's not a 
production-quality patch, as the check is rudimentary, and I am not sure two 
such things should ever match. But it does confirm how the extra Exchange is 
getting added to the plan.

In a nutshell, the HashAggregate is getting its output partitionings from the 
Exchange below the Sort in the two branches under the SortMergeJoin in the 
plan, which contains the expressions for both table1.id1 and table2.id2 as an 
GetStructField instances. However, the HashAggregate's distribution (held in 
its parent's requiredChildDistributions field) contains an expression for 
table1.id1 as a AttributeReference reference. When EnsureRequirements tries to 
determine whether to add an Exchange to the plan, the two expressions for 
table1.id don't match (using AttributeReference.semanticEquals), resulting in 
an Exchange getting added.

In the "join" case, the two expressions for table1.id1 are both 
AttributeReference instances, so they match.

As for a fix, I need to explore the planner/optimizer code some more and try to 
determine if two expressions with different implementations should be 
comparable, or maybe if these two expressions should have had the same 
implementation.

> A joinWith followed by groupBy requires extra shuffle
> -
>
> Key: SPARK-23560
> URL: https://issues.apache.org/jira/browse/SPARK-23560
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: debian 8.9, macos x high sierra
>Reporter: Bruce Robbins
>Priority: Major
>
> Depending on the size of the input, a joinWith followed by a groupBy requires 
> more shuffles than a join followed by a groupBy.
> For example, here's a joinWith on two CSV files, followed by a groupBy:
> {noformat}
> import org.apache.spark.sql.types._
> val schema = StructType(StructField("id1", LongType) :: StructField("id2", 
> LongType) :: Nil)
> val df1 = spark.read.schema(schema).csv("ds1.csv")
> val df2 = spark.read.schema(schema).csv("ds2.csv")
> val result1 = df1.joinWith(df2, df1.col("id1") === 
> df2.col("id2")).groupBy("_1.id1").count
> result1.explain
> == Physical Plan ==
> *(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)])
> +- Exchange hashpartitioning(_1#8.id1#19L, 200)
>+- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L], 
> functions=[partial_count(1)])
>   +- *(5) Project [_1#8]
>  +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner
> :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(_1#8.id1, 200)
> : +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS 
> _1#8]
> :+- *(1) FileScan csv [id1#0L,id2#1L] Batched: false, 
> Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(_2#9.id2, 200)
>   +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS 
> _2#9]
>  +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false, 
> Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: 
> [], PushedFilters: [], ReadSchema: struct
> {noformat}
> Using join, there is one less shuffle:
> {noformat}
> val result2 = df1.join(df2,  df1.col("id1") === 
> df2.col("id2")).groupBy(df1("id1")).count
> result2.explain
> == Physical Plan ==
> *(5) HashAggregate(keys=[id1#0L], functions=[count(1)])
> +- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)])
>+- *(5) Project [id1#0L]
>   +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner
>  :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id1#0L, 200)
>  : +- *(1) Project [id1#0L]
>  :+- *(1) Filter isnotnull(id1#0L)
>  :   +- *(1) FileScan csv [id1#0L] Batched: false, Format: 
> CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [], 
> PushedFilters: [IsNotNull(id1)], ReadSchema: struct
>  +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id2#5L, 200)
>+- *(3) Project [id2#5L]
>