skambha opened a new pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow URL: https://github.com/apache/spark/pull/27627 ### What changes were proposed in this pull request? JIRA SPARK-28067: Wrong results are returned for aggregate sum with decimals with whole stage codegen enabled **Repro:** WholeStage enabled enabled -> Wrong results WholeStage disabled -> Returns exception Decimal precision 39 exceeds max precision 38 **Issues:** 1. Wrong results are returned which is bad 2. Inconsistency between whole stage enabled and disabled. **Cause:** Sum does not take care of possibility of overflow for the intermediate steps. ie the updateExpressions and mergeExpressions. This PR makes the following changes: - Add changes to check if overflow occurs for decimal in aggregate Sum and if there is an overflow, it will return null for the Sum operation when spark.sql.ansi.enabled is false. - When spark.sql.ansi.enabled is true, then the sum operation will return an exception if an overflow occurs for the decimal operation in Sum. - This is keeping it consistent with the behavior defined in spark.sql.ansi.enabled property **Before the fix: Scenario 1:** - WRONG RESULTS ``` scala> val df = Seq( | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int] scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum")) df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] scala> df2.show(40,false) +---------------------------------------+ |sum(decNum) | +---------------------------------------+ |20000000000000000000.000000000000000000| +---------------------------------------+ ``` -- **Before fix: Scenario2: Setting spark.sql.ansi.enabled to true** - WRONG RESULTS ``` scala> spark.conf.set("spark.sql.ansi.enabled", "true") scala> val df = Seq( | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int] scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum")) df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] scala> df2.show(40,false) +---------------------------------------+ |sum(decNum) | +---------------------------------------+ |20000000000000000000.000000000000000000| +---------------------------------------+ ``` **After the fix: Scenario1:** ``` scala> val df = Seq( | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int] scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum")) df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] scala> df2.show(40,false) +-----------+ |sum(decNum)| +-----------+ |null | +-----------+ ``` **After fix: Scenario2: Setting the spark.sql.ansi.enabled to true:** ``` scala> spark.conf.set("spark.sql.ansi.enabled", "true") scala> val df = Seq( | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 1), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2), | (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int] scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum")) df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)] scala> df2.show(40,false) 20/02/18 10:57:43 ERROR Executor: Exception in task 5.0 in stage 4.0 (TID 30) java.lang.ArithmeticException: Decimal(expanded,100000000000000000000.000000000000000000,39,18}) cannot be represented as Decimal(38, 18). ``` ### Why are the changes needed? The changes are needed in order to fix the wrong results that are returned for decimal aggregate sum. ### Does this PR introduce any user-facing change? User would see wrong results on aggregate sum that involved decimal overflow prior to this change, but now the user will see null. But if user enables the spark.sql.ansi.enabled flag to true, then the user will see an exception and not incorrect results. ### How was this patch tested? New test has been added and existing tests for sql, catalyst and hive suites were run ok.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
