GitHub user maropu opened a pull request:

    https://github.com/apache/spark/pull/20965

    [SPARK-21870][SQL] Split aggregation code into small functions

    ## What changes were proposed in this pull request?
    This pr proposes to split aggregation code into pieces in 
`HashAggregateExec`. In #18810, we got performance regression if JVMs didn't 
compile too long functions. I checked and I found the codegen of 
`HashAggregateExec` frequently goes over the limit, for example:
    
    ```
    scala> spark.range(1).selectExpr("id % 1024 AS a", "id AS 
b").write.saveAsTable("t")
    scala> sql("SELECT a, KURTOSIS(b)FROM t GROUP BY a")
    ```
    
    This query goes over the limit and the actual bytecode size is `12356`.
    This pr split the aggregation code into small separate functions and, in a 
simple example;
    
    ```
    scala> sql("SELECT SUM(a), AVG(a) FROM VALUES(1) t(a)").debugCodegen
    ```
    
    - generated  code with this pr:
    ```
    /* 083 */   private void agg_doAggregateWithoutKey() throws 
java.io.IOException {
    /* 084 */     // initialize aggregation buffer
    /* 085 */     final long agg_value = -1L;
    /* 086 */     agg_bufIsNull = true;
    /* 087 */     agg_bufValue = agg_value;
    /* 088 */     boolean agg_isNull1 = false;
    /* 089 */     double agg_value1 = -1.0;
    /* 090 */     if (!false) {
    /* 091 */       agg_value1 = (double) 0;
    /* 092 */     }
    /* 093 */     agg_bufIsNull1 = agg_isNull1;
    /* 094 */     agg_bufValue1 = agg_value1;
    /* 095 */     agg_bufIsNull2 = false;
    /* 096 */     agg_bufValue2 = 0L;
    /* 097 */
    /* 098 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 099 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 100 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 101 */       long inputadapter_value = inputadapter_isNull ? -1L : 
(inputadapter_row.getLong(0));
    /* 102 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
    /* 103 */       double inputadapter_value1 = inputadapter_isNull1 ? -1.0 : 
(inputadapter_row.getDouble(1));
    /* 104 */       boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
    /* 105 */       long inputadapter_value2 = inputadapter_isNull2 ? -1L : 
(inputadapter_row.getLong(2));
    /* 106 */
    /* 107 */       // do aggregate
    /* 108 */       // copy aggregation buffer to the local
    /* 109 */       boolean agg_localBufIsNull = agg_bufIsNull;
    /* 110 */       long agg_localBufValue = agg_bufValue;
    /* 111 */       boolean agg_localBufIsNull1 = agg_bufIsNull1;
    /* 112 */       double agg_localBufValue1 = agg_bufValue1;
    /* 113 */       boolean agg_localBufIsNull2 = agg_bufIsNull2;
    /* 114 */       long agg_localBufValue2 = agg_bufValue2;
    /* 115 */       // common sub-expressions
    /* 116 */
    /* 117 */       // process aggregate functions to update aggregation buffer
    /* 118 */       agg_doAggregateVal_coalesce(agg_localBufIsNull, 
agg_localBufValue, inputadapter_value, inputadapter_isNull);
    /* 119 */       agg_doAggregateVal_add(agg_localBufValue1, 
inputadapter_isNull1, inputadapter_value1, agg_localBufIsNull1);
    /* 120 */       agg_doAggregateVal_add1(inputadapter_isNull2, 
inputadapter_value2, agg_localBufIsNull2, agg_localBufValue2);
    /* 121 */       if (shouldStop()) return;
    /* 122 */     }
    ```
    
    - generated code in the current master
    ```
    /* 083 */   private void agg_doAggregateWithoutKey() throws 
java.io.IOException {
    /* 084 */     // initialize aggregation buffer
    /* 085 */     final long agg_value = -1L;
    /* 086 */     agg_bufIsNull = true;
    /* 087 */     agg_bufValue = agg_value;
    /* 088 */     boolean agg_isNull1 = false;
    /* 089 */     double agg_value1 = -1.0;
    /* 090 */     if (!false) {
    /* 091 */       agg_value1 = (double) 0;
    /* 092 */     }
    /* 093 */     agg_bufIsNull1 = agg_isNull1;
    /* 094 */     agg_bufValue1 = agg_value1;
    /* 095 */     agg_bufIsNull2 = false;
    /* 096 */     agg_bufValue2 = 0L;
    /* 097 */
    /* 098 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 099 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 100 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 101 */       long inputadapter_value = inputadapter_isNull ? -1L : 
(inputadapter_row.getLong(0));
    /* 102 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
    /* 103 */       double inputadapter_value1 = inputadapter_isNull1 ? -1.0 : 
(inputadapter_row.getDouble(1));
    /* 104 */       boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
    /* 105 */       long inputadapter_value2 = inputadapter_isNull2 ? -1L : 
(inputadapter_row.getLong(2));
    /* 106 */
    /* 107 */       // do aggregate
    /* 108 */       // common sub-expressions
    /* 109 */
    /* 110 */       // evaluate aggregate function
    /* 111 */       boolean agg_isNull12 = true;
    /* 112 */       long agg_value12 = -1L;
    /* 113 */
    /* 114 */       boolean agg_isNull13 = agg_bufIsNull;
    /* 115 */       long agg_value13 = agg_bufValue;
    /* 116 */       if (agg_isNull13) {
    /* 117 */         boolean agg_isNull15 = false;
    /* 118 */         long agg_value15 = -1L;
    /* 119 */         if (!false) {
    /* 120 */           agg_value15 = (long) 0;
    /* 121 */         }
    /* 122 */         if (!agg_isNull15) {
    /* 123 */           agg_isNull13 = false;
    /* 124 */           agg_value13 = agg_value15;
    /* 125 */         }
    /* 126 */       }
    /* 127 */
    /* 128 */       if (!inputadapter_isNull) {
    /* 129 */         agg_isNull12 = false; // resultCode could change 
nullability.
    /* 130 */         agg_value12 = agg_value13 + inputadapter_value;
    /* 131 */
    /* 132 */       }
    /* 133 */       boolean agg_isNull11 = agg_isNull12;
    /* 134 */       long agg_value11 = agg_value12;
    /* 135 */       if (agg_isNull11) {
    /* 136 */         if (!agg_bufIsNull) {
    /* 137 */           agg_isNull11 = false;
    /* 138 */           agg_value11 = agg_bufValue;
    /* 139 */         }
    /* 140 */       }
    /* 141 */       boolean agg_isNull19 = true;
    /* 142 */       double agg_value19 = -1.0;
    /* 143 */
    /* 144 */       if (!agg_bufIsNull1) {
    /* 145 */         if (!inputadapter_isNull1) {
    /* 146 */           agg_isNull19 = false; // resultCode could change 
nullability.
    /* 147 */           agg_value19 = agg_bufValue1 + inputadapter_value1;
    /* 148 */
    /* 149 */         }
    /* 150 */
    /* 151 */       }
    /* 152 */       boolean agg_isNull22 = true;
    /* 153 */       long agg_value22 = -1L;
    /* 154 */
    /* 155 */       if (!agg_bufIsNull2) {
    /* 156 */         if (!inputadapter_isNull2) {
    /* 157 */           agg_isNull22 = false; // resultCode could change 
nullability.
    /* 158 */           agg_value22 = agg_bufValue2 + inputadapter_value2;
    /* 159 */
    /* 160 */         }
    /* 161 */
    /* 162 */       }
    /* 163 */       // update aggregation buffer
    /* 164 */       agg_bufIsNull = agg_isNull11;
    /* 165 */       agg_bufValue = agg_value11;
    /* 166 */
    /* 167 */       agg_bufIsNull1 = agg_isNull19;
    /* 168 */       agg_bufValue1 = agg_value19;
    /* 169 */
    /* 170 */       agg_bufIsNull2 = agg_isNull22;
    /* 171 */       agg_bufValue2 = agg_value22;
    /* 172 */       if (shouldStop()) return;
    /* 173 */     }
    /* 174 */
    /* 175 */   }
    /* 176 */
    /* 177 */ }
    ```
    
    I did also performance checks;
    ```
    $ ./bin/spark --master=local[1]
    scala> sql("SET spark.sql.shuffle.partitions=4")
    scala> spark.range(10000000).selectExpr("id % 1024 AS a", "id AS 
b").write.saveAsTable("t")
    scala> timer { sql("SELECT a, KURTOSIS(b)FROM t GROUP BY a").collect }
    
    master w/ this pr, Avg. Elapsed Time: 2.520551837s
    master, Avg. Elapsed Time: 54.029893146199996s
    ```
    
    ## How was this patch tested?
    Added tests in `WholeStageCodegenSuite`.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/maropu/spark SPARK-21870-2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20965.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20965
    
----
commit 289b6d31727277e6e201acb1fb450cddc5c6892a
Author: Takeshi Yamamuro <yamamuro@...>
Date:   2017-08-28T12:46:00Z

    Split aggregation into small functions

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to