karuppayya commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r468725811
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##########
@@ -838,13 +880,17 @@ case class HashAggregateExec(
| long $beforeAgg = System.nanoTime();
| $doAggFuncName();
| $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS);
+ | if (shouldStop()) return;
|}
+ |$genCodePostInitCode
|// output the result
|$outputFromFastHashMap
|$outputFromRegularHashMap
""".stripMargin
}
+ override def needStopCheck: Boolean = skipPartialAggregateEnabled
Review comment:
When the ration of cardinality to numRows,
**doesnot go beyond threshold** - it implies that the optimization has not
kicked in yet. In which case
`org.apache.spark.sql.execution.CodegenSupport#shouldStopCheckCode`, returns
false. And continues with the iterating over remaining items of the iterator.
**goes beyond threshold** - We add the item(since the addition to Map is
skipped) to the org.apache.spark.sql.execution.BufferedRowIterator#currentRows,
which gets consumed by the parent.
Since it is inexpensive operation and has been used at many places in
HashAggregateExec and didnt see any performance penalties, this approached
seemed ok to me .
Please let me know if you have any other suggestions.
Generated code:
```
private void agg_doAggregateWithKeys_0() throws java.io.IOException {
/* 318 */ while ( localtablescan_input_0.hasNext()) {
/* 319 */ InternalRow localtablescan_row_0 = (InternalRow)
localtablescan_input_0.next();
/* 320 */ ((org.apache.spark.sql.execution.metric.SQLMetric)
references[7] /* numOutputRows */).add(1);
/* 321 */ boolean localtablescan_isNull_0 =
localtablescan_row_0.isNullAt(0);
/* 322 */ UTF8String localtablescan_value_0 = localtablescan_isNull_0 ?
/* 323 */ null : (localtablescan_row_0.getUTF8String(0));
/* 324 */ int localtablescan_value_1 = localtablescan_row_0.getInt(1);
/* 325 */
/* 326 */ agg_doConsume_0(localtablescan_value_0,
localtablescan_isNull_0, localtablescan_value_1);
/* 327 */ if (shouldStop()) return; // code added as part of
needStopCheck
/* 328 */ }
/* 329 */
/* 330 */ agg_childrenConsumed_0 = true;
/* 331 */
/* 332 */ agg_fastHashMapIter_0 = agg_fastHashMap_0.rowIterator();
/* 333 */ agg_mapIter_0 =
((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /*
plan */).finishAggregate(agg_hashMap_0, agg_sorter_0,
((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* peakMemory
*/), ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /*
spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5]
/* avgHashProbe */));
/* 334 */
/* 335 */ }
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]