Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19869#discussion_r154528498
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
    @@ -784,86 +774,65 @@ case class HashAggregateExec(
           ("true", "true", "", "")
         }
     
    -    // We first generate code to probe and update the fast hash map. If 
the probe is
    -    // successful the corresponding fast row buffer will hold the mutable 
row
    -    val findOrInsertFastHashMap: Option[String] = {
    +    val findOrInsertRegularHashMap: String =
    +      s"""
    +         |// generate grouping key
    +         |${unsafeRowKeyCode.code.trim}
    +         |${hashEval.code.trim}
    +         |if ($checkFallbackForBytesToBytesMap) {
    +         |  // try to get the buffer from hash map
    +         |  $unsafeRowBuffer =
    +         |    
$hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, 
${hashEval.value});
    +         |}
    +         |// Can't allocate buffer from the hash map. Spill the map and 
fallback to sort-based
    +         |// aggregation after processing all input rows.
    +         |if ($unsafeRowBuffer == null) {
    +         |  if ($sorterTerm == null) {
    +         |    $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
    +         |  } else {
    +         |    
$sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter());
    +         |  }
    +         |  $resetCounter
    +         |  // the hash map had be spilled, it should have enough memory 
now,
    +         |  // try to allocate buffer again.
    +         |  $unsafeRowBuffer = 
$hashMapTerm.getAggregationBufferFromUnsafeRow(
    +         |    $unsafeRowKeys, ${hashEval.value});
    +         |  if ($unsafeRowBuffer == null) {
    +         |    // failed to allocate the first page
    +         |    throw new OutOfMemoryError("No enough memory for 
aggregation");
    +         |  }
    +         |}
    +       """.stripMargin
    +
    +    val findOrInsertHashMap: String = {
           if (isFastHashMapEnabled) {
    -        Option(
    -          s"""
    -             |
    -             |if ($checkFallbackForGeneratedHashMap) {
    -             |  ${fastRowKeys.map(_.code).mkString("\n")}
    -             |  if (${fastRowKeys.map("!" + _.isNull).mkString(" && ")}) {
    -             |    $fastRowBuffer = $fastHashMapTerm.findOrInsert(
    -             |        ${fastRowKeys.map(_.value).mkString(", ")});
    -             |  }
    -             |}
    -         """.stripMargin)
    +        // If fast hash map is on, we first generate code to probe and 
update the fast hash map.
    +        // If the probe is successful the corresponding fast row buffer 
will hold the mutable row.
    +        s"""
    +           |if ($checkFallbackForGeneratedHashMap) {
    --- End diff --
    
    moved from 
https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L794


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to