[GitHub] spark pull request #19938: [SPARK-22747][SQL] Localize lifetime of mutable s...

2017-12-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19938


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19938: [SPARK-22747][SQL] Localize lifetime of mutable s...

2017-12-09 Thread kiszk
GitHub user kiszk opened a pull request:

https://github.com/apache/spark/pull/19938

[SPARK-22747][SQL] Localize lifetime of mutable states in HashAggregateExec

## What changes were proposed in this pull request?

This PR localizes lifetime of mutable states, which are used for `isNull` 
and `value` of aggregation results, in generated code by `HashAggregateExec`. 

These status are passed to successor operations thru `consume()` method. It 
may violate this assumption at #19865 when operations that uses these variables 
are split.  In the following example, `agg_localBufValue` and 
`agg_localBufisNull` are passed to an successor operation (`projection`).

This PR is based on @cloud-fan 's 
[suggestion](https://github.com/apache/spark/pull/19865#issuecomment-348776654).

Without this PR
```
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator inputadapter_input;
...
/* 039 */   private void agg_doAggregateWithoutKey() throws 
java.io.IOException {
/* 040 */ // initialize aggregation buffer
/* 041 */ final long agg_value = -1L;
/* 042 */ agg_bufIsNull = true;
/* 043 */ agg_bufValue = agg_value;
/* 044 */
/* 045 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 046 */   InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
/* 047 */   boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 048 */   long inputadapter_value = inputadapter_isNull ? -1L : 
(inputadapter_row.getLong(0));
...
/* 100 */   } while (false);
/* 101 */   final boolean agg_isNull3 = agg_coalesceTmpIsNull;
/* 102 */   // update aggregation buffer
/* 103 */   agg_bufIsNull = agg_isNull3;
/* 104 */   agg_bufValue = agg_value3;
/* 105 */   if (shouldStop()) return;
/* 106 */ }
/* 107 */
/* 108 */   }
/* 109 */
/* 110 */   protected void processNext() throws java.io.IOException {
/* 111 */ while (!agg_initAgg) {
/* 112 */   agg_initAgg = true;
/* 113 */   long agg_beforeAgg = System.nanoTime();
/* 114 */   agg_doAggregateWithoutKey();
/* 115 */   agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 
100);
/* 116 */
/* 117 */   // output the result
/* 118 */
/* 119 */   agg_numOutputRows.add(1);
/* 120 */   agg_rowWriter.zeroOutNullBytes();
/* 121 */
/* 122 */   if (agg_bufisNull) {
/* 123 */ agg_rowWriter.setNullAt(0);
/* 124 */   } else {
/* 125 */ agg_rowWriter.write(0, agg_bufValue);
/* 126 */   }
/* 127 */   append(agg_result);
/* 128 */ }
/* 129 */   }
```

With this PR
```
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator inputadapter_input;
...
/* 039 */   private void agg_doAggregateWithoutKey() throws 
java.io.IOException {
/* 040 */ // initialize aggregation buffer
/* 041 */ final long agg_value = -1L;
/* 042 */ agg_bufIsNull = true;
/* 043 */ agg_bufValue = agg_value;
/* 044 */
/* 045 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 046 */   InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
/* 047 */   boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 048 */   long inputadapter_value = inputadapter_isNull ? -1L : 
(inputadapter_row.getLong(0));
...
/* 100 */   } while (false);
/* 101 */   final boolean agg_isNull3 = agg_coalesceTmpIsNull;
/* 102 */   // update aggregation buffer
/* 103 */   agg_bufIsNull = agg_isNull3;
/* 104 */   agg_bufValue = agg_value3;
/* 105 */   if (shouldStop()) return;
/* 106 */ }
/* 107 */
/* 108 */   }
/* 109 */
/* 110 */   protected void processNext() throws java.io.IOException {
/* 111 */ while (!agg_initAgg) {
/* 112 */   agg_initAgg = true;
/* 113 */   long agg_beforeAgg = System.nanoTime();
/* 114 */   agg_doAggregateWithoutKey();
/* 115 */   agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 
100);
/* 116 */
/* 117 */   // output the result
/* 118 */
/* 119 */