Github user sameeragarwal commented on the pull request:
https://github.com/apache/spark/pull/12161#issuecomment-206021282
```java
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * TungstenAggregate(key=[k1#29L,k2#30L],
functions=[(sum(id#26L),mode=Final,isDistinct=false)],
output=[k1#29L,k2#30L,sum(id)#34L]...
/* 007 */ */
/* 008 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 009 */ private Object[] references;
/* 010 */ private boolean agg_initAgg;
/* 011 */ private agg_AggregateHashMap agg_aggregateHashMap;
/* 012 */ private
org.apache.spark.sql.execution.aggregate.TungstenAggregate agg_plan;
/* 013 */ private
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 014 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter
agg_sorter;
/* 015 */ private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 016 */ private scala.collection.Iterator inputadapter_input;
/* 017 */ private UnsafeRow agg_result;
/* 018 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 019 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 020 */ private UnsafeRow agg_result1;
/* 021 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1;
/* 022 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
agg_rowWriter1;
/* 023 */ private org.apache.spark.sql.execution.metric.LongSQLMetric
wholestagecodegen_numOutputRows;
/* 024 */ private
org.apache.spark.sql.execution.metric.LongSQLMetricValue
wholestagecodegen_metricValue;
/* 025 */
/* 026 */ public GeneratedIterator(Object[] references) {
/* 027 */ this.references = references;
/* 028 */ }
/* 029 */
/* 030 */ public void init(int index, scala.collection.Iterator inputs[])
{
/* 031 */ partitionIndex = index;
/* 032 */ agg_initAgg = false;
/* 033 */ agg_aggregateHashMap = new agg_AggregateHashMap();
/* 034 */ this.agg_plan =
(org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0];
/* 035 */ agg_hashMap = agg_plan.createHashMap();
/* 036 */
/* 037 */ inputadapter_input = inputs[0];
/* 038 */ agg_result = new UnsafeRow(2);
/* 039 */ this.agg_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 040 */ this.agg_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder,
2);
/* 041 */ agg_result1 = new UnsafeRow(3);
/* 042 */ this.agg_holder1 = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 0);
/* 043 */ this.agg_rowWriter1 = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1,
3);
/* 044 */ this.wholestagecodegen_numOutputRows =
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 045 */ wholestagecodegen_metricValue =
(org.apache.spark.sql.execution.metric.LongSQLMetricValue)
wholestagecodegen_numOutputRows.localValue();
/* 046 */ }
/* 047 */
/* 048 */ public class agg_AggregateHashMap {
/* 049 */ private
org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
/* 050 */ private int[] buckets;
/* 051 */ private int numBuckets;
/* 052 */ private int maxSteps;
/* 053 */ private int numRows = 0;
/* 054 */ private org.apache.spark.sql.types.StructType schema =
/* 055 */ new org.apache.spark.sql.types.StructType()
/* 056 */ .add("k1", org.apache.spark.sql.types.DataTypes.LongType)
/* 057 */ .add("k2", org.apache.spark.sql.types.DataTypes.LongType)
/* 058 */ .add("sum", org.apache.spark.sql.types.DataTypes.LongType);
/* 059 */
/* 060 */ public agg_AggregateHashMap(int capacity, double loadFactor,
int maxSteps) {
/* 061 */ assert (capacity > 0 && ((capacity & (capacity - 1)) == 0));
/* 062 */ this.maxSteps = maxSteps;
/* 063 */ numBuckets = (int) (capacity / loadFactor);
/* 064 */ batch =
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
/* 065 */ org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 066 */ buckets = new int[numBuckets];
/* 067 */ java.util.Arrays.fill(buckets, -1);
/* 068 */ }
/* 069 */
/* 070 */ public agg_AggregateHashMap() {
/* 071 */ new agg_AggregateHashMap(1 << 16, 0.25, 5);
/* 072 */ }
/* 073 */
/* 074 */ public
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(long
k1, long k2) {
/* 075 */ int idx = find(k1, k2);
/* 076 */ if (idx != -1 && buckets[idx] == -1) {
/* 077 */ batch.column(0).putLong(numRows, k1);
/* 078 */ batch.column(1).putLong(numRows, k2);
/* 079 */ batch.column(2).putLong(numRows, 0);
/* 080 */ buckets[idx] = numRows++;
/* 081 */ }
/* 082 */ return batch.getRow(buckets[idx]);
/* 083 */ }
/* 084 */
/* 085 */ private int find(long k1, long k2) {
/* 086 */ long h = hash(k1, k2);
/* 087 */ int step = 0;
/* 088 */ int idx = (int) h & (numBuckets - 1);
/* 089 */ while (step < maxSteps) {
/* 090 */ // Return bucket index if it's either an empty slot or
already contains the key
/* 091 */ if (buckets[idx] == -1) {
/* 092 */ return idx;
/* 093 */ } else if (equals(idx, k1, k2)) {
/* 094 */ return idx;
/* 095 */ }
/* 096 */ idx = (idx + 1) & (numBuckets - 1);
/* 097 */ step++;
/* 098 */ }
/* 099 */ // Didn't find it
/* 100 */ return -1;
/* 101 */ }
/* 102 */
/* 103 */ private boolean equals(int idx, long k1, long k2) {
/* 104 */ return batch.column(0).getLong(buckets[idx]) == k1 &&
batch.column(1).getLong(buckets[idx]) == k2;
/* 105 */ }
/* 106 */
/* 107 */ // TODO: Improve this Hash Function
/* 108 */ private long hash(long k1, long k2) {
/* 109 */ return k1 & k2;
/* 110 */ }
/* 111 */
/* 112 */ }
/* 113 */
/* 114 */ private void agg_doAggregateWithKeys() throws
java.io.IOException {
/* 115 */ /*** PRODUCE: INPUT */
/* 116 */
/* 117 */ while (inputadapter_input.hasNext()) {
/* 118 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 119 */ /*** CONSUME: TungstenAggregate(key=[k1#29L,k2#30L],
functions=[(sum(id#26L),mode=Final,isDistinct=false)],
output=[k1#29L,k2#30L,sum(id)#34L]... */
/* 120 */ /* input[0, bigint] */
/* 121 */ long inputadapter_value = inputadapter_row.getLong(0);
/* 122 */ /* input[1, bigint] */
/* 123 */ long inputadapter_value1 = inputadapter_row.getLong(1);
/* 124 */ /* input[2, bigint] */
/* 125 */ boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
/* 126 */ long inputadapter_value2 = inputadapter_isNull2 ? -1L :
(inputadapter_row.getLong(2));
/* 127 */
/* 128 */ // generate grouping key
/* 129 */ agg_rowWriter.write(0, inputadapter_value);
/* 130 */
/* 131 */ agg_rowWriter.write(1, inputadapter_value1);
/* 132 */ /* hash(input[0, bigint], input[1, bigint], 42) */
/* 133 */ int agg_value2 = 42;
/* 134 */
/* 135 */ agg_value2 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(inputadapter_value,
agg_value2);
/* 136 */
/* 137 */ agg_value2 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(inputadapter_value1,
agg_value2);
/* 138 */ UnsafeRow agg_aggBuffer = null;
/* 139 */ if (true) {
/* 140 */ // try to get the buffer from hash map
/* 141 */ agg_aggBuffer =
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value2);
/* 142 */ }
/* 143 */ if (agg_aggBuffer == null) {
/* 144 */ if (agg_sorter == null) {
/* 145 */ agg_sorter =
agg_hashMap.destructAndCreateExternalSorter();
/* 146 */ } else {
/* 147 */
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 148 */ }
/* 149 */
/* 150 */ // the hash map had be spilled, it should have enough
memory now,
/* 151 */ // try to allocate buffer again.
/* 152 */ agg_aggBuffer =
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value2);
/* 153 */ if (agg_aggBuffer == null) {
/* 154 */ // failed to allocate the first page
/* 155 */ throw new OutOfMemoryError("No enough memory for
aggregation");
/* 156 */ }
/* 157 */ }
/* 158 */
/* 159 */ // evaluate aggregate function
/* 160 */ /* coalesce((coalesce(input[0, bigint], cast(0 as bigint))
+ input[3, bigint]), input[0, bigint]) */
/* 161 */ /* (coalesce(input[0, bigint], cast(0 as bigint)) +
input[3, bigint]) */
/* 162 */ boolean agg_isNull6 = true;
/* 163 */ long agg_value6 = -1L;
/* 164 */ /* coalesce(input[0, bigint], cast(0 as bigint)) */
/* 165 */ /* input[0, bigint] */
/* 166 */ boolean agg_isNull8 = agg_aggBuffer.isNullAt(0);
/* 167 */ long agg_value8 = agg_isNull8 ? -1L :
(agg_aggBuffer.getLong(0));
/* 168 */ boolean agg_isNull7 = agg_isNull8;
/* 169 */ long agg_value7 = agg_value8;
/* 170 */
/* 171 */ if (agg_isNull7) {
/* 172 */ /* cast(0 as bigint) */
/* 173 */ boolean agg_isNull9 = false;
/* 174 */ long agg_value9 = -1L;
/* 175 */ if (!false) {
/* 176 */ agg_value9 = (long) 0;
/* 177 */ }
/* 178 */ if (!agg_isNull9) {
/* 179 */ agg_isNull7 = false;
/* 180 */ agg_value7 = agg_value9;
/* 181 */ }
/* 182 */ }
/* 183 */
/* 184 */ if (!inputadapter_isNull2) {
/* 185 */ agg_isNull6 = false; // resultCode could change
nullability.
/* 186 */ agg_value6 = agg_value7 + inputadapter_value2;
/* 187 */
/* 188 */ }
/* 189 */ boolean agg_isNull5 = agg_isNull6;
/* 190 */ long agg_value5 = agg_value6;
/* 191 */
/* 192 */ if (agg_isNull5) {
/* 193 */ /* input[0, bigint] */
/* 194 */ boolean agg_isNull12 = agg_aggBuffer.isNullAt(0);
/* 195 */ long agg_value12 = agg_isNull12 ? -1L :
(agg_aggBuffer.getLong(0));
/* 196 */ if (!agg_isNull12) {
/* 197 */ agg_isNull5 = false;
/* 198 */ agg_value5 = agg_value12;
/* 199 */ }
/* 200 */ }
/* 201 */ // update aggregate buffer
/* 202 */ if (!agg_isNull5) {
/* 203 */ agg_aggBuffer.setLong(0, agg_value5);
/* 204 */ } else {
/* 205 */ agg_aggBuffer.setNullAt(0);
/* 206 */ }
/* 207 */ if (shouldStop()) return;
/* 208 */ }
/* 209 */
/* 210 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap,
agg_sorter);
/* 211 */ }
/* 212 */
/* 213 */ protected void processNext() throws java.io.IOException {
/* 214 */ /*** PRODUCE: TungstenAggregate(key=[k1#29L,k2#30L],
functions=[(sum(id#26L),mode=Final,isDistinct=false)],
output=[k1#29L,k2#30L,sum(id)#34L]... */
/* 215 */
/* 216 */ if (!agg_initAgg) {
/* 217 */ agg_initAgg = true;
/* 218 */ agg_doAggregateWithKeys();
/* 219 */ }
/* 220 */
/* 221 */ // output the result
/* 222 */ while (agg_mapIter.next()) {
/* 223 */ wholestagecodegen_metricValue.add(1);
/* 224 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 225 */ UnsafeRow agg_aggBuffer1 = (UnsafeRow)
agg_mapIter.getValue();
/* 226 */
/* 227 */ /* input[0, bigint] */
/* 228 */ long agg_value13 = agg_aggKey.getLong(0);
/* 229 */ /* input[1, bigint] */
/* 230 */ long agg_value14 = agg_aggKey.getLong(1);
/* 231 */ /* input[0, bigint] */
/* 232 */ boolean agg_isNull15 = agg_aggBuffer1.isNullAt(0);
/* 233 */ long agg_value15 = agg_isNull15 ? -1L :
(agg_aggBuffer1.getLong(0));
/* 234 */
/* 235 */ /*** CONSUME: WholeStageCodegen */
/* 236 */
/* 237 */ agg_rowWriter1.zeroOutNullBytes();
/* 238 */
/* 239 */ agg_rowWriter1.write(0, agg_value13);
/* 240 */
/* 241 */ agg_rowWriter1.write(1, agg_value14);
/* 242 */
/* 243 */ if (agg_isNull15) {
/* 244 */ agg_rowWriter1.setNullAt(2);
/* 245 */ } else {
/* 246 */ agg_rowWriter1.write(2, agg_value15);
/* 247 */ }
/* 248 */ append(agg_result1);
/* 249 */
/* 250 */ if (shouldStop()) return;
/* 251 */ }
/* 252 */
/* 253 */ agg_mapIter.close();
/* 254 */ if (agg_sorter == null) {
/* 255 */ agg_hashMap.free();
/* 256 */ }
/* 257 */ }
/* 258 */ }
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]