Github user viirya commented on the pull request:
https://github.com/apache/spark/pull/12729#issuecomment-215013688
/* 196 */ private void agg_doAggregateWithKeys() throws
java.io.IOException {
/* 197 */ agg_hashMap = agg_plan.createHashMap();
/* 198 */
/* 199 */ /*** PRODUCE: INPUT */
/* 200 */
/* 201 */ while (inputadapter_input.hasNext()) {
/* 202 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 203 */ /*** CONSUME: TungstenAggregate(key=[value#502],
functions=[(TypedSumDouble(scala.Tuple2),mode=Part
ial,isDistinct=false),(TypedSumDouble(scala... */
/* 204 */ /* input[0, string] */
/* 205 */ boolean inputadapter_isNull =
inputadapter_row.isNullAt(0);
/* 206 */ UTF8String inputadapter_value = inputadapter_isNull ?
null : (inputadapter_row.getUTF8String(0));
/* 207 */ /* input[1, int] */
/* 208 */ int inputadapter_value1 = inputadapter_row.getInt(1);
/* 209 */ /* input[2, string] */
/* 210 */ boolean inputadapter_isNull2 =
inputadapter_row.isNullAt(2);
/* 211 */ UTF8String inputadapter_value2 = inputadapter_isNull2 ?
null : (inputadapter_row.getUTF8String(2));
/* 212 */
/* 213 */ UnsafeRow agg_unsafeRowAggBuffer = null;
/* 214 */
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
agg_vectorizedAggBuffer = null;
/* 215 */
/* 216 */ if (true) {
/* 217 */ if (!inputadapter_isNull2) {
/* 218 */ agg_vectorizedAggBuffer =
agg_vectorizedHashMap.findOrInsert(
/* 219 */ inputadapter_value2);
/* 220 */ }
/* 221 */ }
/* 222 */
/* 223 */ if (agg_vectorizedAggBuffer == null) {
/* 224 */ // generate grouping key
/* 225 */ agg_holder.reset();
/* 226 */
/* 227 */ agg_rowWriter.zeroOutNullBytes();
/* 228 */
/* 229 */ if (inputadapter_isNull2) {
/* 230 */ agg_rowWriter.setNullAt(0);
/* 231 */ } else {
/* 232 */ agg_rowWriter.write(0, inputadapter_value2);
/* 233 */ }
/* 234 */ agg_result1.setTotalSize(agg_holder.totalSize());
/* 235 */ /* hash(input[2, string], 42) */
/* 236 */ int agg_value8 = 42;
/* 237 */
/* 238 */ if (!inputadapter_isNull2) {
/* 239 */ agg_value8 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value2.ge
tBaseObject(), inputadapter_value2.getBaseOffset(),
inputadapter_value2.numBytes(), agg_value8);
/* 240 */ }
/* 241 */ if (true) {
/* 242 */ // try to get the buffer from hash map
/* 243 */ agg_unsafeRowAggBuffer =
/* 244 */
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value8);
/* 245 */ }
/* 246 */ if (agg_unsafeRowAggBuffer == null) {
/* 247 */ if (agg_sorter == null) {
/* 248 */ agg_sorter =
agg_hashMap.destructAndCreateExternalSorter();
/* 249 */ } else {
/* 250 */
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 251 */ }
/* 252 */
/* 253 */ // the hash map had be spilled, it should have
enough memory now,
/* 254 */ // try to allocate buffer again.
/* 255 */ agg_unsafeRowAggBuffer =
/* 256 */
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value8);
/* 257 */ if (agg_unsafeRowAggBuffer == null) {
/* 258 */ // failed to allocate the first page
/* 259 */ throw new OutOfMemoryError("No enough memory for
aggregation");
/* 260 */ }
/* 261 */ }
/* 262 */ }
/* 263 */
/* 264 */ if (agg_vectorizedAggBuffer != null) {
/* 265 */ // update vectorized row
/* 266 */
/* 267 */ // evaluate aggregate function
/* 268 */
/* 269 */ /* newInstance(class scala.Tuple2) */
/* 270 */ /* input[2, string].toString */
/* 271 */ java.lang.String agg_value11 = inputadapter_isNull ?
null : (java.lang.String) inputadapter_value
.toString();
/* 272 */ boolean agg_isNull9 = agg_value11 == null;
/* 273 */
/* 274 */ final scala.Tuple2 agg_value10 = new
scala.Tuple2(agg_value11, inputadapter_value1);
/* 275 */ final boolean agg_isNull8 = false;
/* 276 */ agg_evalExprIsNull = agg_isNull8;
/* 277 */ agg_evalExprValue = agg_value10;
/* 278 */
/* 279 */ /* referencetoexpressions(input[0, double] AS
value#507, org.apache.spark.sql.execution.aggregate
[email protected]) */
/* 280 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05.reduce */
/* 281 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05 */
/* 282 */ /* expression:
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05 */
/* 283 */ Object agg_obj = ((Expression)
references[3]).eval(null);
/* 284 */ org.apache.spark.sql.expressions.Aggregator
agg_value16 = (org.apache.spark.sql.expressions.Aggre
gator) agg_obj;
/* 285 */ /* input[0, double] */
/* 286 */ double agg_value17 =
agg_vectorizedAggBuffer.getDouble(0);
/* 287 */ /* newInstance(class scala.Tuple2) */
/* 288 */ double agg_value15 = false ? -1.0 : (double)
((java.lang.Double)agg_value16.reduce(agg_value17,
agg_evalExprValue)).doubleValue();
/* 289 */ /* referencetoexpressions(input[0, double] AS
value#511, org.apache.spark.sql.execution.aggregate
[email protected]) */
/* 290 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41.reduce */
/* 291 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41 */
/* 292 */ /* expression:
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41 */
/* 293 */ Object agg_obj1 = ((Expression)
references[4]).eval(null);
/* 294 */ org.apache.spark.sql.expressions.Aggregator
agg_value20 = (org.apache.spark.sql.expressions.Aggre
gator) agg_obj1;
/* 295 */ /* input[1, double] */
/* 296 */ double agg_value21 =
agg_vectorizedAggBuffer.getDouble(1);
/* 297 */ /* newInstance(class scala.Tuple2) */
/* 298 */ double agg_value19 = false ? -1.0 : (double)
((java.lang.Double)agg_value20.reduce(agg_value21, a
gg_evalExprValue)).doubleValue();
/* 299 */ // update vectorized row
/* 300 */ agg_vectorizedAggBuffer.setDouble(0, agg_value15);
/* 301 */ agg_vectorizedAggBuffer.setDouble(1, agg_value19);
/* 302 */
/* 303 */ } else {
/* 304 */ // update unsafe row
/* 305 */
/* 306 */ // evaluate aggregate function
/* 307 */
/* 308 */ /* referencetoexpressions(input[0, double] AS
value#507, org.apache.spark.sql.execution.aggregate
[email protected]) */
/* 309 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05.reduce */
/* 310 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05 */
/* 311 */ /* expression:
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05 */
/* 312 */ Object agg_obj2 = ((Expression)
references[5]).eval(null);
/* 313 */ org.apache.spark.sql.expressions.Aggregator
agg_value24 = (org.apache.spark.sql.expressions.Aggre
gator) agg_obj2;
/* 314 */ /* input[0, double] */
/* 315 */ double agg_value25 =
agg_unsafeRowAggBuffer.getDouble(0);
/* 316 */ /* newInstance(class scala.Tuple2) */
/* 317 */ double agg_value23 = false ? -1.0 : (double)
((java.lang.Double)agg_value24.reduce(agg_value25, a
gg_evalExprValue)).doubleValue();
/* 318 */ agg_evalExpr1IsNull = false;
/* 319 */ agg_evalExpr1Value = agg_value23;
/* 320 */
/* 321 */ /* referencetoexpressions(input[0, double] AS
value#511, org.apache.spark.sql.execution.aggregate
[email protected]) */
/* 322 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41.reduce */
/* 323 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41 */
/* 324 */ /* expression:
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41 */
/* 325 */ Object agg_obj3 = ((Expression)
references[6]).eval(null);
/* 326 */ org.apache.spark.sql.expressions.Aggregator
agg_value28 = (org.apache.spark.sql.expressions.Aggre
gator) agg_obj3;
/* 327 */ /* input[1, double] */
/* 328 */ double agg_value29 =
agg_unsafeRowAggBuffer.getDouble(1);
/* 329 */ /* newInstance(class scala.Tuple2) */
/* 330 */ double agg_value27 = false ? -1.0 : (double)
((java.lang.Double)agg_value28.reduce(agg_value29,
agg_evalExprValue)).doubleValue();
/* 331 */ agg_evalExpr2IsNull = false;
/* 332 */ agg_evalExpr2Value = agg_value27;
/* 333 */
/* 334 */ /* newInstance(class scala.Tuple2) */
/* 335 */ agg_evalExpr3IsNull = agg_evalExprIsNull;
/* 336 */ agg_evalExpr3Value = agg_evalExprValue;
/* 337 */
/* 338 */ /* referencetoexpressions(input[0, double] AS
value#507,
org.apache.spark.sql.execution.aggregate.TypedSumDouble@30c50f05.reduce) */
/* 339 */ /* referencetoexpressions(input[0, double] AS
value#511,
org.apache.spark.sql.execution.aggregate.TypedSumDouble@16a4ce41.reduce) */
/* 340 */ // update unsafe row buffer
/* 341 */ agg_unsafeRowAggBuffer.setDouble(0,
agg_evalExpr1Value);
/* 342 */ agg_unsafeRowAggBuffer.setDouble(1,
agg_evalExpr2Value);
/* 343 */
/* 344 */ }
/* 345 */ if (shouldStop()) return;
/* 346 */ }
/* 347 */
/* 348 */ agg_vectorizedHashMapIter =
agg_vectorizedHashMap.rowIterator();
/* 349 */
/* 350 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap,
agg_sorter, agg_metricValue, agg_metricValue1);
/* 351 */ }
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]