GitHub user maropu opened a pull request:

    https://github.com/apache/spark/pull/19082

    [SPARK-21870][SQL] Split aggregation code into small functions for the 
HotSpot

    ## What changes were proposed in this pull request?
    This pr proposes to split aggregation code into pieces in 
`HashAggregateExec` for the JVM HotSpot.
    In #18810, we got performance regression if the HotSpot didn't compile too 
long functions (the limit is 80000 in bytecode size). I checked and I found the 
codegen of `HashAggregateExec` frequently goes over the limit, for example:
    
    ```
    spark.range(10000000).selectExpr("id % 1024 AS a", "id AS 
b").write.saveAsTable("t")
    sql("SELECT a, KURTOSIS(b)FROM t GROUP BY a")
    ```
    
    This query goes over the limit and the actual bytecode size is `12356`.
    This pr split the aggregation code into small separate functions and, in a 
simple example;
    
    ```
    sql("SELECT SUM(a), AVG(a) FROM VALUES(1) t(a)").debugCodegen
    ```
    
    - generated  code with this pr:
    ```
    /* 083 */   private void agg_doAggregateWithoutKey() throws 
java.io.IOException {
    /* 084 */     // initialize aggregation buffer
    /* 085 */     final long agg_value = -1L;
    /* 086 */     agg_bufIsNull = true;
    /* 087 */     agg_bufValue = agg_value;
    /* 088 */     boolean agg_isNull1 = false;
    /* 089 */     double agg_value1 = -1.0;
    /* 090 */     if (!false) {
    /* 091 */       agg_value1 = (double) 0;
    /* 092 */     }
    /* 093 */     agg_bufIsNull1 = agg_isNull1;
    /* 094 */     agg_bufValue1 = agg_value1;
    /* 095 */     agg_bufIsNull2 = false;
    /* 096 */     agg_bufValue2 = 0L;
    /* 097 */
    /* 098 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 099 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 100 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 101 */       long inputadapter_value = inputadapter_isNull ? -1L : 
(inputadapter_row.getLong(0));
    /* 102 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
    /* 103 */       double inputadapter_value1 = inputadapter_isNull1 ? -1.0 : 
(inputadapter_row.getDouble(1));
    /* 104 */       boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
    /* 105 */       long inputadapter_value2 = inputadapter_isNull2 ? -1L : 
(inputadapter_row.getLong(2));
    /* 106 */
    /* 107 */       // do aggregate
    /* 108 */       // copy aggregation buffer to the local
    /* 109 */       boolean agg_localBufIsNull = agg_bufIsNull;
    /* 110 */       long agg_localBufValue = agg_bufValue;
    /* 111 */       boolean agg_localBufIsNull1 = agg_bufIsNull1;
    /* 112 */       double agg_localBufValue1 = agg_bufValue1;
    /* 113 */       boolean agg_localBufIsNull2 = agg_bufIsNull2;
    /* 114 */       long agg_localBufValue2 = agg_bufValue2;
    /* 115 */       // common sub-expressions
    /* 116 */
    /* 117 */       // process aggregate functions to update aggregation buffer
    /* 118 */       agg_doAggregateVal_coalesce(agg_localBufIsNull, 
agg_localBufValue, inputadapter_value, inputadapter_isNull);
    /* 119 */       agg_doAggregateVal_add(agg_localBufValue1, 
inputadapter_isNull1, inputadapter_value1, agg_localBufIsNull1);
    /* 120 */       agg_doAggregateVal_add1(inputadapter_isNull2, 
inputadapter_value2, agg_localBufIsNull2, agg_localBufValue2);
    /* 121 */       if (shouldStop()) return;
    /* 122 */     }
    ```
    
    - generated code in the current master
    ```
    /* 083 */   private void agg_doAggregateWithoutKey() throws 
java.io.IOException {
    /* 084 */     // initialize aggregation buffer
    /* 085 */     final long agg_value = -1L;
    /* 086 */     agg_bufIsNull = true;
    /* 087 */     agg_bufValue = agg_value;
    /* 088 */     boolean agg_isNull1 = false;
    /* 089 */     double agg_value1 = -1.0;
    /* 090 */     if (!false) {
    /* 091 */       agg_value1 = (double) 0;
    /* 092 */     }
    /* 093 */     agg_bufIsNull1 = agg_isNull1;
    /* 094 */     agg_bufValue1 = agg_value1;
    /* 095 */     agg_bufIsNull2 = false;
    /* 096 */     agg_bufValue2 = 0L;
    /* 097 */
    /* 098 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 099 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 100 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 101 */       long inputadapter_value = inputadapter_isNull ? -1L : 
(inputadapter_row.getLong(0));
    /* 102 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
    /* 103 */       double inputadapter_value1 = inputadapter_isNull1 ? -1.0 : 
(inputadapter_row.getDouble(1));
    /* 104 */       boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
    /* 105 */       long inputadapter_value2 = inputadapter_isNull2 ? -1L : 
(inputadapter_row.getLong(2));
    /* 106 */
    /* 107 */       // do aggregate
    /* 108 */       // common sub-expressions
    /* 109 */
    /* 110 */       // evaluate aggregate function
    /* 111 */       boolean agg_isNull12 = true;
    /* 112 */       long agg_value12 = -1L;
    /* 113 */
    /* 114 */       boolean agg_isNull13 = agg_bufIsNull;
    /* 115 */       long agg_value13 = agg_bufValue;
    /* 116 */       if (agg_isNull13) {
    /* 117 */         boolean agg_isNull15 = false;
    /* 118 */         long agg_value15 = -1L;
    /* 119 */         if (!false) {
    /* 120 */           agg_value15 = (long) 0;
    /* 121 */         }
    /* 122 */         if (!agg_isNull15) {
    /* 123 */           agg_isNull13 = false;
    /* 124 */           agg_value13 = agg_value15;
    /* 125 */         }
    /* 126 */       }
    /* 127 */
    /* 128 */       if (!inputadapter_isNull) {
    /* 129 */         agg_isNull12 = false; // resultCode could change 
nullability.
    /* 130 */         agg_value12 = agg_value13 + inputadapter_value;
    /* 131 */
    /* 132 */       }
    /* 133 */       boolean agg_isNull11 = agg_isNull12;
    /* 134 */       long agg_value11 = agg_value12;
    /* 135 */       if (agg_isNull11) {
    /* 136 */         if (!agg_bufIsNull) {
    /* 137 */           agg_isNull11 = false;
    /* 138 */           agg_value11 = agg_bufValue;
    /* 139 */         }
    /* 140 */       }
    /* 141 */       boolean agg_isNull19 = true;
    /* 142 */       double agg_value19 = -1.0;
    /* 143 */
    /* 144 */       if (!agg_bufIsNull1) {
    /* 145 */         if (!inputadapter_isNull1) {
    /* 146 */           agg_isNull19 = false; // resultCode could change 
nullability.
    /* 147 */           agg_value19 = agg_bufValue1 + inputadapter_value1;
    /* 148 */
    /* 149 */         }
    /* 150 */
    /* 151 */       }
    /* 152 */       boolean agg_isNull22 = true;
    /* 153 */       long agg_value22 = -1L;
    /* 154 */
    /* 155 */       if (!agg_bufIsNull2) {
    /* 156 */         if (!inputadapter_isNull2) {
    /* 157 */           agg_isNull22 = false; // resultCode could change 
nullability.
    /* 158 */           agg_value22 = agg_bufValue2 + inputadapter_value2;
    /* 159 */
    /* 160 */         }
    /* 161 */
    /* 162 */       }
    /* 163 */       // update aggregation buffer
    /* 164 */       agg_bufIsNull = agg_isNull11;
    /* 165 */       agg_bufValue = agg_value11;
    /* 166 */
    /* 167 */       agg_bufIsNull1 = agg_isNull19;
    /* 168 */       agg_bufValue1 = agg_value19;
    /* 169 */
    /* 170 */       agg_bufIsNull2 = agg_isNull22;
    /* 171 */       agg_bufValue2 = agg_value22;
    /* 172 */       if (shouldStop()) return;
    /* 173 */     }
    /* 174 */
    /* 175 */   }
    /* 176 */
    /* 177 */ }
    ```
    
    ## How was this patch tested?
    Added tests in `WholeStageCodegenSuite`.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/maropu/spark SPARK-21870

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19082.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19082
    
----
commit 6dacad4e6d0f588ce6f02e33da9625cc9ea116aa
Author: Takeshi Yamamuro <[email protected]>
Date:   2017-08-28T12:46:00Z

    Split aggregation into small functions

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to