[jira] [Commented] (SPARK-35371) Scala UDF returning string or complex type applied to array members returns wrong data

2021-05-12 Thread David Benedeki (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17343636#comment-17343636
 ] 

David Benedeki commented on SPARK-35371:


I can confirm, that with Spark 3.1.2-SNAPSHOT the issue is gone. Thank you. (y)

> Scala UDF returning string or complex type applied to array members returns 
> wrong data
> --
>
> Key: SPARK-35371
> URL: https://issues.apache.org/jira/browse/SPARK-35371
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: David Benedeki
>Priority: Major
>
> When using an UDF returning string or complex type (Struct) on array members 
> the resulting array consists of the last array member UDF result.
> h3. *Example code:*
> {code:scala}
> import org.apache.spark.sql.{Column, SparkSession}
> import org.apache.spark.sql.functions.{callUDF, col, transform, udf}
> val sparkBuilder: SparkSession.Builder = SparkSession.builder()
>   .master("local[*]")
>   .appName(s"Udf Bug Demo")
>   .config("spark.ui.enabled", "false")
>   .config("spark.debug.maxToStringFields", 100)
> val spark: SparkSession = sparkBuilder
>   .config("spark.driver.bindAddress", "127.0.0.1")
>   .config("spark.driver.host", "127.0.0.1")
>   .getOrCreate()
> import spark.implicits._
> case class Foo(num: Int, s: String)
> val src  = Seq(
>   (1, 2, Array(1, 2, 3)),
>   (2, 2, Array(2, 2, 2)),
>   (3, 4, Array(3, 4, 3, 4))
> ).toDF("A", "B", "C")
> val udfStringName = "UdfString"
> val udfIntName = "UdfInt"
> val udfStructName = "UdfStruct"
> val udfString = udf((num: Int) => {
>   (num + 1).toString
> })
> spark.udf.register(udfStringName, udfString)
> val udfInt = udf((num: Int) => {
>   num + 1
> })
> spark.udf.register(udfIntName, udfInt)
> val udfStruct = udf((num: Int) => {
>   Foo(num + 1, (num + 1).toString)
> })
> spark.udf.register(udfStructName, udfStruct)
> val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol)
> val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol)
> val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol)
> val cA = callUDF(udfStringName, col("A"))
> val cB = callUDF(udfStringName, col("B"))
> val cCString: Column = transform(col("C"), lambdaString)
> val cCInt: Column = transform(col("C"), lambdaInt)
> val cCStruc: Column = transform(col("C"), lambdaStruct)
> val dest = src.withColumn("AStr", cA)
>   .withColumn("BStr", cB)
>   .withColumn("CString (Wrong)", cCString)
>   .withColumn("CInt (OK)", cCInt)
>   .withColumn("CStruct (Wrong)", cCStruc)
> dest.show(false)
> dest.printSchema()
> {code}
> h3. *Expected:*
> {noformat}
> +---+---++++---+++
> |A  |B  |C   |AStr|BStr|CString|CInt|CStruct  
> |
> +---+---++++---+++
> |1  |2  |[1, 2, 3]   |2   |3   |[2, 3, 4]  |[2, 3, 4]   |[{2, 2}, {3, 3}, 
> {4, 4}]|
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]  |[3, 3, 3]   |[{3, 3}, {3, 3}, 
> {3, 3}]|
> |3  |4  |[3, 4, 3, 4]|4   |5   |[4, 5, 4, 5]   |[4, 5, 4, 5]|[{4, 4}, {5, 5}, 
> {4, 4}, {5, 5}]|
> +---+---++++---+++
> {noformat}
> h3. *Got:*
> {noformat}
> +---+---++++---+++
> |A  |B  |C   |AStr|BStr|CString (Wrong)|CInt (Ok)   |CStruct (Wrong)  
>|
> +---+---++++---+++
> |1  |2  |[1, 2, 3]   |2   |3   |[4, 4, 4]  |[2, 3, 4]   |[{4, 4}, {4, 4}, 
> {4, 4}]|
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]  |[3, 3, 3]   |[{3, 3}, {3, 3}, 
> {3, 3}]|
> |3  |4  |[3, 4, 3, 4]|4   |5   |[5, 5, 5, 5]   |[4, 5, 4, 5]|[{5, 5}, {5, 5}, 
> {5, 5}, {5, 5}]|
> +---+---++++---+++
> {noformat}
> h3. *Observation*
>  * Work correctly on Spark 3.0.2
>  * When UDF is registered as Java UDF, it works as supposed
>  * The UDF is called the appropriate number of times (regardless if UDF is 
> marked as deterministic or non-deterministic).
>  * When debugged, the correct value is actually saved into the result array 
> at first but every subsequent item processing overwrites the previous result 
> values as well. Therefore the last item values filling the array is the final 
> result.
>  * When the UDF returns NULL/None it does not "overwrite” the prior array 
> values nor is “overwritten” by subsequent non-NULL values. See with following 
> UDF impelementation:
> {code:scala}
> val udfString = udf((

[jira] [Commented] (SPARK-35371) Scala UDF returning string or complex type applied to array members returns wrong data

2021-05-11 Thread L. C. Hsieh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342937#comment-17342937
 ] 

L. C. Hsieh commented on SPARK-35371:
-

Oh, I think it was fixed by SPARK-34829.

> Scala UDF returning string or complex type applied to array members returns 
> wrong data
> --
>
> Key: SPARK-35371
> URL: https://issues.apache.org/jira/browse/SPARK-35371
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: David Benedeki
>Priority: Major
>
> When using an UDF returning string or complex type (Struct) on array members 
> the resulting array consists of the last array member UDF result.
> h3. *Example code:*
> {code:scala}
> import org.apache.spark.sql.{Column, SparkSession}
> import org.apache.spark.sql.functions.{callUDF, col, transform, udf}
> val sparkBuilder: SparkSession.Builder = SparkSession.builder()
>   .master("local[*]")
>   .appName(s"Udf Bug Demo")
>   .config("spark.ui.enabled", "false")
>   .config("spark.debug.maxToStringFields", 100)
> val spark: SparkSession = sparkBuilder
>   .config("spark.driver.bindAddress", "127.0.0.1")
>   .config("spark.driver.host", "127.0.0.1")
>   .getOrCreate()
> import spark.implicits._
> case class Foo(num: Int, s: String)
> val src  = Seq(
>   (1, 2, Array(1, 2, 3)),
>   (2, 2, Array(2, 2, 2)),
>   (3, 4, Array(3, 4, 3, 4))
> ).toDF("A", "B", "C")
> val udfStringName = "UdfString"
> val udfIntName = "UdfInt"
> val udfStructName = "UdfStruct"
> val udfString = udf((num: Int) => {
>   (num + 1).toString
> })
> spark.udf.register(udfStringName, udfString)
> val udfInt = udf((num: Int) => {
>   num + 1
> })
> spark.udf.register(udfIntName, udfInt)
> val udfStruct = udf((num: Int) => {
>   Foo(num + 1, (num + 1).toString)
> })
> spark.udf.register(udfStructName, udfStruct)
> val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol)
> val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol)
> val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol)
> val cA = callUDF(udfStringName, col("A"))
> val cB = callUDF(udfStringName, col("B"))
> val cCString: Column = transform(col("C"), lambdaString)
> val cCInt: Column = transform(col("C"), lambdaInt)
> val cCStruc: Column = transform(col("C"), lambdaStruct)
> val dest = src.withColumn("AStr", cA)
>   .withColumn("BStr", cB)
>   .withColumn("CString (Wrong)", cCString)
>   .withColumn("CInt (OK)", cCInt)
>   .withColumn("CStruct (Wrong)", cCStruc)
> dest.show(false)
> dest.printSchema()
> {code}
> h3. *Expected:*
> {noformat}
> +---+---++++---+++
> |A  |B  |C   |AStr|BStr|CString|CInt|CStruct  
> |
> +---+---++++---+++
> |1  |2  |[1, 2, 3]   |2   |3   |[2, 3, 4]  |[2, 3, 4]   |[{2, 2}, {3, 3}, 
> {4, 4}]|
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]  |[3, 3, 3]   |[{3, 3}, {3, 3}, 
> {3, 3}]|
> |3  |4  |[3, 4, 3, 4]|4   |5   |[4, 5, 4, 5]   |[4, 5, 4, 5]|[{4, 4}, {5, 5}, 
> {4, 4}, {5, 5}]|
> +---+---++++---+++
> {noformat}
> h3. *Got:*
> {noformat}
> +---+---++++---+++
> |A  |B  |C   |AStr|BStr|CString (Wrong)|CInt (Ok)   |CStruct (Wrong)  
>|
> +---+---++++---+++
> |1  |2  |[1, 2, 3]   |2   |3   |[4, 4, 4]  |[2, 3, 4]   |[{4, 4}, {4, 4}, 
> {4, 4}]|
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]  |[3, 3, 3]   |[{3, 3}, {3, 3}, 
> {3, 3}]|
> |3  |4  |[3, 4, 3, 4]|4   |5   |[5, 5, 5, 5]   |[4, 5, 4, 5]|[{5, 5}, {5, 5}, 
> {5, 5}, {5, 5}]|
> +---+---++++---+++
> {noformat}
> h3. *Observation*
>  * Work correctly on Spark 3.0.2
>  * When UDF is registered as Java UDF, it works as supposed
>  * The UDF is called the appropriate number of times (regardless if UDF is 
> marked as deterministic or non-deterministic).
>  * When debugged, the correct value is actually saved into the result array 
> at first but every subsequent item processing overwrites the previous result 
> values as well. Therefore the last item values filling the array is the final 
> result.
>  * When the UDF returns NULL/None it does not "overwrite” the prior array 
> values nor is “overwritten” by subsequent non-NULL values. See with following 
> UDF impelementation:
> {code:scala}
> val udfString = udf((num: Int) => {
>   if (num == 3) {
> None

[jira] [Commented] (SPARK-35371) Scala UDF returning string or complex type applied to array members returns wrong data

2021-05-11 Thread L. C. Hsieh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342936#comment-17342936
 ] 

L. C. Hsieh commented on SPARK-35371:
-

I just ran the example in both current master branch, and branch-3.1. Both got 
the correct results. Would you like to test it too?

> Scala UDF returning string or complex type applied to array members returns 
> wrong data
> --
>
> Key: SPARK-35371
> URL: https://issues.apache.org/jira/browse/SPARK-35371
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: David Benedeki
>Priority: Major
>
> When using an UDF returning string or complex type (Struct) on array members 
> the resulting array consists of the last array member UDF result.
> h3. *Example code:*
> {code:scala}
> import org.apache.spark.sql.{Column, SparkSession}
> import org.apache.spark.sql.functions.{callUDF, col, transform, udf}
> val sparkBuilder: SparkSession.Builder = SparkSession.builder()
>   .master("local[*]")
>   .appName(s"Udf Bug Demo")
>   .config("spark.ui.enabled", "false")
>   .config("spark.debug.maxToStringFields", 100)
> val spark: SparkSession = sparkBuilder
>   .config("spark.driver.bindAddress", "127.0.0.1")
>   .config("spark.driver.host", "127.0.0.1")
>   .getOrCreate()
> import spark.implicits._
> case class Foo(num: Int, s: String)
> val src  = Seq(
>   (1, 2, Array(1, 2, 3)),
>   (2, 2, Array(2, 2, 2)),
>   (3, 4, Array(3, 4, 3, 4))
> ).toDF("A", "B", "C")
> val udfStringName = "UdfString"
> val udfIntName = "UdfInt"
> val udfStructName = "UdfStruct"
> val udfString = udf((num: Int) => {
>   (num + 1).toString
> })
> spark.udf.register(udfStringName, udfString)
> val udfInt = udf((num: Int) => {
>   num + 1
> })
> spark.udf.register(udfIntName, udfInt)
> val udfStruct = udf((num: Int) => {
>   Foo(num + 1, (num + 1).toString)
> })
> spark.udf.register(udfStructName, udfStruct)
> val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol)
> val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol)
> val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol)
> val cA = callUDF(udfStringName, col("A"))
> val cB = callUDF(udfStringName, col("B"))
> val cCString: Column = transform(col("C"), lambdaString)
> val cCInt: Column = transform(col("C"), lambdaInt)
> val cCStruc: Column = transform(col("C"), lambdaStruct)
> val dest = src.withColumn("AStr", cA)
>   .withColumn("BStr", cB)
>   .withColumn("CString (Wrong)", cCString)
>   .withColumn("CInt (OK)", cCInt)
>   .withColumn("CStruct (Wrong)", cCStruc)
> dest.show(false)
> dest.printSchema()
> {code}
> h3. *Expected:*
> {noformat}
> +---+---++++---+++
> |A  |B  |C   |AStr|BStr|CString|CInt|CStruct  
> |
> +---+---++++---+++
> |1  |2  |[1, 2, 3]   |2   |3   |[2, 3, 4]  |[2, 3, 4]   |[{2, 2}, {3, 3}, 
> {4, 4}]|
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]  |[3, 3, 3]   |[{3, 3}, {3, 3}, 
> {3, 3}]|
> |3  |4  |[3, 4, 3, 4]|4   |5   |[4, 5, 4, 5]   |[4, 5, 4, 5]|[{4, 4}, {5, 5}, 
> {4, 4}, {5, 5}]|
> +---+---++++---+++
> {noformat}
> h3. *Got:*
> {noformat}
> +---+---++++---+++
> |A  |B  |C   |AStr|BStr|CString (Wrong)|CInt (Ok)   |CStruct (Wrong)  
>|
> +---+---++++---+++
> |1  |2  |[1, 2, 3]   |2   |3   |[4, 4, 4]  |[2, 3, 4]   |[{4, 4}, {4, 4}, 
> {4, 4}]|
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]  |[3, 3, 3]   |[{3, 3}, {3, 3}, 
> {3, 3}]|
> |3  |4  |[3, 4, 3, 4]|4   |5   |[5, 5, 5, 5]   |[4, 5, 4, 5]|[{5, 5}, {5, 5}, 
> {5, 5}, {5, 5}]|
> +---+---++++---+++
> {noformat}
> h3. *Observation*
>  * Work correctly on Spark 3.0.2
>  * When UDF is registered as Java UDF, it works as supposed
>  * The UDF is called the appropriate number of times (regardless if UDF is 
> marked as deterministic or non-deterministic).
>  * When debugged, the correct value is actually saved into the result array 
> at first but every subsequent item processing overwrites the previous result 
> values as well. Therefore the last item values filling the array is the final 
> result.
>  * When the UDF returns NULL/None it does not "overwrite” the prior array 
> values nor is “overwritten” by subsequent non-NULL values. See with following 
> UDF impelemen