[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-03-05 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387191#comment-16387191
 ] 

Liang-Chi Hsieh commented on SPARK-22446:
-

Yeah, sounds good.

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   at 
> 

[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-03-05 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386520#comment-16386520
 ] 

Joseph K. Bradley commented on SPARK-22446:
---

Maybe not then unless someone complains?

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> 

[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-03-03 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384618#comment-16384618
 ] 

Liang-Chi Hsieh commented on SPARK-22446:
-

This fix uses an new API  {{asNondeterministic}} of {{UserDefinedFunction. 
}}{{asNondeterministic}} is added since 2.3.0. If we want to backport this fix, 
we need to backport the API too. It is not hard but it involves SQL codes. 
Should we backport it because of this fix?

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 

[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-03-02 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384078#comment-16384078
 ] 

Joseph K. Bradley commented on SPARK-22446:
---

I see.  Do you have a sense of how hard it would be to backport this fix 
(definitely to 2.2 and maybe to 2.1 if it's easy)?

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   

[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-02-07 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356525#comment-16356525
 ] 

Liang-Chi Hsieh commented on SPARK-22446:
-

2.0 and 2.1 also have this issue.

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   

[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-02-07 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356506#comment-16356506
 ] 

Liang-Chi Hsieh commented on SPARK-22446:
-

Yes, this is an issue in Spark 2.2. For earlier version, let me check it.

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> 

[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2018-02-07 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356340#comment-16356340
 ] 

Joseph K. Bradley commented on SPARK-22446:
---

[~viirya] Did you confirm this is an issue in Spark 2.2 or earlier?

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.2, 2.1.2, 2.2.1
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.3.0
>
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> 

[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2017-11-05 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239905#comment-16239905
 ] 

Apache Spark commented on SPARK-22446:
--

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/19662

> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.0, 2.2.0
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   at 
> 

[jira] [Commented] (SPARK-22446) Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" exception incorrectly for filtered data.

2017-11-05 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239897#comment-16239897
 ] 

Liang-Chi Hsieh commented on SPARK-22446:
-

For this special case, the simplest workaround is to set {{handleInvalid}} as 
keep. Actually the another predicate {{isnotnull(_3#5)}} can filter the row out 
if the UDF doesn't cause error with {{handleInvalid}} as keep.

The problem is happened at the optimizer when pushing predicates down through 
projection. For the catalyst expressions, applying on the supposedly filtered 
out data is not a problem because other predicates should filter it out.

UDFs are special case because they can possibly cause runtime exception when 
applying on unexcepted data. It is not always safe to push down such predicates.

However, because not all UDFs are not safe to push down, we may not want to 
disable all pushdown UDF predicates. Currently I think we should let such UDFs 
as non-deterministic and disable pushdown for it.



> Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" 
> exception incorrectly for filtered data.
> ---
>
> Key: SPARK-22446
> URL: https://issues.apache.org/jira/browse/SPARK-22446
> Project: Spark
>  Issue Type: Bug
>  Components: ML, Optimizer
>Affects Versions: 2.0.0, 2.2.0
> Environment: spark-shell, local mode, macOS Sierra 10.12.6
>Reporter: Greg Bellchambers
>
> In the following, the `indexer` UDF defined inside the 
> `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an 
> "Unseen label" error, despite the label not being present in the transformed 
> DataFrame.
> Here is the definition of the indexer UDF in the transform method:
> {code:java}
> val indexer = udf { label: String =>
>   if (labelToIndex.contains(label)) {
> labelToIndex(label)
>   } else {
> throw new SparkException(s"Unseen label: $label.")
>   }
> }
> {code}
> We can demonstrate the error with a very simple example DataFrame.
> {code:java}
> scala> import org.apache.spark.ml.feature.StringIndexer
> import org.apache.spark.ml.feature.StringIndexer
> scala> // first we create a DataFrame with three cities
> scala> val df = List(
>  | ("A", "London", "StrA"),
>  | ("B", "Bristol", null),
>  | ("C", "New York", "StrC")
>  | ).toDF("ID", "CITY", "CONTENT")
> df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more 
> field]
> scala> df.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  B| Bristol|   null|
> |  C|New York|   StrC|
> +---++---+
> scala> // then we remove the row with null in CONTENT column, which removes 
> Bristol
> scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull)
> dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: 
> string, CITY: string ... 1 more field]
> scala> dfNoBristol.show
> +---++---+
> | ID|CITY|CONTENT|
> +---++---+
> |  A|  London|   StrA|
> |  C|New York|   StrC|
> +---++---+
> scala> // now create a StringIndexer for the CITY column and fit to 
> dfNoBristol
> scala> val model = {
>  | new StringIndexer()
>  | .setInputCol("CITY")
>  | .setOutputCol("CITYIndexed")
>  | .fit(dfNoBristol)
>  | }
> model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa2fb
> scala> // the StringIndexerModel has only two labels: "London" and "New York"
> scala> str.labels foreach println
> London
> New York
> scala> // transform our DataFrame to add an index column
> scala> val dfWithIndex = model.transform(dfNoBristol)
> dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 
> more fields]
> scala> dfWithIndex.show
> +---++---+---+
> | ID|CITY|CONTENT|CITYIndexed|
> +---++---+---+
> |  A|  London|   StrA|0.0|
> |  C|New York|   StrC|1.0|
> +---++---+---+
> {code}
> The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` 
> equal to 1.0 and perform an action. The `indexer` UDF in `transform` method 
> throws an exception reporting unseen label "Bristol". This is irrational 
> behaviour as far as the user of the API is concerned, because there is no 
> such value as "Bristol" when do show all rows of `dfWithIndex`:
> {code:java}
> scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count
> 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40)
> org.apache.spark.SparkException: Failed to execute user defined 
> function($anonfun$5: (string) => double)
>   at 
>