[jira] [Assigned] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2017-11-08 Thread Wenchen Fan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-22446:
---

Assignee: Liang-Chi Hsieh

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.0, 2.2.0
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>  

[jira] [Assigned] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2017-11-05 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22446:


Assignee: (was: Apache Spark)

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.0, 2.2.0
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)

[jira] [Assigned] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2017-11-05 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22446:


Assignee: Apache Spark

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.0, 2.2.0
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Apache Spark
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>   at org.apache.spark.scheduler.