Github user sameeragarwal commented on the pull request:

    https://github.com/apache/spark/pull/12440#issuecomment-211484774
  
    Generated VectorizedHashMap class for a contrived query that groups by 
against a range of datatypes: 
    
    ```scala
    sqlContext.range(N)
      .selectExpr(
        "id" ,
        "(id & 65535) as k0",
        "cast(id & 65535 as string) as k1",
        "cast(id & 65535 as int) as k2",
        "cast(id & 65535 as double) as k3",
        "cast(id & 65535 as float) as k4",
        "id > 100 as k5")
      .groupBy("k1", "k2", "k3", "k4", "k5")
      .sum()
      .collect()
    ```
    
    looks like:
    
    ```java
    /* 075 */   public class agg_VectorizedHashMap {
    /* 076 */     private 
org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
    /* 077 */     private 
org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch;
    /* 078 */     private int[] buckets;
    /* 079 */     private int capacity = 1 << 16;
    /* 080 */     private double loadFactor = 0.5;
    /* 081 */     private int numBuckets = (int) (capacity / loadFactor);
    /* 082 */     private int maxSteps = 2;
    /* 083 */     private int numRows = 0;
    /* 084 */     private org.apache.spark.sql.types.StructType schema =
    /* 085 */     new org.apache.spark.sql.types.StructType()
    /* 086 */     .add("k1", org.apache.spark.sql.types.DataTypes.StringType)
    /* 087 */     .add("k2", org.apache.spark.sql.types.DataTypes.IntegerType)
    /* 088 */     .add("k3", org.apache.spark.sql.types.DataTypes.DoubleType)
    /* 089 */     .add("k4", org.apache.spark.sql.types.DataTypes.FloatType)
    /* 090 */     .add("k5", org.apache.spark.sql.types.DataTypes.BooleanType)
    /* 091 */     .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
    /* 092 */     .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
    /* 093 */     .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
    /* 094 */     .add("sum", org.apache.spark.sql.types.DataTypes.DoubleType)
    /* 095 */     .add("sum", org.apache.spark.sql.types.DataTypes.DoubleType);
    /* 096 */     
    /* 097 */     private org.apache.spark.sql.types.StructType 
aggregateBufferSchema =
    /* 098 */     
    /* 099 */     new org.apache.spark.sql.types.StructType()
    /* 100 */     .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
    /* 101 */     .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
    /* 102 */     .add("sum", org.apache.spark.sql.types.DataTypes.LongType)
    /* 103 */     .add("sum", org.apache.spark.sql.types.DataTypes.DoubleType)
    /* 104 */     .add("sum", org.apache.spark.sql.types.DataTypes.DoubleType);
    /* 105 */     
    /* 106 */     public agg_VectorizedHashMap() {
    /* 107 */       batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
    /* 108 */         org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
    /* 109 */       // TODO: Possibly generate this projection in 
TungstenAggregate directly
    /* 110 */       aggregateBufferBatch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
    /* 111 */         aggregateBufferSchema, 
org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
    /* 112 */       for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
    /* 113 */         aggregateBufferBatch.setColumn(i, batch.column(i+5));
    /* 114 */       }
    /* 115 */       
    /* 116 */       buckets = new int[numBuckets];
    /* 117 */       java.util.Arrays.fill(buckets, -1);
    /* 118 */     }
    /* 119 */     
    /* 120 */     public 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
findOrInsert(UTF8String agg_key, int agg_key1, double agg_key2, float agg_key3, 
boolean agg_key4) {
    /* 121 */       long h = hash(agg_key, agg_key1, agg_key2, agg_key3, 
agg_key4);
    /* 122 */       int step = 0;
    /* 123 */       int idx = (int) h & (numBuckets - 1);
    /* 124 */       while (step < maxSteps) {
    /* 125 */         // Return bucket index if it's either an empty slot or 
already contains the key
    /* 126 */         if (buckets[idx] == -1) {
    /* 127 */           if (numRows < capacity) {
    /* 128 */             batch.column(0).putByteArray(numRows, 
agg_key.getBytes());
    /* 129 */             batch.column(1).putInt(numRows, agg_key1);
    /* 130 */             batch.column(2).putDouble(numRows, agg_key2);
    /* 131 */             batch.column(3).putFloat(numRows, agg_key3);
    /* 132 */             batch.column(4).putBoolean(numRows, agg_key4);
    /* 133 */             batch.column(5).putNull(numRows);
    /* 134 */             batch.column(6).putNull(numRows);
    /* 135 */             batch.column(7).putNull(numRows);
    /* 136 */             batch.column(8).putNull(numRows);
    /* 137 */             batch.column(9).putNull(numRows);
    /* 138 */             buckets[idx] = numRows++;
    /* 139 */             batch.setNumRows(numRows);
    /* 140 */             aggregateBufferBatch.setNumRows(numRows);
    /* 141 */             return aggregateBufferBatch.getRow(buckets[idx]);
    /* 142 */           } else {
    /* 143 */             // No more space
    /* 144 */             return null;
    /* 145 */           }
    /* 146 */         } else if (equals(idx, agg_key, agg_key1, agg_key2, 
agg_key3, agg_key4)) {
    /* 147 */           return aggregateBufferBatch.getRow(buckets[idx]);
    /* 148 */         }
    /* 149 */         idx = (idx + 1) & (numBuckets - 1);
    /* 150 */         step++;
    /* 151 */       }
    /* 152 */       // Didn't find it
    /* 153 */       return null;
    /* 154 */     }
    /* 155 */     
    /* 156 */     private boolean equals(int idx, UTF8String agg_key, int 
agg_key1, double agg_key2, float agg_key3, boolean agg_key4) {
    /* 157 */       return batch.column(0).getUTF8String(buckets[idx]) == 
agg_key && batch.column(1).getInt(buckets[idx]) == agg_key1 && 
batch.column(2).getDouble(buckets[idx]) == agg_key2 && 
batch.column(3).getFloat(buckets[idx]) == agg_key3 && 
batch.column(4).getBoolean(buckets[idx]) == agg_key4;
    /* 158 */     }
    /* 159 */     
    /* 160 */     private long hash(UTF8String agg_key, int agg_key1, double 
agg_key2, float agg_key3, boolean agg_key4) {
    /* 161 */       long agg_hash = 0;
    /* 162 */       
    /* 163 */       int agg_result = 0;
    /* 164 */       byte[] agg_bytes = agg_key.getBytes();
    /* 165 */       for (int i = 0; i < agg_bytes.length; i++) {
    /* 166 */         int agg_hash1 = agg_bytes[i];
    /* 167 */         agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + 
(agg_result << 6) + (agg_result >>> 2);
    /* 168 */       }
    /* 169 */       
    /* 170 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + 
(agg_hash << 6) + (agg_hash >>> 2);
    /* 171 */       
    /* 172 */       int agg_result1 = agg_key1;
    /* 173 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result1 + 
(agg_hash << 6) + (agg_hash >>> 2);
    /* 174 */       
    /* 175 */       long agg_result2 = Double.doubleToLongBits(agg_key2);
    /* 176 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result2 + 
(agg_hash << 6) + (agg_hash >>> 2);
    /* 177 */       
    /* 178 */       int agg_result3 = Float.floatToIntBits(agg_key3);
    /* 179 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result3 + 
(agg_hash << 6) + (agg_hash >>> 2);
    /* 180 */       
    /* 181 */       int agg_result4 = agg_key4 ? 1 : 0;
    /* 182 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result4 + 
(agg_hash << 6) + (agg_hash >>> 2);
    /* 183 */       
    /* 184 */       return agg_hash;
    /* 185 */     }
    /* 186 */     
    /* 187 */     public 
java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>
    /* 188 */     rowIterator() {
    /* 189 */       return batch.rowIterator();
    /* 190 */     }
    /* 191 */     
    /* 192 */     public void close() {
    /* 193 */       batch.close();
    /* 194 */     }
    /* 195 */     
    /* 196 */   }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to