[jira] [Commented] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF
[ https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381077#comment-16381077 ] Ravneet Popli commented on SPARK-22942: --- Matthew - Were you able to resolve this? We are also running into a similar problem and we know for sure that input arguments to our UDF cannot be null. If it helps, we are using Spark 2.0.2. > Spark Sql UDF throwing NullPointer when adding a filter on a columns that > uses that UDF > --- > > Key: SPARK-22942 > URL: https://issues.apache.org/jira/browse/SPARK-22942 > Project: Spark > Issue Type: Bug > Components: Spark Shell, SQL >Affects Versions: 2.2.0 >Reporter: Matthew Fishkin >Priority: Major > > I ran into an interesting issue when trying to do a `filter` on a dataframe > that has columns that were added using a UDF. I am able to replicate the > problem with a smaller set of data. > Given the dummy case classes: > {code:java} > case class Info(number: Int, color: String) > case class Record(name: String, infos: Seq[Info]) > {code} > And the following data: > {code:java} > val blue = Info(1, "blue") > val black = Info(2, "black") > val yellow = Info(3, "yellow") > val orange = Info(4, "orange") > val white = Info(5, "white") > val a = Record("a", Seq(blue, black, white)) > val a2 = Record("a", Seq(yellow, white, orange)) > val b = Record("b", Seq(blue, black)) > val c = Record("c", Seq(white, orange)) > val d = Record("d", Seq(orange, black)) > {code} > Create two dataframes (we will call them left and right) > {code:java} > val left = Seq(a, b).toDF > val right = Seq(a2, c, d).toDF > {code} > Join those two dataframes with an outer join (So two of our columns are > nullable now. > {code:java} > val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer") > joined.show(false) > res0: > +++---+ > |name|infos |infos | > +++---+ > |b |[[1,blue], [2,black]] |null | > |a |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]| > |c |null|[[5,white], [4,orange]]| > |d |null|[[4,orange], [2,black]]| > +++---+ > {code} > Then, take only the `name`s that exist in the right Dataframe > {code:java} > val rightOnly = joined.filter("l.infos is null").select($"name", > $"r.infos".as("r_infos")) > rightOnly.show(false) > res1: > ++---+ > |name|r_infos| > ++---+ > |c |[[5,white], [4,orange]]| > |d |[[4,orange], [2,black]]| > ++---+ > {code} > Now, add a new column called `has_black` which will be true if the `r_infos` > contains _black_ as a color > {code:java} > def hasBlack = (s: Seq[Row]) => { > s.exists{ case Row(num: Int, color: String) => > color == "black" > } > } > val rightBreakdown = rightOnly.withColumn("has_black", > udf(hasBlack).apply($"r_infos")) > rightBreakdown.show(false) > res2: > ++---+-+ > |name|r_infos|has_black| > ++---+-+ > |c |[[5,white], [4,orange]]|false| > |d |[[4,orange], [2,black]]|true | > ++---+-+ > {code} > So far, _exactly_ what we expected. > *However*, when I try to filter `rightBreakdown`, it fails. > {code:java} > rightBreakdown.filter("has_black == true").show(false) > org.apache.spark.SparkException: Failed to execute user defined > function($anonfun$hasBlack$1: (array>) => > boolean) > at > org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075) > at > org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) > at > scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138) > at >
[jira] [Commented] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF
[ https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309663#comment-16309663 ] Matthew Fishkin commented on SPARK-22942: - So the fact that [val rightBreakdown = rightOnly.withColumn("has_black", udf(hasBlack).apply($"r_infos")) rightBreakdown.show(false) res2: ++---+-+ |name|r_infos|has_black| ++---+-+ |c |[[5,white], [4,orange]]|false| |d |[[4,orange], [2,black]]|true | ++---+-+] correctly uses the UDF, but when I do `rightBreakdown.filter("has_black == true").show(false)` it says that the UDF hit a nullPointer (even though we see the values were computed correctly without the filter).. that is all due to the optimizer order of operations? > Spark Sql UDF throwing NullPointer when adding a filter on a columns that > uses that UDF > --- > > Key: SPARK-22942 > URL: https://issues.apache.org/jira/browse/SPARK-22942 > Project: Spark > Issue Type: Bug > Components: Spark Shell, SQL >Affects Versions: 2.2.0 >Reporter: Matthew Fishkin > > I ran into an interesting issue when trying to do a `filter` on a dataframe > that has columns that were added using a UDF. I am able to replicate the > problem with a smaller set of data. > Given the dummy case classes: > {code:java} > case class Info(number: Int, color: String) > case class Record(name: String, infos: Seq[Info]) > {code} > And the following data: > {code:java} > val blue = Info(1, "blue") > val black = Info(2, "black") > val yellow = Info(3, "yellow") > val orange = Info(4, "orange") > val white = Info(5, "white") > val a = Record("a", Seq(blue, black, white)) > val a2 = Record("a", Seq(yellow, white, orange)) > val b = Record("b", Seq(blue, black)) > val c = Record("c", Seq(white, orange)) > val d = Record("d", Seq(orange, black)) > {code} > Create two dataframes (we will call them left and right) > {code:java} > val left = Seq(a, b).toDF > val right = Seq(a2, c, d).toDF > {code} > Join those two dataframes with an outer join (So two of our columns are > nullable now. > {code:java} > val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer") > joined.show(false) > res0: > +++---+ > |name|infos |infos | > +++---+ > |b |[[1,blue], [2,black]] |null | > |a |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]| > |c |null|[[5,white], [4,orange]]| > |d |null|[[4,orange], [2,black]]| > +++---+ > {code} > Then, take only the `name`s that exist in the right Dataframe > {code:java} > val rightOnly = joined.filter("l.infos is null").select($"name", > $"r.infos".as("r_infos")) > rightOnly.show(false) > res1: > ++---+ > |name|r_infos| > ++---+ > |c |[[5,white], [4,orange]]| > |d |[[4,orange], [2,black]]| > ++---+ > {code} > Now, add a new column called `has_black` which will be true if the `r_infos` > contains _black_ as a color > {code:java} > def hasBlack = (s: Seq[Row]) => { > s.exists{ case Row(num: Int, color: String) => > color == "black" > } > } > val rightBreakdown = rightOnly.withColumn("has_black", > udf(hasBlack).apply($"r_infos")) > rightBreakdown.show(false) > res2: > ++---+-+ > |name|r_infos|has_black| > ++---+-+ > |c |[[5,white], [4,orange]]|false| > |d |[[4,orange], [2,black]]|true | > ++---+-+ > {code} > So far, _exactly_ what we expected. > *However*, when I try to filter `rightBreakdown`, it fails. > {code:java} > rightBreakdown.filter("has_black == true").show(false) > org.apache.spark.SparkException: Failed to execute user defined > function($anonfun$hasBlack$1: (array>) => > boolean) > at > org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075) > at > org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127) > at >
[jira] [Commented] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF
[ https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309037#comment-16309037 ] Takeshi Yamamuro commented on SPARK-22942: -- Since spark passes null to udfs in optimizer rules, you need to make udfs null-safe. > Spark Sql UDF throwing NullPointer when adding a filter on a columns that > uses that UDF > --- > > Key: SPARK-22942 > URL: https://issues.apache.org/jira/browse/SPARK-22942 > Project: Spark > Issue Type: Bug > Components: Spark Shell, SQL >Affects Versions: 2.2.0 >Reporter: Matthew Fishkin > > I ran into an interesting issue when trying to do a `filter` on a dataframe > that has columns that were added using a UDF. I am able to replicate the > problem with a smaller set of data. > Given the dummy case classes: > {code:java} > case class Info(number: Int, color: String) > case class Record(name: String, infos: Seq[Info]) > {code} > And the following data: > {code:java} > val blue = Info(1, "blue") > val black = Info(2, "black") > val yellow = Info(3, "yellow") > val orange = Info(4, "orange") > val white = Info(5, "white") > val a = Record("a", Seq(blue, black, white)) > val a2 = Record("a", Seq(yellow, white, orange)) > val b = Record("b", Seq(blue, black)) > val c = Record("c", Seq(white, orange)) > val d = Record("d", Seq(orange, black)) > {code} > Create two dataframes (we will call them left and right) > {code:java} > val left = Seq(a, b).toDF > val right = Seq(a2, c, d).toDF > {code} > Join those two dataframes with an outer join (So two of our columns are > nullable now. > {code:java} > val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer") > joined.show(false) > res0: > +++---+ > |name|infos |infos | > +++---+ > |b |[[1,blue], [2,black]] |null | > |a |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]| > |c |null|[[5,white], [4,orange]]| > |d |null|[[4,orange], [2,black]]| > +++---+ > {code} > Then, take only the `name`s that exist in the right Dataframe > {code:java} > val rightOnly = joined.filter("l.infos is null").select($"name", > $"r.infos".as("r_infos")) > rightOnly.show(false) > res1: > ++---+ > |name|r_infos| > ++---+ > |c |[[5,white], [4,orange]]| > |d |[[4,orange], [2,black]]| > ++---+ > {code} > Now, add a new column called `has_black` which will be true if the `r_infos` > contains _black_ as a color > {code:java} > def hasBlack = (s: Seq[Row]) => { > s.exists{ case Row(num: Int, color: String) => > color == "black" > } > } > val rightBreakdown = rightOnly.withColumn("has_black", > udf(hasBlack).apply($"r_infos")) > rightBreakdown.show(false) > res2: > ++---+-+ > |name|r_infos|has_black| > ++---+-+ > |c |[[5,white], [4,orange]]|false| > |d |[[4,orange], [2,black]]|true | > ++---+-+ > {code} > So far, _exactly_ what we expected. > *However*, when I try to filter `rightBreakdown`, it fails. > {code:java} > rightBreakdown.filter("has_black == true").show(false) > org.apache.spark.SparkException: Failed to execute user defined > function($anonfun$hasBlack$1: (array>) => > boolean) > at > org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075) > at > org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) > at > scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138) > at >
[jira] [Commented] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF
[ https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308882#comment-16308882 ] Matthew Fishkin commented on SPARK-22942: - I would expect that to work too. I'm more curious why the null pointer is occurring when none of the data is null. Interestingly, I found the following. When you change from {code:java} val rightOnly = joined.filter("l.infos is null").select($"name", $"r.infos".as("r_infos")) {code} {code:java} val rightOnly = joined.filter("l.infos is null and r.infos is not null").select($"name", $"r.infos".as("r_infos")) {code} the rest of the code above works. But I am pretty sure "l.infos is null and r.infos is not null" is equal to "l.infos is null". If one column of an outer join is null, the other must be defined. > Spark Sql UDF throwing NullPointer when adding a filter on a columns that > uses that UDF > --- > > Key: SPARK-22942 > URL: https://issues.apache.org/jira/browse/SPARK-22942 > Project: Spark > Issue Type: Bug > Components: Spark Shell, SQL >Affects Versions: 2.2.0 >Reporter: Matthew Fishkin > > I ran into an interesting issue when trying to do a `filter` on a dataframe > that has columns that were added using a UDF. I am able to replicate the > problem with a smaller set of data. > Given the dummy case classes: > {code:java} > case class Info(number: Int, color: String) > case class Record(name: String, infos: Seq[Info]) > {code} > And the following data: > {code:java} > val blue = Info(1, "blue") > val black = Info(2, "black") > val yellow = Info(3, "yellow") > val orange = Info(4, "orange") > val white = Info(5, "white") > val a = Record("a", Seq(blue, black, white)) > val a2 = Record("a", Seq(yellow, white, orange)) > val b = Record("b", Seq(blue, black)) > val c = Record("c", Seq(white, orange)) > val d = Record("d", Seq(orange, black)) > {code} > Create two dataframes (we will call them left and right) > {code:java} > val left = Seq(a, b).toDF > val right = Seq(a2, c, d).toDF > {code} > Join those two dataframes with an outer join (So two of our columns are > nullable now. > {code:java} > val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer") > joined.show(false) > res0: > +++---+ > |name|infos |infos | > +++---+ > |b |[[1,blue], [2,black]] |null | > |a |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]| > |c |null|[[5,white], [4,orange]]| > |d |null|[[4,orange], [2,black]]| > +++---+ > {code} > Then, take only the `name`s that exist in the right Dataframe > {code:java} > val rightOnly = joined.filter("l.infos is null").select($"name", > $"r.infos".as("r_infos")) > rightOnly.show(false) > res1: > ++---+ > |name|r_infos| > ++---+ > |c |[[5,white], [4,orange]]| > |d |[[4,orange], [2,black]]| > ++---+ > {code} > Now, add a new column called `has_black` which will be true if the `r_infos` > contains _black_ as a color > {code:java} > def hasBlack = (s: Seq[Row]) => { > s.exists{ case Row(num: Int, color: String) => > color == "black" > } > } > val rightBreakdown = rightOnly.withColumn("has_black", > udf(hasBlack).apply($"r_infos")) > rightBreakdown.show(false) > res2: > ++---+-+ > |name|r_infos|has_black| > ++---+-+ > |c |[[5,white], [4,orange]]|false| > |d |[[4,orange], [2,black]]|true | > ++---+-+ > {code} > So far, _exactly_ what we expected. > *However*, when I try to filter `rightBreakdown`, it fails. > {code:java} > rightBreakdown.filter("has_black == true").show(false) > org.apache.spark.SparkException: Failed to execute user defined > function($anonfun$hasBlack$1: (array>) => > boolean) > at > org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075) > at > org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) > at >
[jira] [Commented] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF
[ https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308830#comment-16308830 ] Takeshi Yamamuro commented on SPARK-22942: -- I think you just need NULL checks; {code} val hasBlack: Seq[Row] => Boolean = (s: Seq[Row]) => { if (s != null) { s.exists{ case Row(num: Int, color: String) => color == "black" } } else { false } } {code} > Spark Sql UDF throwing NullPointer when adding a filter on a columns that > uses that UDF > --- > > Key: SPARK-22942 > URL: https://issues.apache.org/jira/browse/SPARK-22942 > Project: Spark > Issue Type: Bug > Components: Spark Shell, SQL >Affects Versions: 2.2.0 >Reporter: Matthew Fishkin > > I ran into an interesting issue when trying to do a `filter` on a dataframe > that has columns that were added using a UDF. I am able to replicate the > problem with a smaller set of data. > Given the dummy case classes: > {code:java} > case class Info(number: Int, color: String) > case class Record(name: String, infos: Seq[Info]) > {code} > And the following data: > {code:java} > val blue = Info(1, "blue") > val black = Info(2, "black") > val yellow = Info(3, "yellow") > val orange = Info(4, "orange") > val white = Info(5, "white") > val a = Record("a", Seq(blue, black, white)) > val a2 = Record("a", Seq(yellow, white, orange)) > val b = Record("b", Seq(blue, black)) > val c = Record("c", Seq(white, orange)) > val d = Record("d", Seq(orange, black)) > {code} > Create two dataframes (we will call them left and right) > {code:java} > val left = Seq(a, b).toDF > val right = Seq(a2, c, d).toDF > {code} > Join those two dataframes with an outer join (So two of our columns are > nullable now. > {code:java} > val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer") > joined.show(false) > res0: > +++---+ > |name|infos |infos | > +++---+ > |b |[[1,blue], [2,black]] |null | > |a |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]| > |c |null|[[5,white], [4,orange]]| > |d |null|[[4,orange], [2,black]]| > +++---+ > {code} > Then, take only the `name`s that exist in the right Dataframe > {code:java} > val rightOnly = joined.filter("l.infos is null").select($"name", > $"r.infos".as("r_infos")) > rightOnly.show(false) > res1: > ++---+ > |name|r_infos| > ++---+ > |c |[[5,white], [4,orange]]| > |d |[[4,orange], [2,black]]| > ++---+ > {code} > Now, add a new column called `has_black` which will be true if the `r_infos` > contains _black_ as a color > {code:java} > def hasBlack = (s: Seq[Row]) => { > s.exists{ case Row(num: Int, color: String) => > color == "black" > } > } > val rightBreakdown = rightOnly.withColumn("has_black", > udf(hasBlack).apply($"r_infos")) > rightBreakdown.show(false) > res2: > ++---+-+ > |name|r_infos|has_black| > ++---+-+ > |c |[[5,white], [4,orange]]|false| > |d |[[4,orange], [2,black]]|true | > ++---+-+ > {code} > So far, _exactly_ what we expected. > *However*, when I try to filter `rightBreakdown`, it fails. > {code:java} > rightBreakdown.filter("has_black == true").show(false) > org.apache.spark.SparkException: Failed to execute user defined > function($anonfun$hasBlack$1: (array>) => > boolean) > at > org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075) > at > org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) > at > scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138) > at >