skambha commented on issue #27627: [WIP][SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow URL: https://github.com/apache/spark/pull/27627#issuecomment-591185451 Thanks @cloud-fan and @kiszk for your comments. > These expressions are used in non-whole-stage-codegen as well, why only whole-stage-codegen has the problem? In case of the whole stage disabled, the exception for the error comes from here. AggregationIterator —> .. JoinedRow —> UnsafeRow -> Decimal.set where it checks if the value can fit within the precision and scale. ``` Caused by: java.lang.ArithmeticException: Decimal precision 39 exceeds max precision 38 at org.apache.spark.sql.types.Decimal.set(Decimal.scala:122) at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:574) at org.apache.spark.sql.types.Decimal.apply(Decimal.scala) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.getDecimal(UnsafeRow.java:385) at org.apache.spark.sql.catalyst.expressions.JoinedRow.getDecimal(JoinedRow.scala:95) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7(AggregationIterator.scala:209) at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7$adapted(AggregationIterator.scala:207) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:187) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:362) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$2(HashAggregateExec.scala:136) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$2$adapted(HashAggregateExec.scala:111) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:889) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:889) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ——— In case of the whole stage, here is the codegen that is generated for it (spark today, so no changes from this pr). a) [codegen agg sum - all default (ansi mode false)](https://github.com/skambha/notes/blob/master/spark28067_ansifalse_wholestagetrue.txt) b) [codegen agg sum - with ansi mode true](https://github.com/skambha/notes/blob/master/spark28067_ansitrue_wholestagetrue.txt) In case of the whole stage codegen, you can see the decimal[ ‘+’ expressions ]( https://github.com/skambha/notes/blob/master/spark28067_ansifalse_wholestagetrue.txt#L164 )will at some point be larger than what can be contained in dec 38,18 but it gets written out as null. This messes up the end result of the sum and you get wrong results. The decimal values computed from the + expressions are written using the UnsafeRowWriter.write https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java#L184 Here is a snippet highlighting the behavior observed for the usecase in this issue: https://github.com/skambha/notes/blob/master/UnsafeRowWriterTestSnippet The relevant code is here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java#L202 This is triggered in the whole stage codegen path.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
