[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: URL: https://github.com/apache/spark/pull/27627#discussion_r433405913 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -62,38 +62,74 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val isEmpty = AttributeReference("isEmpty", BooleanType, nullable = false)() + private lazy val zero = Literal.default(sumDataType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = resultType match { +case _: DecimalType => sum :: isEmpty :: Nil +case _ => sum :: Nil + } - override lazy val initialValues: Seq[Expression] = Seq( -/* sum = */ Literal.create(null, sumDataType) - ) + override lazy val initialValues: Seq[Expression] = resultType match { +case _: DecimalType => Seq(Literal(null, resultType), Literal(true, BooleanType)) +case _ => Seq(Literal(null, resultType)) + } override lazy val updateExpressions: Seq[Expression] = { if (child.nullable) { - Seq( -/* sum = */ -coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) - ) + val updateSumExpr = coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) + resultType match { +case _: DecimalType => + Seq(updateSumExpr, isEmpty && child.isNull) +case _ => Seq(updateSumExpr) + } } else { - Seq( -/* sum = */ -coalesce(sum, zero) + child.cast(sumDataType) - ) + val updateSumExpr = coalesce(sum, zero) + child.cast(sumDataType) + resultType match { +case _: DecimalType => + Seq(updateSumExpr, Literal(false, BooleanType)) +case _ => Seq(updateSumExpr) + } } } + /** + * For decimal type: + * If isEmpty is false and if sum is null, then it means we have an overflow. + * + * update of the sum is as follows: + * Check if either portion of the left.sum or right.sum has overflowed + * If it has, then the sum value will remain null. + * If it did not have overflow, then add the sum.left and sum.right and check for overflow. Review comment: Thanks @cloud-fan, I have pushed the code comments changes. Please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: URL: https://github.com/apache/spark/pull/27627#discussion_r433404148 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ## @@ -200,14 +222,90 @@ class DataFrameSuite extends QueryTest Seq(true, false).foreach { ansiEnabled => withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) { val structDf = largeDecimals.select("a").agg(sum("a")) -if (!ansiEnabled) { - checkAnswer(structDf, Row(null)) -} else { - val e = intercept[SparkException] { -structDf.collect +assertDecimalSumOverflow(structDf, ansiEnabled, Row(null)) + } +} + } + + test("SPARK-28067: sum of null decimal values") { +Seq("true", "false").foreach { wholeStageEnabled => + withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) { +Seq("true", "false").foreach { ansiEnabled => + withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled)) { +val df = spark.range(1, 4, 1).select(expr(s"cast(null as decimal(38,18)) as d")) +checkAnswer(df.agg(sum($"d")), Row(null)) + } +} + } +} + } + + test("SPARK-28067: Aggregate sum should not return wrong results for decimal overflow") { +Seq("true", "false").foreach { wholeStageEnabled => + withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) { +Seq(true, false).foreach { ansiEnabled => + withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) { +val df0 = Seq( + (BigDecimal("1000"), 1), + (BigDecimal("1000"), 1), + (BigDecimal("1000"), 2)).toDF("decNum", "intNum") +val df1 = Seq( + (BigDecimal("1000"), 2), + (BigDecimal("1000"), 2), + (BigDecimal("1000"), 2), + (BigDecimal("1000"), 2), + (BigDecimal("1000"), 2), + (BigDecimal("1000"), 2), + (BigDecimal("1000"), 2), + (BigDecimal("1000"), 2), + (BigDecimal("1000"), 2)).toDF("decNum", "intNum") +val df = df0.union(df1) +val df2 = df.withColumnRenamed("decNum", "decNum2"). + join(df, "intNum").agg(sum("decNum")) Review comment: Sure. Yes, I have added the simplified version of the repro without join and other cases as well in the UT. All the cases we have discussed in the pr are added there. The reason I added this is it is the original repro from the JIRA creator who reported this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: URL: https://github.com/apache/spark/pull/27627#discussion_r433397053 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -62,38 +62,74 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val isEmpty = AttributeReference("isEmpty", BooleanType, nullable = false)() + private lazy val zero = Literal.default(sumDataType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = resultType match { +case _: DecimalType => sum :: isEmpty :: Nil +case _ => sum :: Nil + } - override lazy val initialValues: Seq[Expression] = Seq( -/* sum = */ Literal.create(null, sumDataType) - ) + override lazy val initialValues: Seq[Expression] = resultType match { +case _: DecimalType => Seq(Literal(null, resultType), Literal(true, BooleanType)) +case _ => Seq(Literal(null, resultType)) + } override lazy val updateExpressions: Seq[Expression] = { if (child.nullable) { - Seq( -/* sum = */ -coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) - ) + val updateSumExpr = coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) + resultType match { +case _: DecimalType => + Seq(updateSumExpr, isEmpty && child.isNull) +case _ => Seq(updateSumExpr) + } } else { - Seq( -/* sum = */ -coalesce(sum, zero) + child.cast(sumDataType) - ) + val updateSumExpr = coalesce(sum, zero) + child.cast(sumDataType) + resultType match { +case _: DecimalType => + Seq(updateSumExpr, Literal(false, BooleanType)) +case _ => Seq(updateSumExpr) + } } } + /** + * For decimal type: + * If isEmpty is false and if sum is null, then it means we have an overflow. + * + * update of the sum is as follows: + * Check if either portion of the left.sum or right.sum has overflowed + * If it has, then the sum value will remain null. + * If it did not have overflow, then add the sum.left and sum.right and check for overflow. + * + * isEmpty: Set to false if either one of the left or right is set to false. This + * means we have seen atleast a row that was not null. Review comment: sure. done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: URL: https://github.com/apache/spark/pull/27627#discussion_r433397162 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -62,38 +62,74 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val isEmpty = AttributeReference("isEmpty", BooleanType, nullable = false)() + private lazy val zero = Literal.default(sumDataType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = resultType match { +case _: DecimalType => sum :: isEmpty :: Nil +case _ => sum :: Nil + } - override lazy val initialValues: Seq[Expression] = Seq( -/* sum = */ Literal.create(null, sumDataType) - ) + override lazy val initialValues: Seq[Expression] = resultType match { +case _: DecimalType => Seq(Literal(null, resultType), Literal(true, BooleanType)) +case _ => Seq(Literal(null, resultType)) + } override lazy val updateExpressions: Seq[Expression] = { if (child.nullable) { - Seq( -/* sum = */ -coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) - ) + val updateSumExpr = coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) + resultType match { +case _: DecimalType => + Seq(updateSumExpr, isEmpty && child.isNull) +case _ => Seq(updateSumExpr) + } } else { - Seq( -/* sum = */ -coalesce(sum, zero) + child.cast(sumDataType) - ) + val updateSumExpr = coalesce(sum, zero) + child.cast(sumDataType) + resultType match { +case _: DecimalType => + Seq(updateSumExpr, Literal(false, BooleanType)) +case _ => Seq(updateSumExpr) + } } } + /** + * For decimal type: + * If isEmpty is false and if sum is null, then it means we have an overflow. + * + * update of the sum is as follows: + * Check if either portion of the left.sum or right.sum has overflowed + * If it has, then the sum value will remain null. + * If it did not have overflow, then add the sum.left and sum.right and check for overflow. Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: URL: https://github.com/apache/spark/pull/27627#discussion_r426027330 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -62,38 +62,113 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val isEmptyOrNulls = AttributeReference("isEmptyOrNulls", BooleanType, false)() + private lazy val zero = Literal.default(sumDataType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = sum :: isEmptyOrNulls :: Nil override lazy val initialValues: Seq[Expression] = Seq( -/* sum = */ Literal.create(null, sumDataType) +/* sum = */ zero, +/* isEmptyOrNulls = */ Literal.create(true, BooleanType) ) + /** + * For decimal types and when child is nullable: + * isEmptyOrNulls flag is a boolean to represent if there are no rows or if all rows that + * have been seen are null. This will be used to identify if the end result of sum in + * evaluateExpression should be null or not. + * + * Update of the isEmptyOrNulls flag: + * If this flag is false, then keep it as is. + * If this flag is true, then check if the incoming value is null and if it is null, keep it + * as true else update it to false. + * Once this flag is switched to false, it will remain false. + * + * The update of the sum is as follows: + * If sum is null, then we have a case of overflow, so keep sum as is. + * If sum is not null, and the incoming value is not null, then perform the addition along + * with the overflow checking. Note, that if overflow occurs, then sum will be null here. + * If the new incoming value is null, we will keep the sum in buffer as is and skip this + * incoming null + */ override lazy val updateExpressions: Seq[Expression] = { if (child.nullable) { - Seq( -/* sum = */ -coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) - ) + resultType match { +case d: DecimalType => + Seq( +/* sum */ +If(IsNull(sum), sum, + If(IsNotNull(child.cast(sumDataType)), +CheckOverflow(sum + child.cast(sumDataType), d, true), sum)), +/* isEmptyOrNulls */ +If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls) + ) +case _ => + Seq( +coalesce(sum + child.cast(sumDataType), sum), +If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls) + ) + } } else { - Seq( -/* sum = */ -coalesce(sum, zero) + child.cast(sumDataType) - ) + resultType match { +case d: DecimalType => + Seq( +/* sum */ +If(IsNull(sum), sum, CheckOverflow(sum + child.cast(sumDataType), d, true)), +/* isEmptyOrNulls */ +false + ) +case _ => Seq(sum + child.cast(sumDataType), false) + } } } + /** + * For decimal type: + * update of the sum is as follows: + * Check if either portion of the left.sum or right.sum has overflowed + * If it has, then the sum value will remain null. + * If it did not have overflow, then add the sum.left and sum.right and check for overflow. + * + * isEmptyOrNulls: Set to false if either one of the left or right is set to false. This + * means we have seen atleast a row that was not null. + * If the value from bufferLeft and bufferRight are both true, then this will be true. + */ override lazy val mergeExpressions: Seq[Expression] = { -Seq( - /* sum = */ - coalesce(coalesce(sum.left, zero) + sum.right, sum.left) -) +resultType match { + case d: DecimalType => +Seq( + /* sum = */ + If(And(IsNull(sum.left), EqualTo(isEmptyOrNulls.left, false)) || +And(IsNull(sum.right), EqualTo(isEmptyOrNulls.right, false)), + Literal.create(null, resultType), + CheckOverflow(sum.left + sum.right, d, true)), + /* isEmptyOrNulls = */ + And(isEmptyOrNulls.left, isEmptyOrNulls.right) + ) + case _ => +Seq( + coalesce(sum.left + sum.right, sum.left), + And(isEmptyOrNulls.left, isEmptyOrNulls.right) +) +} } + /** + * If the isEmptyOrNulls is true, then it means either there are no rows, or all the rows were + * null, so the result will be null. + * If the isEmptyOrNulls is false, then if sum is null that means an overflow has happened. + * So now, if ansi is enabled, then throw exception, if not then return null. + * If sum is not null, then return the sum. Review comment: Please see my comment above as why we need to check overflow at mergeExpressions.
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: URL: https://github.com/apache/spark/pull/27627#discussion_r426026716 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -62,38 +62,113 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val isEmptyOrNulls = AttributeReference("isEmptyOrNulls", BooleanType, false)() + private lazy val zero = Literal.default(sumDataType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = sum :: isEmptyOrNulls :: Nil override lazy val initialValues: Seq[Expression] = Seq( -/* sum = */ Literal.create(null, sumDataType) +/* sum = */ zero, +/* isEmptyOrNulls = */ Literal.create(true, BooleanType) ) + /** + * For decimal types and when child is nullable: + * isEmptyOrNulls flag is a boolean to represent if there are no rows or if all rows that + * have been seen are null. This will be used to identify if the end result of sum in + * evaluateExpression should be null or not. + * + * Update of the isEmptyOrNulls flag: + * If this flag is false, then keep it as is. + * If this flag is true, then check if the incoming value is null and if it is null, keep it + * as true else update it to false. + * Once this flag is switched to false, it will remain false. + * + * The update of the sum is as follows: + * If sum is null, then we have a case of overflow, so keep sum as is. + * If sum is not null, and the incoming value is not null, then perform the addition along + * with the overflow checking. Note, that if overflow occurs, then sum will be null here. + * If the new incoming value is null, we will keep the sum in buffer as is and skip this + * incoming null + */ override lazy val updateExpressions: Seq[Expression] = { if (child.nullable) { - Seq( -/* sum = */ -coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) - ) + resultType match { +case d: DecimalType => + Seq( +/* sum */ +If(IsNull(sum), sum, + If(IsNotNull(child.cast(sumDataType)), +CheckOverflow(sum + child.cast(sumDataType), d, true), sum)), +/* isEmptyOrNulls */ +If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls) + ) +case _ => + Seq( +coalesce(sum + child.cast(sumDataType), sum), +If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls) + ) + } } else { - Seq( -/* sum = */ -coalesce(sum, zero) + child.cast(sumDataType) - ) + resultType match { +case d: DecimalType => + Seq( +/* sum */ +If(IsNull(sum), sum, CheckOverflow(sum + child.cast(sumDataType), d, true)), +/* isEmptyOrNulls */ +false + ) +case _ => Seq(sum + child.cast(sumDataType), false) + } } } + /** + * For decimal type: + * update of the sum is as follows: + * Check if either portion of the left.sum or right.sum has overflowed + * If it has, then the sum value will remain null. + * If it did not have overflow, then add the sum.left and sum.right and check for overflow. Review comment: We need this here since merge_sum and evaluate may not always be in the same whole stage codegen. For e.g a query that has a distinct and a sum. Notice the first merge_sum is in a different stage(2) and the final sum is in stage (3). ``` == Physical Plan == *(3) HashAggregate(keys=[intNum#8], functions=[sum(decNum#7), count(distinct decNum#7)], output=[count(DISTINCT decNum)#35L, sum(decNum)#36]) +- Exchange hashpartitioning(intNum#8, 5), true, [id=#35] +- *(2) HashAggregate(keys=[intNum#8], functions=[merge_sum(decNum#7), partial_count(distinct decNum#7)], output=[intNum#8, sum#40, count#43L]) +- *(2) HashAggregate(keys=[intNum#8, decNum#7], functions=[merge_sum(decNum#7)], output=[intNum#8, decNum#7, sum#40]) +- Exchange hashpartitioning(intNum#8, decNum#7, 5), true, [id=#30] +- *(1) HashAggregate(keys=[intNum#8, decNum#7], functions=[partial_sum(decNum#7)], output=[intNum#8, decNum#7, sum#40]) +- *(1) Project [_1#2 AS decNum#7, _2#3 AS intNum#8] +- *(1) LocalTableScan [_1#2, _2#3] ``` I have added this scenario in the test suite as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: URL: https://github.com/apache/spark/pull/27627#discussion_r426022883 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -62,38 +62,113 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val isEmptyOrNulls = AttributeReference("isEmptyOrNulls", BooleanType, false)() + private lazy val zero = Literal.default(sumDataType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = sum :: isEmptyOrNulls :: Nil override lazy val initialValues: Seq[Expression] = Seq( -/* sum = */ Literal.create(null, sumDataType) +/* sum = */ zero, +/* isEmptyOrNulls = */ Literal.create(true, BooleanType) ) + /** + * For decimal types and when child is nullable: + * isEmptyOrNulls flag is a boolean to represent if there are no rows or if all rows that + * have been seen are null. This will be used to identify if the end result of sum in + * evaluateExpression should be null or not. + * + * Update of the isEmptyOrNulls flag: + * If this flag is false, then keep it as is. + * If this flag is true, then check if the incoming value is null and if it is null, keep it + * as true else update it to false. + * Once this flag is switched to false, it will remain false. + * + * The update of the sum is as follows: + * If sum is null, then we have a case of overflow, so keep sum as is. + * If sum is not null, and the incoming value is not null, then perform the addition along + * with the overflow checking. Note, that if overflow occurs, then sum will be null here. Review comment: Yes it is necessary. Otherwise basically for when the HashAggregate has keys, it can throw an exception when building the hash map if overflow happened. As an example, one of the examples we have already seen and discussed won't work(will throw an exception) if we remove this. df.select(expr(s"cast('$decimalStr' as decimal (38, 18)) as d"), lit(1).as("key")).groupBy("key").agg(sum($"d")).show Please see this comment that explained the reasoning: https://github.com/apache/spark/pull/27627#issuecomment-597404017 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: URL: https://github.com/apache/spark/pull/27627#discussion_r426015667 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -62,38 +62,113 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val isEmptyOrNulls = AttributeReference("isEmptyOrNulls", BooleanType, false)() + private lazy val zero = Literal.default(sumDataType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = sum :: isEmptyOrNulls :: Nil override lazy val initialValues: Seq[Expression] = Seq( -/* sum = */ Literal.create(null, sumDataType) +/* sum = */ zero, +/* isEmptyOrNulls = */ Literal.create(true, BooleanType) ) + /** + * For decimal types and when child is nullable: + * isEmptyOrNulls flag is a boolean to represent if there are no rows or if all rows that + * have been seen are null. This will be used to identify if the end result of sum in + * evaluateExpression should be null or not. + * + * Update of the isEmptyOrNulls flag: + * If this flag is false, then keep it as is. + * If this flag is true, then check if the incoming value is null and if it is null, keep it + * as true else update it to false. + * Once this flag is switched to false, it will remain false. + * + * The update of the sum is as follows: + * If sum is null, then we have a case of overflow, so keep sum as is. + * If sum is not null, and the incoming value is not null, then perform the addition along + * with the overflow checking. Note, that if overflow occurs, then sum will be null here. + * If the new incoming value is null, we will keep the sum in buffer as is and skip this + * incoming null + */ override lazy val updateExpressions: Seq[Expression] = { if (child.nullable) { - Seq( -/* sum = */ -coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) - ) + resultType match { +case d: DecimalType => + Seq( +/* sum */ +If(IsNull(sum), sum, + If(IsNotNull(child.cast(sumDataType)), +CheckOverflow(sum + child.cast(sumDataType), d, true), sum)), +/* isEmptyOrNulls */ +If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls) + ) +case _ => + Seq( +coalesce(sum + child.cast(sumDataType), sum), +If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls) + ) + } } else { - Seq( -/* sum = */ -coalesce(sum, zero) + child.cast(sumDataType) - ) + resultType match { +case d: DecimalType => + Seq( +/* sum */ +If(IsNull(sum), sum, CheckOverflow(sum + child.cast(sumDataType), d, true)), +/* isEmptyOrNulls */ +false + ) +case _ => Seq(sum + child.cast(sumDataType), false) + } } } + /** + * For decimal type: + * update of the sum is as follows: + * Check if either portion of the left.sum or right.sum has overflowed Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: URL: https://github.com/apache/spark/pull/27627#discussion_r426015765 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -62,38 +62,113 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val isEmptyOrNulls = AttributeReference("isEmptyOrNulls", BooleanType, false)() + private lazy val zero = Literal.default(sumDataType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = sum :: isEmptyOrNulls :: Nil Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow URL: https://github.com/apache/spark/pull/27627#discussion_r408623119 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/DecimalSum.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +case class DecimalSum(child: Expression) extends DeclarativeAggregate with ImplicitCastInputTypes { Review comment: Please see the comment https://github.com/apache/spark/pull/27627#discussion_r408622772 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow URL: https://github.com/apache/spark/pull/27627#discussion_r408622772 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -240,6 +240,7 @@ class Analyzer( ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: + UseDecimalSum :: Review comment: Thanks @maropu for your comments. Based on the discussion here https://github.com/apache/spark/pull/27627#issuecomment-611308104, the changes are now back only in Sum, which removes the necessity of this rule. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow URL: https://github.com/apache/spark/pull/27627#discussion_r386739126 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala ## @@ -132,3 +132,32 @@ case class CheckOverflow( override def sql: String = child.sql } + +case class HasOverflow( +child: Expression, +inputType: DecimalType) extends UnaryExpression { + + override def dataType: DataType = BooleanType + + override def nullable: Boolean = true Review comment: Since the child can be nullable, the input value can be null. Making nullable to false in that case will not work, as it may result in npe. We can change the doGenCode() here to make the check for null for that, but since the nullSafeCodeGen in UnaryExpression already takes care of the if nullable checks, it seems there is no need to add if null checks here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow URL: https://github.com/apache/spark/pull/27627#discussion_r380941152 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -60,38 +60,104 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sumDataType = resultType private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val overflow = AttributeReference("overflow", BooleanType, false)() private lazy val zero = Literal.default(resultType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = sum :: overflow :: Nil override lazy val initialValues: Seq[Expression] = Seq( -/* sum = */ Literal.create(null, sumDataType) +/* sum = */ Literal.create(null, sumDataType), +/* overflow = */ Literal.create(false, BooleanType) Review comment: We keep track of overflow using this aggBufferAttributes - overflow to know if any of the intermediate add operations in updateExpressions and/or mergeExpressions overflow'd. If the overflow is true and if spark.sql.ansi.enabled flag is false, then we return null for the sum operation in evaluateExpression. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow URL: https://github.com/apache/spark/pull/27627#discussion_r380939010 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala ## @@ -60,38 +60,104 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sumDataType = resultType private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val overflow = AttributeReference("overflow", BooleanType, false)() private lazy val zero = Literal.default(resultType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = sum :: overflow :: Nil override lazy val initialValues: Seq[Expression] = Seq( -/* sum = */ Literal.create(null, sumDataType) +/* sum = */ Literal.create(null, sumDataType), +/* overflow = */ Literal.create(false, BooleanType) ) override lazy val updateExpressions: Seq[Expression] = { -if (child.nullable) { +if (!SQLConf.get.ansiEnabled) { + if (child.nullable) { +Seq( + /* sum = */ + coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum), Review comment: The changes in the Sum are mostly to check if overflow has occurred when we do the different additions in the updateExpressions and mergeExpressions. The actual addition operations are all the same. Reading the diff may not show that easily so wanted to make a note here on that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org