[jira] [Commented] (SPARK-35371) Scala UDF returning string or complex type applied to array members returns wrong data
[ https://issues.apache.org/jira/browse/SPARK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17343636#comment-17343636 ] David Benedeki commented on SPARK-35371: I can confirm, that with Spark 3.1.2-SNAPSHOT the issue is gone. Thank you. (y) > Scala UDF returning string or complex type applied to array members returns > wrong data > -- > > Key: SPARK-35371 > URL: https://issues.apache.org/jira/browse/SPARK-35371 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: David Benedeki >Priority: Major > > When using an UDF returning string or complex type (Struct) on array members > the resulting array consists of the last array member UDF result. > h3. *Example code:* > {code:scala} > import org.apache.spark.sql.{Column, SparkSession} > import org.apache.spark.sql.functions.{callUDF, col, transform, udf} > val sparkBuilder: SparkSession.Builder = SparkSession.builder() > .master("local[*]") > .appName(s"Udf Bug Demo") > .config("spark.ui.enabled", "false") > .config("spark.debug.maxToStringFields", 100) > val spark: SparkSession = sparkBuilder > .config("spark.driver.bindAddress", "127.0.0.1") > .config("spark.driver.host", "127.0.0.1") > .getOrCreate() > import spark.implicits._ > case class Foo(num: Int, s: String) > val src = Seq( > (1, 2, Array(1, 2, 3)), > (2, 2, Array(2, 2, 2)), > (3, 4, Array(3, 4, 3, 4)) > ).toDF("A", "B", "C") > val udfStringName = "UdfString" > val udfIntName = "UdfInt" > val udfStructName = "UdfStruct" > val udfString = udf((num: Int) => { > (num + 1).toString > }) > spark.udf.register(udfStringName, udfString) > val udfInt = udf((num: Int) => { > num + 1 > }) > spark.udf.register(udfIntName, udfInt) > val udfStruct = udf((num: Int) => { > Foo(num + 1, (num + 1).toString) > }) > spark.udf.register(udfStructName, udfStruct) > val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol) > val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol) > val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol) > val cA = callUDF(udfStringName, col("A")) > val cB = callUDF(udfStringName, col("B")) > val cCString: Column = transform(col("C"), lambdaString) > val cCInt: Column = transform(col("C"), lambdaInt) > val cCStruc: Column = transform(col("C"), lambdaStruct) > val dest = src.withColumn("AStr", cA) > .withColumn("BStr", cB) > .withColumn("CString (Wrong)", cCString) > .withColumn("CInt (OK)", cCInt) > .withColumn("CStruct (Wrong)", cCStruc) > dest.show(false) > dest.printSchema() > {code} > h3. *Expected:* > {noformat} > +---+---++++---+++ > |A |B |C |AStr|BStr|CString|CInt|CStruct > | > +---+---++++---+++ > |1 |2 |[1, 2, 3] |2 |3 |[2, 3, 4] |[2, 3, 4] |[{2, 2}, {3, 3}, > {4, 4}]| > |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, > {3, 3}]| > |3 |4 |[3, 4, 3, 4]|4 |5 |[4, 5, 4, 5] |[4, 5, 4, 5]|[{4, 4}, {5, 5}, > {4, 4}, {5, 5}]| > +---+---++++---+++ > {noformat} > h3. *Got:* > {noformat} > +---+---++++---+++ > |A |B |C |AStr|BStr|CString (Wrong)|CInt (Ok) |CStruct (Wrong) >| > +---+---++++---+++ > |1 |2 |[1, 2, 3] |2 |3 |[4, 4, 4] |[2, 3, 4] |[{4, 4}, {4, 4}, > {4, 4}]| > |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, > {3, 3}]| > |3 |4 |[3, 4, 3, 4]|4 |5 |[5, 5, 5, 5] |[4, 5, 4, 5]|[{5, 5}, {5, 5}, > {5, 5}, {5, 5}]| > +---+---++++---+++ > {noformat} > h3. *Observation* > * Work correctly on Spark 3.0.2 > * When UDF is registered as Java UDF, it works as supposed > * The UDF is called the appropriate number of times (regardless if UDF is > marked as deterministic or non-deterministic). > * When debugged, the correct value is actually saved into the result array > at first but every subsequent item processing overwrites the previous result > values as well. Therefore the last item values filling the array is the final > result. > * When the UDF returns NULL/None it does not "overwrite” the prior array > values nor is “overwritten” by subsequent non-NULL values. See with following > UDF impelementation: > {code:scala} > val udfString = udf((
[jira] [Commented] (SPARK-35371) Scala UDF returning string or complex type applied to array members returns wrong data
[ https://issues.apache.org/jira/browse/SPARK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342937#comment-17342937 ] L. C. Hsieh commented on SPARK-35371: - Oh, I think it was fixed by SPARK-34829. > Scala UDF returning string or complex type applied to array members returns > wrong data > -- > > Key: SPARK-35371 > URL: https://issues.apache.org/jira/browse/SPARK-35371 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: David Benedeki >Priority: Major > > When using an UDF returning string or complex type (Struct) on array members > the resulting array consists of the last array member UDF result. > h3. *Example code:* > {code:scala} > import org.apache.spark.sql.{Column, SparkSession} > import org.apache.spark.sql.functions.{callUDF, col, transform, udf} > val sparkBuilder: SparkSession.Builder = SparkSession.builder() > .master("local[*]") > .appName(s"Udf Bug Demo") > .config("spark.ui.enabled", "false") > .config("spark.debug.maxToStringFields", 100) > val spark: SparkSession = sparkBuilder > .config("spark.driver.bindAddress", "127.0.0.1") > .config("spark.driver.host", "127.0.0.1") > .getOrCreate() > import spark.implicits._ > case class Foo(num: Int, s: String) > val src = Seq( > (1, 2, Array(1, 2, 3)), > (2, 2, Array(2, 2, 2)), > (3, 4, Array(3, 4, 3, 4)) > ).toDF("A", "B", "C") > val udfStringName = "UdfString" > val udfIntName = "UdfInt" > val udfStructName = "UdfStruct" > val udfString = udf((num: Int) => { > (num + 1).toString > }) > spark.udf.register(udfStringName, udfString) > val udfInt = udf((num: Int) => { > num + 1 > }) > spark.udf.register(udfIntName, udfInt) > val udfStruct = udf((num: Int) => { > Foo(num + 1, (num + 1).toString) > }) > spark.udf.register(udfStructName, udfStruct) > val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol) > val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol) > val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol) > val cA = callUDF(udfStringName, col("A")) > val cB = callUDF(udfStringName, col("B")) > val cCString: Column = transform(col("C"), lambdaString) > val cCInt: Column = transform(col("C"), lambdaInt) > val cCStruc: Column = transform(col("C"), lambdaStruct) > val dest = src.withColumn("AStr", cA) > .withColumn("BStr", cB) > .withColumn("CString (Wrong)", cCString) > .withColumn("CInt (OK)", cCInt) > .withColumn("CStruct (Wrong)", cCStruc) > dest.show(false) > dest.printSchema() > {code} > h3. *Expected:* > {noformat} > +---+---++++---+++ > |A |B |C |AStr|BStr|CString|CInt|CStruct > | > +---+---++++---+++ > |1 |2 |[1, 2, 3] |2 |3 |[2, 3, 4] |[2, 3, 4] |[{2, 2}, {3, 3}, > {4, 4}]| > |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, > {3, 3}]| > |3 |4 |[3, 4, 3, 4]|4 |5 |[4, 5, 4, 5] |[4, 5, 4, 5]|[{4, 4}, {5, 5}, > {4, 4}, {5, 5}]| > +---+---++++---+++ > {noformat} > h3. *Got:* > {noformat} > +---+---++++---+++ > |A |B |C |AStr|BStr|CString (Wrong)|CInt (Ok) |CStruct (Wrong) >| > +---+---++++---+++ > |1 |2 |[1, 2, 3] |2 |3 |[4, 4, 4] |[2, 3, 4] |[{4, 4}, {4, 4}, > {4, 4}]| > |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, > {3, 3}]| > |3 |4 |[3, 4, 3, 4]|4 |5 |[5, 5, 5, 5] |[4, 5, 4, 5]|[{5, 5}, {5, 5}, > {5, 5}, {5, 5}]| > +---+---++++---+++ > {noformat} > h3. *Observation* > * Work correctly on Spark 3.0.2 > * When UDF is registered as Java UDF, it works as supposed > * The UDF is called the appropriate number of times (regardless if UDF is > marked as deterministic or non-deterministic). > * When debugged, the correct value is actually saved into the result array > at first but every subsequent item processing overwrites the previous result > values as well. Therefore the last item values filling the array is the final > result. > * When the UDF returns NULL/None it does not "overwrite” the prior array > values nor is “overwritten” by subsequent non-NULL values. See with following > UDF impelementation: > {code:scala} > val udfString = udf((num: Int) => { > if (num == 3) { > None
[jira] [Commented] (SPARK-35371) Scala UDF returning string or complex type applied to array members returns wrong data
[ https://issues.apache.org/jira/browse/SPARK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342936#comment-17342936 ] L. C. Hsieh commented on SPARK-35371: - I just ran the example in both current master branch, and branch-3.1. Both got the correct results. Would you like to test it too? > Scala UDF returning string or complex type applied to array members returns > wrong data > -- > > Key: SPARK-35371 > URL: https://issues.apache.org/jira/browse/SPARK-35371 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: David Benedeki >Priority: Major > > When using an UDF returning string or complex type (Struct) on array members > the resulting array consists of the last array member UDF result. > h3. *Example code:* > {code:scala} > import org.apache.spark.sql.{Column, SparkSession} > import org.apache.spark.sql.functions.{callUDF, col, transform, udf} > val sparkBuilder: SparkSession.Builder = SparkSession.builder() > .master("local[*]") > .appName(s"Udf Bug Demo") > .config("spark.ui.enabled", "false") > .config("spark.debug.maxToStringFields", 100) > val spark: SparkSession = sparkBuilder > .config("spark.driver.bindAddress", "127.0.0.1") > .config("spark.driver.host", "127.0.0.1") > .getOrCreate() > import spark.implicits._ > case class Foo(num: Int, s: String) > val src = Seq( > (1, 2, Array(1, 2, 3)), > (2, 2, Array(2, 2, 2)), > (3, 4, Array(3, 4, 3, 4)) > ).toDF("A", "B", "C") > val udfStringName = "UdfString" > val udfIntName = "UdfInt" > val udfStructName = "UdfStruct" > val udfString = udf((num: Int) => { > (num + 1).toString > }) > spark.udf.register(udfStringName, udfString) > val udfInt = udf((num: Int) => { > num + 1 > }) > spark.udf.register(udfIntName, udfInt) > val udfStruct = udf((num: Int) => { > Foo(num + 1, (num + 1).toString) > }) > spark.udf.register(udfStructName, udfStruct) > val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol) > val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol) > val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol) > val cA = callUDF(udfStringName, col("A")) > val cB = callUDF(udfStringName, col("B")) > val cCString: Column = transform(col("C"), lambdaString) > val cCInt: Column = transform(col("C"), lambdaInt) > val cCStruc: Column = transform(col("C"), lambdaStruct) > val dest = src.withColumn("AStr", cA) > .withColumn("BStr", cB) > .withColumn("CString (Wrong)", cCString) > .withColumn("CInt (OK)", cCInt) > .withColumn("CStruct (Wrong)", cCStruc) > dest.show(false) > dest.printSchema() > {code} > h3. *Expected:* > {noformat} > +---+---++++---+++ > |A |B |C |AStr|BStr|CString|CInt|CStruct > | > +---+---++++---+++ > |1 |2 |[1, 2, 3] |2 |3 |[2, 3, 4] |[2, 3, 4] |[{2, 2}, {3, 3}, > {4, 4}]| > |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, > {3, 3}]| > |3 |4 |[3, 4, 3, 4]|4 |5 |[4, 5, 4, 5] |[4, 5, 4, 5]|[{4, 4}, {5, 5}, > {4, 4}, {5, 5}]| > +---+---++++---+++ > {noformat} > h3. *Got:* > {noformat} > +---+---++++---+++ > |A |B |C |AStr|BStr|CString (Wrong)|CInt (Ok) |CStruct (Wrong) >| > +---+---++++---+++ > |1 |2 |[1, 2, 3] |2 |3 |[4, 4, 4] |[2, 3, 4] |[{4, 4}, {4, 4}, > {4, 4}]| > |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, > {3, 3}]| > |3 |4 |[3, 4, 3, 4]|4 |5 |[5, 5, 5, 5] |[4, 5, 4, 5]|[{5, 5}, {5, 5}, > {5, 5}, {5, 5}]| > +---+---++++---+++ > {noformat} > h3. *Observation* > * Work correctly on Spark 3.0.2 > * When UDF is registered as Java UDF, it works as supposed > * The UDF is called the appropriate number of times (regardless if UDF is > marked as deterministic or non-deterministic). > * When debugged, the correct value is actually saved into the result array > at first but every subsequent item processing overwrites the previous result > values as well. Therefore the last item values filling the array is the final > result. > * When the UDF returns NULL/None it does not "overwrite” the prior array > values nor is “overwritten” by subsequent non-NULL values. See with following > UDF impelemen