Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
Benchmarking on a Spark cluster with 5 nodes on EC2 too.
```scala
def benchmark(func: () => Unit): Unit = {
val t0 = System.nanoTime()
func()
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
}
val N = 10000
val data = (0 until N).map { i =>
(i, i % 2, i % 3, Array.fill(10)(i), Array.fill(10)(i.toString),
Array.fill(10)(i.toDouble), (i, i.toString, i.toDouble))
}
val df1 = sc.parallelize(data).toDF("key", "t1", "t2", "t3", "t4", "t5",
"t6").repartition($"key")
val df2 = sc.parallelize(data).toDF("key", "t1", "t2", "t3", "t4", "t5",
"t6").repartition($"key")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.unionInSamePartition", "true")
val df3 = df1.union(df2).groupBy("key").agg(count("*"))
val df4 = df1.union(df2)
val df5 = df3.sample(0.8).filter($"key" > 1000000).sample(0.4)
val df6 = df4.sample(0.8).filter($"key" > 1000000).sample(0.4)
benchmark(() => df3.collect)
benchmark(() => df4.collect)
benchmark(() => df5.collect)
benchmark(() => df6.collect)
```
Before:
```scala
scala> benchmark(() => df3.collect)
Elapsed time: 663668585ns
scala> benchmark(() => df4.collect)
Elapsed time: 547487953ns
scala> benchmark(() => df5.collect)
Elapsed time: 712634187ns
scala> benchmark(() => df6.collect)
Elapsed time: 491917400ns
```
After:
```scala
scala> benchmark(() => df3.collect)
Elapsed time: 516797788ns
scala> benchmark(() => df4.collect)
Elapsed time: 557499803ns
scala> benchmark(() => df5.collect)
Elapsed time: 611327782ns
scala> benchmark(() => df6.collect)
Elapsed time: 495387557ns
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]