skambha commented on issue #27627: [WIP][SPARK-28067][SQL] Fix incorrect 
results for decimal aggregate sum by returning null on decimal overflow
URL: https://github.com/apache/spark/pull/27627#issuecomment-591185451
 
 
   Thanks @cloud-fan and @kiszk  for your comments.
   
   > These expressions are used in non-whole-stage-codegen as well, why only 
whole-stage-codegen has the problem?
   
   In case of the whole stage disabled, the exception for the error comes from 
here.  
   
   AggregationIterator —> .. JoinedRow —> UnsafeRow -> Decimal.set where it 
checks if the value can fit within the precision and scale. 
   ```
   Caused by: java.lang.ArithmeticException: Decimal precision 39 exceeds max 
precision 38
     at org.apache.spark.sql.types.Decimal.set(Decimal.scala:122)
     at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:574)
     at org.apache.spark.sql.types.Decimal.apply(Decimal.scala)
     at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.getDecimal(UnsafeRow.java:385)
     at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.getDecimal(JoinedRow.scala:95)
     at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
 Source)
     at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7(AggregationIterator.scala:209)
     at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7$adapted(AggregationIterator.scala:207)
     at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:187)
     at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:362)
     at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$2(HashAggregateExec.scala:136)
     at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$2$adapted(HashAggregateExec.scala:111)
     at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:889)
     at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:889)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
     at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
     at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
     at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
     at org.apache.spark.scheduler.Task.run(Task.scala:127)
     at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
   
   ```
   ———
   
   In case of the whole stage, here is the codegen that is generated for it 
(spark today, so no changes from this pr). 
   
   a)  [codegen agg sum - all default (ansi mode 
false)](https://github.com/skambha/notes/blob/master/spark28067_ansifalse_wholestagetrue.txt)
   b) [codegen agg sum - with ansi mode 
true](https://github.com/skambha/notes/blob/master/spark28067_ansitrue_wholestagetrue.txt)
   
   In case of the whole stage codegen, you can see the decimal[ ‘+’ expressions 
]( 
https://github.com/skambha/notes/blob/master/spark28067_ansifalse_wholestagetrue.txt#L164
   )will at some point be larger than what can be contained in dec 38,18 but it 
gets written out as null.   This messes up the end result of the sum and you 
get wrong results.  
   The decimal values computed from the + expressions are written using the 
UnsafeRowWriter.write 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java#L184
   
   Here is a snippet highlighting the behavior observed for the usecase in this 
issue: https://github.com/skambha/notes/blob/master/UnsafeRowWriterTestSnippet
   The relevant code is here: 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java#L202
   This is triggered in the whole stage codegen path.  
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to