[ https://issues.apache.org/jira/browse/SPARK-47024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nicholas Chammas updated SPARK-47024: ------------------------------------- Description: I found this problem using [Hypothesis|https://hypothesis.readthedocs.io/en/latest/]. Here's a reproduction that fails on {{{}master{}}}, 3.5.0, 3.4.2, and 3.3.4 (and probably all prior versions as well): {code:python} from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum SUM_EXAMPLE = [ (1.0,), (0.0,), (1.0,), (9007199254740992.0,), ] spark = ( SparkSession.builder .config("spark.log.level", "ERROR") .getOrCreate() ) def compare_sums(data, num_partitions): df = spark.createDataFrame(data, "val double").coalesce(1) result1 = df.agg(sum(col("val"))).collect()[0][0] df = spark.createDataFrame(data, "val double").repartition(num_partitions) result2 = df.agg(sum(col("val"))).collect()[0][0] assert result1 == result2, f"{result1}, {result2}" if __name__ == "__main__": print(compare_sums(SUM_EXAMPLE, 2)) {code} This fails as follows: {code:python} AssertionError: 9007199254740994.0, 9007199254740992.0 {code} I suspected some kind of problem related to code generation, so tried setting all of these to {{{}false{}}}: * {{spark.sql.codegen.wholeStage}} * {{spark.sql.codegen.aggregate.map.twolevel.enabled}} * {{spark.sql.codegen.aggregate.splitAggregateFunc.enabled}} But this did not change the behavior. Somehow, the partitioning of the data affects the computed sum. was:Will fill in the details shortly. Summary: Sum of floats/doubles may be incorrect depending on partitioning (was: Sum is incorrect (exact cause currently unknown)) Sadly, I think this is a case where we may not be able to do anything. The problem appears to be a classic case of floating point arithmetic going wrong. {code:scala} scala> 9007199254740992.0 + 1.0 val res0: Double = 9.007199254740992E15 scala> 9007199254740992.0 + 2.0 val res1: Double = 9.007199254740994E15 {code} Notice how adding {{1.0}} did not change the large value, whereas adding {{2.0}} did. So what I believe is happening is that, depending on the order in which the rows happen to be added, we either hit or do not hit this corner case. In other words, if the aggregation goes like this: {code:java} (1.0 + 1.0) + (0.0 + 9007199254740992.0) 2.0 + 9007199254740992.0 9007199254740994.0 {code} Then there is no problem. However, if we are unlucky and it goes like this: {code:java} (1.0 + 0.0) + (1.0 + 9007199254740992.0) 1.0 + 9007199254740992.0 9007199254740992.0 {code} Then we get the incorrect result shown in the description above. This violates what I believe should be an invariant in Spark: That declarative aggregates like {{sum}} do not compute different results depending on accidents of row order or partitioning. However, given that this is a basic problem of floating point arithmetic, I doubt we can really do anything here. Note that there are many such "special" numbers that have this problem, not just 9007199254740992.0: {code:scala} scala> 1.7168917017330176e+16 + 1.0 val res2: Double = 1.7168917017330176E16 scala> 1.7168917017330176e+16 + 2.0 val res3: Double = 1.7168917017330178E16 {code} > Sum of floats/doubles may be incorrect depending on partitioning > ---------------------------------------------------------------- > > Key: SPARK-47024 > URL: https://issues.apache.org/jira/browse/SPARK-47024 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.4.2, 3.5.0, 3.3.4 > Reporter: Nicholas Chammas > Priority: Major > Labels: correctness > > I found this problem using > [Hypothesis|https://hypothesis.readthedocs.io/en/latest/]. > Here's a reproduction that fails on {{{}master{}}}, 3.5.0, 3.4.2, and 3.3.4 > (and probably all prior versions as well): > {code:python} > from pyspark.sql import SparkSession > from pyspark.sql.functions import col, sum > SUM_EXAMPLE = [ > (1.0,), > (0.0,), > (1.0,), > (9007199254740992.0,), > ] > spark = ( > SparkSession.builder > .config("spark.log.level", "ERROR") > .getOrCreate() > ) > def compare_sums(data, num_partitions): > df = spark.createDataFrame(data, "val double").coalesce(1) > result1 = df.agg(sum(col("val"))).collect()[0][0] > df = spark.createDataFrame(data, "val double").repartition(num_partitions) > result2 = df.agg(sum(col("val"))).collect()[0][0] > assert result1 == result2, f"{result1}, {result2}" > if __name__ == "__main__": > print(compare_sums(SUM_EXAMPLE, 2)) > {code} > This fails as follows: > {code:python} > AssertionError: 9007199254740994.0, 9007199254740992.0 > {code} > I suspected some kind of problem related to code generation, so tried setting > all of these to {{{}false{}}}: > * {{spark.sql.codegen.wholeStage}} > * {{spark.sql.codegen.aggregate.map.twolevel.enabled}} > * {{spark.sql.codegen.aggregate.splitAggregateFunc.enabled}} > But this did not change the behavior. > Somehow, the partitioning of the data affects the computed sum. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org