karuppayya commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r466750508



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##########
@@ -877,50 +903,111 @@ case class HashAggregateExec(
       ("true", "true", "", "")
     }
 
+    val skipPartialAggregateThreshold = 
sqlContext.conf.skipPartialAggregateThreshold
+    val skipPartialAggRatio = sqlContext.conf.skipPartialAggregateRatio
+
+    val countTerm = ctx.addMutableState(CodeGenerator.JAVA_LONG, "count")
     val oomeClassName = classOf[SparkOutOfMemoryError].getName
+    val findOrInsertRegularHashMap: String = {
+      def getAggBufferFromMap = {
+        s"""
+           |// generate grouping key
+           |${unsafeRowKeyCode.code}
+           |int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
+           |if ($checkFallbackForBytesToBytesMap) {
+           |  // try to get the buffer from hash map
+           |  $unsafeRowBuffer =
+           |    $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, 
$unsafeRowKeyHash);
+           |}
+          """.stripMargin
+      }
 
-    val findOrInsertRegularHashMap: String =
-      s"""
-         |// generate grouping key
-         |${unsafeRowKeyCode.code}
-         |int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
-         |if ($checkFallbackForBytesToBytesMap) {
-         |  // try to get the buffer from hash map
-         |  $unsafeRowBuffer =
-         |    $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, 
$unsafeRowKeyHash);
-         |}
-         |// Can't allocate buffer from the hash map. Spill the map and 
fallback to sort-based
-         |// aggregation after processing all input rows.
-         |if ($unsafeRowBuffer == null) {
-         |  if ($sorterTerm == null) {
-         |    $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
-         |  } else {
-         |    
$sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter());
-         |  }
-         |  $resetCounter
-         |  // the hash map had be spilled, it should have enough memory now,
-         |  // try to allocate buffer again.
-         |  $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow(
-         |    $unsafeRowKeys, $unsafeRowKeyHash);
-         |  if ($unsafeRowBuffer == null) {
-         |    // failed to allocate the first page
-         |    throw new $oomeClassName("No enough memory for aggregation");
-         |  }
-         |}
+      def addToSorter: String = {
+        s"""
+          |if ($sorterTerm == null) {
+          |  $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
+          |} else {
+          |  $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter());
+          |}
+          |$resetCounter
+          |// the hash map had be spilled, it should have enough memory now,
+          |// try to allocate buffer again.
+          |$unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow(
+          |  $unsafeRowKeys, $unsafeRowKeyHash);
+          |if ($unsafeRowBuffer == null) {
+          |  // failed to allocate the first page
+          |  throw new $oomeClassName("No enough memory for aggregation");
+          |}""".stripMargin
+      }
+
+      def getHeuristicToAvoidAgg: String = {
+        s"""
+          |!($rowCountTerm < $skipPartialAggregateThreshold) &&
+          |      ((float)$countTerm/$rowCountTerm) > $skipPartialAggRatio;
+          |""".stripMargin
+      }
+
+      if (skipPartialAggregate) {
+        s"""
+           |if (!$avoidSpillInPartialAggregateTerm) {
+           |  $getAggBufferFromMap
+           |  // Can't allocate buffer from the hash map.
+           |  // Check if we can avoid partial aggregation.
+           |  //  Otherwise, Spill the map and fallback to sort-based
+           |  // aggregation after processing all input rows.
+           |  if ($unsafeRowBuffer == null && 
!$avoidSpillInPartialAggregateTerm) {

Review comment:
       Removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to