GitHub user hvanhovell opened a pull request:

    https://github.com/apache/spark/pull/12874

    [SPARK-10605][SQL] Create native collect_list/collect_set aggregates

    ## What changes were proposed in this pull request?
    We currently use the Hive implementations for the collect_list/collect_set 
aggregate functions. This has a few major drawbacks, the use of HiveUDAF (which 
has quite a bit of overhead) and the lack of support for struct datatypes. This 
PR adds native implementation of these functions to Spark.
    
    The size of the collected list/set vary, this means we cannot use the fast, 
Tungsten, aggregation path to perform the aggregation, and that we fallback to 
the slower sort based path. Another big issue with these operators is that when 
the size of the collected list/set grows too large, we can start experiencing 
large GC pauzes and OOMEs.
    
    This `collect*` aggregates implemented in this PR rely on the sort based 
aggregate path for correctness. They maintain their own internal buffer which 
holds the rows for one group at a time. The sortbased aggregation path is 
triggered by disabling `partialAggregation` for the aggregates (which is kinda 
funny); this technique is also employed in 
`org.apache.spark.sql.hiveHiveUDAFFunction`.
    
    I have done some performance testing:
    ```scala
    import org.apache.spark.sql.{Dataset, Row}
    
    sql("create function collect_list2 as 
'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList'")
    
    val df = range(0, 10000000).select($"id", (rand(213123L) * 
100000).cast("int").as("grp"))
    df.select(countDistinct($"grp")).show
    
    def benchmark(name: String, plan: Dataset[Row], maxItr: Int = 5): Unit = {
       // Do not measure planning.
       plan1.queryExecution.executedPlan
    
       // Execute the plan a number of times and average the result.
       val start = System.nanoTime
       var i = 0
       while (i < maxItr) {
         plan.rdd.foreach(row => Unit)
         i += 1
       }
       val time = (System.nanoTime - start) / (maxItr * 1000000L)
       println(s"[$name] $maxItr iterations completed in an average time of 
$time ms.")
    }
    
    val plan1 = df.groupBy($"grp").agg(collect_list($"id"))
    val plan2 = df.groupBy($"grp").agg(callUDF("collect_list2", $"id"))
    
    benchmark("Spark collect_list", plan1)
    ...
    > [Spark collect_list] 5 iterations completed in an average time of 3371 ms.
    
    benchmark("Hive collect_list", plan2)
    ...
    > [Hive collect_list] 5 iterations completed in an average time of 9109 ms.
    ```
    Performance is improved by a factor 2-3.
    
    ## How was this patch tested?
    Added tests to `DataFrameAggregateSuite`.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hvanhovell/spark implode

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/12874.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #12874
    
----
commit 9d8aeed7b18d58258097526f85f07c37a10c28d8
Author: Herman van Hovell <[email protected]>
Date:   2016-02-01T15:15:16Z

    Add native collect_set/collect_list.

commit 8247d8eb6416b7b5d4eb61fa585c595d87930d63
Author: Herman van Hovell <[email protected]>
Date:   2016-02-01T15:43:24Z

    Add test for struct types.

commit 326a213dc014403aef1033e9d39206f5a873b7a6
Author: Herman van Hovell <[email protected]>
Date:   2016-02-01T18:14:10Z

    Add pretty names for SQL generation.

commit 9d3205d211405a598a4f24b2b5fef62cf0538f6e
Author: Herman van Hovell <[email protected]>
Date:   2016-05-03T16:32:37Z

    Merge remote-tracking branch 'apache-github/master' into implode
    
    # Conflicts:
    #   
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
    #   
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala

commit da6a13151e01d1789a18090a734ad512c56aecc5
Author: Herman van Hovell <[email protected]>
Date:   2016-05-03T16:34:01Z

    Merge remote-tracking branch 'apache-github/master' into implode

commit 8a4e7827d6b1f4c150ec29c35185e63c974762dd
Author: Herman van Hovell <[email protected]>
Date:   2016-05-03T18:32:22Z

    Merge remote-tracking branch 'apache-github/master' into implode
    
    # Conflicts:
    #   
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
    #   
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala

commit 597f76bb350126bb3360b692720426a6d41c3c18
Author: Herman van Hovell <[email protected]>
Date:   2016-05-03T18:48:39Z

    Remove hardcoded Hive references.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to