[jira] [Commented] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF

2018-02-28 Thread Ravneet Popli (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381077#comment-16381077
 ] 

Ravneet Popli commented on SPARK-22942:
---

Matthew - Were you able to resolve this? We are also running into a similar 
problem and we know for sure that input arguments to our UDF cannot be null. If 
it helps, we are using Spark 2.0.2.

> Spark Sql UDF throwing NullPointer when adding a filter on a columns that 
> uses that UDF
> ---
>
> Key: SPARK-22942
> URL: https://issues.apache.org/jira/browse/SPARK-22942
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.0
>Reporter: Matthew Fishkin
>Priority: Major
>
> I ran into an interesting issue when trying to do a `filter` on a dataframe 
> that has columns that were added using a UDF. I am able to replicate the 
> problem with a smaller set of data.
> Given the dummy case classes:
> {code:java}
> case class Info(number: Int, color: String)
> case class Record(name: String, infos: Seq[Info])
> {code}
> And the following data:
> {code:java}
> val blue = Info(1, "blue")
> val black = Info(2, "black")
> val yellow = Info(3, "yellow")
> val orange = Info(4, "orange")
> val white = Info(5, "white")
> val a  = Record("a", Seq(blue, black, white))
> val a2 = Record("a", Seq(yellow, white, orange))
> val b = Record("b", Seq(blue, black))
> val c = Record("c", Seq(white, orange))
>  val d = Record("d", Seq(orange, black))
> {code}
> Create two dataframes (we will call them left and right)
> {code:java}
> val left = Seq(a, b).toDF
> val right = Seq(a2, c, d).toDF
> {code}
> Join those two dataframes with an outer join (So two of our columns are 
> nullable now.
> {code:java}
> val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
> joined.show(false)
> res0:
> +++---+
> |name|infos   |infos  |
> +++---+
> |b   |[[1,blue], [2,black]]   |null   |
> |a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
> |c   |null|[[5,white], [4,orange]]|
> |d   |null|[[4,orange], [2,black]]|
> +++---+
> {code}
> Then, take only the `name`s that exist in the right Dataframe
> {code:java}
> val rightOnly = joined.filter("l.infos is null").select($"name", 
> $"r.infos".as("r_infos"))
> rightOnly.show(false)
> res1:
> ++---+
> |name|r_infos|
> ++---+
> |c   |[[5,white], [4,orange]]|
> |d   |[[4,orange], [2,black]]|
> ++---+
> {code}
> Now, add a new column called `has_black` which will be true if the `r_infos` 
> contains _black_ as a color
> {code:java}
> def hasBlack = (s: Seq[Row]) => {
>   s.exists{ case Row(num: Int, color: String) =>
> color == "black"
>   }
> }
> val rightBreakdown = rightOnly.withColumn("has_black", 
> udf(hasBlack).apply($"r_infos"))
> rightBreakdown.show(false)
> res2:
> ++---+-+
> |name|r_infos|has_black|
> ++---+-+
> |c   |[[5,white], [4,orange]]|false|
> |d   |[[4,orange], [2,black]]|true |
> ++---+-+
> {code}
> So far, _exactly_ what we expected. 
> *However*, when I try to filter `rightBreakdown`, it fails.
> {code:java}
> rightBreakdown.filter("has_black == true").show(false)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$hasBlack$1: (array>) => 
> boolean)
>   at 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
>   at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
>   at scala.collection.immutable.List.exists(List.scala:84)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138)
>   at 
> 

[jira] [Commented] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF

2018-01-03 Thread Matthew Fishkin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309663#comment-16309663
 ] 

Matthew Fishkin commented on SPARK-22942:
-

So the fact that 

[val rightBreakdown = rightOnly.withColumn("has_black", 
udf(hasBlack).apply($"r_infos"))

rightBreakdown.show(false)

res2:
++---+-+
|name|r_infos|has_black|
++---+-+
|c   |[[5,white], [4,orange]]|false|
|d   |[[4,orange], [2,black]]|true |
++---+-+]

correctly uses the UDF, but when I do

`rightBreakdown.filter("has_black == true").show(false)`

it says that the UDF hit a nullPointer (even though we see the values were 
computed correctly without the filter).. that is all due to the optimizer order 
of operations?

> Spark Sql UDF throwing NullPointer when adding a filter on a columns that 
> uses that UDF
> ---
>
> Key: SPARK-22942
> URL: https://issues.apache.org/jira/browse/SPARK-22942
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.0
>Reporter: Matthew Fishkin
>
> I ran into an interesting issue when trying to do a `filter` on a dataframe 
> that has columns that were added using a UDF. I am able to replicate the 
> problem with a smaller set of data.
> Given the dummy case classes:
> {code:java}
> case class Info(number: Int, color: String)
> case class Record(name: String, infos: Seq[Info])
> {code}
> And the following data:
> {code:java}
> val blue = Info(1, "blue")
> val black = Info(2, "black")
> val yellow = Info(3, "yellow")
> val orange = Info(4, "orange")
> val white = Info(5, "white")
> val a  = Record("a", Seq(blue, black, white))
> val a2 = Record("a", Seq(yellow, white, orange))
> val b = Record("b", Seq(blue, black))
> val c = Record("c", Seq(white, orange))
>  val d = Record("d", Seq(orange, black))
> {code}
> Create two dataframes (we will call them left and right)
> {code:java}
> val left = Seq(a, b).toDF
> val right = Seq(a2, c, d).toDF
> {code}
> Join those two dataframes with an outer join (So two of our columns are 
> nullable now.
> {code:java}
> val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
> joined.show(false)
> res0:
> +++---+
> |name|infos   |infos  |
> +++---+
> |b   |[[1,blue], [2,black]]   |null   |
> |a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
> |c   |null|[[5,white], [4,orange]]|
> |d   |null|[[4,orange], [2,black]]|
> +++---+
> {code}
> Then, take only the `name`s that exist in the right Dataframe
> {code:java}
> val rightOnly = joined.filter("l.infos is null").select($"name", 
> $"r.infos".as("r_infos"))
> rightOnly.show(false)
> res1:
> ++---+
> |name|r_infos|
> ++---+
> |c   |[[5,white], [4,orange]]|
> |d   |[[4,orange], [2,black]]|
> ++---+
> {code}
> Now, add a new column called `has_black` which will be true if the `r_infos` 
> contains _black_ as a color
> {code:java}
> def hasBlack = (s: Seq[Row]) => {
>   s.exists{ case Row(num: Int, color: String) =>
> color == "black"
>   }
> }
> val rightBreakdown = rightOnly.withColumn("has_black", 
> udf(hasBlack).apply($"r_infos"))
> rightBreakdown.show(false)
> res2:
> ++---+-+
> |name|r_infos|has_black|
> ++---+-+
> |c   |[[5,white], [4,orange]]|false|
> |d   |[[4,orange], [2,black]]|true |
> ++---+-+
> {code}
> So far, _exactly_ what we expected. 
> *However*, when I try to filter `rightBreakdown`, it fails.
> {code:java}
> rightBreakdown.filter("has_black == true").show(false)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$hasBlack$1: (array>) => 
> boolean)
>   at 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
>   at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
>   at 
> 

[jira] [Commented] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF

2018-01-02 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309037#comment-16309037
 ] 

Takeshi Yamamuro commented on SPARK-22942:
--

Since spark passes null to udfs in optimizer rules, you need to make udfs 
null-safe.

> Spark Sql UDF throwing NullPointer when adding a filter on a columns that 
> uses that UDF
> ---
>
> Key: SPARK-22942
> URL: https://issues.apache.org/jira/browse/SPARK-22942
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.0
>Reporter: Matthew Fishkin
>
> I ran into an interesting issue when trying to do a `filter` on a dataframe 
> that has columns that were added using a UDF. I am able to replicate the 
> problem with a smaller set of data.
> Given the dummy case classes:
> {code:java}
> case class Info(number: Int, color: String)
> case class Record(name: String, infos: Seq[Info])
> {code}
> And the following data:
> {code:java}
> val blue = Info(1, "blue")
> val black = Info(2, "black")
> val yellow = Info(3, "yellow")
> val orange = Info(4, "orange")
> val white = Info(5, "white")
> val a  = Record("a", Seq(blue, black, white))
> val a2 = Record("a", Seq(yellow, white, orange))
> val b = Record("b", Seq(blue, black))
> val c = Record("c", Seq(white, orange))
>  val d = Record("d", Seq(orange, black))
> {code}
> Create two dataframes (we will call them left and right)
> {code:java}
> val left = Seq(a, b).toDF
> val right = Seq(a2, c, d).toDF
> {code}
> Join those two dataframes with an outer join (So two of our columns are 
> nullable now.
> {code:java}
> val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
> joined.show(false)
> res0:
> +++---+
> |name|infos   |infos  |
> +++---+
> |b   |[[1,blue], [2,black]]   |null   |
> |a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
> |c   |null|[[5,white], [4,orange]]|
> |d   |null|[[4,orange], [2,black]]|
> +++---+
> {code}
> Then, take only the `name`s that exist in the right Dataframe
> {code:java}
> val rightOnly = joined.filter("l.infos is null").select($"name", 
> $"r.infos".as("r_infos"))
> rightOnly.show(false)
> res1:
> ++---+
> |name|r_infos|
> ++---+
> |c   |[[5,white], [4,orange]]|
> |d   |[[4,orange], [2,black]]|
> ++---+
> {code}
> Now, add a new column called `has_black` which will be true if the `r_infos` 
> contains _black_ as a color
> {code:java}
> def hasBlack = (s: Seq[Row]) => {
>   s.exists{ case Row(num: Int, color: String) =>
> color == "black"
>   }
> }
> val rightBreakdown = rightOnly.withColumn("has_black", 
> udf(hasBlack).apply($"r_infos"))
> rightBreakdown.show(false)
> res2:
> ++---+-+
> |name|r_infos|has_black|
> ++---+-+
> |c   |[[5,white], [4,orange]]|false|
> |d   |[[4,orange], [2,black]]|true |
> ++---+-+
> {code}
> So far, _exactly_ what we expected. 
> *However*, when I try to filter `rightBreakdown`, it fails.
> {code:java}
> rightBreakdown.filter("has_black == true").show(false)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$hasBlack$1: (array>) => 
> boolean)
>   at 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
>   at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
>   at scala.collection.immutable.List.exists(List.scala:84)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138)
>   at 
> 

[jira] [Commented] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF

2018-01-02 Thread Matthew Fishkin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308882#comment-16308882
 ] 

Matthew Fishkin commented on SPARK-22942:
-

I would expect that to work too. I'm more curious why the null pointer is 
occurring when none of the data is null.

Interestingly, I found the following. When you change from 
{code:java}
val rightOnly = joined.filter("l.infos is null").select($"name", 
$"r.infos".as("r_infos"))
{code}

{code:java}
val rightOnly = joined.filter("l.infos is null and r.infos is not 
null").select($"name", $"r.infos".as("r_infos"))
{code}

the rest of the code above works. 

But I am pretty sure 
"l.infos is null and r.infos is not null" is equal to "l.infos is null". If one 
column of an outer join is null, the other must be defined.

> Spark Sql UDF throwing NullPointer when adding a filter on a columns that 
> uses that UDF
> ---
>
> Key: SPARK-22942
> URL: https://issues.apache.org/jira/browse/SPARK-22942
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.0
>Reporter: Matthew Fishkin
>
> I ran into an interesting issue when trying to do a `filter` on a dataframe 
> that has columns that were added using a UDF. I am able to replicate the 
> problem with a smaller set of data.
> Given the dummy case classes:
> {code:java}
> case class Info(number: Int, color: String)
> case class Record(name: String, infos: Seq[Info])
> {code}
> And the following data:
> {code:java}
> val blue = Info(1, "blue")
> val black = Info(2, "black")
> val yellow = Info(3, "yellow")
> val orange = Info(4, "orange")
> val white = Info(5, "white")
> val a  = Record("a", Seq(blue, black, white))
> val a2 = Record("a", Seq(yellow, white, orange))
> val b = Record("b", Seq(blue, black))
> val c = Record("c", Seq(white, orange))
>  val d = Record("d", Seq(orange, black))
> {code}
> Create two dataframes (we will call them left and right)
> {code:java}
> val left = Seq(a, b).toDF
> val right = Seq(a2, c, d).toDF
> {code}
> Join those two dataframes with an outer join (So two of our columns are 
> nullable now.
> {code:java}
> val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
> joined.show(false)
> res0:
> +++---+
> |name|infos   |infos  |
> +++---+
> |b   |[[1,blue], [2,black]]   |null   |
> |a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
> |c   |null|[[5,white], [4,orange]]|
> |d   |null|[[4,orange], [2,black]]|
> +++---+
> {code}
> Then, take only the `name`s that exist in the right Dataframe
> {code:java}
> val rightOnly = joined.filter("l.infos is null").select($"name", 
> $"r.infos".as("r_infos"))
> rightOnly.show(false)
> res1:
> ++---+
> |name|r_infos|
> ++---+
> |c   |[[5,white], [4,orange]]|
> |d   |[[4,orange], [2,black]]|
> ++---+
> {code}
> Now, add a new column called `has_black` which will be true if the `r_infos` 
> contains _black_ as a color
> {code:java}
> def hasBlack = (s: Seq[Row]) => {
>   s.exists{ case Row(num: Int, color: String) =>
> color == "black"
>   }
> }
> val rightBreakdown = rightOnly.withColumn("has_black", 
> udf(hasBlack).apply($"r_infos"))
> rightBreakdown.show(false)
> res2:
> ++---+-+
> |name|r_infos|has_black|
> ++---+-+
> |c   |[[5,white], [4,orange]]|false|
> |d   |[[4,orange], [2,black]]|true |
> ++---+-+
> {code}
> So far, _exactly_ what we expected. 
> *However*, when I try to filter `rightBreakdown`, it fails.
> {code:java}
> rightBreakdown.filter("has_black == true").show(false)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$hasBlack$1: (array>) => 
> boolean)
>   at 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
>   at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at 
> 

[jira] [Commented] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF

2018-01-02 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308830#comment-16308830
 ] 

Takeshi Yamamuro commented on SPARK-22942:
--

I think you just need NULL checks;
{code}
val hasBlack: Seq[Row] => Boolean = (s: Seq[Row]) => {
  if (s != null) {
s.exists{ case Row(num: Int, color: String) =>
  color == "black"
}
  } else {
false
  }
}
{code}

> Spark Sql UDF throwing NullPointer when adding a filter on a columns that 
> uses that UDF
> ---
>
> Key: SPARK-22942
> URL: https://issues.apache.org/jira/browse/SPARK-22942
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.0
>Reporter: Matthew Fishkin
>
> I ran into an interesting issue when trying to do a `filter` on a dataframe 
> that has columns that were added using a UDF. I am able to replicate the 
> problem with a smaller set of data.
> Given the dummy case classes:
> {code:java}
> case class Info(number: Int, color: String)
> case class Record(name: String, infos: Seq[Info])
> {code}
> And the following data:
> {code:java}
> val blue = Info(1, "blue")
> val black = Info(2, "black")
> val yellow = Info(3, "yellow")
> val orange = Info(4, "orange")
> val white = Info(5, "white")
> val a  = Record("a", Seq(blue, black, white))
> val a2 = Record("a", Seq(yellow, white, orange))
> val b = Record("b", Seq(blue, black))
> val c = Record("c", Seq(white, orange))
>  val d = Record("d", Seq(orange, black))
> {code}
> Create two dataframes (we will call them left and right)
> {code:java}
> val left = Seq(a, b).toDF
> val right = Seq(a2, c, d).toDF
> {code}
> Join those two dataframes with an outer join (So two of our columns are 
> nullable now.
> {code:java}
> val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
> joined.show(false)
> res0:
> +++---+
> |name|infos   |infos  |
> +++---+
> |b   |[[1,blue], [2,black]]   |null   |
> |a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
> |c   |null|[[5,white], [4,orange]]|
> |d   |null|[[4,orange], [2,black]]|
> +++---+
> {code}
> Then, take only the `name`s that exist in the right Dataframe
> {code:java}
> val rightOnly = joined.filter("l.infos is null").select($"name", 
> $"r.infos".as("r_infos"))
> rightOnly.show(false)
> res1:
> ++---+
> |name|r_infos|
> ++---+
> |c   |[[5,white], [4,orange]]|
> |d   |[[4,orange], [2,black]]|
> ++---+
> {code}
> Now, add a new column called `has_black` which will be true if the `r_infos` 
> contains _black_ as a color
> {code:java}
> def hasBlack = (s: Seq[Row]) => {
>   s.exists{ case Row(num: Int, color: String) =>
> color == "black"
>   }
> }
> val rightBreakdown = rightOnly.withColumn("has_black", 
> udf(hasBlack).apply($"r_infos"))
> rightBreakdown.show(false)
> res2:
> ++---+-+
> |name|r_infos|has_black|
> ++---+-+
> |c   |[[5,white], [4,orange]]|false|
> |d   |[[4,orange], [2,black]]|true |
> ++---+-+
> {code}
> So far, _exactly_ what we expected. 
> *However*, when I try to filter `rightBreakdown`, it fails.
> {code:java}
> rightBreakdown.filter("has_black == true").show(false)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$hasBlack$1: (array>) => 
> boolean)
>   at 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
>   at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
>   at scala.collection.immutable.List.exists(List.scala:84)
>   at 
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138)
>   at 
>