Github user viirya commented on the pull request:
https://github.com/apache/spark/pull/12729#issuecomment-215016989
Generated codes before this patch:
/* 180 */ private void agg_doAggregateWithKeys() throws
java.io.IOException {
/* 181 */ agg_hashMap = agg_plan.createHashMap();
/* 182 */
/* 183 */ /*** PRODUCE: INPUT */
/* 184 */
/* 185 */ while (inputadapter_input.hasNext()) {
/* 186 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 187 */ /*** CONSUME: TungstenAggregate(key=[value#502],
functions=[(TypedSumDouble(scala.Tuple2),mode=Part
ial,isDistinct=false),(TypedSumDouble(scala... */
/* 188 */ /* input[0, string] */
/* 189 */ boolean inputadapter_isNull =
inputadapter_row.isNullAt(0);
/* 190 */ UTF8String inputadapter_value = inputadapter_isNull ?
null : (inputadapter_row.getUTF8String(0));
/* 191 */ /* input[1, int] */
/* 192 */ int inputadapter_value1 = inputadapter_row.getInt(1);
/* 193 */ /* input[2, string] */
/* 194 */ boolean inputadapter_isNull2 =
inputadapter_row.isNullAt(2);
/* 195 */ UTF8String inputadapter_value2 = inputadapter_isNull2 ?
null : (inputadapter_row.getUTF8String(2));
/* 196 */
/* 197 */ UnsafeRow agg_unsafeRowAggBuffer = null;
/* 198 */
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
agg_vectorizedAggBuffer = null;
/* 199 */
/* 200 */ if (true) {
/* 201 */ if (!inputadapter_isNull2) {
/* 202 */ agg_vectorizedAggBuffer =
agg_vectorizedHashMap.findOrInsert(
/* 203 */ inputadapter_value2);
/* 204 */ }
/* 205 */ }
/* 206 */
[204/1982]
/* 207 */ if (agg_vectorizedAggBuffer == null) {
/* 208 */ // generate grouping key
/* 209 */ agg_holder.reset();
/* 210 */
/* 211 */ agg_rowWriter.zeroOutNullBytes();
/* 212 */
/* 213 */ if (inputadapter_isNull2) {
/* 214 */ agg_rowWriter.setNullAt(0);
/* 215 */ } else {
/* 216 */ agg_rowWriter.write(0, inputadapter_value2);
/* 217 */ }
/* 218 */ agg_result1.setTotalSize(agg_holder.totalSize());
/* 219 */ /* hash(input[2, string], 42) */
/* 220 */ int agg_value8 = 42;
/* 221 */
/* 222 */ if (!inputadapter_isNull2) {
/* 223 */ agg_value8 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value2.getBaseObject(),
inputadapter_value2.getBaseOffset(), inputadapter_value2.numBytes(),
agg_value8);
/* 224 */ }
/* 225 */ if (true) {
/* 226 */ // try to get the buffer from hash map
/* 227 */ agg_unsafeRowAggBuffer =
/* 228 */
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value8);
/* 229 */ }
/* 230 */ if (agg_unsafeRowAggBuffer == null) {
/* 231 */ if (agg_sorter == null) {
/* 232 */ agg_sorter =
agg_hashMap.destructAndCreateExternalSorter();
/* 233 */ } else {
/* 234 */
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 235 */ }
/* 236 */
/* 237 */ // the hash map had be spilled, it should have
enough memory now,
/* 238 */ // try to allocate buffer again.
/* 239 */ agg_unsafeRowAggBuffer =
/* 240 */
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value8);
/* 241 */ if (agg_unsafeRowAggBuffer == null) {
/* 242 */ // failed to allocate the first page
/* 243 */ throw new OutOfMemoryError("No enough memory for
aggregation");
/* 244 */ }
/* 245 */ }
/* 246 */ }
/* 247 */
/* 248 */ if (agg_vectorizedAggBuffer != null) {
/* 249 */ // update vectorized row
/* 250 */
/* 251 */ // evaluate aggregate function
/* 252 */ /* referencetoexpressions(input[0, double] AS
value#507,
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb.reduce) */
/* 253 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb.reduce */
/* 254 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb */
/* 255 */ /* expression:
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb */
/* 256 */ Object agg_obj = ((Expression)
references[3]).eval(null);
/* 257 */ org.apache.spark.sql.expressions.Aggregator
agg_value12 = (org.apache.spark.sql.expressions.Aggregator) agg_obj;
/* 258 */ /* input[0, double] */
/* 259 */ double agg_value13 =
agg_vectorizedAggBuffer.getDouble(0);
/* 260 */ /* newInstance(class scala.Tuple2) */
/* 261 */ /* input[2, string].toString */
/* 262 */ java.lang.String agg_value15 = inputadapter_isNull ?
null : (java.lang.String) inputadapter_value.toString();
/* 263 */ boolean agg_isNull13 = agg_value15 == null;
/* 264 */
/* 265 */ final scala.Tuple2 agg_value14 = new
scala.Tuple2(agg_value15, inputadapter_value1);
/* 266 */ final boolean agg_isNull12 = false;
/* 267 */ double agg_value11 = false ? -1.0 : (double)
((java.lang.Double)agg_value12.reduce(agg_value13, agg_value14)).doubleValue();
/* 268 */ /* referencetoexpressions(input[0, double] AS
value#511,
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e.reduce) */
/* 269 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e.reduce */
/* 270 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e */
/* 271 */ /* expression:
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e */
/* 272 */ Object agg_obj1 = ((Expression)
references[4]).eval(null);
/* 273 */ org.apache.spark.sql.expressions.Aggregator
agg_value20 = (org.apache.spark.sql.expressions.Aggregator) agg_obj1;
/* 274 */ /* input[1, double] */
/* 275 */ double agg_value21 =
agg_vectorizedAggBuffer.getDouble(1);
/* 276 */ /* newInstance(class scala.Tuple2) */
/* 277 */ /* input[2, string].toString */
/* 278 */ java.lang.String agg_value23 = inputadapter_isNull ?
null : (java.lang.String) inputadapter_value.toString();
/* 279 */ boolean agg_isNull21 = agg_value23 == null;
/* 280 */
/* 281 */ final scala.Tuple2 agg_value22 = new
scala.Tuple2(agg_value23, inputadapter_value1);
/* 282 */ final boolean agg_isNull20 = false;
/* 283 */ double agg_value19 = false ? -1.0 : (double)
((java.lang.Double)agg_value20.reduce(agg_value21, agg_value22)).doubleValue();
/* 284 */ // update vectorized row
/* 285 */ agg_vectorizedAggBuffer.setDouble(0, agg_value11);
/* 286 */ agg_vectorizedAggBuffer.setDouble(1, agg_value19);
/* 287 */
/* 288 */ } else {
/* 289 */ // update unsafe row
/* 290 */
/* 291 */ // evaluate aggregate function
/* 292 */ /* referencetoexpressions(input[0, double] AS
value#507,
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb.reduce) */
/* 293 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb.reduce */
/* 294 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb */
/* 295 */ /* expression:
org.apache.spark.sql.execution.aggregate.TypedSumDouble@39528bbb */
/* 296 */ Object agg_obj2 = ((Expression)
references[5]).eval(null);
/* 297 */ org.apache.spark.sql.expressions.Aggregator
agg_value28 = (org.apache.spark.sql.expressions.Aggregator) agg_obj2;
/* 298 */ /* input[0, double] */
/* 299 */ double agg_value29 =
agg_unsafeRowAggBuffer.getDouble(0);
/* 300 */ /* newInstance(class scala.Tuple2) */
/* 301 */ /* input[2, string].toString */
/* 302 */ java.lang.String agg_value31 = inputadapter_isNull ?
null : (java.lang.String) inputadapter_value.toString();
/* 303 */ boolean agg_isNull29 = agg_value31 == null;
/* 304 */
/* 305 */ final scala.Tuple2 agg_value30 = new
scala.Tuple2(agg_value31, inputadapter_value1);
/* 306 */ final boolean agg_isNull28 = false;
/* 307 */ double agg_value27 = false ? -1.0 : (double)
((java.lang.Double)agg_value28.reduce(agg_value29, agg_value30)).doubleValue();
/* 308 */ /* referencetoexpressions(input[0, double] AS
value#511,
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e.reduce) */
/* 309 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e.reduce */
/* 310 */ /*
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e */
/* 311 */ /* expression:
org.apache.spark.sql.execution.aggregate.TypedSumDouble@72cfd54e */
/* 312 */ Object agg_obj3 = ((Expression)
references[6]).eval(null);
/* 313 */ org.apache.spark.sql.expressions.Aggregator
agg_value36 = (org.apache.spark.sql.expressions.Aggregator) agg_obj3;
/* 314 */ /* input[1, double] */
/* 315 */ double agg_value37 =
agg_unsafeRowAggBuffer.getDouble(1);
/* 316 */ /* newInstance(class scala.Tuple2) */
/* 317 */ /* input[2, string].toString */
/* 318 */ java.lang.String agg_value39 = inputadapter_isNull ?
null : (java.lang.String) inputadapter_value.toString();
/* 319 */ boolean agg_isNull37 = agg_value39 == null;
/* 320 */
/* 321 */ final scala.Tuple2 agg_value38 = new
scala.Tuple2(agg_value39, inputadapter_value1);
/* 322 */ final boolean agg_isNull36 = false;
/* 323 */ double agg_value35 = false ? -1.0 : (double)
((java.lang.Double)agg_value36.reduce(agg_value37, agg_value38)).doubleValue();
/* 324 */ // update unsafe row buffer
/* 325 */ agg_unsafeRowAggBuffer.setDouble(0, agg_value27);
/* 326 */ agg_unsafeRowAggBuffer.setDouble(1, agg_value35);
/* 327 */
/* 328 */ }
/* 329 */ if (shouldStop()) return;
/* 330 */ }
/* 331 */
/* 332 */ agg_vectorizedHashMapIter =
agg_vectorizedHashMap.rowIterator();
/* 333 */
/* 334 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap,
agg_sorter, agg_metricValue, agg_metricValue1);
/* 335 */ }
/* 336 */
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]