Github user sameeragarwal commented on the pull request:

    https://github.com/apache/spark/pull/12161#issuecomment-206021282
  
    ```java
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIterator(references);
    /* 003 */ }
    /* 004 */ 
    /* 005 */ /** Codegened pipeline for:
    /* 006 */ * TungstenAggregate(key=[k1#29L,k2#30L], 
functions=[(sum(id#26L),mode=Final,isDistinct=false)], 
output=[k1#29L,k2#30L,sum(id)#34L]...
    /* 007 */   */
    /* 008 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 009 */   private Object[] references;
    /* 010 */   private boolean agg_initAgg;
    /* 011 */   private agg_AggregateHashMap agg_aggregateHashMap;
    /* 012 */   private 
org.apache.spark.sql.execution.aggregate.TungstenAggregate agg_plan;
    /* 013 */   private 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
    /* 014 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter 
agg_sorter;
    /* 015 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
    /* 016 */   private scala.collection.Iterator inputadapter_input;
    /* 017 */   private UnsafeRow agg_result;
    /* 018 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
    /* 019 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
    /* 020 */   private UnsafeRow agg_result1;
    /* 021 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1;
    /* 022 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
agg_rowWriter1;
    /* 023 */   private org.apache.spark.sql.execution.metric.LongSQLMetric 
wholestagecodegen_numOutputRows;
    /* 024 */   private 
org.apache.spark.sql.execution.metric.LongSQLMetricValue 
wholestagecodegen_metricValue;
    /* 025 */   
    /* 026 */   public GeneratedIterator(Object[] references) {
    /* 027 */     this.references = references;
    /* 028 */   }
    /* 029 */   
    /* 030 */   public void init(int index, scala.collection.Iterator inputs[]) 
{
    /* 031 */     partitionIndex = index;
    /* 032 */     agg_initAgg = false;
    /* 033 */     agg_aggregateHashMap = new agg_AggregateHashMap();
    /* 034 */     this.agg_plan = 
(org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0];
    /* 035 */     agg_hashMap = agg_plan.createHashMap();
    /* 036 */     
    /* 037 */     inputadapter_input = inputs[0];
    /* 038 */     agg_result = new UnsafeRow(2);
    /* 039 */     this.agg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
    /* 040 */     this.agg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 
2);
    /* 041 */     agg_result1 = new UnsafeRow(3);
    /* 042 */     this.agg_holder1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 0);
    /* 043 */     this.agg_rowWriter1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1, 
3);
    /* 044 */     this.wholestagecodegen_numOutputRows = 
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
    /* 045 */     wholestagecodegen_metricValue = 
(org.apache.spark.sql.execution.metric.LongSQLMetricValue) 
wholestagecodegen_numOutputRows.localValue();
    /* 046 */   }
    /* 047 */   
    /* 048 */   public class agg_AggregateHashMap {
    /* 049 */     private 
org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
    /* 050 */     private int[] buckets;
    /* 051 */     private int numBuckets;
    /* 052 */     private int maxSteps;
    /* 053 */     private int numRows = 0;
    /* 054 */     private org.apache.spark.sql.types.StructType schema =
    /* 055 */     new org.apache.spark.sql.types.StructType()
    /* 056 */     .add("k1", org.apache.spark.sql.types.DataTypes.LongType)
    /* 057 */     .add("k2", org.apache.spark.sql.types.DataTypes.LongType)
    /* 058 */     .add("sum", org.apache.spark.sql.types.DataTypes.LongType);
    /* 059 */     
    /* 060 */     public agg_AggregateHashMap(int capacity, double loadFactor, 
int maxSteps) {
    /* 061 */       assert (capacity > 0 && ((capacity & (capacity - 1)) == 0));
    /* 062 */       this.maxSteps = maxSteps;
    /* 063 */       numBuckets = (int) (capacity / loadFactor);
    /* 064 */       batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
    /* 065 */         org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
    /* 066 */       buckets = new int[numBuckets];
    /* 067 */       java.util.Arrays.fill(buckets, -1);
    /* 068 */     }
    /* 069 */     
    /* 070 */     public agg_AggregateHashMap() {
    /* 071 */       new agg_AggregateHashMap(1 << 16, 0.25, 5);
    /* 072 */     }
    /* 073 */     
    /* 074 */     public 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(long 
k1, long k2) {
    /* 075 */       int idx = find(k1, k2);
    /* 076 */       if (idx != -1 && buckets[idx] == -1) {
    /* 077 */         batch.column(0).putLong(numRows, k1);
    /* 078 */         batch.column(1).putLong(numRows, k2);
    /* 079 */         batch.column(2).putLong(numRows, 0);
    /* 080 */         buckets[idx] = numRows++;
    /* 081 */       }
    /* 082 */       return batch.getRow(buckets[idx]);
    /* 083 */     }
    /* 084 */     
    /* 085 */     private int find(long k1, long k2) {
    /* 086 */       long h = hash(k1, k2);
    /* 087 */       int step = 0;
    /* 088 */       int idx = (int) h & (numBuckets - 1);
    /* 089 */       while (step < maxSteps) {
    /* 090 */         // Return bucket index if it's either an empty slot or 
already contains the key
    /* 091 */         if (buckets[idx] == -1) {
    /* 092 */           return idx;
    /* 093 */         } else if (equals(idx, k1, k2)) {
    /* 094 */           return idx;
    /* 095 */         }
    /* 096 */         idx = (idx + 1) & (numBuckets - 1);
    /* 097 */         step++;
    /* 098 */       }
    /* 099 */       // Didn't find it
    /* 100 */       return -1;
    /* 101 */     }
    /* 102 */     
    /* 103 */     private boolean equals(int idx, long k1, long k2) {
    /* 104 */       return batch.column(0).getLong(buckets[idx]) == k1 && 
batch.column(1).getLong(buckets[idx]) == k2;
    /* 105 */     }
    /* 106 */     
    /* 107 */     // TODO: Improve this Hash Function
    /* 108 */     private long hash(long k1, long k2) {
    /* 109 */       return k1 & k2;
    /* 110 */     }
    /* 111 */     
    /* 112 */   }
    /* 113 */   
    /* 114 */   private void agg_doAggregateWithKeys() throws 
java.io.IOException {
    /* 115 */     /*** PRODUCE: INPUT */
    /* 116 */     
    /* 117 */     while (inputadapter_input.hasNext()) {
    /* 118 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 119 */       /*** CONSUME: TungstenAggregate(key=[k1#29L,k2#30L], 
functions=[(sum(id#26L),mode=Final,isDistinct=false)], 
output=[k1#29L,k2#30L,sum(id)#34L]... */
    /* 120 */       /* input[0, bigint] */
    /* 121 */       long inputadapter_value = inputadapter_row.getLong(0);
    /* 122 */       /* input[1, bigint] */
    /* 123 */       long inputadapter_value1 = inputadapter_row.getLong(1);
    /* 124 */       /* input[2, bigint] */
    /* 125 */       boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
    /* 126 */       long inputadapter_value2 = inputadapter_isNull2 ? -1L : 
(inputadapter_row.getLong(2));
    /* 127 */       
    /* 128 */       // generate grouping key
    /* 129 */       agg_rowWriter.write(0, inputadapter_value);
    /* 130 */       
    /* 131 */       agg_rowWriter.write(1, inputadapter_value1);
    /* 132 */       /* hash(input[0, bigint], input[1, bigint], 42) */
    /* 133 */       int agg_value2 = 42;
    /* 134 */       
    /* 135 */       agg_value2 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(inputadapter_value, 
agg_value2);
    /* 136 */       
    /* 137 */       agg_value2 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(inputadapter_value1, 
agg_value2);
    /* 138 */       UnsafeRow agg_aggBuffer = null;
    /* 139 */       if (true) {
    /* 140 */         // try to get the buffer from hash map
    /* 141 */         agg_aggBuffer = 
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value2);
    /* 142 */       }
    /* 143 */       if (agg_aggBuffer == null) {
    /* 144 */         if (agg_sorter == null) {
    /* 145 */           agg_sorter = 
agg_hashMap.destructAndCreateExternalSorter();
    /* 146 */         } else {
    /* 147 */           
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
    /* 148 */         }
    /* 149 */         
    /* 150 */         // the hash map had be spilled, it should have enough 
memory now,
    /* 151 */         // try  to allocate buffer again.
    /* 152 */         agg_aggBuffer = 
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value2);
    /* 153 */         if (agg_aggBuffer == null) {
    /* 154 */           // failed to allocate the first page
    /* 155 */           throw new OutOfMemoryError("No enough memory for 
aggregation");
    /* 156 */         }
    /* 157 */       }
    /* 158 */       
    /* 159 */       // evaluate aggregate function
    /* 160 */       /* coalesce((coalesce(input[0, bigint], cast(0 as bigint)) 
+ input[3, bigint]), input[0, bigint]) */
    /* 161 */       /* (coalesce(input[0, bigint], cast(0 as bigint)) + 
input[3, bigint]) */
    /* 162 */       boolean agg_isNull6 = true;
    /* 163 */       long agg_value6 = -1L;
    /* 164 */       /* coalesce(input[0, bigint], cast(0 as bigint)) */
    /* 165 */       /* input[0, bigint] */
    /* 166 */       boolean agg_isNull8 = agg_aggBuffer.isNullAt(0);
    /* 167 */       long agg_value8 = agg_isNull8 ? -1L : 
(agg_aggBuffer.getLong(0));
    /* 168 */       boolean agg_isNull7 = agg_isNull8;
    /* 169 */       long agg_value7 = agg_value8;
    /* 170 */       
    /* 171 */       if (agg_isNull7) {
    /* 172 */         /* cast(0 as bigint) */
    /* 173 */         boolean agg_isNull9 = false;
    /* 174 */         long agg_value9 = -1L;
    /* 175 */         if (!false) {
    /* 176 */           agg_value9 = (long) 0;
    /* 177 */         }
    /* 178 */         if (!agg_isNull9) {
    /* 179 */           agg_isNull7 = false;
    /* 180 */           agg_value7 = agg_value9;
    /* 181 */         }
    /* 182 */       }
    /* 183 */       
    /* 184 */       if (!inputadapter_isNull2) {
    /* 185 */         agg_isNull6 = false; // resultCode could change 
nullability.
    /* 186 */         agg_value6 = agg_value7 + inputadapter_value2;
    /* 187 */         
    /* 188 */       }
    /* 189 */       boolean agg_isNull5 = agg_isNull6;
    /* 190 */       long agg_value5 = agg_value6;
    /* 191 */       
    /* 192 */       if (agg_isNull5) {
    /* 193 */         /* input[0, bigint] */
    /* 194 */         boolean agg_isNull12 = agg_aggBuffer.isNullAt(0);
    /* 195 */         long agg_value12 = agg_isNull12 ? -1L : 
(agg_aggBuffer.getLong(0));
    /* 196 */         if (!agg_isNull12) {
    /* 197 */           agg_isNull5 = false;
    /* 198 */           agg_value5 = agg_value12;
    /* 199 */         }
    /* 200 */       }
    /* 201 */       // update aggregate buffer
    /* 202 */       if (!agg_isNull5) {
    /* 203 */         agg_aggBuffer.setLong(0, agg_value5);
    /* 204 */       } else {
    /* 205 */         agg_aggBuffer.setNullAt(0);
    /* 206 */       }
    /* 207 */       if (shouldStop()) return;
    /* 208 */     }
    /* 209 */     
    /* 210 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, 
agg_sorter);
    /* 211 */   }
    /* 212 */   
    /* 213 */   protected void processNext() throws java.io.IOException {
    /* 214 */     /*** PRODUCE: TungstenAggregate(key=[k1#29L,k2#30L], 
functions=[(sum(id#26L),mode=Final,isDistinct=false)], 
output=[k1#29L,k2#30L,sum(id)#34L]... */
    /* 215 */     
    /* 216 */     if (!agg_initAgg) {
    /* 217 */       agg_initAgg = true;
    /* 218 */       agg_doAggregateWithKeys();
    /* 219 */     }
    /* 220 */     
    /* 221 */     // output the result
    /* 222 */     while (agg_mapIter.next()) {
    /* 223 */       wholestagecodegen_metricValue.add(1);
    /* 224 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
    /* 225 */       UnsafeRow agg_aggBuffer1 = (UnsafeRow) 
agg_mapIter.getValue();
    /* 226 */       
    /* 227 */       /* input[0, bigint] */
    /* 228 */       long agg_value13 = agg_aggKey.getLong(0);
    /* 229 */       /* input[1, bigint] */
    /* 230 */       long agg_value14 = agg_aggKey.getLong(1);
    /* 231 */       /* input[0, bigint] */
    /* 232 */       boolean agg_isNull15 = agg_aggBuffer1.isNullAt(0);
    /* 233 */       long agg_value15 = agg_isNull15 ? -1L : 
(agg_aggBuffer1.getLong(0));
    /* 234 */       
    /* 235 */       /*** CONSUME: WholeStageCodegen */
    /* 236 */       
    /* 237 */       agg_rowWriter1.zeroOutNullBytes();
    /* 238 */       
    /* 239 */       agg_rowWriter1.write(0, agg_value13);
    /* 240 */       
    /* 241 */       agg_rowWriter1.write(1, agg_value14);
    /* 242 */       
    /* 243 */       if (agg_isNull15) {
    /* 244 */         agg_rowWriter1.setNullAt(2);
    /* 245 */       } else {
    /* 246 */         agg_rowWriter1.write(2, agg_value15);
    /* 247 */       }
    /* 248 */       append(agg_result1);
    /* 249 */       
    /* 250 */       if (shouldStop()) return;
    /* 251 */     }
    /* 252 */     
    /* 253 */     agg_mapIter.close();
    /* 254 */     if (agg_sorter == null) {
    /* 255 */       agg_hashMap.free();
    /* 256 */     }
    /* 257 */   }
    /* 258 */ }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to