YuzhouSun commented on code in PR #35806:
URL: https://github.com/apache/spark/pull/35806#discussion_r860541420
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2907,6 +2907,18 @@ object SQLConf {
.checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in
[10, 30].")
.createWithDefault(16)
+ val ADAPTIVE_PARTIAL_AGGREGATION_THRESHOLD =
+ buildConf("spark.sql.aggregate.adaptivePartialAggregationThreshold")
+ .internal()
+ .doc("Minimum number of processed rows before partial aggregation can be
skipped. " +
+ "By setting this value to 0 adaptive partial aggregation can be
disabled.")
+ .version("3.3.0")
+ .intConf
+ .checkValue(threshold => threshold >= 0 && threshold < (1 << 16),
+ "The threshold value must be bigger than or equal to 0 and less than "
+
+ s"1 << ${FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT.key}.")
+ .createWithDefault(10000)
Review Comment:
Just curious, any reason that the default value of
adaptivePartialAggregationThreshold is 10,000? [In Trino the default is
100,000](https://github.com/trinodb/trino/blob/ab7bc3fa534110893b96232b6b1c6d36286dcbb6/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java#L86)
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala:
##########
@@ -146,10 +155,32 @@ case class HashAggregateExec(
// but the vectorized hashmap can still be switched on for testing and
benchmarking purposes.
private var isVectorizedHashMapEnabled: Boolean = false
+ // VisibleForTesting
+ private[sql] lazy val isAdaptivePartialAggregationEnabled = {
+ isPartialAgg && groupingAttributes.nonEmpty &&
conf.adaptivePartialAggregationThreshold > 0 &&
+ conf.adaptivePartialAggregationThreshold < (1 <<
conf.fastHashAggregateRowMaxCapacityBit) && {
+ child
+ .collectUntil(p => p.isInstanceOf[WholeStageCodegenExec] ||
+ !p.isInstanceOf[CodegenSupport] ||
+ p.isInstanceOf[LeafExecNode]).forall {
+ case _: ProjectExec | _: FilterExec | _: ColumnarToRowExec => true
+ case _: SerializeFromObjectExec => true
+ case _: InputAdapter => true
+ // HashAggregateExec, ExpandExec, SortMergeJoinExec ...
+ case _ => false
Review Comment:
Why does it need to limit child node types? Is this for a performance
reason? If so, after nodes like aggregate, join, and expand, skipping partial
aggregate may still benefit performance. Why isn’t
adaptivePartialAggregationThreshold good enough?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala:
##########
@@ -847,18 +904,60 @@ case class HashAggregateExec(
s"UnsafeRow $unsafeRowBuffer = null;"
}
+ val adaptivePartialAggregation = if (isAdaptivePartialAggregationEnabled) {
+ val numberOfConsumedKeysTerm =
+ ctx.addMutableState(CodeGenerator.JAVA_LONG, "numberOfConsumedKeys")
+ val numAggSkippedRows = metricTerm(ctx, "numSkippedParAggRows")
+ val initExpr = declFunctions.flatMap(f => f.initialValues)
+ val emptyBufferKeyCode = GenerateUnsafeProjection.createCode(ctx,
initExpr)
+
+ val numberOfKeys = if (fastHashMapTerm != null) {
+ s"$numberOfConsumedKeysTerm = $fastHashMapTerm.getNumKeys();"
+ } else if (hashMapTerm != null) {
+ s"$numberOfConsumedKeysTerm = $hashMapTerm.getNumKeys();"
Review Comment:
When fastHashMap is enabled, this implementation doesn’t count keys in
regular hashMap. However, even when number of processed keys is less than
fastHashMap capacity, the regularHashMap seems can still be non-empty when the
fastHashMap
1. unable to find group key within maxSteps (maxSteps = 2)
2. run out of max supported page size.
So even though adaptivePartialAggregationThreshold must be smaller than
fastHashMap capacity, it's still possible that both fastHashMap and regular
hashMap are non-empty. A potential fix can be `numberOfKeys =
fastHashMap.getNumKeys() + hashMap.getNumKeys()`, and don't enable
skipPartialAggregate once the aggregate spills. With this fix we can also
remove [the adaptivePartialAggregationThreshold < fastHashMap's capacity
check](https://github.com/apache/spark/pull/35806/files#diff-13c5b65678b327277c68d17910ae93629801af00117a0e3da007afd95b6c6764R2917-R2919)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]