peter-toth opened a new pull request #31778:
URL: https://github.com/apache/spark/pull/31778


   ### What changes were proposed in this pull request?
   
   pyrolite 4.21 introduced and enabled value comparison by default 
(`valueCompare=true`) during object memoization and serialization: 
https://github.com/irmen/Pyrolite/blob/pyrolite-4.21/java/src/main/java/net/razorvine/pickle/Pickler.java#L112-L122
   This change has undesired effect when we serialize a row (actually 
`GenericRowWithSchema`) to be passed to python: 
https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala#L60.
 A simple example is that 
   ```
   new GenericRowWithSchema(Array(1.0, 1.0), StructType(Seq(StructField("_1", 
DoubleType), StructField("_2", DoubleType))))
   ```
   and
   ```
   new GenericRowWithSchema(Array(1, 1), StructType(Seq(StructField("_1", 
IntegerType), StructField("_2", IntegerType))))
   ```
   are currently equal and the second instance is replaced to the short code of 
the first one during serialization.
   
   ### Why are the changes needed?
   The above can cause nasty issues like the one in 
https://issues.apache.org/jira/browse/SPARK-34545 description:
   
   ```
   >>> from pyspark.sql.functions import udf
   >>> from pyspark.sql.types import *
   >>> 
   >>> def udf1(data_type):
           def u1(e):
               return e[0]
           return udf(u1, data_type)
   >>> 
   >>> df = spark.createDataFrame([((1.0, 1.0), (1, 1))], ['c1', 'c2'])
   >>> 
   >>> df = df.withColumn("c3", udf1(DoubleType())("c1"))
   >>> df = df.withColumn("c4", udf1(IntegerType())("c2"))
   >>> 
   >>> df.select("c3").show()
   +---+
   | c3|
   +---+
   |1.0|
   +---+
   
   >>> df.select("c4").show()
   +---+
   | c4|
   +---+
   |  1|
   +---+
   
   >>> df.select("c3", "c4").show()
   +---+----+
   | c3|  c4|
   +---+----+
   |1.0|null|
   +---+----+
   ```
   This is because during serialization from JVM to Python 
`GenericRowWithSchema(1.0, 1.0)` (`c1`) is memoized first and when 
`GenericRowWithSchema(1, 1)` (`c2`) comes next, it is replaced to some short 
code of the `c1` (instead of serializing `c2` out) as they are `equal()`. The 
python functions then runs but the return type of `c4` is expected to be 
`IntegerType` and if a different type (`DoubleType`) comes back from python 
then it is discarded: 
https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala#L108-L113
   
   After this PR:
   ```
   >>> df.select("c3", "c4").show()
   +---+---+
   | c3| c4|
   +---+---+
   |1.0|  1|
   +---+---+
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, fixes a correctness issue.
   
   ### How was this patch tested?
   Added new UT + manual tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to