GitHub user maropu opened a pull request:
https://github.com/apache/spark/pull/20965
[SPARK-21870][SQL] Split aggregation code into small functions
## What changes were proposed in this pull request?
This pr proposes to split aggregation code into pieces in
`HashAggregateExec`. In #18810, we got performance regression if JVMs didn't
compile too long functions. I checked and I found the codegen of
`HashAggregateExec` frequently goes over the limit, for example:
```
scala> spark.range(1).selectExpr("id % 1024 AS a", "id AS
b").write.saveAsTable("t")
scala> sql("SELECT a, KURTOSIS(b)FROM t GROUP BY a")
```
This query goes over the limit and the actual bytecode size is `12356`.
This pr split the aggregation code into small separate functions and, in a
simple example;
```
scala> sql("SELECT SUM(a), AVG(a) FROM VALUES(1) t(a)").debugCodegen
```
- generated code with this pr:
```
/* 083 */ private void agg_doAggregateWithoutKey() throws
java.io.IOException {
/* 084 */ // initialize aggregation buffer
/* 085 */ final long agg_value = -1L;
/* 086 */ agg_bufIsNull = true;
/* 087 */ agg_bufValue = agg_value;
/* 088 */ boolean agg_isNull1 = false;
/* 089 */ double agg_value1 = -1.0;
/* 090 */ if (!false) {
/* 091 */ agg_value1 = (double) 0;
/* 092 */ }
/* 093 */ agg_bufIsNull1 = agg_isNull1;
/* 094 */ agg_bufValue1 = agg_value1;
/* 095 */ agg_bufIsNull2 = false;
/* 096 */ agg_bufValue2 = 0L;
/* 097 */
/* 098 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 099 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 100 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 101 */ long inputadapter_value = inputadapter_isNull ? -1L :
(inputadapter_row.getLong(0));
/* 102 */ boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 103 */ double inputadapter_value1 = inputadapter_isNull1 ? -1.0 :
(inputadapter_row.getDouble(1));
/* 104 */ boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
/* 105 */ long inputadapter_value2 = inputadapter_isNull2 ? -1L :
(inputadapter_row.getLong(2));
/* 106 */
/* 107 */ // do aggregate
/* 108 */ // copy aggregation buffer to the local
/* 109 */ boolean agg_localBufIsNull = agg_bufIsNull;
/* 110 */ long agg_localBufValue = agg_bufValue;
/* 111 */ boolean agg_localBufIsNull1 = agg_bufIsNull1;
/* 112 */ double agg_localBufValue1 = agg_bufValue1;
/* 113 */ boolean agg_localBufIsNull2 = agg_bufIsNull2;
/* 114 */ long agg_localBufValue2 = agg_bufValue2;
/* 115 */ // common sub-expressions
/* 116 */
/* 117 */ // process aggregate functions to update aggregation buffer
/* 118 */ agg_doAggregateVal_coalesce(agg_localBufIsNull,
agg_localBufValue, inputadapter_value, inputadapter_isNull);
/* 119 */ agg_doAggregateVal_add(agg_localBufValue1,
inputadapter_isNull1, inputadapter_value1, agg_localBufIsNull1);
/* 120 */ agg_doAggregateVal_add1(inputadapter_isNull2,
inputadapter_value2, agg_localBufIsNull2, agg_localBufValue2);
/* 121 */ if (shouldStop()) return;
/* 122 */ }
```
- generated code in the current master
```
/* 083 */ private void agg_doAggregateWithoutKey() throws
java.io.IOException {
/* 084 */ // initialize aggregation buffer
/* 085 */ final long agg_value = -1L;
/* 086 */ agg_bufIsNull = true;
/* 087 */ agg_bufValue = agg_value;
/* 088 */ boolean agg_isNull1 = false;
/* 089 */ double agg_value1 = -1.0;
/* 090 */ if (!false) {
/* 091 */ agg_value1 = (double) 0;
/* 092 */ }
/* 093 */ agg_bufIsNull1 = agg_isNull1;
/* 094 */ agg_bufValue1 = agg_value1;
/* 095 */ agg_bufIsNull2 = false;
/* 096 */ agg_bufValue2 = 0L;
/* 097 */
/* 098 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 099 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 100 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 101 */ long inputadapter_value = inputadapter_isNull ? -1L :
(inputadapter_row.getLong(0));
/* 102 */ boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 103 */ double inputadapter_value1 = inputadapter_isNull1 ? -1.0 :
(inputadapter_row.getDouble(1));
/* 104 */ boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
/* 105 */ long inputadapter_value2 = inputadapter_isNull2 ? -1L :
(inputadapter_row.getLong(2));
/* 106 */
/* 107 */ // do aggregate
/* 108 */ // common sub-expressions
/* 109 */
/* 110 */ // evaluate aggregate function
/* 111 */ boolean agg_isNull12 = true;
/* 112 */ long agg_value12 = -1L;
/* 113 */
/* 114 */ boolean agg_isNull13 = agg_bufIsNull;
/* 115 */ long agg_value13 = agg_bufValue;
/* 116 */ if (agg_isNull13) {
/* 117 */ boolean agg_isNull15 = false;
/* 118 */ long agg_value15 = -1L;
/* 119 */ if (!false) {
/* 120 */ agg_value15 = (long) 0;
/* 121 */ }
/* 122 */ if (!agg_isNull15) {
/* 123 */ agg_isNull13 = false;
/* 124 */ agg_value13 = agg_value15;
/* 125 */ }
/* 126 */ }
/* 127 */
/* 128 */ if (!inputadapter_isNull) {
/* 129 */ agg_isNull12 = false; // resultCode could change
nullability.
/* 130 */ agg_value12 = agg_value13 + inputadapter_value;
/* 131 */
/* 132 */ }
/* 133 */ boolean agg_isNull11 = agg_isNull12;
/* 134 */ long agg_value11 = agg_value12;
/* 135 */ if (agg_isNull11) {
/* 136 */ if (!agg_bufIsNull) {
/* 137 */ agg_isNull11 = false;
/* 138 */ agg_value11 = agg_bufValue;
/* 139 */ }
/* 140 */ }
/* 141 */ boolean agg_isNull19 = true;
/* 142 */ double agg_value19 = -1.0;
/* 143 */
/* 144 */ if (!agg_bufIsNull1) {
/* 145 */ if (!inputadapter_isNull1) {
/* 146 */ agg_isNull19 = false; // resultCode could change
nullability.
/* 147 */ agg_value19 = agg_bufValue1 + inputadapter_value1;
/* 148 */
/* 149 */ }
/* 150 */
/* 151 */ }
/* 152 */ boolean agg_isNull22 = true;
/* 153 */ long agg_value22 = -1L;
/* 154 */
/* 155 */ if (!agg_bufIsNull2) {
/* 156 */ if (!inputadapter_isNull2) {
/* 157 */ agg_isNull22 = false; // resultCode could change
nullability.
/* 158 */ agg_value22 = agg_bufValue2 + inputadapter_value2;
/* 159 */
/* 160 */ }
/* 161 */
/* 162 */ }
/* 163 */ // update aggregation buffer
/* 164 */ agg_bufIsNull = agg_isNull11;
/* 165 */ agg_bufValue = agg_value11;
/* 166 */
/* 167 */ agg_bufIsNull1 = agg_isNull19;
/* 168 */ agg_bufValue1 = agg_value19;
/* 169 */
/* 170 */ agg_bufIsNull2 = agg_isNull22;
/* 171 */ agg_bufValue2 = agg_value22;
/* 172 */ if (shouldStop()) return;
/* 173 */ }
/* 174 */
/* 175 */ }
/* 176 */
/* 177 */ }
```
I did also performance checks;
```
$ ./bin/spark --master=local[1]
scala> sql("SET spark.sql.shuffle.partitions=4")
scala> spark.range(10000000).selectExpr("id % 1024 AS a", "id AS
b").write.saveAsTable("t")
scala> timer { sql("SELECT a, KURTOSIS(b)FROM t GROUP BY a").collect }
master w/ this pr, Avg. Elapsed Time: 2.520551837s
master, Avg. Elapsed Time: 54.029893146199996s
```
## How was this patch tested?
Added tests in `WholeStageCodegenSuite`.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/maropu/spark SPARK-21870-2
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/20965.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #20965
----
commit 289b6d31727277e6e201acb1fb450cddc5c6892a
Author: Takeshi Yamamuro <yamamuro@...>
Date: 2017-08-28T12:46:00Z
Split aggregation into small functions
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]