Github user viirya commented on the issue: https://github.com/apache/spark/pull/21498 I set up a Spark cluster with 5 nodes on EC2. ```scala def benchmark(func: () => Unit): Unit = { val t0 = System.nanoTime() func() val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0) + "ns") } val N = 100000000L spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as t2").repartition(col("key")).write.mode("overwrite").bucketBy(3, "key").sortBy("t1").saveAsTable("a1") spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as t2").repartition(col("key")).write.mode("overwrite").bucketBy(3, "key").sortBy("t1").saveAsTable("a2") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val df = sql("select key,count(*) from (select * from a1 union all select * from a2)z group by key") val df2 = sql("select * from a1 union all select * from a2") val df3 = df.sample(0.8).filter($"key" > 1000000).sample(0.4) val df4 = df2.sample(0.8).filter($"key" > 1000000).sample(0.4) benchmark(() => df.collect) benchmark(() => df2.collect) benchmark(() => df3.collect) benchmark(() => df4.collect) ``` Before: ``` scala> benchmark(() => df.collect) Elapsed time: 371007018ns scala> benchmark(() => df2.collect) Elapsed time: 93056619ns scala> benchmark(() => df3.collect) Elapsed time: 477603242ns scala> benchmark(() => df4.collect) Elapsed time: 150106354ns ``` After: ``` scala> benchmark(() => df.collect) Elapsed time: 101199791ns scala> benchmark(() => df2.collect) Elapsed time: 80275158ns scala> benchmark(() => df3.collect) Elapsed time: 292775244ns scala> benchmark(() => df4.collect) Elapsed time: 151129518ns ``` It improves the queries of `df` and `df3` by eliminating shuffle. For `df2` and `df4` which don't involve shuffle, there is no performance regression.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org