[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-06-01 Thread GitBox


skambha commented on a change in pull request #27627:
URL: https://github.com/apache/spark/pull/27627#discussion_r433405913



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
##
@@ -62,38 +62,74 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate with ImplicitCast
 
   private lazy val sum = AttributeReference("sum", sumDataType)()
 
+  private lazy val isEmpty = AttributeReference("isEmpty", BooleanType, 
nullable = false)()
+
   private lazy val zero = Literal.default(sumDataType)
 
-  override lazy val aggBufferAttributes = sum :: Nil
+  override lazy val aggBufferAttributes = resultType match {
+case _: DecimalType => sum :: isEmpty :: Nil
+case _ => sum :: Nil
+  }
 
-  override lazy val initialValues: Seq[Expression] = Seq(
-/* sum = */ Literal.create(null, sumDataType)
-  )
+  override lazy val initialValues: Seq[Expression] = resultType match {
+case _: DecimalType => Seq(Literal(null, resultType), Literal(true, 
BooleanType))
+case _ => Seq(Literal(null, resultType))
+  }
 
   override lazy val updateExpressions: Seq[Expression] = {
 if (child.nullable) {
-  Seq(
-/* sum = */
-coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum)
-  )
+  val updateSumExpr = coalesce(coalesce(sum, zero) + 
child.cast(sumDataType), sum)
+  resultType match {
+case _: DecimalType =>
+  Seq(updateSumExpr, isEmpty && child.isNull)
+case _ => Seq(updateSumExpr)
+  }
 } else {
-  Seq(
-/* sum = */
-coalesce(sum, zero) + child.cast(sumDataType)
-  )
+  val updateSumExpr = coalesce(sum, zero) + child.cast(sumDataType)
+  resultType match {
+case _: DecimalType =>
+  Seq(updateSumExpr, Literal(false, BooleanType))
+case _ => Seq(updateSumExpr)
+  }
 }
   }
 
+  /**
+   * For decimal type:
+   * If isEmpty is false and if sum is null, then it means we have an overflow.
+   *
+   * update of the sum is as follows:
+   * Check if either portion of the left.sum or right.sum has overflowed
+   * If it has, then the sum value will remain null.
+   * If it did not have overflow, then add the sum.left and sum.right and 
check for overflow.

Review comment:
   Thanks @cloud-fan, I have pushed the code comments changes.  Please take 
a look. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-06-01 Thread GitBox


skambha commented on a change in pull request #27627:
URL: https://github.com/apache/spark/pull/27627#discussion_r433404148



##
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
##
@@ -200,14 +222,90 @@ class DataFrameSuite extends QueryTest
 Seq(true, false).foreach { ansiEnabled =>
   withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) {
 val structDf = largeDecimals.select("a").agg(sum("a"))
-if (!ansiEnabled) {
-  checkAnswer(structDf, Row(null))
-} else {
-  val e = intercept[SparkException] {
-structDf.collect
+assertDecimalSumOverflow(structDf, ansiEnabled, Row(null))
+  }
+}
+  }
+
+  test("SPARK-28067: sum of null decimal values") {
+Seq("true", "false").foreach { wholeStageEnabled =>
+  withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) 
{
+Seq("true", "false").foreach { ansiEnabled =>
+  withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled)) {
+val df = spark.range(1, 4, 1).select(expr(s"cast(null as 
decimal(38,18)) as d"))
+checkAnswer(df.agg(sum($"d")), Row(null))
+  }
+}
+  }
+}
+  }
+
+  test("SPARK-28067: Aggregate sum should not return wrong results for decimal 
overflow") {
+Seq("true", "false").foreach { wholeStageEnabled =>
+  withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) 
{
+Seq(true, false).foreach { ansiEnabled =>
+  withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) {
+val df0 = Seq(
+  (BigDecimal("1000"), 1),
+  (BigDecimal("1000"), 1),
+  (BigDecimal("1000"), 2)).toDF("decNum", "intNum")
+val df1 = Seq(
+  (BigDecimal("1000"), 2),
+  (BigDecimal("1000"), 2),
+  (BigDecimal("1000"), 2),
+  (BigDecimal("1000"), 2),
+  (BigDecimal("1000"), 2),
+  (BigDecimal("1000"), 2),
+  (BigDecimal("1000"), 2),
+  (BigDecimal("1000"), 2),
+  (BigDecimal("1000"), 2)).toDF("decNum", "intNum")
+val df = df0.union(df1)
+val df2 = df.withColumnRenamed("decNum", "decNum2").
+  join(df, "intNum").agg(sum("decNum"))

Review comment:
   Sure. Yes, I have added the simplified version of the repro without join 
 and other cases as well in the UT.  All the cases we have discussed in the pr 
are added there.   The reason I added this is it is the original repro from the 
JIRA creator who reported this issue. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-06-01 Thread GitBox


skambha commented on a change in pull request #27627:
URL: https://github.com/apache/spark/pull/27627#discussion_r433397053



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
##
@@ -62,38 +62,74 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate with ImplicitCast
 
   private lazy val sum = AttributeReference("sum", sumDataType)()
 
+  private lazy val isEmpty = AttributeReference("isEmpty", BooleanType, 
nullable = false)()
+
   private lazy val zero = Literal.default(sumDataType)
 
-  override lazy val aggBufferAttributes = sum :: Nil
+  override lazy val aggBufferAttributes = resultType match {
+case _: DecimalType => sum :: isEmpty :: Nil
+case _ => sum :: Nil
+  }
 
-  override lazy val initialValues: Seq[Expression] = Seq(
-/* sum = */ Literal.create(null, sumDataType)
-  )
+  override lazy val initialValues: Seq[Expression] = resultType match {
+case _: DecimalType => Seq(Literal(null, resultType), Literal(true, 
BooleanType))
+case _ => Seq(Literal(null, resultType))
+  }
 
   override lazy val updateExpressions: Seq[Expression] = {
 if (child.nullable) {
-  Seq(
-/* sum = */
-coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum)
-  )
+  val updateSumExpr = coalesce(coalesce(sum, zero) + 
child.cast(sumDataType), sum)
+  resultType match {
+case _: DecimalType =>
+  Seq(updateSumExpr, isEmpty && child.isNull)
+case _ => Seq(updateSumExpr)
+  }
 } else {
-  Seq(
-/* sum = */
-coalesce(sum, zero) + child.cast(sumDataType)
-  )
+  val updateSumExpr = coalesce(sum, zero) + child.cast(sumDataType)
+  resultType match {
+case _: DecimalType =>
+  Seq(updateSumExpr, Literal(false, BooleanType))
+case _ => Seq(updateSumExpr)
+  }
 }
   }
 
+  /**
+   * For decimal type:
+   * If isEmpty is false and if sum is null, then it means we have an overflow.
+   *
+   * update of the sum is as follows:
+   * Check if either portion of the left.sum or right.sum has overflowed
+   * If it has, then the sum value will remain null.
+   * If it did not have overflow, then add the sum.left and sum.right and 
check for overflow.
+   *
+   * isEmpty:  Set to false if either one of the left or right is set to 
false. This
+   * means we have seen atleast a row that was not null.

Review comment:
   sure. done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-06-01 Thread GitBox


skambha commented on a change in pull request #27627:
URL: https://github.com/apache/spark/pull/27627#discussion_r433397162



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
##
@@ -62,38 +62,74 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate with ImplicitCast
 
   private lazy val sum = AttributeReference("sum", sumDataType)()
 
+  private lazy val isEmpty = AttributeReference("isEmpty", BooleanType, 
nullable = false)()
+
   private lazy val zero = Literal.default(sumDataType)
 
-  override lazy val aggBufferAttributes = sum :: Nil
+  override lazy val aggBufferAttributes = resultType match {
+case _: DecimalType => sum :: isEmpty :: Nil
+case _ => sum :: Nil
+  }
 
-  override lazy val initialValues: Seq[Expression] = Seq(
-/* sum = */ Literal.create(null, sumDataType)
-  )
+  override lazy val initialValues: Seq[Expression] = resultType match {
+case _: DecimalType => Seq(Literal(null, resultType), Literal(true, 
BooleanType))
+case _ => Seq(Literal(null, resultType))
+  }
 
   override lazy val updateExpressions: Seq[Expression] = {
 if (child.nullable) {
-  Seq(
-/* sum = */
-coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum)
-  )
+  val updateSumExpr = coalesce(coalesce(sum, zero) + 
child.cast(sumDataType), sum)
+  resultType match {
+case _: DecimalType =>
+  Seq(updateSumExpr, isEmpty && child.isNull)
+case _ => Seq(updateSumExpr)
+  }
 } else {
-  Seq(
-/* sum = */
-coalesce(sum, zero) + child.cast(sumDataType)
-  )
+  val updateSumExpr = coalesce(sum, zero) + child.cast(sumDataType)
+  resultType match {
+case _: DecimalType =>
+  Seq(updateSumExpr, Literal(false, BooleanType))
+case _ => Seq(updateSumExpr)
+  }
 }
   }
 
+  /**
+   * For decimal type:
+   * If isEmpty is false and if sum is null, then it means we have an overflow.
+   *
+   * update of the sum is as follows:
+   * Check if either portion of the left.sum or right.sum has overflowed
+   * If it has, then the sum value will remain null.
+   * If it did not have overflow, then add the sum.left and sum.right and 
check for overflow.

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-05-15 Thread GitBox


skambha commented on a change in pull request #27627:
URL: https://github.com/apache/spark/pull/27627#discussion_r426027330



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
##
@@ -62,38 +62,113 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate with ImplicitCast
 
   private lazy val sum = AttributeReference("sum", sumDataType)()
 
+  private lazy val isEmptyOrNulls = AttributeReference("isEmptyOrNulls", 
BooleanType, false)()
+
   private lazy val zero = Literal.default(sumDataType)
 
-  override lazy val aggBufferAttributes = sum :: Nil
+  override lazy val aggBufferAttributes = sum :: isEmptyOrNulls :: Nil
 
   override lazy val initialValues: Seq[Expression] = Seq(
-/* sum = */ Literal.create(null, sumDataType)
+/* sum = */  zero,
+/* isEmptyOrNulls = */ Literal.create(true, BooleanType)
   )
 
+  /**
+   * For decimal types and when child is nullable:
+   * isEmptyOrNulls flag is a boolean to represent if there are no rows or if 
all rows that
+   * have been seen are null.  This will be used to identify if the end result 
of sum in
+   * evaluateExpression should be null or not.
+   *
+   * Update of the isEmptyOrNulls flag:
+   * If this flag is false, then keep it as is.
+   * If this flag is true, then check if the incoming value is null and if it 
is null, keep it
+   * as true else update it to false.
+   * Once this flag is switched to false, it will remain false.
+   *
+   * The update of the sum is as follows:
+   * If sum is null, then we have a case of overflow, so keep sum as is.
+   * If sum is not null, and the incoming value is not null, then perform the 
addition along
+   * with the overflow checking. Note, that if overflow occurs, then sum will 
be null here.
+   * If the new incoming value is null, we will keep the sum in buffer as is 
and skip this
+   * incoming null
+   */
   override lazy val updateExpressions: Seq[Expression] = {
 if (child.nullable) {
-  Seq(
-/* sum = */
-coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum)
-  )
+  resultType match {
+case d: DecimalType =>
+  Seq(
+/* sum */
+If(IsNull(sum), sum,
+  If(IsNotNull(child.cast(sumDataType)),
+CheckOverflow(sum + child.cast(sumDataType), d, true), sum)),
+/* isEmptyOrNulls */
+If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls)
+  )
+case _ =>
+  Seq(
+coalesce(sum + child.cast(sumDataType), sum),
+If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls)
+  )
+  }
 } else {
-  Seq(
-/* sum = */
-coalesce(sum, zero) + child.cast(sumDataType)
-  )
+  resultType match {
+case d: DecimalType =>
+  Seq(
+/* sum */
+If(IsNull(sum), sum, CheckOverflow(sum + child.cast(sumDataType), 
d, true)),
+/* isEmptyOrNulls */
+false
+  )
+case _ => Seq(sum + child.cast(sumDataType), false)
+  }
 }
   }
 
+  /**
+   * For decimal type:
+   * update of the sum is as follows:
+   * Check if either portion of the left.sum or right.sum has overflowed
+   * If it has, then the sum value will remain null.
+   * If it did not have overflow, then add the sum.left and sum.right and 
check for overflow.
+   *
+   * isEmptyOrNulls:  Set to false if either one of the left or right is set 
to false. This
+   * means we have seen atleast a row that was not null.
+   * If the value from bufferLeft and bufferRight are both true, then this 
will be true.
+   */
   override lazy val mergeExpressions: Seq[Expression] = {
-Seq(
-  /* sum = */
-  coalesce(coalesce(sum.left, zero) + sum.right, sum.left)
-)
+resultType match {
+  case d: DecimalType =>
+Seq(
+  /* sum = */
+  If(And(IsNull(sum.left), EqualTo(isEmptyOrNulls.left, false)) ||
+And(IsNull(sum.right), EqualTo(isEmptyOrNulls.right, false)),
+  Literal.create(null, resultType),
+  CheckOverflow(sum.left + sum.right, d, true)),
+  /* isEmptyOrNulls = */
+  And(isEmptyOrNulls.left, isEmptyOrNulls.right)
+  )
+  case _ =>
+Seq(
+  coalesce(sum.left + sum.right, sum.left),
+  And(isEmptyOrNulls.left, isEmptyOrNulls.right)
+)
+}
   }
 
+  /**
+   * If the isEmptyOrNulls is true, then it means either there are no rows, or 
all the rows were
+   * null, so the result will be null.
+   * If the isEmptyOrNulls is false, then if sum is null that means an 
overflow has happened.
+   * So now, if ansi is enabled, then throw exception, if not then return null.
+   * If sum is not null, then return the sum.

Review comment:
   Please see my comment above as why we need to check overflow at 
mergeExpressions.  

[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-05-15 Thread GitBox


skambha commented on a change in pull request #27627:
URL: https://github.com/apache/spark/pull/27627#discussion_r426026716



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
##
@@ -62,38 +62,113 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate with ImplicitCast
 
   private lazy val sum = AttributeReference("sum", sumDataType)()
 
+  private lazy val isEmptyOrNulls = AttributeReference("isEmptyOrNulls", 
BooleanType, false)()
+
   private lazy val zero = Literal.default(sumDataType)
 
-  override lazy val aggBufferAttributes = sum :: Nil
+  override lazy val aggBufferAttributes = sum :: isEmptyOrNulls :: Nil
 
   override lazy val initialValues: Seq[Expression] = Seq(
-/* sum = */ Literal.create(null, sumDataType)
+/* sum = */  zero,
+/* isEmptyOrNulls = */ Literal.create(true, BooleanType)
   )
 
+  /**
+   * For decimal types and when child is nullable:
+   * isEmptyOrNulls flag is a boolean to represent if there are no rows or if 
all rows that
+   * have been seen are null.  This will be used to identify if the end result 
of sum in
+   * evaluateExpression should be null or not.
+   *
+   * Update of the isEmptyOrNulls flag:
+   * If this flag is false, then keep it as is.
+   * If this flag is true, then check if the incoming value is null and if it 
is null, keep it
+   * as true else update it to false.
+   * Once this flag is switched to false, it will remain false.
+   *
+   * The update of the sum is as follows:
+   * If sum is null, then we have a case of overflow, so keep sum as is.
+   * If sum is not null, and the incoming value is not null, then perform the 
addition along
+   * with the overflow checking. Note, that if overflow occurs, then sum will 
be null here.
+   * If the new incoming value is null, we will keep the sum in buffer as is 
and skip this
+   * incoming null
+   */
   override lazy val updateExpressions: Seq[Expression] = {
 if (child.nullable) {
-  Seq(
-/* sum = */
-coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum)
-  )
+  resultType match {
+case d: DecimalType =>
+  Seq(
+/* sum */
+If(IsNull(sum), sum,
+  If(IsNotNull(child.cast(sumDataType)),
+CheckOverflow(sum + child.cast(sumDataType), d, true), sum)),
+/* isEmptyOrNulls */
+If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls)
+  )
+case _ =>
+  Seq(
+coalesce(sum + child.cast(sumDataType), sum),
+If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls)
+  )
+  }
 } else {
-  Seq(
-/* sum = */
-coalesce(sum, zero) + child.cast(sumDataType)
-  )
+  resultType match {
+case d: DecimalType =>
+  Seq(
+/* sum */
+If(IsNull(sum), sum, CheckOverflow(sum + child.cast(sumDataType), 
d, true)),
+/* isEmptyOrNulls */
+false
+  )
+case _ => Seq(sum + child.cast(sumDataType), false)
+  }
 }
   }
 
+  /**
+   * For decimal type:
+   * update of the sum is as follows:
+   * Check if either portion of the left.sum or right.sum has overflowed
+   * If it has, then the sum value will remain null.
+   * If it did not have overflow, then add the sum.left and sum.right and 
check for overflow.

Review comment:
   We need this here since merge_sum and evaluate may not always be in the 
same whole stage codegen.  
   
   For e.g a query that has a distinct and a sum. Notice the first merge_sum is 
in a different stage(2) and the final sum is in stage (3).
   
   ```
   == Physical Plan ==
   *(3) HashAggregate(keys=[intNum#8], functions=[sum(decNum#7), count(distinct 
decNum#7)], output=[count(DISTINCT decNum)#35L, sum(decNum)#36])
   +- Exchange hashpartitioning(intNum#8, 5), true, [id=#35]
  +- *(2) HashAggregate(keys=[intNum#8], functions=[merge_sum(decNum#7), 
partial_count(distinct decNum#7)], output=[intNum#8, sum#40, count#43L])
 +- *(2) HashAggregate(keys=[intNum#8, decNum#7], 
functions=[merge_sum(decNum#7)], output=[intNum#8, decNum#7, sum#40])
+- Exchange hashpartitioning(intNum#8, decNum#7, 5), true, [id=#30]
   +- *(1) HashAggregate(keys=[intNum#8, decNum#7], 
functions=[partial_sum(decNum#7)], output=[intNum#8, decNum#7, sum#40])
  +- *(1) Project [_1#2 AS decNum#7, _2#3 AS intNum#8]
 +- *(1) LocalTableScan [_1#2, _2#3]
   ```
   
   I have added this scenario in the test suite as well. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-05-15 Thread GitBox


skambha commented on a change in pull request #27627:
URL: https://github.com/apache/spark/pull/27627#discussion_r426022883



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
##
@@ -62,38 +62,113 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate with ImplicitCast
 
   private lazy val sum = AttributeReference("sum", sumDataType)()
 
+  private lazy val isEmptyOrNulls = AttributeReference("isEmptyOrNulls", 
BooleanType, false)()
+
   private lazy val zero = Literal.default(sumDataType)
 
-  override lazy val aggBufferAttributes = sum :: Nil
+  override lazy val aggBufferAttributes = sum :: isEmptyOrNulls :: Nil
 
   override lazy val initialValues: Seq[Expression] = Seq(
-/* sum = */ Literal.create(null, sumDataType)
+/* sum = */  zero,
+/* isEmptyOrNulls = */ Literal.create(true, BooleanType)
   )
 
+  /**
+   * For decimal types and when child is nullable:
+   * isEmptyOrNulls flag is a boolean to represent if there are no rows or if 
all rows that
+   * have been seen are null.  This will be used to identify if the end result 
of sum in
+   * evaluateExpression should be null or not.
+   *
+   * Update of the isEmptyOrNulls flag:
+   * If this flag is false, then keep it as is.
+   * If this flag is true, then check if the incoming value is null and if it 
is null, keep it
+   * as true else update it to false.
+   * Once this flag is switched to false, it will remain false.
+   *
+   * The update of the sum is as follows:
+   * If sum is null, then we have a case of overflow, so keep sum as is.
+   * If sum is not null, and the incoming value is not null, then perform the 
addition along
+   * with the overflow checking. Note, that if overflow occurs, then sum will 
be null here.

Review comment:
   Yes it is necessary.   Otherwise basically for when the HashAggregate 
has keys, it can throw an exception when building the hash map if overflow 
happened.
   
   As an example, one of the examples we have already seen and discussed won't 
work(will throw an exception) if we remove this.
df.select(expr(s"cast('$decimalStr' as decimal (38, 18)) as d"), 
lit(1).as("key")).groupBy("key").agg(sum($"d")).show
   Please see this comment that explained the reasoning: 
https://github.com/apache/spark/pull/27627#issuecomment-597404017





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-05-15 Thread GitBox


skambha commented on a change in pull request #27627:
URL: https://github.com/apache/spark/pull/27627#discussion_r426015667



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
##
@@ -62,38 +62,113 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate with ImplicitCast
 
   private lazy val sum = AttributeReference("sum", sumDataType)()
 
+  private lazy val isEmptyOrNulls = AttributeReference("isEmptyOrNulls", 
BooleanType, false)()
+
   private lazy val zero = Literal.default(sumDataType)
 
-  override lazy val aggBufferAttributes = sum :: Nil
+  override lazy val aggBufferAttributes = sum :: isEmptyOrNulls :: Nil
 
   override lazy val initialValues: Seq[Expression] = Seq(
-/* sum = */ Literal.create(null, sumDataType)
+/* sum = */  zero,
+/* isEmptyOrNulls = */ Literal.create(true, BooleanType)
   )
 
+  /**
+   * For decimal types and when child is nullable:
+   * isEmptyOrNulls flag is a boolean to represent if there are no rows or if 
all rows that
+   * have been seen are null.  This will be used to identify if the end result 
of sum in
+   * evaluateExpression should be null or not.
+   *
+   * Update of the isEmptyOrNulls flag:
+   * If this flag is false, then keep it as is.
+   * If this flag is true, then check if the incoming value is null and if it 
is null, keep it
+   * as true else update it to false.
+   * Once this flag is switched to false, it will remain false.
+   *
+   * The update of the sum is as follows:
+   * If sum is null, then we have a case of overflow, so keep sum as is.
+   * If sum is not null, and the incoming value is not null, then perform the 
addition along
+   * with the overflow checking. Note, that if overflow occurs, then sum will 
be null here.
+   * If the new incoming value is null, we will keep the sum in buffer as is 
and skip this
+   * incoming null
+   */
   override lazy val updateExpressions: Seq[Expression] = {
 if (child.nullable) {
-  Seq(
-/* sum = */
-coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum)
-  )
+  resultType match {
+case d: DecimalType =>
+  Seq(
+/* sum */
+If(IsNull(sum), sum,
+  If(IsNotNull(child.cast(sumDataType)),
+CheckOverflow(sum + child.cast(sumDataType), d, true), sum)),
+/* isEmptyOrNulls */
+If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls)
+  )
+case _ =>
+  Seq(
+coalesce(sum + child.cast(sumDataType), sum),
+If(isEmptyOrNulls, IsNull(child.cast(sumDataType)), isEmptyOrNulls)
+  )
+  }
 } else {
-  Seq(
-/* sum = */
-coalesce(sum, zero) + child.cast(sumDataType)
-  )
+  resultType match {
+case d: DecimalType =>
+  Seq(
+/* sum */
+If(IsNull(sum), sum, CheckOverflow(sum + child.cast(sumDataType), 
d, true)),
+/* isEmptyOrNulls */
+false
+  )
+case _ => Seq(sum + child.cast(sumDataType), false)
+  }
 }
   }
 
+  /**
+   * For decimal type:
+   * update of the sum is as follows:
+   * Check if either portion of the left.sum or right.sum has overflowed

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-05-15 Thread GitBox


skambha commented on a change in pull request #27627:
URL: https://github.com/apache/spark/pull/27627#discussion_r426015765



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
##
@@ -62,38 +62,113 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate with ImplicitCast
 
   private lazy val sum = AttributeReference("sum", sumDataType)()
 
+  private lazy val isEmptyOrNulls = AttributeReference("isEmptyOrNulls", 
BooleanType, false)()
+
   private lazy val zero = Literal.default(sumDataType)
 
-  override lazy val aggBufferAttributes = sum :: Nil
+  override lazy val aggBufferAttributes = sum :: isEmptyOrNulls :: Nil

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-04-15 Thread GitBox
skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] 
Fix incorrect results for decimal aggregate sum by returning null on decimal 
overflow
URL: https://github.com/apache/spark/pull/27627#discussion_r408623119
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/DecimalSum.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+case class DecimalSum(child: Expression) extends DeclarativeAggregate with 
ImplicitCastInputTypes {
 
 Review comment:
   Please see the comment 
https://github.com/apache/spark/pull/27627#discussion_r408622772


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-04-15 Thread GitBox
skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] 
Fix incorrect results for decimal aggregate sum by returning null on decimal 
overflow
URL: https://github.com/apache/spark/pull/27627#discussion_r408622772
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##
 @@ -240,6 +240,7 @@ class Analyzer(
   ExtractWindowExpressions ::
   GlobalAggregates ::
   ResolveAggregateFunctions ::
+  UseDecimalSum ::
 
 Review comment:
   Thanks @maropu for your comments. Based on the discussion here 
https://github.com/apache/spark/pull/27627#issuecomment-611308104, the changes 
are now back only in Sum, which removes the necessity of this rule. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-03-02 Thread GitBox
skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] 
Fix incorrect results for decimal aggregate sum by returning null on decimal 
overflow
URL: https://github.com/apache/spark/pull/27627#discussion_r386739126
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
 ##
 @@ -132,3 +132,32 @@ case class CheckOverflow(
 
   override def sql: String = child.sql
 }
+
+case class HasOverflow(
+child: Expression,
+inputType: DecimalType) extends UnaryExpression {
+
+  override def dataType: DataType = BooleanType
+
+  override def nullable: Boolean = true
 
 Review comment:
   Since the child can be nullable, the input value can be null.   Making 
nullable to false in that case will not work, as it may result in npe.  We can 
change the doGenCode() here to make the check for null for that, but since  the 
nullSafeCodeGen in UnaryExpression already takes care of the if nullable 
checks,  it seems there is no need to add if null checks here. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-02-18 Thread GitBox
skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] 
Fix incorrect results for decimal aggregate sum by returning null on decimal 
overflow
URL: https://github.com/apache/spark/pull/27627#discussion_r380941152
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
 ##
 @@ -60,38 +60,104 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate with ImplicitCast
   private lazy val sumDataType = resultType
 
   private lazy val sum = AttributeReference("sum", sumDataType)()
+  private lazy val overflow = AttributeReference("overflow", BooleanType, 
false)()
 
   private lazy val zero = Literal.default(resultType)
 
-  override lazy val aggBufferAttributes = sum :: Nil
+  override lazy val aggBufferAttributes = sum :: overflow :: Nil
 
   override lazy val initialValues: Seq[Expression] = Seq(
-/* sum = */ Literal.create(null, sumDataType)
+/* sum = */ Literal.create(null, sumDataType),
+/* overflow = */ Literal.create(false, BooleanType)
 
 Review comment:
   We keep track of overflow using this  aggBufferAttributes - overflow  to 
know if any of the intermediate add operations in updateExpressions and/or 
mergeExpressions overflow'd.  If the overflow is true and if 
spark.sql.ansi.enabled flag is false, then we return null for the sum operation 
in evaluateExpression.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow

2020-02-18 Thread GitBox
skambha commented on a change in pull request #27627: [WIP][SPARK-28067][SQL] 
Fix incorrect results for decimal aggregate sum by returning null on decimal 
overflow
URL: https://github.com/apache/spark/pull/27627#discussion_r380939010
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
 ##
 @@ -60,38 +60,104 @@ case class Sum(child: Expression) extends 
DeclarativeAggregate with ImplicitCast
   private lazy val sumDataType = resultType
 
   private lazy val sum = AttributeReference("sum", sumDataType)()
+  private lazy val overflow = AttributeReference("overflow", BooleanType, 
false)()
 
   private lazy val zero = Literal.default(resultType)
 
-  override lazy val aggBufferAttributes = sum :: Nil
+  override lazy val aggBufferAttributes = sum :: overflow :: Nil
 
   override lazy val initialValues: Seq[Expression] = Seq(
-/* sum = */ Literal.create(null, sumDataType)
+/* sum = */ Literal.create(null, sumDataType),
+/* overflow = */ Literal.create(false, BooleanType)
   )
 
   override lazy val updateExpressions: Seq[Expression] = {
-if (child.nullable) {
+if (!SQLConf.get.ansiEnabled) {
+  if (child.nullable) {
+Seq(
+  /* sum = */
+  coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum),
 
 Review comment:
   The changes in the Sum are mostly to check if overflow has occurred when we 
do the different additions in the updateExpressions and mergeExpressions.  The 
actual  addition operations are all the same.  Reading the diff may not show 
that easily so wanted to make a note here on that. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org