skambha opened a new pull request #27627: [WIP][SPARK-28067][SQL] Fix incorrect 
results for decimal aggregate sum by returning null on decimal overflow
URL: https://github.com/apache/spark/pull/27627
 
 
   ### What changes were proposed in this pull request?
   
   JIRA SPARK-28067:  Wrong results are returned for aggregate sum with 
decimals with whole stage codegen enabled
   
   **Repro:** 
   WholeStage enabled enabled ->  Wrong results
   WholeStage disabled -> Returns exception Decimal precision 39 exceeds max 
precision 38
   
   **Issues:** 
   1. Wrong results are returned which is bad 
   2. Inconsistency between whole stage enabled and disabled. 
   
   **Cause:**
   Sum does not take care of possibility of overflow for the intermediate 
steps.  ie the updateExpressions and mergeExpressions.  
   
   This PR makes the following changes:
   - Add changes to check if overflow occurs for decimal in aggregate Sum and 
if there is an overflow,  it will return null for the Sum operation when 
spark.sql.ansi.enabled is false.  
   - When spark.sql.ansi.enabled is true, then the sum operation will return an 
exception if an overflow occurs for the decimal operation in Sum. 
   - This is keeping it consistent with the behavior defined in 
spark.sql.ansi.enabled property
   
   **Before the fix:  Scenario 1:** - WRONG RESULTS
   ```
   scala> val df = Seq(
        |  (BigDecimal("10000000000000000000"), 1),
        |  (BigDecimal("10000000000000000000"), 1),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
   df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int]
   
   scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, 
"intNum").agg(sum("decNum"))
   df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]
   
   scala> df2.show(40,false)
   +---------------------------------------+                                    
   
   |sum(decNum)                            |
   +---------------------------------------+
   |20000000000000000000.000000000000000000|
   +---------------------------------------+
   ```
   
   --
   **Before fix: Scenario2:  Setting spark.sql.ansi.enabled to true** - WRONG 
RESULTS
   ```
   scala> spark.conf.set("spark.sql.ansi.enabled", "true")
   
   scala> val df = Seq(
        |  (BigDecimal("10000000000000000000"), 1),
        |  (BigDecimal("10000000000000000000"), 1),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
   df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int]
   
   scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, 
"intNum").agg(sum("decNum"))
   df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]
   
   scala> df2.show(40,false)
   +---------------------------------------+
   |sum(decNum)                            |
   +---------------------------------------+
   |20000000000000000000.000000000000000000|
   +---------------------------------------+
   
   
   ```
   
   
   **After the fix: Scenario1:**  
   ```
   scala> val df = Seq(
        |  (BigDecimal("10000000000000000000"), 1),
        |  (BigDecimal("10000000000000000000"), 1),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
   df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int]
   
   scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, 
"intNum").agg(sum("decNum"))
   df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]
   
   scala>  df2.show(40,false)
   +-----------+                                                                
   
   |sum(decNum)|
   +-----------+
   |null       |
   +-----------+
   
   ```
   
   **After fix:  Scenario2:  Setting the spark.sql.ansi.enabled to true:** 
   ```
   scala> spark.conf.set("spark.sql.ansi.enabled", "true")
   
   scala> val df = Seq(
        |  (BigDecimal("10000000000000000000"), 1),
        |  (BigDecimal("10000000000000000000"), 1),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2),
        |  (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
   df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int]
   
   scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, 
"intNum").agg(sum("decNum"))
   df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]
   
   scala>  df2.show(40,false)
   20/02/18 10:57:43 ERROR Executor: Exception in task 5.0 in stage 4.0 (TID 30)
   java.lang.ArithmeticException: 
Decimal(expanded,100000000000000000000.000000000000000000,39,18}) cannot be 
represented as Decimal(38, 18).
   
   ```
   
   ### Why are the changes needed?
   The changes are needed in order to fix the wrong results that are returned 
for decimal aggregate sum.
   
   ### Does this PR introduce any user-facing change?
   User would see wrong results on aggregate sum that involved decimal overflow 
prior to this change, but now the user will see null.  But if user enables the 
spark.sql.ansi.enabled flag to true, then the user will see an exception and 
not incorrect results. 
   
   ### How was this patch tested?
   New test has been added and existing tests for sql, catalyst and hive suites 
were run ok.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to