Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154528498 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -784,86 +774,65 @@ case class HashAggregateExec( ("true", "true", "", "") } - // We first generate code to probe and update the fast hash map. If the probe is - // successful the corresponding fast row buffer will hold the mutable row - val findOrInsertFastHashMap: Option[String] = { + val findOrInsertRegularHashMap: String = + s""" + |// generate grouping key + |${unsafeRowKeyCode.code.trim} + |${hashEval.code.trim} + |if ($checkFallbackForBytesToBytesMap) { + | // try to get the buffer from hash map + | $unsafeRowBuffer = + | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value}); + |} + |// Can't allocate buffer from the hash map. Spill the map and fallback to sort-based + |// aggregation after processing all input rows. + |if ($unsafeRowBuffer == null) { + | if ($sorterTerm == null) { + | $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter(); + | } else { + | $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter()); + | } + | $resetCounter + | // the hash map had be spilled, it should have enough memory now, + | // try to allocate buffer again. + | $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow( + | $unsafeRowKeys, ${hashEval.value}); + | if ($unsafeRowBuffer == null) { + | // failed to allocate the first page + | throw new OutOfMemoryError("No enough memory for aggregation"); + | } + |} + """.stripMargin + + val findOrInsertHashMap: String = { if (isFastHashMapEnabled) { - Option( - s""" - | - |if ($checkFallbackForGeneratedHashMap) { - | ${fastRowKeys.map(_.code).mkString("\n")} - | if (${fastRowKeys.map("!" + _.isNull).mkString(" && ")}) { - | $fastRowBuffer = $fastHashMapTerm.findOrInsert( - | ${fastRowKeys.map(_.value).mkString(", ")}); - | } - |} - """.stripMargin) + // If fast hash map is on, we first generate code to probe and update the fast hash map. + // If the probe is successful the corresponding fast row buffer will hold the mutable row. + s""" + |if ($checkFallbackForGeneratedHashMap) { --- End diff -- moved from https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L794
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org