[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19869 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154682180 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -596,7 +596,7 @@ case class HashAggregateExec( ctx.addMutableState(fastHashMapClassName, fastHashMapTerm, s"$fastHashMapTerm = new $fastHashMapClassName();") ctx.addMutableState( - classOf[java.util.Iterator[ColumnarRow]].getName, + s"java.util.Iterator<${classOf[ColumnarRow]}>", --- End diff -- damn... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154580311 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -596,7 +596,7 @@ case class HashAggregateExec( ctx.addMutableState(fastHashMapClassName, fastHashMapTerm, s"$fastHashMapTerm = new $fastHashMapClassName();") ctx.addMutableState( - classOf[java.util.Iterator[ColumnarRow]].getName, + s"java.util.Iterator<${classOf[ColumnarRow]}>", --- End diff -- ```scala scala> s"java.util.Iterator<${classOf[ColumnarRow]}>" res2: String = java.util.Iterator ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154576368 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -573,94 +574,84 @@ case class HashAggregateExec( enableTwoLevelHashMap(ctx) } else { sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) match { -case "true" => logWarning("Two level hashmap is disabled but vectorized hashmap is " + - "enabled.") -case null | "" | "false" => None +case "true" => + logWarning("Two level hashmap is disabled but vectorized hashmap is enabled.") +case _ => } } -fastHashMapTerm = ctx.freshName("fastHashMap") -val fastHashMapClassName = ctx.freshName("FastHashMap") -val fastHashMapGenerator = - if (isVectorizedHashMapEnabled) { -new VectorizedHashMapGenerator(ctx, aggregateExpressions, - fastHashMapClassName, groupingKeySchema, bufferSchema) - } else { -new RowBasedHashMapGenerator(ctx, aggregateExpressions, - fastHashMapClassName, groupingKeySchema, bufferSchema) - } val thisPlan = ctx.addReferenceObj("plan", this) -// Create a name for iterator from vectorized HashMap +// Create a name for the iterator from the fast hash map. val iterTermForFastHashMap = ctx.freshName("fastHashMapIter") if (isFastHashMapEnabled) { + // Generates the fast hash map class and creates the fash hash map term. + fastHashMapTerm = ctx.freshName("fastHashMap") + val fastHashMapClassName = ctx.freshName("FastHashMap") if (isVectorizedHashMapEnabled) { +val generatedMap = new VectorizedHashMapGenerator(ctx, aggregateExpressions, + fastHashMapClassName, groupingKeySchema, bufferSchema).generate() +ctx.addInnerClass(generatedMap) + ctx.addMutableState(fastHashMapClassName, fastHashMapTerm, s"$fastHashMapTerm = new $fastHashMapClassName();") ctx.addMutableState( - "java.util.Iterator", + classOf[java.util.Iterator[ColumnarRow]].getName, --- End diff -- Is this as same as before? ```scala scala> classOf[java.util.Iterator[Int]].getName res2: String = java.util.Iterator ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154569858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -768,12 +762,8 @@ case class HashAggregateExec( // generate hash code for key val hashExpr = Murmur3Hash(groupingExpressions, 42) -ctx.currentVars = input val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx) -val inputAttr = aggregateBufferAttributes ++ child.output -ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input - val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { --- End diff -- ok, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154569741 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -768,12 +762,8 @@ case class HashAggregateExec( // generate hash code for key val hashExpr = Murmur3Hash(groupingExpressions, 42) -ctx.currentVars = input val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx) -val inputAttr = aggregateBufferAttributes ++ child.output -ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input - val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { --- End diff -- I'll have another PR for this part, will reformat it at that time --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154566445 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -882,45 +851,65 @@ case class HashAggregateExec( |${evaluateVariables(unsafeRowBufferEvals)} |// update unsafe row buffer |${updateUnsafeRowBuffer.mkString("\n").trim} - """.stripMargin + """.stripMargin } +val updateRowInHashMap: String = { + if (isFastHashMapEnabled) { +ctx.INPUT_ROW = fastRowBuffer +val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) +val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) +val effectiveCodes = subExprs.codes.mkString("\n") +val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { + boundUpdateExpr.map(_.genCode(ctx)) +} +val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) => + val dt = updateExpr(i).dataType + ctx.updateColumn( +fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled) +} + +// If fast hash map is on, we first generate code to update row in fast hash map, if the +// previous loop up hit fast hash map. Otherwise, update row in regular hash map. +s""" + |if ($fastRowBuffer != null) { + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(fastRowEvals)} + | // update fast row + | ${updateFastRow.mkString("\n").trim} + |} else { + | $updateRowInRegularHashMap + |} + """.stripMargin + } else { +updateRowInRegularHashMap + } +} + +val declareFastRowBuffer: String = if (isFastHashMapEnabled) { + val rowType = if (isVectorizedHashMapEnabled) { +classOf[MutableColumnarRow].getName + } else { +"UnsafeRow" + } + s"$rowType $fastRowBuffer = null;" +} else "" // We try to do hash map based in-memory aggregation first. If there is not enough memory (the // hash map will return null for new key), we spill the hash map to disk to free memory, then // continue to do in-memory aggregation and spilling until all the rows had been processed. // Finally, sort the spilled aggregate buffers by key, and merge them together for same key. s""" UnsafeRow $unsafeRowBuffer = null; --- End diff -- nit: How about this for just consistency? ``` s""" $declareRowBuffer $findOrInsertHashMap $incCounter $updateRowInHashMap """ ``` Then, ``` val declareRowBuffer: String = if (isFastHashMapEnabled) { val rowType = if (isVectorizedHashMapEnabled) { classOf[MutableColumnarRow].getName } else { "UnsafeRow" } s""" |UnsafeRow $unsafeRowBuffer = null; |$rowType $fastRowBuffer = null; """.stripMargin } else { s"UnsafeRow $unsafeRowBuffer = null;" } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154565659 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -768,12 +762,8 @@ case class HashAggregateExec( // generate hash code for key val hashExpr = Murmur3Hash(groupingExpressions, 42) -ctx.currentVars = input val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx) -val inputAttr = aggregateBufferAttributes ++ child.output -ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input - val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { --- End diff -- nit: need some indents in the head? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154561232 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -621,34 +622,30 @@ case class HashAggregateExec( val iterTerm = ctx.freshName("mapIter") ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm) -def generateGenerateCode(): String = { - if (isFastHashMapEnabled) { -if (isVectorizedHashMapEnabled) { - s""" - | ${fastHashMapGenerator.asInstanceOf[VectorizedHashMapGenerator].generate()} - """.stripMargin -} else { - s""" - | ${fastHashMapGenerator.asInstanceOf[RowBasedHashMapGenerator].generate()} - """.stripMargin -} - } else "" +if (isFastHashMapEnabled) { + val generatedMap = if (isVectorizedHashMapEnabled) { + fastHashMapGenerator.asInstanceOf[VectorizedHashMapGenerator].generate() + } else { + fastHashMapGenerator.asInstanceOf[RowBasedHashMapGenerator].generate() + } + ctx.addInnerClass(generatedMap) } -ctx.addInnerClass(generateGenerateCode()) val doAgg = ctx.freshName("doAggregateWithKeys") val peakMemory = metricTerm(ctx, "peakMemory") val spillSize = metricTerm(ctx, "spillSize") val avgHashProbe = metricTerm(ctx, "avgHashProbe") +val finishFashHashMap = if (isFastHashMapEnabled) { --- End diff -- nit: can we merge this branch with the the branch above (line 625) for hashmap stuffs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154562413 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -672,48 +668,56 @@ case class HashAggregateExec( def outputFromRowBasedMap: String = { s""" - while ($iterTermForFastHashMap.next()) { - $numOutput.add(1); - UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); - UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); - $outputFunc($keyTerm, $bufferTerm); - - if (shouldStop()) return; - } - $fastHashMapTerm.close(); - """ + |while ($iterTermForFastHashMap.next()) { --- End diff -- super nit: can we also drop unnecessary spaces in the head from this file? e.g., ``` s""" | private void $doAgg() throws java.io.IOException { ``` ``` s""" |private void $doAgg() throws java.io.IOException { ``` https://github.com/cloud-fan/spark/blob/9b8ae3d6635c5ed0323bf088e20d0de55dd1c098/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L233 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154556638 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -672,48 +668,56 @@ case class HashAggregateExec( def outputFromRowBasedMap: String = { s""" - while ($iterTermForFastHashMap.next()) { - $numOutput.add(1); - UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); - UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); - $outputFunc($keyTerm, $bufferTerm); - - if (shouldStop()) return; - } - $fastHashMapTerm.close(); - """ + |while ($iterTermForFastHashMap.next()) { + | UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); + | UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); + | $outputFunc($keyTerm, $bufferTerm); + | + | if (shouldStop()) return; + |} + |$fastHashMapTerm.close(); + """.stripMargin } // Iterate over the aggregate rows and convert them from ColumnarRow to UnsafeRow def outputFromVectorizedMap: String = { val row = ctx.freshName("fastHashMapRow") --- End diff -- ah good catch! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154534333 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -672,48 +668,56 @@ case class HashAggregateExec( def outputFromRowBasedMap: String = { s""" - while ($iterTermForFastHashMap.next()) { - $numOutput.add(1); - UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); - UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); - $outputFunc($keyTerm, $bufferTerm); - - if (shouldStop()) return; - } - $fastHashMapTerm.close(); - """ + |while ($iterTermForFastHashMap.next()) { + | UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); + | UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); + | $outputFunc($keyTerm, $bufferTerm); + | + | if (shouldStop()) return; + |} + |$fastHashMapTerm.close(); + """.stripMargin } // Iterate over the aggregate rows and convert them from ColumnarRow to UnsafeRow def outputFromVectorizedMap: String = { val row = ctx.freshName("fastHashMapRow") --- End diff -- nit: Is it better to fix indentations for these three lines, too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154528578 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -882,45 +851,65 @@ case class HashAggregateExec( |${evaluateVariables(unsafeRowBufferEvals)} |// update unsafe row buffer |${updateUnsafeRowBuffer.mkString("\n").trim} - """.stripMargin + """.stripMargin } +val updateRowInHashMap: String = { + if (isFastHashMapEnabled) { +ctx.INPUT_ROW = fastRowBuffer +val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) +val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) +val effectiveCodes = subExprs.codes.mkString("\n") +val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { + boundUpdateExpr.map(_.genCode(ctx)) +} +val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) => + val dt = updateExpr(i).dataType + ctx.updateColumn( +fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled) +} + +// If fast hash map is on, we first generate code to update row in fast hash map, if the +// previous loop up hit fast hash map. Otherwise, update row in regular hash map. +s""" + |if ($fastRowBuffer != null) { + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(fastRowEvals)} + | // update fast row + | ${updateFastRow.mkString("\n").trim} + |} else { + | $updateRowInRegularHashMap + |} + """.stripMargin + } else { +updateRowInRegularHashMap --- End diff -- Previously we always declare the `fastRowBuffer` and have the `if (fastRowBuffer != null)` check. Now we don't generate then if fast hash map is not enabled. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154528498 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -784,86 +774,65 @@ case class HashAggregateExec( ("true", "true", "", "") } -// We first generate code to probe and update the fast hash map. If the probe is -// successful the corresponding fast row buffer will hold the mutable row -val findOrInsertFastHashMap: Option[String] = { +val findOrInsertRegularHashMap: String = + s""" + |// generate grouping key + |${unsafeRowKeyCode.code.trim} + |${hashEval.code.trim} + |if ($checkFallbackForBytesToBytesMap) { + | // try to get the buffer from hash map + | $unsafeRowBuffer = + | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value}); + |} + |// Can't allocate buffer from the hash map. Spill the map and fallback to sort-based + |// aggregation after processing all input rows. + |if ($unsafeRowBuffer == null) { + | if ($sorterTerm == null) { + |$sorterTerm = $hashMapTerm.destructAndCreateExternalSorter(); + | } else { + | $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter()); + | } + | $resetCounter + | // the hash map had be spilled, it should have enough memory now, + | // try to allocate buffer again. + | $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow( + |$unsafeRowKeys, ${hashEval.value}); + | if ($unsafeRowBuffer == null) { + |// failed to allocate the first page + |throw new OutOfMemoryError("No enough memory for aggregation"); + | } + |} + """.stripMargin + +val findOrInsertHashMap: String = { if (isFastHashMapEnabled) { -Option( - s""" - | - |if ($checkFallbackForGeneratedHashMap) { - | ${fastRowKeys.map(_.code).mkString("\n")} - | if (${fastRowKeys.map("!" + _.isNull).mkString(" && ")}) { - |$fastRowBuffer = $fastHashMapTerm.findOrInsert( - |${fastRowKeys.map(_.value).mkString(", ")}); - | } - |} - """.stripMargin) +// If fast hash map is on, we first generate code to probe and update the fast hash map. +// If the probe is successful the corresponding fast row buffer will hold the mutable row. +s""" + |if ($checkFallbackForGeneratedHashMap) { --- End diff -- moved from https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L794 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154528455 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -784,86 +774,65 @@ case class HashAggregateExec( ("true", "true", "", "") } -// We first generate code to probe and update the fast hash map. If the probe is -// successful the corresponding fast row buffer will hold the mutable row -val findOrInsertFastHashMap: Option[String] = { +val findOrInsertRegularHashMap: String = --- End diff -- moved from https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L833 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154528431 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -672,48 +668,56 @@ case class HashAggregateExec( def outputFromRowBasedMap: String = { s""" - while ($iterTermForFastHashMap.next()) { - $numOutput.add(1); - UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); - UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); - $outputFunc($keyTerm, $bufferTerm); - - if (shouldStop()) return; - } - $fastHashMapTerm.close(); - """ + |while ($iterTermForFastHashMap.next()) { + | UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); + | UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); + | $outputFunc($keyTerm, $bufferTerm); + | + | if (shouldStop()) return; + |} + |$fastHashMapTerm.close(); + """.stripMargin } // Iterate over the aggregate rows and convert them from ColumnarRow to UnsafeRow def outputFromVectorizedMap: String = { val row = ctx.freshName("fastHashMapRow") ctx.currentVars = null ctx.INPUT_ROW = row -val generateKeyRow = GenerateUnsafeProjection.createCode(ctx, - groupingKeySchema.toAttributes.zipWithIndex + val generateKeyRow = GenerateUnsafeProjection.createCode(ctx, +groupingKeySchema.toAttributes.zipWithIndex .map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable) } -) -val generateBufferRow = GenerateUnsafeProjection.createCode(ctx, - bufferSchema.toAttributes.zipWithIndex - .map { case (attr, i) => -BoundReference(groupingKeySchema.length + i, attr.dataType, attr.nullable) }) -s""" - | while ($iterTermForFastHashMap.hasNext()) { - | $numOutput.add(1); - | org.apache.spark.sql.execution.vectorized.ColumnarRow $row = - | (org.apache.spark.sql.execution.vectorized.ColumnarRow) - | $iterTermForFastHashMap.next(); - | ${generateKeyRow.code} - | ${generateBufferRow.code} - | $outputFunc(${generateKeyRow.value}, ${generateBufferRow.value}); - | - | if (shouldStop()) return; - | } - | - | $fastHashMapTerm.close(); - """.stripMargin + ) + val generateBufferRow = GenerateUnsafeProjection.createCode(ctx, +bufferSchema.toAttributes.zipWithIndex.map { case (attr, i) => + BoundReference(groupingKeySchema.length + i, attr.dataType, attr.nullable) +}) + val columnarRowCls = classOf[ColumnarRow].getName + s""" + |while ($iterTermForFastHashMap.hasNext()) { + | $columnarRowCls $row = ($columnarRowCls) $iterTermForFastHashMap.next(); + | ${generateKeyRow.code} + | ${generateBufferRow.code} + | $outputFunc(${generateKeyRow.value}, ${generateBufferRow.value}); + | + | if (shouldStop()) return; + |} + | + |$fastHashMapTerm.close(); + """.stripMargin } +def outputFromRegularHashMap: String = { + s""" + |while ($iterTerm.next()) { --- End diff -- moved from https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L731 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154528409 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -672,48 +668,56 @@ case class HashAggregateExec( def outputFromRowBasedMap: String = { s""" - while ($iterTermForFastHashMap.next()) { - $numOutput.add(1); - UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); - UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); - $outputFunc($keyTerm, $bufferTerm); - - if (shouldStop()) return; - } - $fastHashMapTerm.close(); - """ + |while ($iterTermForFastHashMap.next()) { + | UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); + | UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); + | $outputFunc($keyTerm, $bufferTerm); + | + | if (shouldStop()) return; + |} + |$fastHashMapTerm.close(); + """.stripMargin } // Iterate over the aggregate rows and convert them from ColumnarRow to UnsafeRow def outputFromVectorizedMap: String = { val row = ctx.freshName("fastHashMapRow") ctx.currentVars = null ctx.INPUT_ROW = row -val generateKeyRow = GenerateUnsafeProjection.createCode(ctx, - groupingKeySchema.toAttributes.zipWithIndex --- End diff -- The indentation was wrong previously. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154528386 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -444,6 +444,7 @@ case class HashAggregateExec( val funcName = ctx.freshName("doAggregateWithKeysOutput") val keyTerm = ctx.freshName("keyTerm") val bufferTerm = ctx.freshName("bufferTerm") +val numOutput = metricTerm(ctx, "numOutputRows") --- End diff -- update the `numOutputRows` in the result function instead of doing it for both fast hash map and regular hash map. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/19869 [SPARK-22677][SQL] cleanup whole stage codegen for hash aggregate ## What changes were proposed in this pull request? The `HashAggregateExec` whole stage codegen path is a little messy and hard to understand, this code cleans it up a little bit, especially for the fast hash map part. ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark hash-agg Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19869.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19869 commit 174f4ec2ea000de01da6f494db366dc3ff58ccc4 Author: Wenchen Fan Date: 2017-11-24T12:43:37Z cleanup whole stage codegen for hash aggregate --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org