GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/19938
[SPARK-22747][SQL] Localize lifetime of mutable states in HashAggregateExec ## What changes were proposed in this pull request? This PR localizes lifetime of mutable states, which are used for `isNull` and `value` of aggregation results, in generated code by `HashAggregateExec`. These status are passed to successor operations thru `consume()` method. It may violate this assumption at #19865 when operations that uses these variables are split. In the following example, `agg_localBufValue` and `agg_localBufisNull` are passed to an successor operation (`projection`). This PR is based on @cloud-fan 's [suggestion](https://github.com/apache/spark/pull/19865#issuecomment-348776654). Without this PR ``` /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private scala.collection.Iterator inputadapter_input; ... /* 039 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 040 */ // initialize aggregation buffer /* 041 */ final long agg_value = -1L; /* 042 */ agg_bufIsNull = true; /* 043 */ agg_bufValue = agg_value; /* 044 */ /* 045 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 046 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 047 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 048 */ long inputadapter_value = inputadapter_isNull ? -1L : (inputadapter_row.getLong(0)); ... /* 100 */ } while (false); /* 101 */ final boolean agg_isNull3 = agg_coalesceTmpIsNull; /* 102 */ // update aggregation buffer /* 103 */ agg_bufIsNull = agg_isNull3; /* 104 */ agg_bufValue = agg_value3; /* 105 */ if (shouldStop()) return; /* 106 */ } /* 107 */ /* 108 */ } /* 109 */ /* 110 */ protected void processNext() throws java.io.IOException { /* 111 */ while (!agg_initAgg) { /* 112 */ agg_initAgg = true; /* 113 */ long agg_beforeAgg = System.nanoTime(); /* 114 */ agg_doAggregateWithoutKey(); /* 115 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 116 */ /* 117 */ // output the result /* 118 */ /* 119 */ agg_numOutputRows.add(1); /* 120 */ agg_rowWriter.zeroOutNullBytes(); /* 121 */ /* 122 */ if (agg_bufisNull) { /* 123 */ agg_rowWriter.setNullAt(0); /* 124 */ } else { /* 125 */ agg_rowWriter.write(0, agg_bufValue); /* 126 */ } /* 127 */ append(agg_result); /* 128 */ } /* 129 */ } ``` With this PR ``` /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private scala.collection.Iterator inputadapter_input; ... /* 039 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 040 */ // initialize aggregation buffer /* 041 */ final long agg_value = -1L; /* 042 */ agg_bufIsNull = true; /* 043 */ agg_bufValue = agg_value; /* 044 */ /* 045 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 046 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 047 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 048 */ long inputadapter_value = inputadapter_isNull ? -1L : (inputadapter_row.getLong(0)); ... /* 100 */ } while (false); /* 101 */ final boolean agg_isNull3 = agg_coalesceTmpIsNull; /* 102 */ // update aggregation buffer /* 103 */ agg_bufIsNull = agg_isNull3; /* 104 */ agg_bufValue = agg_value3; /* 105 */ if (shouldStop()) return; /* 106 */ } /* 107 */ /* 108 */ } /* 109 */ /* 110 */ protected void processNext() throws java.io.IOException { /* 111 */ while (!agg_initAgg) { /* 112 */ agg_initAgg = true; /* 113 */ long agg_beforeAgg = System.nanoTime(); /* 114 */ agg_doAggregateWithoutKey(); /* 115 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 116 */ /* 117 */ // output the result /* 118 */ /* 119 */ boolean agg_localBufisNull = agg_bufIsNull; /* 120 */ long agg_localBufValue = agg_bufValue; /* 121 */ /* 122 */ agg_numOutputRows.add(1); /* 123 */ agg_rowWriter.zeroOutNullBytes(); /* 124 */ /* 125 */ if (agg_localBufisNull) { /* 126 */ agg_rowWriter.setNullAt(0); /* 127 */ } else { /* 128 */ agg_rowWriter.write(0, agg_localBufValue); /* 129 */ } /* 130 */ append(agg_result); /* 131 */ } /* 132 */ } ``` ## How was this patch tested? Existing test suites You can merge this pull request into a Git repository by running: $ git pull https://github.com/kiszk/spark SPARK-22747 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19938.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19938 ---- commit ac65dd21b975481f5c4feb5f0745a2c45d000728 Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com> Date: 2017-12-10T07:06:43Z initial commit ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org