YuzhouSun commented on code in PR #35806:
URL: https://github.com/apache/spark/pull/35806#discussion_r860541420


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2907,6 +2907,18 @@ object SQLConf {
       .checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in 
[10, 30].")
       .createWithDefault(16)
 
+  val ADAPTIVE_PARTIAL_AGGREGATION_THRESHOLD =
+    buildConf("spark.sql.aggregate.adaptivePartialAggregationThreshold")
+      .internal()
+      .doc("Minimum number of processed rows before partial aggregation can be 
skipped. " +
+        "By setting this value to 0 adaptive partial aggregation can be 
disabled.")
+      .version("3.3.0")
+      .intConf
+      .checkValue(threshold => threshold >= 0 && threshold < (1 << 16),
+        "The threshold value must be bigger than or equal to 0 and less than " 
+
+          s"1 << ${FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT.key}.")
+      .createWithDefault(10000)

Review Comment:
   Just curious, any reason that the default value of 
adaptivePartialAggregationThreshold is 10,000? [In Trino the default is 
100,000](https://github.com/trinodb/trino/blob/ab7bc3fa534110893b96232b6b1c6d36286dcbb6/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java#L86)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala:
##########
@@ -146,10 +155,32 @@ case class HashAggregateExec(
   // but the vectorized hashmap can still be switched on for testing and 
benchmarking purposes.
   private var isVectorizedHashMapEnabled: Boolean = false
 
+  // VisibleForTesting
+  private[sql] lazy val isAdaptivePartialAggregationEnabled = {
+    isPartialAgg && groupingAttributes.nonEmpty && 
conf.adaptivePartialAggregationThreshold > 0 &&
+      conf.adaptivePartialAggregationThreshold < (1 << 
conf.fastHashAggregateRowMaxCapacityBit) && {
+      child
+        .collectUntil(p => p.isInstanceOf[WholeStageCodegenExec] ||
+          !p.isInstanceOf[CodegenSupport] ||
+          p.isInstanceOf[LeafExecNode]).forall {
+        case _: ProjectExec | _: FilterExec | _: ColumnarToRowExec => true
+        case _: SerializeFromObjectExec => true
+        case _: InputAdapter => true
+        // HashAggregateExec, ExpandExec, SortMergeJoinExec ...
+        case _ => false

Review Comment:
   Why does it need to limit child node types? Is this for a performance 
reason? If so, after nodes like aggregate, join, and expand, skipping partial 
aggregate may still benefit performance. Why isn’t 
adaptivePartialAggregationThreshold good enough?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala:
##########
@@ -847,18 +904,60 @@ case class HashAggregateExec(
       s"UnsafeRow $unsafeRowBuffer = null;"
     }
 
+    val adaptivePartialAggregation = if (isAdaptivePartialAggregationEnabled) {
+      val numberOfConsumedKeysTerm =
+        ctx.addMutableState(CodeGenerator.JAVA_LONG, "numberOfConsumedKeys")
+      val numAggSkippedRows = metricTerm(ctx, "numSkippedParAggRows")
+      val initExpr = declFunctions.flatMap(f => f.initialValues)
+      val emptyBufferKeyCode = GenerateUnsafeProjection.createCode(ctx, 
initExpr)
+
+      val numberOfKeys = if (fastHashMapTerm != null) {
+        s"$numberOfConsumedKeysTerm = $fastHashMapTerm.getNumKeys();"
+      } else if (hashMapTerm != null) {
+        s"$numberOfConsumedKeysTerm = $hashMapTerm.getNumKeys();"

Review Comment:
   When fastHashMap is enabled, this implementation doesn’t count keys in 
regular hashMap. However, even when number of processed keys is less than 
fastHashMap capacity, the regularHashMap seems can still be non-empty when the 
fastHashMap
           1. unable to find group key within maxSteps (maxSteps = 2)
           2. run out of max supported page size. 
   
   So even though adaptivePartialAggregationThreshold must be smaller than 
fastHashMap capacity, it's still possible that both fastHashMap and regular 
hashMap are non-empty. A potential fix can be `numberOfKeys = 
fastHashMap.getNumKeys() + hashMap.getNumKeys()`, and don't enable 
skipPartialAggregate once the aggregate spills. With this fix we can also 
remove [the adaptivePartialAggregationThreshold < fastHashMap's capacity 
check](https://github.com/apache/spark/pull/35806/files#diff-13c5b65678b327277c68d17910ae93629801af00117a0e3da007afd95b6c6764R2917-R2919)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to