karuppayya commented on a change in pull request #28804:
URL: https://github.com/apache/spark/pull/28804#discussion_r449790519



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
##########
@@ -877,44 +901,61 @@ case class HashAggregateExec(
       ("true", "true", "", "")
     }
 
-    val oomeClassName = classOf[SparkOutOfMemoryError].getName
+    val skipPartialAggregateThreshold = 
sqlContext.conf.skipPartialAggregateThreshold
+    val skipPartialAggRatio = sqlContext.conf.skipPartialAggregateRatio
 
+    val oomeClassName = classOf[SparkOutOfMemoryError].getName
+    val countTerm = ctx.addMutableState(CodeGenerator.JAVA_LONG, "count")
     val findOrInsertRegularHashMap: String =
       s"""
-         |// generate grouping key
-         |${unsafeRowKeyCode.code}
-         |int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
-         |if ($checkFallbackForBytesToBytesMap) {
-         |  // try to get the buffer from hash map
-         |  $unsafeRowBuffer =
-         |    $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, 
$unsafeRowKeyHash);
-         |}
-         |// Can't allocate buffer from the hash map. Spill the map and 
fallback to sort-based
-         |// aggregation after processing all input rows.
-         |if ($unsafeRowBuffer == null) {
-         |  if ($sorterTerm == null) {
-         |    $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
-         |  } else {
-         |    
$sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter());
+         |if (!$avoidSpillInPartialAggregateTerm) {
+         |  // generate grouping key
+         |  ${unsafeRowKeyCode.code}
+         |  int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
+         |  if ($checkFallbackForBytesToBytesMap) {
+         |    // try to get the buffer from hash map
+         |    $unsafeRowBuffer =
+         |      $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, 
$unsafeRowKeyHash);
          |  }
-         |  $resetCounter
-         |  // the hash map had be spilled, it should have enough memory now,
-         |  // try to allocate buffer again.
-         |  $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow(
-         |    $unsafeRowKeys, $unsafeRowKeyHash);
-         |  if ($unsafeRowBuffer == null) {
-         |    // failed to allocate the first page
-         |    throw new $oomeClassName("No enough memory for aggregation");
+         |  // Can't allocate buffer from the hash map. Spill the map and 
fallback to sort-based
+         |  // aggregation after processing all input rows.
+         |  if ($unsafeRowBuffer == null && 
!$avoidSpillInPartialAggregateTerm) {

Review comment:
       @prakharjain09 
   The first check will kick in once we have decided to avoid partail 
aggregation, in whih case we wont attempt to fetch from Map.
   Second check will kick only when the Map is exhausted completely, and this 
is when we decide whether we have to skip partil aggregation.
   I think both checks are required.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to