Github user sameeragarwal commented on the pull request:
https://github.com/apache/spark/pull/12440#issuecomment-211484774
Generated VectorizedHashMap class for a contrived query that groups by
against a range of datatypes:
```scala
sqlContext.range(N)
.selectExpr(
"id" ,
"(id & 65535) as k0",
"cast(id & 65535 as string) as k1",
"cast(id & 65535 as int) as k2",
"cast(id & 65535 as double) as k3",
"cast(id & 65535 as float) as k4",
"id > 100 as k5")
.groupBy("k1", "k2", "k3", "k4", "k5")
.sum()
.collect()
```
looks like:
```java
/* 075 */ public class agg_VectorizedHashMap {
/* 076 */ private
org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
/* 077 */ private
org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch;
/* 078 */ private int[] buckets;
/* 079 */ private int capacity = 1 << 16;
/* 080 */ private double loadFactor = 0.5;
/* 081 */ private int numBuckets = (int) (capacity / loadFactor);
/* 082 */ private int maxSteps = 2;
/* 083 */ private int numRows = 0;
/* 084 */ private org.apache.spark.sql.types.StructType schema =
/* 085 */ new org.apache.spark.sql.types.StructType()
/* 086 */ .add("k1", org.apache.spark.sql.types.DataTypes.StringType)
/* 087 */ .add("k2", org.apache.spark.sql.types.DataTypes.IntegerType)
/* 088 */ .add("k3", org.apache.spark.sql.types.DataTypes.DoubleType)
/* 089 */ .add("k4", org.apache.spark.sql.types.DataTypes.FloatType)
/* 090 */ .add("k5", org.apache.spark.sql.types.DataTypes.BooleanType)
/* 091 */ .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
/* 092 */ .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
/* 093 */ .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
/* 094 */ .add("sum", org.apache.spark.sql.types.DataTypes.DoubleType)
/* 095 */ .add("sum", org.apache.spark.sql.types.DataTypes.DoubleType);
/* 096 */
/* 097 */ private org.apache.spark.sql.types.StructType
aggregateBufferSchema =
/* 098 */
/* 099 */ new org.apache.spark.sql.types.StructType()
/* 100 */ .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
/* 101 */ .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
/* 102 */ .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
/* 103 */ .add("sum", org.apache.spark.sql.types.DataTypes.DoubleType)
/* 104 */ .add("sum", org.apache.spark.sql.types.DataTypes.DoubleType);
/* 105 */
/* 106 */ public agg_VectorizedHashMap() {
/* 107 */ batch =
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
/* 108 */ org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 109 */ // TODO: Possibly generate this projection in
TungstenAggregate directly
/* 110 */ aggregateBufferBatch =
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
/* 111 */ aggregateBufferSchema,
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 112 */ for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
/* 113 */ aggregateBufferBatch.setColumn(i, batch.column(i+5));
/* 114 */ }
/* 115 */
/* 116 */ buckets = new int[numBuckets];
/* 117 */ java.util.Arrays.fill(buckets, -1);
/* 118 */ }
/* 119 */
/* 120 */ public
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
findOrInsert(UTF8String agg_key, int agg_key1, double agg_key2, float agg_key3,
boolean agg_key4) {
/* 121 */ long h = hash(agg_key, agg_key1, agg_key2, agg_key3,
agg_key4);
/* 122 */ int step = 0;
/* 123 */ int idx = (int) h & (numBuckets - 1);
/* 124 */ while (step < maxSteps) {
/* 125 */ // Return bucket index if it's either an empty slot or
already contains the key
/* 126 */ if (buckets[idx] == -1) {
/* 127 */ if (numRows < capacity) {
/* 128 */ batch.column(0).putByteArray(numRows,
agg_key.getBytes());
/* 129 */ batch.column(1).putInt(numRows, agg_key1);
/* 130 */ batch.column(2).putDouble(numRows, agg_key2);
/* 131 */ batch.column(3).putFloat(numRows, agg_key3);
/* 132 */ batch.column(4).putBoolean(numRows, agg_key4);
/* 133 */ batch.column(5).putNull(numRows);
/* 134 */ batch.column(6).putNull(numRows);
/* 135 */ batch.column(7).putNull(numRows);
/* 136 */ batch.column(8).putNull(numRows);
/* 137 */ batch.column(9).putNull(numRows);
/* 138 */ buckets[idx] = numRows++;
/* 139 */ batch.setNumRows(numRows);
/* 140 */ aggregateBufferBatch.setNumRows(numRows);
/* 141 */ return aggregateBufferBatch.getRow(buckets[idx]);
/* 142 */ } else {
/* 143 */ // No more space
/* 144 */ return null;
/* 145 */ }
/* 146 */ } else if (equals(idx, agg_key, agg_key1, agg_key2,
agg_key3, agg_key4)) {
/* 147 */ return aggregateBufferBatch.getRow(buckets[idx]);
/* 148 */ }
/* 149 */ idx = (idx + 1) & (numBuckets - 1);
/* 150 */ step++;
/* 151 */ }
/* 152 */ // Didn't find it
/* 153 */ return null;
/* 154 */ }
/* 155 */
/* 156 */ private boolean equals(int idx, UTF8String agg_key, int
agg_key1, double agg_key2, float agg_key3, boolean agg_key4) {
/* 157 */ return batch.column(0).getUTF8String(buckets[idx]) ==
agg_key && batch.column(1).getInt(buckets[idx]) == agg_key1 &&
batch.column(2).getDouble(buckets[idx]) == agg_key2 &&
batch.column(3).getFloat(buckets[idx]) == agg_key3 &&
batch.column(4).getBoolean(buckets[idx]) == agg_key4;
/* 158 */ }
/* 159 */
/* 160 */ private long hash(UTF8String agg_key, int agg_key1, double
agg_key2, float agg_key3, boolean agg_key4) {
/* 161 */ long agg_hash = 0;
/* 162 */
/* 163 */ int agg_result = 0;
/* 164 */ byte[] agg_bytes = agg_key.getBytes();
/* 165 */ for (int i = 0; i < agg_bytes.length; i++) {
/* 166 */ int agg_hash1 = agg_bytes[i];
/* 167 */ agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 +
(agg_result << 6) + (agg_result >>> 2);
/* 168 */ }
/* 169 */
/* 170 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result +
(agg_hash << 6) + (agg_hash >>> 2);
/* 171 */
/* 172 */ int agg_result1 = agg_key1;
/* 173 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result1 +
(agg_hash << 6) + (agg_hash >>> 2);
/* 174 */
/* 175 */ long agg_result2 = Double.doubleToLongBits(agg_key2);
/* 176 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result2 +
(agg_hash << 6) + (agg_hash >>> 2);
/* 177 */
/* 178 */ int agg_result3 = Float.floatToIntBits(agg_key3);
/* 179 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result3 +
(agg_hash << 6) + (agg_hash >>> 2);
/* 180 */
/* 181 */ int agg_result4 = agg_key4 ? 1 : 0;
/* 182 */ agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result4 +
(agg_hash << 6) + (agg_hash >>> 2);
/* 183 */
/* 184 */ return agg_hash;
/* 185 */ }
/* 186 */
/* 187 */ public
java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>
/* 188 */ rowIterator() {
/* 189 */ return batch.rowIterator();
/* 190 */ }
/* 191 */
/* 192 */ public void close() {
/* 193 */ batch.close();
/* 194 */ }
/* 195 */
/* 196 */ }
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]