Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/12729#issuecomment-215016989
  
    
    Generated codes before this patch: 
    
       /* 180 */   private void agg_doAggregateWithKeys() throws 
java.io.IOException {
        /* 181 */     agg_hashMap = agg_plan.createHashMap();
        /* 182 */
        /* 183 */     /*** PRODUCE: INPUT */ 
        /* 184 */
        /* 185 */     while (inputadapter_input.hasNext()) {
        /* 186 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
        /* 187 */       /*** CONSUME: TungstenAggregate(key=[value#502], 
functions=[(TypedSumDouble(scala.Tuple2),mode=Part
        ial,isDistinct=false),(TypedSumDouble(scala... */
        /* 188 */       /* input[0, string] */
        /* 189 */       boolean inputadapter_isNull = 
inputadapter_row.isNullAt(0);
        /* 190 */       UTF8String inputadapter_value = inputadapter_isNull ? 
null : (inputadapter_row.getUTF8String(0)); 
        /* 191 */       /* input[1, int] */
        /* 192 */       int inputadapter_value1 = inputadapter_row.getInt(1);
        /* 193 */       /* input[2, string] */
        /* 194 */       boolean inputadapter_isNull2 = 
inputadapter_row.isNullAt(2);
        /* 195 */       UTF8String inputadapter_value2 = inputadapter_isNull2 ? 
null : (inputadapter_row.getUTF8String(2));
        /* 196 */
        /* 197 */       UnsafeRow agg_unsafeRowAggBuffer = null;
        /* 198 */       
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
agg_vectorizedAggBuffer = null;
        /* 199 */
        /* 200 */       if (true) {
        /* 201 */         if (!inputadapter_isNull2) {
        /* 202 */           agg_vectorizedAggBuffer = 
agg_vectorizedHashMap.findOrInsert(
        /* 203 */             inputadapter_value2);
        /* 204 */         }
        /* 205 */       }
        /* 206 */                                                               
                                 [204/1982]
        /* 207 */       if (agg_vectorizedAggBuffer == null) {
        /* 208 */         // generate grouping key
        /* 209 */         agg_holder.reset();
        /* 210 */
        /* 211 */         agg_rowWriter.zeroOutNullBytes();
        /* 212 */
        /* 213 */         if (inputadapter_isNull2) {
        /* 214 */           agg_rowWriter.setNullAt(0);
        /* 215 */         } else {
        /* 216 */           agg_rowWriter.write(0, inputadapter_value2);
        /* 217 */         }
        /* 218 */         agg_result1.setTotalSize(agg_holder.totalSize());
        /* 219 */         /* hash(input[2, string], 42) */
        /* 220 */         int agg_value8 = 42;
        /* 221 */
        /* 222 */         if (!inputadapter_isNull2) {
        /* 223 */           agg_value8 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value2.getBaseObject(),
 inputadapter_value2.getBaseOffset(), inputadapter_value2.numBytes(), 
agg_value8);
        /* 224 */         }
        /* 225 */         if (true) {
        /* 226 */           // try to get the buffer from hash map
        /* 227 */           agg_unsafeRowAggBuffer =
        /* 228 */           
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value8);
        /* 229 */         }
        /* 230 */         if (agg_unsafeRowAggBuffer == null) {
        /* 231 */           if (agg_sorter == null) {
        /* 232 */             agg_sorter = 
agg_hashMap.destructAndCreateExternalSorter();
        /* 233 */           } else {
        /* 234 */             
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
        /* 235 */           }
        /* 236 */
        /* 237 */           // the hash map had be spilled, it should have 
enough memory now,
        /* 238 */           // try  to allocate buffer again.
        /* 239 */           agg_unsafeRowAggBuffer =
        /* 240 */           
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value8);
        /* 241 */           if (agg_unsafeRowAggBuffer == null) {
        /* 242 */             // failed to allocate the first page
        /* 243 */             throw new OutOfMemoryError("No enough memory for 
aggregation");
        /* 244 */           }
        /* 245 */         }
        /* 246 */       }
        /* 247 */
        /* 248 */       if (agg_vectorizedAggBuffer != null) {
        /* 249 */         // update vectorized row
        /* 250 */
        /* 251 */         // evaluate aggregate function
        /* 252 */         /* referencetoexpressions(input[0, double] AS 
value#507, 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb.reduce) */
        /* 253 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb.reduce */
        /* 254 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb */
        /* 255 */         /* expression: 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb */
        /* 256 */         Object agg_obj = ((Expression) 
references[3]).eval(null);
        /* 257 */         org.apache.spark.sql.expressions.Aggregator 
agg_value12 = (org.apache.spark.sql.expressions.Aggregator) agg_obj;
        /* 258 */         /* input[0, double] */
        /* 259 */         double agg_value13 = 
agg_vectorizedAggBuffer.getDouble(0);
        /* 260 */         /* newInstance(class scala.Tuple2) */
        /* 261 */         /* input[2, string].toString */
        /* 262 */         java.lang.String agg_value15 = inputadapter_isNull ? 
null : (java.lang.String) inputadapter_value.toString();
        /* 263 */         boolean agg_isNull13 = agg_value15 == null;
        /* 264 */
        /* 265 */         final scala.Tuple2 agg_value14 = new 
scala.Tuple2(agg_value15, inputadapter_value1);
        /* 266 */         final boolean agg_isNull12 = false;
        /* 267 */         double agg_value11 = false ? -1.0 : (double) 
((java.lang.Double)agg_value12.reduce(agg_value13, agg_value14)).doubleValue();
        /* 268 */         /* referencetoexpressions(input[0, double] AS 
value#511, 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e.reduce) */
        /* 269 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e.reduce */
        /* 270 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e */
        /* 271 */         /* expression: 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e */
        /* 272 */         Object agg_obj1 = ((Expression) 
references[4]).eval(null);
        /* 273 */         org.apache.spark.sql.expressions.Aggregator 
agg_value20 = (org.apache.spark.sql.expressions.Aggregator) agg_obj1;
        /* 274 */         /* input[1, double] */
        /* 275 */         double agg_value21 = 
agg_vectorizedAggBuffer.getDouble(1);
        /* 276 */         /* newInstance(class scala.Tuple2) */
        /* 277 */         /* input[2, string].toString */
        /* 278 */         java.lang.String agg_value23 = inputadapter_isNull ? 
null : (java.lang.String) inputadapter_value.toString();
        /* 279 */         boolean agg_isNull21 = agg_value23 == null;
        /* 280 */
        /* 281 */         final scala.Tuple2 agg_value22 = new 
scala.Tuple2(agg_value23, inputadapter_value1);
        /* 282 */         final boolean agg_isNull20 = false;
        /* 283 */         double agg_value19 = false ? -1.0 : (double) 
((java.lang.Double)agg_value20.reduce(agg_value21, agg_value22)).doubleValue();
        /* 284 */         // update vectorized row
        /* 285 */         agg_vectorizedAggBuffer.setDouble(0, agg_value11);
        /* 286 */         agg_vectorizedAggBuffer.setDouble(1, agg_value19);
        /* 287 */
        /* 288 */       } else {
        /* 289 */         // update unsafe row
        /* 290 */
        /* 291 */         // evaluate aggregate function
        /* 292 */         /* referencetoexpressions(input[0, double] AS 
value#507, 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb.reduce) */
        /* 293 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb.reduce */
        /* 294 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb */
        /* 295 */         /* expression: 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb */
        /* 296 */         Object agg_obj2 = ((Expression) 
references[5]).eval(null);
        /* 297 */         org.apache.spark.sql.expressions.Aggregator 
agg_value28 = (org.apache.spark.sql.expressions.Aggregator) agg_obj2;
        /* 298 */         /* input[0, double] */
        /* 299 */         double agg_value29 = 
agg_unsafeRowAggBuffer.getDouble(0);
        /* 300 */         /* newInstance(class scala.Tuple2) */
        /* 301 */         /* input[2, string].toString */
        /* 302 */         java.lang.String agg_value31 = inputadapter_isNull ? 
null : (java.lang.String) inputadapter_value.toString();
        /* 303 */         boolean agg_isNull29 = agg_value31 == null;
        /* 304 */
        /* 305 */         final scala.Tuple2 agg_value30 = new 
scala.Tuple2(agg_value31, inputadapter_value1);
        /* 306 */         final boolean agg_isNull28 = false;
        /* 307 */         double agg_value27 = false ? -1.0 : (double) 
((java.lang.Double)agg_value28.reduce(agg_value29, agg_value30)).doubleValue();
        /* 308 */         /* referencetoexpressions(input[0, double] AS 
value#511, 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e.reduce) */
        /* 309 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e.reduce */
        /* 310 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e */
        /* 311 */         /* expression: 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e */
        /* 312 */         Object agg_obj3 = ((Expression) 
references[6]).eval(null);
        /* 313 */         org.apache.spark.sql.expressions.Aggregator 
agg_value36 = (org.apache.spark.sql.expressions.Aggregator) agg_obj3;
        /* 314 */         /* input[1, double] */
        /* 315 */         double agg_value37 = 
agg_unsafeRowAggBuffer.getDouble(1);
        /* 316 */         /* newInstance(class scala.Tuple2) */
        /* 317 */         /* input[2, string].toString */
        /* 318 */         java.lang.String agg_value39 = inputadapter_isNull ? 
null : (java.lang.String) inputadapter_value.toString();
        /* 319 */         boolean agg_isNull37 = agg_value39 == null;
        /* 320 */
        /* 321 */         final scala.Tuple2 agg_value38 = new 
scala.Tuple2(agg_value39, inputadapter_value1);
        /* 322 */         final boolean agg_isNull36 = false;
        /* 323 */         double agg_value35 = false ? -1.0 : (double) 
((java.lang.Double)agg_value36.reduce(agg_value37, agg_value38)).doubleValue();
        /* 324 */         // update unsafe row buffer
        /* 325 */         agg_unsafeRowAggBuffer.setDouble(0, agg_value27);
        /* 326 */         agg_unsafeRowAggBuffer.setDouble(1, agg_value35);
        /* 327 */
        /* 328 */       }
        /* 329 */       if (shouldStop()) return;
        /* 330 */     }
        /* 331 */
        /* 332 */     agg_vectorizedHashMapIter = 
agg_vectorizedHashMap.rowIterator();
        /* 333 */
        /* 334 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, 
agg_sorter, agg_metricValue, agg_metricValue1);
        /* 335 */   }
        /* 336 */



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to