GitHub user chenghao-intel opened a pull request:

    https://github.com/apache/spark/pull/2113

    [SPARK-3197] [SQL] Reduce the Expressions creation for aggregation function 
(min/max)

    Aggregation function min/max in catalyst will create expression tree for 
each single row, however, the expression tree creation is quite expensive in a 
multithreading env currently. Hence we got a very bad performance for the 
min/max. 
    Here is the benchmark that I've done in my local.
    
    Master | Previous Result (ms) | Current Result (ms)
    ------------ | ------------- | ------------- 
    local | 3645 | 3416
    local[6] | 3602 | 1002
    
    The Benchmark source code.
    ```
    case class Record(key: Int, value: Int)
    
    object TestHive2 extends HiveContext(new SparkContext("local[6]", 
"TestSQLContext", new SparkConf()))
    
    object DataPrepare extends App {
      import TestHive2._
    
      val rdd = sparkContext.parallelize((1 to 10000000).map(i => Record(i % 
3000, i)), 12)
    
      runSqlHive("SHOW TABLES")
      runSqlHive("DROP TABLE if exists a")
      runSqlHive("DROP TABLE if exists result")
      rdd.registerAsTable("records")
    
      runSqlHive("""CREATE TABLE a (key INT, value INT)
                     | ROW FORMAT SERDE 
                     | 
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' 
                     | STORED AS RCFILE
                   """.stripMargin)
      runSqlHive("""CREATE TABLE result (key INT, value INT)
                     | ROW FORMAT SERDE 
                     | 
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' 
                     | STORED AS RCFILE
                   """.stripMargin)
    
      hql(s"""from records 
                 | insert into table a
                 | select key, value
               """.stripMargin)
    }
    
    object PerformanceTest extends App {
      import TestHive2._
    
      hql("SHOW TABLES")
      hql("set spark.sql.shuffle.partitions=12")
    
      val cmd = "select min(value), max(value) from a group by key"
    
      val results = ("Result1", benchmark(cmd)) :: 
                    ("Result2", benchmark(cmd)) :: 
                    ("Result3", benchmark(cmd)) :: Nil
      results.foreach { case (prompt, result) => {
          println(s"$prompt: took ${result._1} ms (${result._2} records)")
        }
      }
    
      def benchmark(cmd: String) = {
        val begin = System.currentTimeMillis()
        val count = hql(cmd).count
        val end = System.currentTimeMillis()
        ((end - begin), count)
      }
    }
    ```


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/chenghao-intel/spark 
aggregation_expression_optimization

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2113
    
----
commit 03c6d4fa67eb8c07dacc2a241578da0ea364b4cd
Author: Cheng Hao <[email protected]>
Date:   2014-08-25T06:40:28Z

    Reduce the Expressions creation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to