Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
I set up a Spark cluster with 5 nodes on EC2.
```scala
def benchmark(func: () => Unit): Unit = {
val t0 = System.nanoTime()
func()
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
}
val N = 100000000L
spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as
t2").repartition(col("key")).write.mode("overwrite").bucketBy(3,
"key").sortBy("t1").saveAsTable("a1")
spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as
t2").repartition(col("key")).write.mode("overwrite").bucketBy(3,
"key").sortBy("t1").saveAsTable("a2")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val df = sql("select key,count(*) from (select * from a1 union all select *
from a2)z group by key")
val df2 = sql("select * from a1 union all select * from a2")
val df3 = df.sample(0.8).filter($"key" > 1000000).sample(0.4)
val df4 = df2.sample(0.8).filter($"key" > 1000000).sample(0.4)
benchmark(() => df.collect)
benchmark(() => df2.collect)
benchmark(() => df3.collect)
benchmark(() => df4.collect)
```
Before:
```
scala> benchmark(() => df.collect)
Elapsed time: 371007018ns
scala> benchmark(() => df2.collect)
Elapsed time: 93056619ns
scala> benchmark(() => df3.collect)
Elapsed time: 477603242ns
scala> benchmark(() => df4.collect)
Elapsed time: 150106354ns
```
After:
```
scala> benchmark(() => df.collect)
Elapsed time: 101199791ns
scala> benchmark(() => df2.collect)
Elapsed time: 80275158ns
scala> benchmark(() => df3.collect)
Elapsed time: 292775244ns
scala> benchmark(() => df4.collect)
Elapsed time: 151129518ns
```
It improves the queries of `df` and `df3` by eliminating shuffle. For `df2`
and `df4` which don't involve shuffle, there is no performance regression.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]