Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/12729#issuecomment-215013688
  
        /* 196 */   private void agg_doAggregateWithKeys() throws 
java.io.IOException {
        /* 197 */     agg_hashMap = agg_plan.createHashMap();
        /* 198 */
        /* 199 */     /*** PRODUCE: INPUT */
        /* 200 */
        /* 201 */     while (inputadapter_input.hasNext()) {
        /* 202 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
        /* 203 */       /*** CONSUME: TungstenAggregate(key=[value#502], 
functions=[(TypedSumDouble(scala.Tuple2),mode=Part
        ial,isDistinct=false),(TypedSumDouble(scala... */
        /* 204 */       /* input[0, string] */
        /* 205 */       boolean inputadapter_isNull = 
inputadapter_row.isNullAt(0);
        /* 206 */       UTF8String inputadapter_value = inputadapter_isNull ? 
null : (inputadapter_row.getUTF8String(0));
        /* 207 */       /* input[1, int] */
        /* 208 */       int inputadapter_value1 = inputadapter_row.getInt(1);
        /* 209 */       /* input[2, string] */
        /* 210 */       boolean inputadapter_isNull2 = 
inputadapter_row.isNullAt(2);
        /* 211 */       UTF8String inputadapter_value2 = inputadapter_isNull2 ? 
null : (inputadapter_row.getUTF8String(2));
        /* 212 */
        /* 213 */       UnsafeRow agg_unsafeRowAggBuffer = null;
        /* 214 */       
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
agg_vectorizedAggBuffer = null;
        /* 215 */
        /* 216 */       if (true) {
        /* 217 */         if (!inputadapter_isNull2) {
        /* 218 */           agg_vectorizedAggBuffer = 
agg_vectorizedHashMap.findOrInsert(
        /* 219 */             inputadapter_value2);
        /* 220 */         }
        /* 221 */       }
        /* 222 */
        /* 223 */       if (agg_vectorizedAggBuffer == null) {
        /* 224 */         // generate grouping key
        /* 225 */         agg_holder.reset();
        /* 226 */
        /* 227 */         agg_rowWriter.zeroOutNullBytes();
        /* 228 */
        /* 229 */         if (inputadapter_isNull2) {
        /* 230 */           agg_rowWriter.setNullAt(0);
        /* 231 */         } else {
        /* 232 */           agg_rowWriter.write(0, inputadapter_value2);
        /* 233 */         }
        /* 234 */         agg_result1.setTotalSize(agg_holder.totalSize());
        /* 235 */         /* hash(input[2, string], 42) */
        /* 236 */         int agg_value8 = 42;
        /* 237 */
        /* 238 */         if (!inputadapter_isNull2) {
        /* 239 */           agg_value8 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value2.ge
        tBaseObject(), inputadapter_value2.getBaseOffset(), 
inputadapter_value2.numBytes(), agg_value8);
        /* 240 */         }
        /* 241 */         if (true) {
        /* 242 */           // try to get the buffer from hash map
        /* 243 */           agg_unsafeRowAggBuffer =
        /* 244 */           
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value8);
        /* 245 */         }
        /* 246 */         if (agg_unsafeRowAggBuffer == null) {
        /* 247 */           if (agg_sorter == null) {
        /* 248 */             agg_sorter = 
agg_hashMap.destructAndCreateExternalSorter();
        /* 249 */           } else {
        /* 250 */             
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
        /* 251 */           }
        /* 252 */
        /* 253 */           // the hash map had be spilled, it should have 
enough memory now,
        /* 254 */           // try  to allocate buffer again.
        /* 255 */           agg_unsafeRowAggBuffer =
        /* 256 */           
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value8);
        /* 257 */           if (agg_unsafeRowAggBuffer == null) {
        /* 258 */             // failed to allocate the first page
        /* 259 */             throw new OutOfMemoryError("No enough memory for 
aggregation");
        /* 260 */           }
        /* 261 */         }
        /* 262 */       }
        /* 263 */
        /* 264 */       if (agg_vectorizedAggBuffer != null) {
        /* 265 */         // update vectorized row
        /* 266 */
        /* 267 */         // evaluate aggregate function
        /* 268 */
        /* 269 */         /* newInstance(class scala.Tuple2) */
        /* 270 */         /* input[2, string].toString */
        /* 271 */         java.lang.String agg_value11 = inputadapter_isNull ? 
null : (java.lang.String) inputadapter_value
        .toString();
        /* 272 */         boolean agg_isNull9 = agg_value11 == null;
        /* 273 */
        /* 274 */         final scala.Tuple2 agg_value10 = new 
scala.Tuple2(agg_value11, inputadapter_value1);
        /* 275 */         final boolean agg_isNull8 = false;
        /* 276 */         agg_evalExprIsNull = agg_isNull8;
        /* 277 */         agg_evalExprValue = agg_value10;
        /* 278 */
        /* 279 */         /* referencetoexpressions(input[0, double] AS 
value#507, org.apache.spark.sql.execution.aggregate
        [email protected]) */
        /* 280 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05.reduce */
        /* 281 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05 */
        /* 282 */         /* expression: 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05 */
        /* 283 */         Object agg_obj = ((Expression) 
references[3]).eval(null);
        /* 284 */         org.apache.spark.sql.expressions.Aggregator 
agg_value16 = (org.apache.spark.sql.expressions.Aggre
        gator) agg_obj;
        /* 285 */         /* input[0, double] */
        /* 286 */         double agg_value17 = 
agg_vectorizedAggBuffer.getDouble(0);
        /* 287 */         /* newInstance(class scala.Tuple2) */
        /* 288 */         double agg_value15 = false ? -1.0 : (double) 
((java.lang.Double)agg_value16.reduce(agg_value17, 
agg_evalExprValue)).doubleValue();
        /* 289 */         /* referencetoexpressions(input[0, double] AS 
value#511, org.apache.spark.sql.execution.aggregate
        [email protected]) */
        /* 290 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41.reduce */
        /* 291 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41 */
        /* 292 */         /* expression: 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41 */
        /* 293 */         Object agg_obj1 = ((Expression) 
references[4]).eval(null);
        /* 294 */         org.apache.spark.sql.expressions.Aggregator 
agg_value20 = (org.apache.spark.sql.expressions.Aggre
        gator) agg_obj1;
        /* 295 */         /* input[1, double] */
        /* 296 */         double agg_value21 = 
agg_vectorizedAggBuffer.getDouble(1);
        /* 297 */         /* newInstance(class scala.Tuple2) */
        /* 298 */         double agg_value19 = false ? -1.0 : (double) 
((java.lang.Double)agg_value20.reduce(agg_value21, a
        gg_evalExprValue)).doubleValue();
        /* 299 */         // update vectorized row
        /* 300 */         agg_vectorizedAggBuffer.setDouble(0, agg_value15);
        /* 301 */         agg_vectorizedAggBuffer.setDouble(1, agg_value19);
        /* 302 */
        /* 303 */       } else {
        /* 304 */         // update unsafe row
        /* 305 */
        /* 306 */         // evaluate aggregate function
        /* 307 */
        /* 308 */         /* referencetoexpressions(input[0, double] AS 
value#507, org.apache.spark.sql.execution.aggregate
        [email protected]) */
        /* 309 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05.reduce */
        /* 310 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05 */
        /* 311 */         /* expression: 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05 */
        /* 312 */         Object agg_obj2 = ((Expression) 
references[5]).eval(null);
        /* 313 */         org.apache.spark.sql.expressions.Aggregator 
agg_value24 = (org.apache.spark.sql.expressions.Aggre
        gator) agg_obj2;
        /* 314 */         /* input[0, double] */
        /* 315 */         double agg_value25 = 
agg_unsafeRowAggBuffer.getDouble(0);
        /* 316 */         /* newInstance(class scala.Tuple2) */
        /* 317 */         double agg_value23 = false ? -1.0 : (double) 
((java.lang.Double)agg_value24.reduce(agg_value25, a
        gg_evalExprValue)).doubleValue();
        /* 318 */         agg_evalExpr1IsNull = false;
        /* 319 */         agg_evalExpr1Value = agg_value23;
        /* 320 */
        /* 321 */         /* referencetoexpressions(input[0, double] AS 
value#511, org.apache.spark.sql.execution.aggregate
        [email protected]) */
        /* 322 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41.reduce */
        /* 323 */         /* 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41 */
        /* 324 */         /* expression: 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41 */
        /* 325 */         Object agg_obj3 = ((Expression) 
references[6]).eval(null);
        /* 326 */         org.apache.spark.sql.expressions.Aggregator 
agg_value28 = (org.apache.spark.sql.expressions.Aggre
        gator) agg_obj3;
        /* 327 */         /* input[1, double] */
        /* 328 */         double agg_value29 = 
agg_unsafeRowAggBuffer.getDouble(1);
        /* 329 */         /* newInstance(class scala.Tuple2) */
        /* 330 */         double agg_value27 = false ? -1.0 : (double) 
((java.lang.Double)agg_value28.reduce(agg_value29, 
agg_evalExprValue)).doubleValue();
        /* 331 */         agg_evalExpr2IsNull = false;
        /* 332 */         agg_evalExpr2Value = agg_value27;
        /* 333 */
        /* 334 */         /* newInstance(class scala.Tuple2) */
        /* 335 */         agg_evalExpr3IsNull = agg_evalExprIsNull;
        /* 336 */         agg_evalExpr3Value = agg_evalExprValue;
        /* 337 */
        /* 338 */         /* referencetoexpressions(input[0, double] AS 
value#507, 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05.reduce) */
        /* 339 */         /* referencetoexpressions(input[0, double] AS 
value#511, 
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41.reduce) */
        /* 340 */         // update unsafe row buffer
        /* 341 */         agg_unsafeRowAggBuffer.setDouble(0, 
agg_evalExpr1Value);
        /* 342 */         agg_unsafeRowAggBuffer.setDouble(1, 
agg_evalExpr2Value);
        /* 343 */
        /* 344 */       }
        /* 345 */       if (shouldStop()) return;
        /* 346 */     }
        /* 347 */
        /* 348 */     agg_vectorizedHashMapIter = 
agg_vectorizedHashMap.rowIterator();
        /* 349 */
        /* 350 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, 
agg_sorter, agg_metricValue, agg_metricValue1);
        /* 351 */   }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to