Github user viirya commented on the issue:
    I set up a Spark cluster with 5 nodes on EC2.
    def benchmark(func: () => Unit): Unit = {
        val t0 = System.nanoTime()
        val t1 = System.nanoTime()
        println("Elapsed time: " + (t1 - t0) + "ns")
    val N = 100000000L
    spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as 
    spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as 
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    val df = sql("select key,count(*) from (select * from a1 union all select * 
from a2)z group by key")
    val df2 = sql("select * from a1 union all select * from a2")
    val df3 = df.sample(0.8).filter($"key" > 1000000).sample(0.4)
    val df4 = df2.sample(0.8).filter($"key" > 1000000).sample(0.4)
    benchmark(() => df.collect)
    benchmark(() => df2.collect)
    benchmark(() => df3.collect)
    benchmark(() => df4.collect)
    scala> benchmark(() => df.collect)
    Elapsed time: 371007018ns
    scala> benchmark(() => df2.collect)
    Elapsed time: 93056619ns
    scala> benchmark(() => df3.collect)
    Elapsed time: 477603242ns
    scala> benchmark(() => df4.collect)
    Elapsed time: 150106354ns
    scala> benchmark(() => df.collect)
    Elapsed time: 101199791ns
    scala> benchmark(() => df2.collect)
    Elapsed time: 80275158ns
    scala> benchmark(() => df3.collect)
    Elapsed time: 292775244ns
    scala> benchmark(() => df4.collect)
    Elapsed time: 151129518ns
    It improves the queries of `df` and `df3` by eliminating shuffle. For `df2` 
and `df4` which don't involve shuffle, there is no performance regression.


To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to