[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Bellchambers updated SPARK-22446:
--------------------------------------
    Description: 
In the following, the `indexer` UDF defined inside the 
`org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
"Unseen label" error, despite the label not being present in the transformed 
DataFrame.

Here is the definition of the indexer UDF in the transform method:

{code:java}
    val indexer = udf { label: String =>
      if (labelToIndex.contains(label)) {
        labelToIndex(label)
      } else {
        throw new SparkException(s"Unseen label: $label.")
      }
    }
{code}

We can demonstrate the error with a very simple example DataFrame.

{code:java}
scala> import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.StringIndexer

scala> // first we create a DataFrame with three cities

scala> val df = List(
     | ("A", "London", "StrA"),
     | ("B", "Bristol", null),
     | ("C", "New York", "StrC")
     | ).toDF("ID", "CITY", "CONTENT")
df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more field]

scala> df.show
+---+--------+-------+
| ID|    CITY|CONTENT|
+---+--------+-------+
|  A|  London|   StrA|
|  B| Bristol|   null|
|  C|New York|   StrC|
+---+--------+-------+


scala> // then we remove the row with null in CONTENT column, which removes 
Bristol

scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
string, CITY: string ... 1 more field]

scala> dfNoBristol.show
+---+--------+-------+
| ID|    CITY|CONTENT|
+---+--------+-------+
|  A|  London|   StrA|
|  C|New York|   StrC|
+---+--------+-------+


scala> // now create a StringIndexer for the CITY column and fit to dfNoBristol

scala> val model = {
     | new StringIndexer()
     | .setInputCol("CITY")
     | .setOutputCol("CITYIndexed")
     | .fit(dfNoBristol)
     | }
model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa23333fb

scala> // the StringIndexerModel has only two labels: "London" and "New York"

scala> str.labels foreach println
London
New York

scala> // transform our DataFrame to add an index column

scala> val dfWithIndex = model.transform(dfNoBristol)
dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
more fields]

scala> dfWithIndex.show
+---+--------+-------+-----------+
| ID|    CITY|CONTENT|CITYIndexed|
+---+--------+-------+-----------+
|  A|  London|   StrA|        0.0|
|  C|New York|   StrC|        1.0|
+---+--------+-------+-----------+
{code}

The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
throws an exception reporting unseen label "Bristol". This is irrational 
behaviour as far as the user of the API is concerned, because there is no such 
value as "Bristol" when do show all rows of `dfWithIndex`:

{code:java}
scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
org.apache.spark.SparkException: Failed to execute user defined 
function($anonfun$5: (string) => double)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Unseen label: Bristol.  To handle 
unseen labels, set Param handleInvalid to keep.
        at 
org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:222)
        at 
org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
        ... 13 more
{code}

To understand what is happening here, note that an action is triggered when we 
call `StringIndexer.fit()`, before the `CITYIndexed === 1` filter is applied, 
so the StringIndexerModel sees only London and New York, as expected. Now 
compare the query plans for `dfWithIndex` and 
`dfWithIndex.filter($"CITYIndexed" === 1.0)`:

{noformat}
scala> dfWithIndex.explain
== Physical Plan ==
*Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS 
CITYIndexed#159]
+- *Filter isnotnull(_3#5)
   +- LocalTableScan [_1#3, _2#4, _3#5]

scala> dfWithIndex.filter($"CITYIndexed" === 1.0).explain
== Physical Plan ==
*Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS 
CITYIndexed#159]
+- *Filter (isnotnull(_3#5) && (UDF(_2#4) = 1.0))
   +- LocalTableScan [_1#3, _2#4, _3#5]
{noformat}

Note that in the latter, the query plan has pushed the filter `$"CITYIndexed" 
=== 1.0` back to be performed at the same stage as our null filter (`Filter 
(isnotnull(_3#5) && (UDF(_2#4) = 1.0))`).

With a debugger I have seen that both operands of `&&` are executed on each row 
of `df`: `isnotnull(_3#5)` and `UDF(_2#4) = 1.0`. Therefore, the UDF is passed 
the label `Bristol` despite isnotnull returning false for that row.

If we cache the DataFrame `dfNoBristol` immediately after creating it, then 
there is no longer an error because the optimizer does not attempt to call the 
UDF on unseen data. The fact that we get different results depending on whether 
or not we call cache is a cause for concern.

I have seen similar issues with pure SparkSql DataFrame operations when the DAG 
gets complicated (many joins, and aggregations). These are harder to isolate to 
such a simple example, but I plan to report them in the near future.

  was:
In the following, the `indexer` UDF defined inside the 
`org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
"Unseen label" error, despite the label not being present in the transformed 
DataFrame.

Here is the definition of the indexer UDF in the transform method:

{code:scala}
    val indexer = udf { label: String =>
      if (labelToIndex.contains(label)) {
        labelToIndex(label)
      } else {
        throw new SparkException(s"Unseen label: $label.")
      }
    }
{code}

We can demonstrate the error with a very simple example DataFrame.

{code:scala}
scala> import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.StringIndexer

scala> // first we create a DataFrame with three cities

scala> val df = List(
     | ("A", "London", "StrA"),
     | ("B", "Bristol", null),
     | ("C", "New York", "StrC")
     | ).toDF("ID", "CITY", "CONTENT")
df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more field]

scala> df.show
+---+--------+-------+
| ID|    CITY|CONTENT|
+---+--------+-------+
|  A|  London|   StrA|
|  B| Bristol|   null|
|  C|New York|   StrC|
+---+--------+-------+


scala> // then we remove the row with null in CONTENT column, which removes 
Bristol

scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
string, CITY: string ... 1 more field]

scala> dfNoBristol.show
+---+--------+-------+
| ID|    CITY|CONTENT|
+---+--------+-------+
|  A|  London|   StrA|
|  C|New York|   StrC|
+---+--------+-------+


scala> // now create a StringIndexer for the CITY column and fit to dfNoBristol

scala> val model = {
     | new StringIndexer()
     | .setInputCol("CITY")
     | .setOutputCol("CITYIndexed")
     | .fit(dfNoBristol)
     | }
model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa23333fb

scala> // the StringIndexerModel has only two labels: "London" and "New York"

scala> str.labels foreach println
London
New York

scala> // transform our DataFrame to add an index column

scala> val dfWithIndex = model.transform(dfNoBristol)
dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
more fields]

scala> dfWithIndex.show
+---+--------+-------+-----------+
| ID|    CITY|CONTENT|CITYIndexed|
+---+--------+-------+-----------+
|  A|  London|   StrA|        0.0|
|  C|New York|   StrC|        1.0|
+---+--------+-------+-----------+
{code}

The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
throws an exception reporting unseen label "Bristol". This is irrational 
behaviour as far as the user of the API is concerned, because there is no such 
value as "Bristol" when do show all rows of `dfWithIndex`:

{code:scala}
scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
org.apache.spark.SparkException: Failed to execute user defined 
function($anonfun$5: (string) => double)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Unseen label: Bristol.  To handle 
unseen labels, set Param handleInvalid to keep.
        at 
org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:222)
        at 
org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
        ... 13 more
{code}

To understand what is happening here, note that an action is triggered when we 
call `StringIndexer.fit()`, before the `CITYIndexed === 1` filter is applied, 
so the StringIndexerModel sees only London and New York, as expected. Now 
compare the query plans for `dfWithIndex` and 
`dfWithIndex.filter($"CITYIndexed" === 1.0)`:

{noformat}
scala> dfWithIndex.explain
== Physical Plan ==
*Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS 
CITYIndexed#159]
+- *Filter isnotnull(_3#5)
   +- LocalTableScan [_1#3, _2#4, _3#5]

scala> dfWithIndex.filter($"CITYIndexed" === 1.0).explain
== Physical Plan ==
*Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS 
CITYIndexed#159]
+- *Filter (isnotnull(_3#5) && (UDF(_2#4) = 1.0))
   +- LocalTableScan [_1#3, _2#4, _3#5]
{noformat}

Note that in the latter, the query plan has pushed the filter `$"CITYIndexed" 
=== 1.0` back to be performed at the same stage as our null filter (`Filter 
(isnotnull(_3#5) && (UDF(_2#4) = 1.0))`).

With a debugger I have seen that both operands of `&&` are executed on each row 
of `df`: `isnotnull(_3#5)` and `UDF(_2#4) = 1.0`. Therefore, the UDF is passed 
the label `Bristol` despite isnotnull returning false for that row.

If we cache the DataFrame `dfNoBristol` immediately after creating it, then 
there is no longer an error because the optimizer does not attempt to call the 
UDF on unseen data. The fact that we get different results depending on whether 
or not we call cache is a cause for concern.

I have seen similar issues with pure SparkSql DataFrame operations when the DAG 
gets complicated (many joins, and aggregations). These are harder to isolate to 
such a simple example, but I plan to report them in the near future.


> Optimizer causing StringIndexer's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22446
>                 URL: https://issues.apache.org/jira/browse/SPARK-22446
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, Optimizer
>    Affects Versions: 2.0.0, 2.2.0
>         Environment: spark-shell, local mode, macOS Sierra 10.12.6
>            Reporter: Greg Bellchambers
>            Priority: Normal
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
>     val indexer = udf { label: String =>
>       if (labelToIndex.contains(label)) {
>         labelToIndex(label)
>       } else {
>         throw new SparkException(s"Unseen label: $label.")
>       }
>     }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>      | ("A", "London", "StrA"),
>      | ("B", "Bristol", null),
>      | ("C", "New York", "StrC")
>      | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---+--------+-------+
> | ID|    CITY|CONTENT|
> +---+--------+-------+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---+--------+-------+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---+--------+-------+
> | ID|    CITY|CONTENT|
> +---+--------+-------+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---+--------+-------+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>      | new StringIndexer()
>      | .setInputCol("CITY")
>      | .setOutputCol("CITYIndexed")
>      | .fit(dfNoBristol)
>      | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa23333fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---+--------+-------+-----------+
> | ID|    CITY|CONTENT|CITYIndexed|
> +---+--------+-------+-----------+
> |  A|  London|   StrA|        0.0|
> |  C|New York|   StrC|        1.0|
> +---+--------+-------+-----------+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>       at org.apache.spark.scheduler.Task.run(Task.scala:108)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Unseen label: Bristol.  To handle 
> unseen labels, set Param handleInvalid to keep.
>       at 
> org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:222)
>       at 
> org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208)
>       ... 13 more
> {code}
> To understand what is happening here, note that an action is triggered when 
> we call `StringIndexer.fit()`, before the `CITYIndexed === 1` filter is 
> applied, so the StringIndexerModel sees only London and New York, as 
> expected. Now compare the query plans for `dfWithIndex` and 
> `dfWithIndex.filter($"CITYIndexed" === 1.0)`:
> {noformat}
> scala> dfWithIndex.explain
> == Physical Plan ==
> *Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS 
> CITYIndexed#159]
> +- *Filter isnotnull(_3#5)
>    +- LocalTableScan [_1#3, _2#4, _3#5]
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).explain
> == Physical Plan ==
> *Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS 
> CITYIndexed#159]
> +- *Filter (isnotnull(_3#5) && (UDF(_2#4) = 1.0))
>    +- LocalTableScan [_1#3, _2#4, _3#5]
> {noformat}
> Note that in the latter, the query plan has pushed the filter `$"CITYIndexed" 
> === 1.0` back to be performed at the same stage as our null filter (`Filter 
> (isnotnull(_3#5) && (UDF(_2#4) = 1.0))`).
> With a debugger I have seen that both operands of `&&` are executed on each 
> row of `df`: `isnotnull(_3#5)` and `UDF(_2#4) = 1.0`. Therefore, the UDF is 
> passed the label `Bristol` despite isnotnull returning false for that row.
> If we cache the DataFrame `dfNoBristol` immediately after creating it, then 
> there is no longer an error because the optimizer does not attempt to call 
> the UDF on unseen data. The fact that we get different results depending on 
> whether or not we call cache is a cause for concern.
> I have seen similar issues with pure SparkSql DataFrame operations when the 
> DAG gets complicated (many joins, and aggregations). These are harder to 
> isolate to such a simple example, but I plan to report them in the near 
> future.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to